RabbitMQ基础
1.传递模式
点对点模式和发布/订阅模式
点对点模式基于队列
,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能;发布/订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic)
,主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者和消息的发布者相互保持独立,不需要进行接触即可保证消息传递。
2.作用
异步
,解耦
,削峰
3.相关概念
Broker:消息中间件的服务节点。一个RabbitMQ Broker可以简单的看作一个RabbitMQ服务节点,或者RabbitMQ实例。
Exchange:交换器。生产者将消息发送到Exchange,由交换器将消息路由到一个或多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。交换器有四种类型。不同类型有着不同的路由策略。
RoutingKey:路由健。生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才会生效。
Binding:绑定。通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(BindingKey),这样就知道如何正确的将消息路由到队列了。
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器时,这些绑定允许使用相同的BindingKey。 BindingKey并不是在所有情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。 在direct交换器类型下,RoutingKey和BindingKey需要完全匹配才能使用,但是在topic交换器类型下,RoutingKey和BindingKey之间需要模糊匹配,二者并不是相同的。 BindingKey其实也属于路由健的一种,BindingKey是在绑定时候使用的路由健。
交换器类型
fanout,把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
direct,把消息路由到BindingKey和RoutingKey完全匹配的队列中。
topic,将消息路由到BindingKey和RoutingKey相匹配的队列中,匹配规则如下:
RoutingKey为一个点号“.”分割的字符串,如“com.rabbitmq.client”,被点号分割开的每一段独立的字符串称为一个单词 BindingKey和RoutingKey也是点号“.”分割的字符串 BindingKey中可以存在两种特殊字符床“”和“#”,用于做模糊匹配,其中“#”用于匹配一个单词,“”用于匹配多个单词。
headers,不依赖于路由健的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列。该类型交换器性能差,一般不用。
Connection,Channel
生产者和消费者都要和RabbitMQ Broker建立连接,这个连接是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端紧接着创建一个AMQP信道,也就是Channel,每个信道都会被指派唯一一个I。信道是建立在Connection之上的虚拟连接。
RabbitMQ使用多路复用,一个连接下,每个线程各自一个Channel实例,多线程间共享Channel实例是非线程安全的。 一般情况下,一个Channel对应一个消费者,这样每个消费者之间没有任何关联,一个消费者运行,不会阻塞其他消费者的调用。
4.消费消息模式
推(Push)模式,Basic.Consume(),通过持续订阅的方式来消费消息,直到取消队列的订阅为止。在接收期间,RabbitMQ会不断的推送消息给消费者。
拉(Pull)模式,Basic.Get(),可以单条的获取消息。 要实现高吞吐量,消费者应该使用推模式,如果只想从队列中获得单条消息而不是持续订阅,可以使用拉模式。但是不能将Basic.Get放在一个循环里代替Basic.Consume,这样会严重影响RabbitMQ的性能。
5.消费端的确认与拒绝
RabbitMQ的消息确认机制可以保证消息从队列可靠的到达消费者,指定autoAck参数,当为false时,RabbitMQ会等待消费者显式的回复确认信号后才从内存或磁盘中删除消息;当为true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存或磁盘中删除,而不管消费者是否真正的消费到了这些消息。
当autoAck为false时,对于RabbitMQ而言,队列中的消息分为两部分:等待投递给消费者的消息
和已经投递给消费者,但是还没有收到消费者确认信号的消息
。如果RabbitMQ一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接,则RabbitMQ会安排该消息重新进入队列,等待下一次投递给消费者。
RabbitMQ不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开
。
RabbitMQ的web管理平台,当前队列中的Ready状态和Unacknowledged状态的消息数,分别对应等待投递给消费者的消息数和已经投递给消费者但是未收到确认信号的消息数。
6.消息传输
mandatory(强制性)和immediate(即时)是channel.basicPubish方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。备份交换机(Alternate Exchange)可以将未能被交换机路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。
mandatory参数:当为true时,交换器无法根据自身的类型和路由健找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者;当为false时,则直接将消息丢弃。
immediate参数:当为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息不会存入队列中,当与路由健匹配的所有队列都没有消费者时,消息会通过Basic.Return返回至生产者。
mandatory参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者。
备份交换机(Alternate) 如果不想消息丢失,可以使用备份交换器,这样可以将未被路由的消息存储在RabbitMQ中,再在需要的时候去处理这些消息。可以通过在声明交换器的时候添加alternate-wxchange参数来实现。
如果设置的备份交换器不存在,客户端和RabbitMQ服务器都不会有异常出现,此时消息会丢失; 如果备份交换器没有绑定任何队列,客户端和RabbitMQ服务器都不会有异常出现,此时消息会丢失; 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ服务器都不会有异常出现,此时消息会丢失; 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。
过期时间(TTL,Time to Live)
目前有两种方法可以设置消息的TTL,第一种方法是通过队列属性设置
,第二种是对消息本身进行单独设置
。消息在队列中的生产时间一旦超过设置的TTL值时,就会变成“死信”(Dead Message),消费者将无法再收到该消息(不是绝对的)。
如果不设置TTL,则表示此消息不会过期;如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
死信队列
DLX(Dead Letter Exchange),死信交换器。当消息在一个队列中变成死信之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就是死信队列。
消息变成死信由于几种情况:
消息被拒绝(Basic.Reject/Basic.Nack)
,并且设置requeue参数为false(未被确认的消息会被重新加入到队列中,为false时,同一条消息会被分配给与之前相同的消费者,为true时,可能会被分配给与之前不同的消费者);
消息过期
;
队列达到最大长度
。
DLX也是一个正常的交换器,通过设置x-dead-letter-exchange参数来为这个队列添加DLX。
当队列中存在死信时,RabbtiMQ会自动将这个消息重新发布到设置DLX上去,进而进入死信队列。可以监听这个队列中的消息进行相应的处理,这个特性与将消息的TTL设置为0配合使用弥补immediate参数的功能
。
web管理页面中,队列被标记了“D”,是durable的缩写,既设置了队列持久化,DLX指的就是x-dead-letter-exchange参数。
延迟队列 延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。 可以通过DLX和TTL模拟出延迟队列的功能。
优先级队列 具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。 设置队列的x-max-priority参数来实现。 web管理页面中,pri标识。
持久化
RabbitMQ的持久化分为三个部分:交换器的持久化
,队列的持久化
,消息的持久化
。
交换器的持久化是通过在声明队列时将durable参数设置为true实现的,如果交换器不设置持久化,在RabbitMQ重启后,相关的交换器的元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了,一个长期使用的交换器,应该置为持久化。
队列的持久化通过在声明队列时将durable参数设置为true实现,如果队列不持久化,那么RabbitMQ重启后,相关队列的元数据会丢失,此时数据也会丢失。
队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要保证消息不会丢失,需要将其设置为持久化,将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2即可实现消息的持久化。
设置了队列和消息的持久化,当RabbitMQ服务重启后,消息依旧存在。
对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。
将交换器,队列,消息都设置了持久化之后,就能保证数据不丢失了吗?不能! 消费者端,如果订阅消费队列时将autoAck参数设置为true,当消费者接收到消息后,还没来得及处理就宕机了,这样也算数据丢失。 在持久化的消息正确存入RabbitMQ之后,还需要有一段时间(很短)才能存入磁盘,RabbitMQ不会为每条数据都进行同步存盘的处理,可能仅仅保存到操作系统缓存中而不是物理磁盘中。如果这段时间内服务发生异常情况,消息保存还没来得及落盘,那么这些消息将丢失。 可以引入RabbitMQ的镜像队列机制来解决。相当于设置了副本,如果主节点在此特殊时间内挂掉,可以自动切换到从节点,这样有效的保证了高可用性,除非整个集群都挂掉。生产环境中的关键业务队列一般都会设置镜像队列。 还可以在发送端引入事务机制或者发送方确认机制来保证消息已经正确的发送并存储至RabbitMQ中,前提还要保证在调用channel.basicPublish方法的时候交换器能够将消息正确路由到相应的队列中。
7.生产者确认
生产者将消息发送之后,默认情况下生产者不知道消息有没有正确的到达服务器,如果在消息到达服务器之前消息已经丢失,持久化操作也解决不了消息丢失的问题。
RabbitMQ提供两种解决方式:事务机制
,发送方确认机制(publisher confirm)
事务机制 三个相关的方法:channel.txSelect,channel.txCommit,channel.txRollback channel.txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,channel.txRollback用于事务回滚。 在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ,如果在事务提交执行之前由于RabbitMQ异常奔溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法实现事务回滚。
使用RabbitMQ的事务模式在正常流程上多4个步骤: 客户端发送Tx.Select,将信道设置为事务模式; Broker回复Tx.Select-Ok,确认已将信道设置为事务模式; 在发送玩消息之后,客户端发送Tx.Commit提交事务; Broker回复Tx.Commit-Ok,确认事务提交。 事务机制能解决发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是事务机制性能太差。
发送方确认机制 生产者将信道设置为confirm(确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID,一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。 如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也可以设置channel.basicAck方法中的mutiple参数,表示到这个序号之前的所有消息都已经得到了处理。
事务机制在一条消息发送之后会使发送端阻塞,以等待RabbitMQ的回应,之后才能继续发送下一条消息。 发送方确认机制最大的好处是它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用程序便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理该Nack命令。 事务机制和发送方确认机制是互斥的,不能共存。 事务机制和发送方确认机制确保的是消息能够正确的发送至RabbitMQ的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列,也就是说,发送方要配合mandatory参数或者备份交换器一起使用来提高消息传输的可靠性。
8.消费端的问题
消息分发,消息顺序性,弃用QueueingConsumer
消息分发:当队列拥有多个消费者时,队列收到的消息以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。但是很多时候轮询的分发机制有问题,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者很快就处理完了所分配的消息,进而进程空闲,这样会造成整体应用吞吐量下降。
使用channel.basicQos
方法,允许限制信道上的消费者所能保持的最大未确认消息的数量。举例说明,在订阅消费队列前,消费端程序调用channel.basicQos(5),之后订阅了某个队列进行消费。RabbitMQ会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么RabbitMQ就不会向这个消费者再发送任何消息,直到消费者确认了某条消息之后,RabbitMQ将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。
Basic.Qos的使用对于
拉模式
的消费方式无效。
channel.basicQos有三种类型的重载方法:
void basicQos(int prefetchCount);
void basicQos(int prefetchCount, boolean global);
void basicQos(int prefetchSize, int prefetchCount, boolean glbal);
前面介绍的都是prefetchCount参数,当prefetchCount设置为0则表示没有上限。prefetchSize参数表示消费者所能接收未确认消息的总体大小的上限,单位是B,设置为0则表示没有上限。
一个信道上可以同时消费多个队列当设置prefetchCount大于0时,这个信道需要和各个队列协调以确保发送的消息都没有超过所限定的prefetchCount的值,这样会使RabbitMQ性能降低,当global参数为true时,信道上所有的消费者都需要遵从prefetchCount的限定值。
消息顺序性:消费者消费到的消息和发送者发布的消息的顺序是一致的。如果有多个生产者同时发送消息,无法确定消息到达Broker的前后顺序,也就无法验证消息的顺序性。下面的情形会导致RabbitMQ消息的顺序性被打破:
如果生产者使用了事务机制,在发送消息之后遇到异常进行事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。如果启用发送者确认机制,在发送超时,中断,或者收到RabbitMQ的Basic.Nack命令时,同样需要补偿发送,结果与事务机制一样会错序。
如果生产者发送的消息设置了不同的超时时间,并且也设置了死信队列,整体上相当于一个延时队列,那么消费者在消费这个队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。
如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。
如果一个队列按照先后顺序有msg1,msg2,msg3,msg4这4个消息,同时有ConsumerA和ConsumerB两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA中的消息为msg1和msg3,ConsumerB中的消息为msg2和msg4。ConsumerA收到消息msg1之后并不小处理而调用了Basic.Nack/.Reject将消息拒绝,与此同时将requeue设置为true,这样这条消息就可以重新存入队列中。消息msg1之后被发送到了ConsumerB中,此时ConsumerB已经消费了msg2,msg4,之后再消费msg1,这样消息顺序性也就错乱了。或者消息msg1又重新发送到ConsumerA中,此时ConsumerA已经消费了msg3,再消费msg1的话消息顺序性也无法得到保障。同时可以用在Basic.Recover这个AMQP命令中。
如果要保证消息的顺序性,需要业务方法使用RabbitMQ之后做进一步处理,比如在消息体内添加全局有序标识来实现。
弃用QueueingConsumer:订阅消费的方式通过继承DefaulatConsumer类来实现。建议不要使用QueueingConsumer类来实现订阅消费。QueueingConsumer类在3.x中广泛使用,4.x中被废弃。
本身有几大缺陷,最主要的是内存溢出
问题,如果由于某些原因,队列中堆积了比较多的消息,就可能导致消费者客户端内存溢出假死,于是发生恶性循环,队列消息不断堆积而得不到消化。
可以使用Basic.Qos解决,限制某个消费者所保持未确认消息的数量,也就间接的限制了QueueingConsumer中的LinkedBlockingQuere的大小。一定要在调用Basic.Consumer之前调用Basic.Qos才能生效。
还包含以下一些缺陷:
QueueingConsumer会拖累同一个Connection下的所有信道,使其性能下降;
同步递归调用QueueingConsumer会产生死锁;
RabbitMQ的自动连接恢复机制(automatic connection recovery)不支持QueueingConsumer的形式;
QueueingConsumer不是事件驱动的。
9.消息传输保障
一般消息中间件的消息传输保障分为三个层级:
最多一次,消息可能会丢失,但绝不会重复传输;
最少一次,消息绝不会丢失,但可能会重复传输;
恰好一次,每条消息肯定会被传输一次且仅传输一次。
RabbitMQ支持“最多一次
”和“最少一次
”,其中“最少一次”投递实现需要考虑以下内容:
消息生产者需要开启事务机制或生产者确认机制,以确保消息可以可靠的传输到RabbitMQ中;
消息生产者需要配合使用mandatory参数或者备份交换器来确保消息能从交换器路由到队列中,进而能够保存下来而不会被丢弃;
消息和队列都需要进行持久化处理,以确保RabbitMQ服务器在遇到异常情况时不会造成消息丢失;
消费者在消费消息的同时需要将autoAck蛇者未false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。
“最多一次”无须考虑以上内容,生产者随意发送,消费者随意消费,不过很难确保消息不会丢失。
“恰好一次”是RabbitMQ无法保障的。消费者在消费完一条消息之后向RabbitMQ发送Basic.Ack命令,此时由于网络断开或其他原因造成RabbitMQ并没有收到这个确认命令,那么RabbitMQ不会将此消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消息。另一种情况,生产者在使用发送者确认机制的时候,发送完一条消息等待RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ中就有两条同样的消息,在消费的时候,消费者就会重复消费。
RabbitMQ没有去重机制来保证“恰好一次”,去重处理一般在业务客户端中实现,比如业务消息具备幂等性,或者使用Redis进行去重等等。
10.分布式部署
RabbitMQ可以通过3中方式实现分布式部署:集群
,Federation
,Shovel
。
集群将多个Broker节点连接起来组成逻辑上独立的单个Broker。集群内部借助Erlang进行消息传输,所以集群中的每个节点的Erlang cookie必须保持一致。同时,集群内部的网络必须是可靠的,RabbitMQ和Erlang的版本也必须一致。虚拟主机,交换器,用户,权限等都会自动备份到集群中的各个节点。队列可能部署单个节点或被镜像到多个节点中。通常使用集群的部署方式来提高可靠性和吞吐量,不过集群只能部署在局域网内。
Federation,翻译未“联邦”,Federation可以通过AMQP协议让原本发送到某个Broker(或集群)中的交换器(或队列)上的消息能够转发到另一个Broker(或集群)中的交换器(或队列)上,两方的交换器(或队列)看起来是以一种“联邦”的形式在运作。
联邦交换器(federated exchange)通过单向点对点的连接形式通信。默认情况下,消息只会由Federation连接转发一次,可以允许有复杂的路由拓扑来提高转发次数。在Federation连接上,消息可能不会被转发,如果消息到达了联邦交换器之后路由不到合适的队列,那么它也不会被再次转发到原来的地方。可以通过Federation连接广域网中的各个RabbitMQ服务器来生产和消费消息。联邦队列(federation queue)也是通过单向点对点连接进行通信的,消息可以根据具体的配置消费者的状态在联邦队列中游离任意次数。
Shovel,工作在比Federation更低一层上。Federation从一个交换器中转发消息到另一个交换器,Shovel只是简单的从某个Broker上的队列中消费消息,然后转发消息到另一个Broker上的交换器。Shovel也可以在单独的一台服务器上去转发消息,比如将一个队列中的数据移动到另一个队列中。如果想获得比Federation更多的控制,可以在广域网中使用Shovel连接各个RabbitMQ Broker来生产或消费消息。
3中方式比较:
Federation/Shovel | 集群 |
---|---|
各个Borker节点之间逻辑分离 | 逻辑上是一个Broker节点 |
各个Broker节点之间可以运行不同版本的Erlang和RabbitMQ | 各个Broker节点之间必须运行相同版本的Erlang和RabbitMQ |
各个Broker节点之间可以在广域网中相连,当然必须要授予适当的用户和权限 | 各个Broker节点之间必须在可信赖的局域网中相连,通过Erlang内部节点传递消息,但节点间需要有相同的Erlang cookie |
各个Broker节点之间能以任何拓扑逻辑部署,连接可以是单向的或者是双向的 | 所有Broker节点都双向连接所有其他节点 |
从CAP理论中选择可用性和分区容错性,即AP | 从CAP理论中选择一致性和可用性,即CA |
一个Broker中的交换器可以是Federation生成的或者是本地的 | 集群中所有Broker节点中的交换器都是一样的,要么全有要么全无 |
客户端只能看到它所连接的Broker节点上的队列 | 客户端连接到集群中的任何Broker节点都可以看到所有的队列 |