ActiveMQ(四)——持久化
前言
什么是持久化消息?
保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。假如消息服务因为某种起因导致失败,它可以恢复此消息传送值相应的消费者。尽管这样添加了消息传送的开销,但却添加了可靠性。
代码同样会放到文章末尾供大家参考。
1.queue消息的持久和非持久
queue非持久,当生产者发送了消息,服务器宕机重启后,消费者时接收不到消息的。
queue持久化,当服务器宕机,消息仍然存在。queue消息默认时持久化的。
持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。
可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。image.png
1)queue非持久化
代码和上篇文章的代码一样。image.png
非持久化只要要在创立生产者对象之后设置DerliveryMode属性为非持久化。
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
2)queue持久化
queue默认就是持久化的,我们同样也可以在创立生产者对象之后手动设置持久化
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
大家可以分别启动一下持久化和非持久化的生产者,查看控制台的queue再重启ActiveMQ,看一下结果是如何的。
结果就是持久化的消息重启MQ后仍然存在,非持久化的则没有消息。
2.topic持久化
topic默认就是非持久化的,由于生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。
topic消息持久化,只需消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不论是MQ服务器宕机还是消费者不在线。
topic持久化是依据上篇文章的topic进行改造的。
1)topic持久化之生产者
public class JmsProduce_persistence { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "MQ-topic-persistence"; public static void main(String[] args) throws JMSException { //1.创立连接工厂,按照给定的url地址,采用默认的客户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.通过连接工厂,取得connection连接 Connection connection = activeMQConnectionFactory.createConnection(); //3.创立会话session //两个参数,第一个叫事务/第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创立目的地(具体是对立还是主题topic) Topic topic = session.createTopic(TOPIC_NAME); //5.创立消息的生产者 MessageProducer messageProducer = session.createProducer(topic); //设置持久化topic messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); //通过使用messageProducer for(int i = 1; i<=4;i++){ //7.创立消息 TextMessage textMessage = session.createTextMessage("TOPIC_NAME---"+i); //8.通过messageProducer发送给mq messageProducer.send(textMessage); } //9.关闭资源 messageProducer.close(); session.close(); connection.close(); System.out.println("****TOPIC_NAME消息发送到MQ完成"); }
大家可以比照一下生产者topic持久化和非持久化的代码。
区别就是把start()方法放到了创立生产者对象之后,并且在start()之前手动设置了持久化。
2)topic持久化之消费者
public class JmsConsumer_persistence { public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616"; public static final String TOPIC_NAME = "MQ-topic-persistence"; public static void main(String[] args) throws JMSException, IOException { System.out.println("***我是一号消费者"); //1.创立连接工厂,按照给定的url地址,采用默认的客户名和密码 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //2.通过连接工厂,取得connection连接 Connection connection = activeMQConnectionFactory.createConnection(); //设置用户端ID,向MQ服务器注册自己的名称 connection.setClientID("merry"); //3.创立会话session //两个参数,第一个叫事务/第二个叫签收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //4.创立目的地(具体是对立还是主题topic) Topic topic = session.createTopic(TOPIC_NAME); //创立一个topic订阅者对象。一参是topic,二参事订阅者名称 TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark..."); connection.start(); Message message = topicSubscriber.receive(); while (null != message){ TextMessage textMessage = (TextMessage) message; System.out.println("收到的持久化 topic:"+textMessage.getText()); message = topicSubscriber.receive(); } session.close(); connection.close(); }
topic消费者持久化需要设置一个注册名,要使用订阅对象TopicSubscriber。
3)测试topic持久化
①肯定要先运行一次消费者,等于向MQ注册,相似我订阅了这个topic。(最好是启动两个消费者,注册好后关闭一个,生产者发送消息后再启动,这样更直观的表现出持久化)
②而后再运行生产者发送消息
③之后无论消费者能否在线,都会收到消息。假如不在线的话,下次连接的时候,就会接受到消息。
image.png
3.demo 代码地址
github: Code-Gao/ActiveMQDemo
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » ActiveMQ(四)——持久化