AMQP消息路由必须有三部分:交换器、队列和绑定。生产者把消息发布到交换器上;消息最终到达队列,并被消费者接收;绑定决定了消息如何从路由器路由到特定的队列。
从底部开始构造:队列
消费者通过以下两种方式从特定的队列中接受消息:
(1)通过AMQP的basic.consume命令订阅。这样做会将信道置为接收模式,直到取消对队列的订阅为止。订阅了消息后,消费者在消费(或者拒绝)最近接收的那条消息后,就能从队列中(可用的)自动接收下一条消息。如果消费者处理队列消息,并且/或者需要在消息一到达队列时就自动接收的话,应该使用basic.consume。
(2)某些时候,只想从队列获得单条消息而不是持续订阅。向队列请求单条消息是通过AMQP的basic.get命令实现的。这样做可以让消费者接收队列中的下一条消息。如果要获得更多消息的话,需要再次发送basic.get命令。不应该将basic.get放在一个循环里来代替basic.consume。因为这样做会影响Rabbit的性能。大致上讲,basic.get命令会订阅消息,获得单条消息,然后取消订阅。消费者理应始终使用basic.consume来实现高吞吐量。
如果至少有一个消费者订阅了队列的话,消息会立即发送给这些订阅的消费者。但是如果消息到达了无人订阅的队列呢?在这种情况下,消息会在队列中等待。一旦有消费者订阅到该队列,那么队列上的消息就会发送给消费者。
当Rabbit队列拥有多个消费者时,队列收到的消息将以循环(round-robin)的方式发送给消费者。每条消息只会发送给一个订阅的消费者。
消费者接收到的每一条消息都必须进行确认。消费者必须通过AMQP的basic.ack命令显示地向RabbitMQ发送一个确认,或者在订阅到队列的时候就将auto_ack参数设置为true。当设置了auto_ack时,一旦消费者接收消息,RabbitMQ会自动视其确认了消息。需要记住的是,消费者对消息的确认和告诉生产者消息已经被接收了这两件事毫不相关。因此,消费者通过确认命令告诉RabbitMQ它已经正确地接收了消息,同时RabbitMQ才能安全地把消息从队列中删除。
如果消费者收到一条消息,然后确认之前从Rabbit断开连接(或者从队列上取消订阅),RabbitMQ会认为这条消息没有分发,然后重新分发给下一个订阅的消费者。如果应用程序崩溃了,这样做可以确保消息会被发送给另一个消费者进行处理。
另一方面,如果应用程序有bug而忘记确认消息的话,Rabbit将不会给该消费者发送更多消息了。这是因为在上一条消息被确认之前,Rabbit会认为这个消费者并没有准备好接收下一条消息。如果处理消息内容非常耗时,则应用程序可以延迟确认该消息,直到消息处理完成。这样可以防止Rabbit持续不断的消息涌向应用程序而导致过载。
在收到消息后,如果想要明确拒绝而不是确认收到该消息的话,该如何呢?只要消息尚未确认,则有以下两个选择:
(1) 把消费者从RabbitMQ服务器断开连接。这会导致RabbitMQ自动重新把消息入队并发送给另一个消费者。这样做的好处是所有的RabbitMQ版本都支持。缺点是,这样连接/断开连接的方式会额外增加RabbitMQ的负担(如果消费者在处理每条消息时都遇到错误的话,会导致潜在的重大负荷)。
(2)如果正使用RabbitMQ 2.0.0或者更新的版本,那就使用AMQP的basic.reject命令。顾名思义:basic.reject允许消费者拒绝RabbitMQ发送的消息。如果把reject命令的requeue参数设置成true的话,RabbitMQ会将消息重新发送给下一个订阅的消费者。如果设置成false的话,RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。也可以通过对消息确认的方式来简单地忽略该消息(这种忽略消息的方式的优势在于所有版本的RabbitMQ都支持)。如果检测到一条格式错误的消息而任何一个消费者都无法处理的时候,这样做就十分有用。
在将来的RabbitMQ版本中会支持一个特殊的“死信”(dead letter)队列,用来存放那些被拒绝而不重入队列的消息。死信队列让通过检测拒绝/未送达的消息来发现问题。如果应用程序想自动从死信队列功能中获益的话,需要使用reject命令并将requeue参数设置成false。
消费者和生产者都能使用AMQP的queue.declare命令来创建队列。但是如果消费者在同一条信道上订阅了另一个队列的话,就无法再声明队列了。必须首先取消订阅,将信道置为“传输”模式。当创建队列时,常常想要指定队列名称。消费者订阅队列时需要队列名称,并在创建绑定时也需要指定队列名称。如果不指定队列名称的话,Rabbit会分配一个随机名称并在queue.declare命令的响应中返回。以下是队列设置中另一些有用的参数:
- exclusive——如果设置为true的话,队列将变成私有的,此时只有你的应用程序才能够消费队列消息。当想要限制一个队列只有一个消费者的时候很有帮助。
- auto-delete——当最后一个消费者取消订阅的时候,队列就会自动移除。如果需要临时队列只为一个消费者服务的话,请结合使用auto-delete和exclusive。当消费者断开连接时,队列就被移除了。
如果尝试声明一个已经存在的队列会发生什么呢?只要声明参数完全匹配现存的队列的话,Rabbit就什么都不做,并成功返回,就好像这个队列已经创建成功一样(如果参数不匹配的话,队列声明尝试会失败)。如果只是想检测队列是否存在,则可以设置queue.declare的passive选项为true。在该设置下,如果队列存在,那么queue.declare命令会成功返回;如果队列不存在的话,queue.declare命令不会创建队列而会返回一个错误。
联合起来:交换器和绑定
服务器会根据路由键将消息从交换器路由到队列,但它是如何处理投递到多个队列的情况的呢?协议中定义的不同类型交换器发挥了作用。一共有四种类型:direct、fanout、topic、和headers。每一种类型实现了不同的路由算法。
headers交换器允许匹配AMQP消息的header而非路由键。除此之外,headers交换器和direct交换器完全一致,但性能会差很多。因此它并不太实用,而且几乎再也用不到了。
direct交换器非常简单:如果路由键匹配的话,消息就被投递到对应的队列。
服务器必须实现direct类型交换器,包含一个空白字符串名称的默认交换器。当声明一个队列时,它会自动绑定到默认交换器,并以队列名称作为路由键。这意味着可以使用如下代码发送消息到之前声明的队列去。前提是已经获得了信道实例:
1 | $channel->basic_publish($msg, '', 'queue_name'); |
第一个参数是想要发送的消息内容;第二个参数是一个空的字符串,指定了默认交换器;而第三个参数就是路由键了。当默认的direct交换器无法满足应用程序的需求时,可以声明自己的交换器。只需发送exchange.declare命令并设置合适的参数就行了。
fanout交换器会将收到的消息广播到绑定的队列上。消息通信模式很简单:当发送一条消息到fanout交换器时,它会把消息投递给所有附加在此交换器上的队列。这允许对单条消息做不同方式的反应。
topic交换器允许实现有趣的消息通信场景,它使得来自不同源头的消息能够到达同一个队列。
1 | $channel->basic_publish($msg, 'logs-exchange', 'error.msg-inbox'); |
单个”.”把路由键分为了几部分,”*”匹配特定位置的任意文本,”#”匹配所有规则。