rabbitmq学习笔记

一、消息确认机制

成都创新互联公司成立于2013年,先为彰武等服务建站,彰武等地企业,进行企业商务咨询服务。为彰武企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

rabbitmq在发送消息后立即从内存中删除消息,因此如果消费者处理消息耗时较长,在处理过程中消费者被kill,则处理中的消息、以及其他发往该消费者的消息都将丢失。

为了保证消息不丢失,rabbitmq支持消息确认机制,消费者可以发送ack告诉rabbitm指定消息已经收到并处理,因此rabbitmq可以删除该消息。

如果消费者死掉(channel关闭、connection关闭、或者TCP connection丢失),导致rabbitmq没有收到ack,rabbitmq将把消息重入队列。

不存在消息超时,这意味着处理一个消息非常长的时间也是ok的。

消息确认机制默认是开启的,通过在channel.basic_consume中设置no_ack=True关闭。

注意消费者在处理消息后,不要忘记调用channel.basic_ack进行消息确认,否则rabbitmq将不断消耗内存把消息重入队列。

二、队列/消息持久化

为了防止rabbitmq服务终止导致队列和消息丢失,需要将队列和消息标记为持久化的:

  1. 确保rabbitmq永远不丢失队列,需要将队列 声明为持久化的:

  2. channel.queue_declare(queue='task_queue', durable=True)
  3. 将消息声明为持久化的:

  4. channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))

注意:尽管已经很健壮了,但是仍然无法完全保证消息不会丢失,例如rabbitmq接收消息但是还没有保存到硬盘的情况。

三、exchange

简单的说,exchange的一端接收消息,另一端把消息放进队列。

在rabbitmq中生产者不会将请求直接发送给消费者,生产值只会把消息发给exchange,exchange收到消息后需要知道怎么做:添加到特定队列、添加到多个队列、还是丢弃。

exchange的类型包括direct,topic,headers,fanout

四、绑定

exchange和queue之间的联系被称为绑定(binding),可以简单的看:队列对于特定exchange上的消息感兴趣

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

此时'logs' exchange将添加消息到指定queue

绑定可以使用一个额外的routing_key参数,例如:

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

对于fanout类型的exchange来说,routing_key参数是被忽略的

五、topic exchange

发往topic exchange的消息不能携带任意的routing_key,必须是以点隔开的一串字符,最大255个字节

binding key也必须是相同的形式,注意binding key有两个重要的特殊情况:

* 可以替代一个单词

#可以替代零个或多个单词

例如,如果binding key是*.orange.*,则可以匹配所有.orange.的key,但是如果key不是*.*.*的形式,例如orange,或者quick.orange.male.rabbit,则消息将被丢弃。

如果binding key是lazy.#,则类似于带有lazy.orange.male.rabbit的key的消息可以匹配。

topic exchange非常强大,通过匹配routing_key可以表现的像存在多个exchange

六、RPC

为了接收响应,客户端需要在发送请求时附加发送回调队列地址:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)
                      
# ... and some code to read a response message from the callback_queue ...

本文名称:rabbitmq学习笔记
文章位置:http://pcwzsj.com/article/gdsddp.html