了解RabbitMQ中的AMQP-0-9-1模型
前提
之前有个打算在学习RabbitMQ之前,把AMQP详细阅读一次,挑出里面的重点内容。后来找了下RabbitMQ的官方文档,发现了有一篇文档专门详情了RabbitMQ中实现的AMQP模型部分,于是直接基于此文档和个人了解写下这篇文章。
AMQP协议
AMQP全称是Advanced Message Queuing Protocol,它是一个(分布式)消息传递协议,使用和符合此协议的用户端能够基于使用和符合此协议的消息传递中间件代理商(Broker,也就是经纪人,个人感觉叫代理商合口少量)进行通信。AMQP目前已经推出协议1.0,实现此协议的比较知名的产品有StormMQ、RabbitMQ、Apache Qpid等。RabbitMQ实现的AMQP版本是0.9.1,官方文档中也提供了该协议pdf文本下载,有兴趣可以翻阅一下。
消息中间件代理商的职责
Messaging Broker,这里称为消息中间件代理商。它的职责是从发布者(Publisher,或者者有些时候称为Producer,生产者)接收消息,而后把消息路由到消费者(Consumer,或者者有些时候称为Listener,监听者)。
由于消息中间件代理商、发布者用户端和消费者用户端都是基于AMQP这一网络消息协议,所以消息中间件代理商、发布者用户端和消费者用户端可以在不同的机器上,从而实现分布式通讯和服务解耦。
消息中间件代理商不仅仅提供了消息接收和消息路由这两个基本功能,还有其余高级的特性如消息持久化功能、监控功能等等。
AMQP-0-9-1在RabbitMQ中的基本模型
AMQP-0-9-1模型的基本视图是:消息发布者消息发布到交换器(Exchange)中,交换器的角色有点相似于日常见到的邮局或者者信箱。而后,交换器把消息的副本分发到队列(Queue)中,分发消息的时候遵循的规则叫做绑定(Binding)。接着,消息中间件代理商向订阅队列的消费者发送消息(push模式),或者者消费者也可以主动从队列中拉取消息(fetch/pull模式)。

发布者在发布消息的时候可以指定消息属性(消息元数据),某些消息元数据可能由消息中间件代理商使用,其余消息元数据对于消息中间件代理商而言是不透明的,仅供消息消费者使用。
因为网络是不可靠的,用户端可能无法接收消息或者者解决消息失败,这个时候消息中间件代理商无法感知消息能否正确传递到消费者中,因而AMQP模型提供了消息确认(Message Acknowledgement)的概念:当消息传递到消费者,消费者可以自动向消息中间件代理商确认消息已经接收成功或者者由应用程序开发者选择手动确认消息已经接收成功并且向消息中间件代理商确认消息,消息中间件代理商只有在接收到该消息(或者者消息组)确实认通知后才会从队列中完全删除该消息。
在某些情况下,交换器无法正确路由到队列中,那么该消息就会返回给发布者,或者者丢弃,或者者假如消息中间件代理商实现了”死信队列(Dead Letter Queue)”扩展,消息会被放置到死信队列中。消息发布者可以选择使用对应的参数控制路由失败的解决策略。
交换器和交换器类型
交互器(Exchange)是消息发送的第一站目的地,它的作用就是就收消息并且将其路由到零个或者者多个队列。路由消息的算法取决于交互器的类型和路由规则(也就是Binding)。RabbitMQ消息中间件代理商支持四种类型的交互器,分别是:
| 交换器类型 | Broker默认预公告的交换器 |
|---|---|
| Direct | (空字符串[(AMQP default)])和amq.direct |
| Fanout | amq.fanout |
| Topic | amq.topic |
| Headers | amq.match (和RabbitMQ中的amq.headers) |
公告交互器的时候需要提供少量列的属性,其中比较重要的属性如下:
- Name:交互器的名称。
- Type:交换器的类型。
- Durability:(交换器)持久化特性,假如启动此特性,则Broker重启后交换器仍然存在,否则交换器会被删除。
- Auto-delete:能否自动删除,假如启用此特性,当最后一个队列解除与交换器的绑定关系,交换器会被删除。
- Arguments:可选参数,一般配合插件或者者Broker的特性使用。
之所以存在Durability和Auto-delete特性是由于并发所有的场景和用例都要求交互器是持久化的。
Direct交换器
Direct类型的交换器基于消息路由键(RoutingKey)把消息传递到队列中。Direct交换器是消息单播路由的理想实现(当然,用于多播路由也可以),它的工作原理如下:
- 队列使用路由键K绑定到交换器。
- 当具备路由键R的新消息到达交换器的时候,假如K = R,那么交换器会把消息传递到队列中。

