RabbitMQ实时获取队列级别生产和消费应用

作者 : 开心源码 本文共2029个字,预计阅读时间需要6分钟 发布时间: 2022-05-12 共168人阅读

背景

生产中有这么一种场景:业务方会问某一队列当前有哪些应用在发送或者消费

方式1:人工通过RMQ插件Management管理端,只能看Channels、Consumers信息,很难从中大量的信息中提取某一队列对应的生产者和消费者相关信息

方式2:通过监控比方Cat看当前该队列MQ相关打点,可行但很容易漏掉。许多Task类型的应用不是时刻都有MQ打点

以上两种方式均很难满足业务要求。

思路

Producer,Consumer 通过 Channel 来和 Broker通信,而Channel则是复用Socket连接。

在Broker实现中,Channel进程(发送或者者消费)都会监视(monitor)队列进程,如下图:

通过rabbitmq_top插件相关板块获取Channel进程信息相似如下:

因而,假如我们能够获取该队列对应的发送和消费Channels进程信息,即可以提取出发送/消费应用的IP。

步骤

1、通过获取队列进程 Pid 的monitored_by信息,并利用rabbitmq_top插件相关板块从中提取出类型为rabbit_channel的进程 Chs:

{monitored_by,Res} = rpc:call(node(Pid), erlang, process_info, [Pid, monitored_by]),Chs = lists:foldl(fun(E,Acc) -> case lists:keyfind(type,1,rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of                                         {type,rabbit_channel} -> [E|Acc];                                        _ -> Acc end end, [], Res).

2、通过rabbitmq_management插件相关板块方法获取队列进程对应的消费者Channel进程 ConsumerChs:

Consumers4VHost = rpc:call(Node, erlang, apply, [fun() -> rabbit_mgmt_db:get_all_consumers(list_to_binary(Vhost)) end,[]]),QName = Q#amqqueue.name#resource.name,Consumers4Q = lists:filter(fun(T) -> case lists:keyfind(queue, 1, T) of                                                 {queue,QRes} -> case lists:keyfind(name, 1, QRes)  of                                                                      {name,QName} -> true;                                                                      _ -> false  end;                                                  _ -> false  end end, Consumers4VHost),ConsumerChs = lists:foldl(fun(E,Acc) -> case lists:keyfind(channel_pid, 1, E) of                                                {channel_pid,Ch} -> [Ch|Acc];                                                _ -> Acc end end, [], Consumers4Q).

3、生产者Channel进程 ProducerChs:

ProducerChs = Chs -- ConsumerChs.

4、这样我们获取了队列进程对应的生产者和消费者Channel进程列表,接下来从Channel进程信息中提取用户端IP即可以了:

lists:foldl(fun(E,Acc) -> case lists:keyfind(connection_name, 1, rpc:call(node(E), rabbit_top_util, obtain_name, [E])) of                                   {connection_name,Conn} ->                                       ConnStr = binary_to_list(Conn),                                      Ip = string:substr(ConnStr, 1, string:chr(ConnStr, $:) - 1),                                      [Ip|Acc];                                  _ -> Acc  end  end, [], ChsList).

最后通过去重上面的IP列表集合,得到该队列对应的生产者和消费者IP信息,通过调用相关接口反查IP对应的应用名就可。

相关拓展:

1、如何获取队列进程Pid?通过获取队列amqquque记录:

QRes = rabbit_misc:r(list_to_binary(VHost), queue, list_to_binary(Q)),[Queue] = rpc:call(Node, rabbit_amqqueue, lookup, [[QRes]]).

2、如何调试上述功能?
利用Erlang shell,确保集群节点已启用management、top插件

3、生产环境实践:
Escript方式运行脚本;比较好的方法是以Erlang Web Server的方式运行,并提供HTTP接口

4、其余场景
僵死队列问题、队列进程(热迁移)负载均衡、获取队列进程、delegate进程关键指标(如邮箱大小、占用内存)等,都可以利用Broker/Plugin既有板块代码进行处理,前提需要熟习服务端相关板块代码,多进行调试。

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » RabbitMQ实时获取队列级别生产和消费应用

发表回复