消息中间件——RabbitMQ整合Spring AMQP实战!

作者 : 开心源码 本文共3897个字,预计阅读时间需要10分钟 发布时间: 2022-05-13 共160人阅读

推荐阅读:Java 面试如何坐等 offer?

1. AMQP 核心组件

① RabbitAdmin

② SpringAMQP公告

③ RabbitTemplate

④ SimpleMessageListenerContainer

⑤ MessageListenerAdapter

⑥ MessageConverter

2. RabbitAdmin

RabbitAdmin类可以很好的才注意RabbitMQ,在Spring中直接进行诸如就可。

注意:

① autoStartUp必需要设置为true,否则Spring容器不会加载RabbitAdmin类

② RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean公告

③ 使用RabbitTemplate的execute方法执行对应的什么、修改、删除等一系列RabbitMQ基础功能操作

④ 例如:增加一个交换机、删除一个绑定、清空一个队列里的消息等等

2.1 代码演示

2.1.1 引入Pom文件

2.1.2 配置Bean

2.1.3 测试类

通过以上代码,可以自行测试一下结果。

2.2?RabbitAdmin源码

实现了InitializingBean接口,表明在Bean配置加载完后再加载RabbitAdmin配置。找到afterPropertiesSet()方法中最要的initialize()初始化方法。

可以看到Exchange、Queue、Binding都是从Spring容器中获取三种类型,加载到上方定义的contextExchanges、contextQueues、contextBindings三种容器中。

后续的源码中,也可以看出通过挑选Spring容器中RabbitMQ的信息之后,再去建立RabbitMQ服务器的连接。主要通过Spring以@Bean的方式,将配置加载到Spring容器之后,再从容器中获取相关信息,再去建立连接。

3. SpringAMQP公告

在Rabbit基础API里面公告一个Exchange、公告一个绑定、一个队列

-使用SpringAMQP去公告,就需要使用SpringAMQP的如下模式,即公告@Bean方式

3.1 代码演示

再次运行ApplicationTests类中testAdmin()方法,可以在控制台中,查看到一个Exchange绑定两个Queue。

4. RabbitTemplate

RabbitTemplate,即消息模板

① 我们在与SpringAMQP整合的时候进行发送消息的关键词

② 该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进行注入到Spring容器中,而后直接使用

③ 在与SPring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里增加配置就可

4.1 代码演示

4.1.1 RabbitMQConfig类

在RabbitMQConfig类中写RabbitTemplate配置

4.1.2 ApplicationTests类

在ApplicationTests测试类中增加测试方法,进行测试。

4.1.3 查看管控台

运行前,可以看到queue001中是没有消息的。

运行testSendMessage()方法。并获取消息。

4.1.4 简单写法

我们往topic001中发送了两条消息,topic002中发送了一条消息。运行testSendMessage2() 接下来再查看下管控台。

可以看到topic001中已经有了三条消息,刚才发送的消息也还在。GetMessage并不是消费消息,而只是获取消息。

5. SimpleMessageListenerContainer

简单消息监听容器

① 这个类非常的强大,我们可以对它进行很多设置,对于消费者的配置项,这个类都可以满足

② 监听队列(多个队列)、自动启动、自动公告功能

③ 设置事务特性、事务管理器、事务属性、事务容器(并发)、能否开启事务、回滚消息等

④ 设置消费者数量、最小最大数量、批量消费

⑤ 设置消息确认和自动确认模式、能否重回队列、异常捕捉handler函数

⑥ 设置消费者标签生成策略、能否独占模式、消费者属性等

⑦ 设置具体的监听器、消息转换器等等。

注意:

SimpleMessageListenerContainer可以进行动态设置,比方在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等

很多机遇RabbitMQ的自制定话后台管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大

思考

SimpleMessageListenerContainer为什么可以动态感知配置变更?

5.1 代码演示

5.1.1 RabbitMQConfig类

配置中增加如下代码:

运行之前写的testSendMessage2()方法,查看管控台中的相关信息以及控制台打印信息

6. MessageListenerAdapter

MessageListenerAdapter 即消息监听适配器

6.1 代码演示

6.1.1 适配器使用方式1

我们把之前的消息监听代码注释,可以不用直接加消息监听,而是采用MessageListenerAdapter的方式,通过适配器方式1,我们来学习下如何使用默认的handleMessage,自己设置方法名,自己设置转换器。

?默认handleMessage

MessageListenerAdapter 适配器类,熟习适配器模式的朋友一定理解适配器模式的话,可以通过适配器,适配自己的实现,这里我们适配自己设置的MessageDelegate类。我们即可以不采用监听的方式,采用适配的方式。

?自己设置MessageDelegate

MessageDelegate类中,方法名与参数handleMessage(byte[] messageBody)是固定的。为什么呢?

?MessageListenerAdapter源码分析

我们来看下MessageListenerAdapter底层代码

MessageListenerAdapter类中

默认方法名就是叫handleMessage。当然也可以自己去指定设置。通过messageListenerAdapter的代码我们可以看出如下核心属性

① defaultListenerMethod默认监听方法名称:用于设置监听方法名称

② Delegate 委托对象:实际真实的委托对象,用于解决消息

③ queueOrTagToMethodName 队列标识与方法名称组成集合

④ 可以逐个进行队列与方法名称的匹配

⑤ 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受解决

测试一下默认使用的handleMessage方法。启动ApplicationTests类,运行testSendMessage()测试方法。

自己设置方法名

修改MessageDelegate()类

自己设置TextMessageConverter转换器

修改RabbitMQConfig类

修改MessageDelegate类

运行testSendMessage4Text()测试方法

注意:在发消息的时候,必需符合自己的转换器。

打印结果

6.1.2 适配器使用方式2

自己设置队列名称和方法名称。

运行 测试方法

运行结果:

7. MessageConverter消息转换器

我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,假如希望内部帮我们进行转换,或者者指定自己设置的转换器,就需要用到MessageConverter

① 自己设置常用转换器:MessageConverter,一般来讲都需要实现这个接口

②?写下面两个方法:toMessage:java对象转换为Message,fromMessage:Message对象转换为java对象

③ Json转换器:Jackson2JsonMessageConverter:可以进行Java对象的转换功能

④ DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系

⑤ 自己设置二进制转换器:比方图片类型、PDF、PPT、流媒体

7.1 代码演示

其实我们在详情MessageListenerAdapter的时候,中间就详情到了TextMessageConverter转换器,将二进制数据转换成字符串数据。

7.1.1 增加json格式的转换器

修改RabbitMQConfig类

修改MessageDelegate

定义一个Order对象

定义测试方法

打印结果:

7.1.2 增加支持Java对象转换

修改RabbitMQConfig类

修改MessageDelegate

定义测试方法

打印结果:

7.1.3 增加支持java对象多映射转换

修改RabbitMQConfig类

修改MessageDelegate

定义一个Packaged对象

定义测试方法

打印结果:

在通过单元测试运行testSendMappingMessage()方法时会存在一个问题:委派对象MessageDelegate可能会收不到对象。

由于单元测试spring容器在运行完毕之后就中止,不会等到消费者消费完消息之后再中止,所以需要通过正常启动springboot项目,可以看到正常消费消息。

7.1.4 增加全局转换器

修改RabbitMQConfig类

修改MessageDelegate

增加PDFMessageConverter

增加ImageMessageConverter

定义测试方法

可以自己测试下图片和pdf的保存。

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 消息中间件——RabbitMQ整合Spring AMQP实战!

发表回复