ActiveMQ(四)——持久化

作者 : 开心源码 本文共3272个字,预计阅读时间需要9分钟 发布时间: 2022-05-13 共168人阅读

前言

什么是持久化消息?
保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。假如消息服务因为某种起因导致失败,它可以恢复此消息传送值相应的消费者。尽管这样添加了消息传送的开销,但却添加了可靠性。
代码同样会放到文章末尾供大家参考。

1.queue消息的持久和非持久

queue非持久,当生产者发送了消息,服务器宕机重启后,消费者时接收不到消息的。
queue持久化,当服务器宕机,消息仍然存在。queue消息默认时持久化的。
持久化消息,保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。

可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。image.png

1)queue非持久化

代码和上篇文章的代码一样。image.png

非持久化只要要在创立生产者对象之后设置DerliveryMode属性为非持久化。

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

2)queue持久化

queue默认就是持久化的,我们同样也可以在创立生产者对象之后手动设置持久化

messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

大家可以分别启动一下持久化和非持久化的生产者,查看控制台的queue再重启ActiveMQ,看一下结果是如何的。
结果就是持久化的消息重启MQ后仍然存在,非持久化的则没有消息。

2.topic持久化

topic默认就是非持久化的,由于生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。
topic消息持久化,只需消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不论是MQ服务器宕机还是消费者不在线。
topic持久化是依据上篇文章的topic进行改造的。

1)topic持久化之生产者

public class JmsProduce_persistence {    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";    public static final String TOPIC_NAME = "MQ-topic-persistence";    public static void main(String[] args) throws JMSException {        //1.创立连接工厂,按照给定的url地址,采用默认的客户名和密码        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        //2.通过连接工厂,取得connection连接        Connection connection = activeMQConnectionFactory.createConnection();        //3.创立会话session        //两个参数,第一个叫事务/第二个叫签收        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //4.创立目的地(具体是对立还是主题topic)        Topic topic = session.createTopic(TOPIC_NAME);        //5.创立消息的生产者        MessageProducer messageProducer = session.createProducer(topic);        //设置持久化topic        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);        connection.start();        //通过使用messageProducer        for(int i = 1; i<=4;i++){            //7.创立消息            TextMessage textMessage = session.createTextMessage("TOPIC_NAME---"+i);            //8.通过messageProducer发送给mq            messageProducer.send(textMessage);        }        //9.关闭资源        messageProducer.close();        session.close();        connection.close();        System.out.println("****TOPIC_NAME消息发送到MQ完成");    }

大家可以比照一下生产者topic持久化和非持久化的代码。
区别就是把start()方法放到了创立生产者对象之后,并且在start()之前手动设置了持久化。

2)topic持久化之消费者

public class JmsConsumer_persistence {    public static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";    public static final String TOPIC_NAME = "MQ-topic-persistence";    public static void main(String[] args) throws JMSException, IOException {        System.out.println("***我是一号消费者");        //1.创立连接工厂,按照给定的url地址,采用默认的客户名和密码        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);        //2.通过连接工厂,取得connection连接        Connection connection = activeMQConnectionFactory.createConnection();        //设置用户端ID,向MQ服务器注册自己的名称        connection.setClientID("merry");        //3.创立会话session        //两个参数,第一个叫事务/第二个叫签收        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //4.创立目的地(具体是对立还是主题topic)        Topic topic = session.createTopic(TOPIC_NAME);        //创立一个topic订阅者对象。一参是topic,二参事订阅者名称        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");        connection.start();        Message message = topicSubscriber.receive();        while (null != message){            TextMessage textMessage = (TextMessage) message;            System.out.println("收到的持久化 topic:"+textMessage.getText());            message = topicSubscriber.receive();        }        session.close();        connection.close();    }

topic消费者持久化需要设置一个注册名,要使用订阅对象TopicSubscriber。

3)测试topic持久化

①肯定要先运行一次消费者,等于向MQ注册,相似我订阅了这个topic。(最好是启动两个消费者,注册好后关闭一个,生产者发送消息后再启动,这样更直观的表现出持久化)
②而后再运行生产者发送消息
③之后无论消费者能否在线,都会收到消息。假如不在线的话,下次连接的时候,就会接受到消息。

image.png

3.demo 代码地址

github: Code-Gao/ActiveMQDemo

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » ActiveMQ(四)——持久化

发表回复