默认交换器
默认交换器(Default Exchange)是一种特殊的Direct交互器,它的名称是空字符串(也就是””),它由消息中间件代理商预公告,在RabbitMQ Broker中,它在Web管理界面中的名称是(AMQP default)。每个新创立的队列都会绑定到默认交换器,路由键就是该队列的队列名,也就是所有的队列都可以通过默认交换器进行消息投递,只要要指定路由键为相应的队列名就可。

Fanout交换器
Fanout其实是一个组合单词,fan也就是扇形,out就是向外发散的意思,Fanout交换器可以想象为”扇形”交换器。Fanout交换器会忽略路由键,它会路由消息到所有绑定到它的队列。也就是说,假如有N个队列绑定到一个Fanout交换器,当一个新的消息发布到该Fanout交换器,那么这条新消息的一个副本会分发到这N个队列中。Fanout交换器是消息广播路由的理想实现。

Topic交换器
Topic交换器基于路由键和绑定队列和交换器的模式进行匹配从而把消息路由到一个或者者多个队列。绑定队列和交换器的Topic模式(这个模式串其实就是公告绑定时候的路由键,和消息发布的路由键并非同一个)一般使用点号(dot,也就是’.’)分隔,例如source.target.key,绑定模式支持通配符:
- 符号’#’匹配一个或者者多个词,例如:
source.target.#可以匹配source.target.doge、source.target.doge.throwable等等。 - 符号’*’只能匹配一个词,例如:
source.target.*可以匹配source.target.doge、source.target.throwable等等。
对每一条消息,Topic交换器会遍历所有的绑定关系,检查消息指定的路由键能否匹配绑定关系中的路由键,假如匹配,则将消息推送到相应队列。

Topic交换器是消息多播路由的理想实现。
Headers交换器
Headers交换器是一种不常用的交换器,它使用多个属性进行路由,这些属性一般称为消息头,它不使用路由键进行消息路由。消息头(Message Headers)是消息属性(消息元数据)部分,因而,使用Headers交换器在建立队列和交换器的绑定关系的时候需要指定一组键值对,发送消息到Headers交换器时候,需要在消息属性中携带一组键值对作为消息头。消息头属性支持匹配规则x-match如下:
- x-match = all:表示所有的键值对都匹配才能接受到消息。
- x-match = any:表示只需存在键值对匹配就能接受到消息。
Headers交换器也是忽略路由键的,只依赖于消息属性中的消息头进行消息路由。

