分布式架构学习:消息中间件RabbitMQ可靠性投递与生产实践
本章重点:
可靠性投递
1.确保消息发送到RabbitMQ服务器
2.确保消息被正确的路由
3.确保消息在队列正确地存储
4.确保消息从队列正确地投递到消费者
5.消费者回调
6.补偿机制
7.消息幂等性
8.消息的顺序性
可靠性投递
首先需要明确,效率和可靠性是无法兼得的,假如要保证每一个环节都成功,势必会对消息的收发效率造成影响,如过是少量业务实时性要求不是特别高的场合,可以牺牲可靠性来换取效率。

- ①代表消息从生产者发送到Exchange
- ②代表消息从Exchange路由到Queue
- ③ 代表消息在Queue中存储;
- ④ 代表消费者订阅Queue并消费消息。
1.确保消息发送到RabbitMQ服务器
可能由于网络或者者Broker的问题导致①失败,而生产者是无法得知消息能否正确发送到Broker的。
有两种处理方案:
第一种是Transaction事务模式
第二种是Confirm确认模式
1.在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,假如事务提交成功,则消息肯定 到达了RabbitMQ中,假如在事务提交执行之前因为RabbitMQ异常崩溃或者者其余起因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。使用事务机制的话会“吸干”RabbitMQ的性 能,一般不建议使用。
2.生产者通过调用channel.con?rmSelect方法(即Con?rm.Select命令)将信道设置为con?rm模式。一旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。
2.确保消息被正确的路由
可能由于路由关键字错误,或者者队列不存在,或者者队列名称错误导致②失败。
- 使用mandatory参数和ReturnListener,可以实现消息无法路由的时候返回给生产者。
- 另一种方式就是使用备份交换机(alternate-exchange),无法路由的消息会发送到这个交换机上。
Map<String,Object> arguments = new HashMap<String,Object>();// 指定交换机的备份交换机arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);3.确保消息在队列正确地存储
可能由于系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题。
处理方案:
1.队列持久化
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments channel.queueDeclare(QUEUE_NAME, true, false, false, null);2.交换机持久化
// String exchange, boolean durable channel.exchangeDeclare("MY_EXCHANGE","true");3.消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() // 2代表持久化,其余代表瞬态 .deliveryMode(2) .build(); channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());4.确保消息从队列正确地投递到消费者
假如消费者收到消息后未来得及解决即发生异常,或者者解决过程中发生异常,会导致④失败。
为了保证消息从队列可靠性到达消费者,RabbitMQ提供了消息确认机制(message acknowledgement),消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显示地回复确认消息才从队列中删除该消息。
假如消息消费失败,也可以调用Basic.Reject或者者BasicNack来拒绝当前消息而不是确认,假如requere参数为true,可以把这条消息重新存入队列,以便发送给下一个消费者。
5.消费者回调
消费者解决消息之后,可以再发送一条消息给生产者,或者者调用生产者地API,告知消息解决完毕。
6.补偿机制
对于肯定时间没有响应地消息,可以设置一个定时重发地机制,但是要控制次数,比方最多重复三次,否则会造成消息堆积。
7.消息幂等性
服务端是没有这种控制的,只能在消费端控制。
如何避免消息的重复消费?
消息重复消费可能会有两个起因:
- 生产者的问题。环节①重复发送消息,比方在开启Confirm模式但未收到确认
- 环节④出了问题,因为消费者未发送ACK或者者其它起因,消息重复投递
对于重复发送的消息,可以对每一条消息生成一个唯一的业务id,通过日志或者者建表来做重复控制。
8.消息的顺序性
消息的顺序性是指消费者消费消息的顺序跟生产者投递消息的顺序是一致的。
在RabbitMQ中,一个队列有多个消费者时,因为不同的消费者消费消息的速度是不一样的,顺序无法保证
读者分享
觉得不错的朋友可以点点左下角的拇指小赞一下,同时在这给大家分享少量免费的架构资料(包括 视频,课件,面试专题,学习笔记等)关注官方微信公众号,那里每天都会有技术干货、技术动向、职业生涯、行业热点、职场趣事等一切有关于程序员的内容分享。更有海量Java架构、移动互联网架构相关源码视频,面试资料,电子书籍截止于4月28日免费发放。学习资源丰富实用,有需要的朋友可以来关注,扫描下方二维码关注wx公众号免费获取↓↓↓

资源大本营↓↓↓
Java架构资料
Java源码解析,到各种框架学习,再到项目实战,一应俱全,包括但不限于:Spring、Mybatis等源码、Java进阶、Java架构师、虚拟机、性能优化、并发编程、数据结构和算法。



移动互联网架构资料
Android进阶、Android架构、APP开发、NDK板块开发、小程序开发、微信开发。

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 分布式架构学习:消息中间件RabbitMQ可靠性投递与生产实践