队列
AMQP 0-9-1模型中的队列与其余消息或者者任务队列系统中的队列非常类似:它们存储应用程序所使用的消息。队列和交换器的基本属性有相似的地方:
- Name:队列名称。
- Durable:能否持久化,开启持久化意味着消息中间件代理商重启后队列仍然存在,否则队列会被删除。
- Exclusive:能否独占的,开启队列独占特性意味着队列只能被一个连接使用并且连接关闭之后队列会被删除。
- Auto-delete:能否自动删除,开启自动删除特性意味着队列至少有一个消费者并且最后一个消费者解除订阅状态(一般是消费者对应的通道关闭)后队列会自动删除。
- Arguments:队列参数,一般和消息中间件代理商或者者插件的特性相关,如消息的过期时间(Message TTL)和队列长度等。
一个队列只有被公告(Declare)了才能使用,也就是队列的第一次公告就是队列的创立操作(由于第一次公告的时候队列并不存在)。假如使用相同的参数再次公告已经存在的队列,那么此次公告会不生效(当然也不会出现异常)。但是假如使用不相同的参数再次公告已经存在的队列,那么会抛出通道级别的异常,异常代码是406(PRECONDITION_FAILED)。
队列名称
队列名必需由255字节(bytes)长度以内的UTF-8编码字符组成。实现AMQP 0-9-1规范的消息中间件代理商具有自动生成随机队列名的功能,也就是在公告队列的时候,队列名指定为空字符串,那么消息中间件代理商会自动生成一个队列名,并且在队列公告的返回结果中带上对应的队列名。
以”amq.”开头的队列是由消息中间件代理商内部生成的,有其特殊的作用,因而不能公告此类名称的新队列,否则会导致通道级别的异常,异常代码为403(ACCESS_REFUSED)。
队列的持久化特性
持久化的队列会持久化到磁盘中,这种队列在消息中间件代理商重启后不会被删除。不开启持久化特性的队列称为瞬时(transient)队列,并非所有的场景都需要开启队列的持久化特性。
队列的持久化特性并不意味着路由到它上面的消息是持久化的,也就是队列的持久化跟消息的持久化是两回事。假如息中间件代理商挂了,它重启后会重新公告开启了持久化特性的队列,这些队列中只有使用了消息持久化特性的消息会被恢复。
绑定
绑定(Binding)是交换器路由消息到队列的规则。例如交换器E可以路由消息到队列Q,那么Q必需通过肯定的规则绑定到E。绑定中使用的某些交换器的类型决定了它可以使用可选的路由键(RoutingKey)。路由键的作用相似于过滤器,可以挑选某些发布到交换器的消息路由到目标队列。
假如发布的消息没有路由到任意一个目标队列,例如,消息已经发布到交换器,交换器中没有任何绑定,这个时候消息会被丢弃或者者返回给发布者,取决于消息发布者发布消息时候使用的参数。
消费者
假如队列只有发布者生产消息,那么是没有意义的,必需有消费者对消息进行使用,或者者叫这个操作为消息消费,消息消费的方式有两种:
- 消息代理商中间件向消费者推送消息(推模式,代表方法是
basic.consume)。 - 消费者主动向消息代理商中间件拉取消息(拉模式,代表方法是
basic.get)。
使用推模式的情况下,消费者必需指定需要订阅的队列。每个队列可以存在多个消费者,或者者仅仅注册一个独占的消费者。
每个消费者(订阅者)都有一个称为消费者标签(consumer tag)的标识符,消费者标签是一个字符串。通过消费者标签可以实现取消订阅的操作。
消息确认
消费者应用程序有可能在接收和解决消息的时候崩溃,也有可能由于网络起因导致消息中间件代理商投递消息到消费者的时候失败了,这样就会催生一个问题:AMQP消息中间件代理商应该在什么时候从队列中删除消息?因而,AMQP 0-9-1规范提供了两种选择:
- 消息中间件代理商向应用程序发送消息(使用AMQP方法
basic.deliver或者basic.get-ok)。 - 应用程序收到消息后向消息中间件代理商发送确认(使用AMQP方法
basic.ack<= 个人感觉这个地方少写了basic.nack和basic.reject)
前一种称为自动确认模型(动作触发的同时进行了消息确认),后一种称为显式确认模型。显式确认模型中,需要消费者主动向消息中间件代理商进行消息主动确认,这个消息主动确认动作的执行时机完全由应用程序控制。消息主动确认有三种方式:积极确认(ack)、消极确认(nack)和拒绝(reject)。
预取消息
预取消息(Prefetching Messages)是一个特性。对于多个消费者共享同一个队列的情况,能够告知消息中间件代理商在发送下一个确认之前指定每个消费者一次可以接收消息的消息量。这个特性可以了解为简单的负载均衡技术,在批量发布消息的场景下能够提高吞吐量。
消息属性和有效负载
AMQP模型中,消息具备属性值。AMQP 0-9-1规范定义了少量常见的属性,一般开发人员不需要太关注这些属性:
- Content type
- Content encoding
- Routing key
- Delivery mode (persistent or not)
- Message priority
- Message publishing timestamp
- Expiration period
- Publisher application id
这些通用的属性一般是消息中间件代理商使用的,还有可以定制的可选属性header,形式是键值对,相似于HTTP中的请求头。消息属性是在发布消息的时候设置的。
AMQP消息还有一个有效载荷(payload,其实就是消息数据体),AMQP代理商将其视为不透明的字节数组,也就是AMQP代理商不会检查或者者修改消息的有效载荷。有些消息可能只包含属性而没有有效负载。通常使用序列化格式(如JSON,Thrift,Protocol Buffers和MessagePack)来序列化和结构化数据,以便将其作为消息有效负载发布。在一般商定下,消息属性中的Content type和Content encoding一般可以表明其序列化的方式。
消息发布支持消息的持久化特性,消息持久化特性开启后,消息中间件代理商会把消息保存到磁盘中,假如重启代理商消息也不会丢失。开启消息持久化特性将会影响性能,主要是由于涉及到刷盘操作。
AMQP-0-9-1方法
AMQP 0-9-1定义了少量方法,对应了用户端和消息中间件代理商之间交互的少量操作方法,这些操作方法的设计跟面向对象编程语言中的方法没有任何共同之处。常用的交换器相关的操作方法有:
- exchange.declare
- exchange.declare-OK
- exchange.delete
- exchange.delete-OK
在逻辑上,上面几个操作方法在用户端和消息中间件代理商之间的交互如下:

对于队列,也有相似的操作方法:
- queue.declare
- queue.declare-OK
- queue.delete
- queue.delete-OK

并非所有的AMQP操作方法都有响应结果操作方法,像消息发布方法basic.publish的使用是最广泛的,此操作方法没有对应的响应结果操作方法。有些操作方法可能有多个响应结果(操作方法),例如basic.get。
连接(Connection)
AMQP的连接(Connection)通常是长期存在的。AMQP是一种使用TCP进行可靠传递的应用程序级协议。AMQP连接使用客户身份验证,可以使用TLS(SSL)进行保护。当应用程序不再需要连接到AMQP代理商时,它应该正常关闭AMQP连接,而不是忽然关闭底层TCP连接。
通道(Channel)
某些应用程序需要与AMQP代理商程序建立多个连接。但是,不希望同时打开许多TCP连接,由于这样做会消耗系统资源并使配置防火墙变得十分困难。通道(Channel)可以认为是”共享一个单独的TCP连接的轻量级连接”,一个AMQP连接可以拥有多个通道。
对于使用了多线程解决的应用程序,有一种使用场景十分普遍:每个线程开启一个新的通道使用,这些通道是线程间隔离的。
另外,每个特定的通道和其余通道是相互隔离的,每个执行的AMQP操作方法(包括响应)都携带一个通道的唯一标识,这样用户端就能通过该通道的唯一标识得知操作方法是对应哪个通道发生的。
虚拟主机(Virtual Host)
为了使单个消息中间件代理商可以托管多个完全隔离的”环境”(这里的隔离指的是客户组、交互器、队列等),AMQP提供了虚拟主机(Virtual Host)的概念。多个虚拟主机相似于许多主流的Web服务器的虚拟主机,提供了AMQP组件完全隔离的环境。AMQP用户端可以在连接消息中间件代理商时指定需要连接的虚拟主机。
个人了解
关于Exchange、Queue和Binding
了解RabbitMQ中的AMQP模型,其实从开发者的角度来看,最重要的是Exchange、Queue、Binding三者的关系,这里谈谈个人的见地。消息的发布第一站总是Exchange,从模型上看,消息发布无法直接发送到队列中。Exchange本身不存储消息,它在接收到消息之后,会基于路由规则也就是Binding,把消息路由到目标Queue中。从实际操作来看,公告路由规则总是在发布消息和消费消息之前,也就是一般步骤如下:
- 1、公告Exchange。
- 2、公告Queue。
- 3、基于Exchange和Queue公告Binding,这个过程有可能自己设置一个RoutingKey。
- 4、通过Exchange消息发布,这个过程有可能使用到上一步定义的RoutingKey。
- 5、通过Queue消费消息。
我们最关注的两个阶段,消息发布和消息消费中,消息发布实际上只跟Exchange有关,而消息消费实际上只跟Queue有关。Binding实际上就是Exchange和Queue的契约关系,会直接影响消息发布阶段的消息路由。那么,路由失败一般是什么情况导致的?路由失败,其实就是消息已经发布到Exchange,而Exchange中从既有的Binding中无法找到存在的目标Queue用于传递消息副本(一般而言,很少人会发送消息到一个不存在的Exchange)。消息路由失败,从了解AMQP的模型来看,可以从根本上避免的,除非是消息发布者成心胡乱使用或者者人为错误使用了未存在的RoutingKey、Exchange或者者说是Binding关系而导致的。
关于Exchange的类型
AMQP-0-9-1模型中支持了四种交换器direct(单播)、fanout(广播)、topic(多播)、headers,实际上,从使用者角度来看,四种交换器的功能是可以相互取代的。例如可以使用fanout类型交换器实现广播,其实使用direct类型交换器也是可以实现广播的,只是对应的direct类型交换器需要通过多个路由键绑定到多个目标队列中。在面对生产环境的技术选型的时候,我们需要考虑性能、维护难度、正当性等角度去考虑选择什么类型的交换器,就上面的广播消息的例子,显然使用fanout类型交换器可以避免公告多个绑定关系,这样在性能、正当性上是更优的选择。
关于负载均衡
在AMQP-0-9-1模型中,负载均衡的实现是基于消费者而不是基于队列(精确来说应该是消息传递到队列的方式)。实际上,出现消息生产速度大大超过消费者的消费速度的时候,队列中有可能会出现消息积压。AMQP-0-9-1模型中没有提供基于队列负载均衡的特性,也就是出现消息生产速度大大超过消费者的消费速度时候,并不会把消息路由到多个队列中,而是通过预取消息(Prefetching Messages)的特性,确定消息者的消费能力,从而调整消息中间件代理商推送消息到对应消费者的数量,这样就能够实现消费速度快的消费者能够消费更多的消息,减少产生有消费者处于饥饿状态和有消费者长期处于忙碌状态的问题。
关于消息确认机制
AMQP中提供的消息确认机制主要包括积极确认(一般叫ack,Acknowledgement)、消极确认(一般叫nack,Negative Acknowledgement)和拒绝(reject)。消息确认机制是保证消息不丢失的重要措施,当消费者接收到消息中间件代理商推送的消息时候,需要主动通知消息中间件代理商消息已经确认投递成功,而后消息中间件代理商才会从队列中删除对应的消息。没有主动确认的消息就会变为”nack”状态,可以想象为暂存在队列的”nack区”中,这些消息不会投递到消费者,直到消费者重启后,”nack区”中的消息会重新变为”ready”状态,可以重新投递给消费者。
来源:https://www.cnblogs.com/throwable/p/12275645.html?utm_source=tuicool&utm_medium=referral
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 了解RabbitMQ中的AMQP-0-9-1模型