自己动手撸一个分布式IM(即时通讯) 系统

作者 : 开心源码 本文共3986个字,预计阅读时间需要10分钟 发布时间: 2022-05-12 共200人阅读

项目详情:CIM(CROSS-IM)?一款面向开发者的?IM(即时通讯)系统,同时提供了少量组件帮助开发者构建一款属于自己可水平扩展的?IM?。

借助?CIM?你可以实现以下需求:

IM?即时通讯系统。

适用于?App?的消息推送中间件。

IOT?海量连接场景中的消息透传中间件。

架构设计

下面来看看具体的架构设计:

CIM?中的各个组件均采用?Spring Boot?构建。

采用?Netty + Google Protocol Buffer?构建底层通信。

Redis?存放各个用户端的路由信息、账号信息、在线状态等。

Zookeeper?用于?IM-server?服务的注册与发现。

整体主要由以下板块组成:

cim-server,IM?服务端:用于接收?Client?连接、消息透传、消息推送等功能。支持集群部署。

cim-forward-route,消息路由服务器:用于解决消息路由、消息转发、客户登录、客户下线以及少量经营工具(获取在线客户数等)。

cim-client,IM?用户端:给客户使用的消息终端,一个命令就可启动并向其余人发起通讯(群聊、私聊);同时内置了少量常用命令方便使用。

流程图

整体的流程也比较简单,流程图如下:

用户端向?Route?发起登录。

登录成功从?Zookeeper?中选择可用 im-server?返回给用户端,并保存登录、路由信息到?Redis。

用户端向 im-server?发起长连接,成功后保持心跳。

用户端下线时通过?Route?清理状态信息。

所以当我们自己部署时需要以下步骤:

搭建基础中间件?Redis、Zookeeper。

部署?cim-server,这是真正的 IM 服务器,为了满足性能需求所以支持水平扩展,只要要注册到同一个?Zookeeper?就可。

部署?cim-forward-route,这是路由服务器,所有的消息都需要经过它。因为它是无状态的,所以也可以利用?Nginx?代理商提高可用性。

cim-client?真正面向客户的用户端;启动之后会自动连接 IM 服务器便可以在控制台收发消息了。

更多使用详情可以参考快速启动。

详细设计

接下来重点看看具体的实现,比方群聊、私聊消息如何流转;IM 服务端负载均衡;服务如何注册发现等等。

IM 服务端

先来看看服务端;主要是实现用户端上下线、消息下发等功能。

首先是服务启动:

因为是在?Spring Boot?中搭建的,所以在应用启动时需要启动?Netty?服务。

从?Pipline?中可以看出使用了?Protobuf?的编解码(具体报文在用户端中分析)。

注册发现

需要满足?IM?服务端的水平扩展需求,所以?cim-server?是需要将自身数据发布到注册中心的。

所以在应用启动成功后需要将自身数据注册到?Zookeeper?中。

最主要的目的就是将当前应用的?ip + cim-server-port+ http-port?注册上去。

上图是我在演示环境中注册的两个?cim-server?实例(因为在一台服务器,所以只是端口不同)。

这样在用户端(监听这个?Zookeeper?节点)就能实时的知道目前可用的服务信息。

登录

当用户端请求?cim-forward-route?中的登录接口(详见下文)做完业务验证(就相当于日常登录其余网站一样)之后,用户端会向服务端发起一个长连接,如之前的流程所示:

这时用户端会发送一个特殊报文,表明当前是登录信息。服务端收到后就需要将该用户端的?userID?和当前?Channel?通道关系保存起来。

同时也缓存了客户的信息,也就是?userID?和客户名。

离线

当用户端断线后也需要将刚才缓存的信息清理掉。

同时也需要调用?Route?接口清理相关信息(具体接口看下文)。

IM 路由

从架构图中可以看出,路由层是非常重要的一环;它提供了一系列的?HTTP?服务承接了用户端和服务端。目前主要是以下几个接口:

①注册接口

因为每一个用户端都是需要登录才能使用的,所以第一步自然是注册。

这里就设计的比较简单,直接利用?Redis?来存储客户信息;客户信息也只有?ID?和?userName?而已。

只是为了方便查询在?Redis?中的?KV?又反过来存储了一份?VK,这样?ID?和 userName?都必需唯一。

②登录接口

这里的登录和?cim-server?中的登录不一样,具备业务性质:

登录成功之后需要判断能否是重复登录(一个客户只能运行一个用户端)。

登录成功后需要从?Zookeeper?中获取服务列表(cim-server)并根据某种算法选择一台服务返回给用户端。

登录成功之后还需要保存路由信息,也就是当前客户分配的服务实例保存到?Redis?中。

为了实现只能一个客户登录,使用了?Redis?中的?Set?来保存登录信息;利用 userID?作为?Key?,重复的登录就会写入失败。

相似于 Java 中的 HashSet,只能去重保存。

获取一台可用的路由实例也比较简单:

先从?Zookeeper?获取所有的服务实例做一个内部缓存。

轮询选择一台服务器(目前只有这一种算法,后续会新添加)。

当然要获取?Zookeeper?中的服务实例前,自然是需要监听?cim-server?之前注册上去的那个节点。

具体代码如下:

也是在应用启动之后监听?Zookeeper?中的路由节点,一旦发生变化就会升级内部缓存。

这里使用的是 Guava 的 Cache,它基于 Concurrent HashMap,所以可以保证清理、新添加缓存的原子性。

③群聊接口

这是一个真正发消息的接口,实现的效果就是其中一个用户端发消息,其他所有用户端都能收到!

流程一定是用户端发送一条消息到服务端,服务端收到后在上文详情的?SessionSocketHolder?中遍历所有?Channel(通道)而后下发消息就可。

服务端是单机倒也可以,但现在是集群设计。所以所有的用户端会根据之前的轮询算法分配到不同的?cim-server?实例中。

因而就需要路由层来发挥作用了:

路由接口收到消息后首先遍历出所有的用户端和服务实例的关系。路由关系在?Redis?中的存放如下:

因为?Redis?单线程的特质,当数据量大时;一旦使用 Keys 匹配所有?cim-route:*?数据,会导致 Redis 不能解决其余请求。

所以这里改为使用 Scan 命令来遍历所有的?cim-route:*。接着会挨个调用每个用户端所在的服务端的?HTTP?接口用于推送消息。

在?cim-server?中的实现如下:

cim-server?收到消息后会在内部缓存中查询该 userID 的通道,接着只要要发消息就可。

④在线客户接口

这是一个辅助接口,可以查询出当前在线客户信息。

实现也很简单,也就是查询之前保存 ”客户登录状态的那个去重?set?“就可。

⑤私聊接口

之所以说获取在线客户是一个辅助接口,其实就是用于辅助私聊使用的。

一般我们使用私聊的前提一定得知道当前哪些客户在线,接着你才会知道你要和谁进行私聊。

相似于这样:

在我们这个场景中,私聊的前提就是需要取得在线客户的?userID。

所以私聊接口在收到消息后需要查询到接收者所在的?cim-server?实例信息,后续的步骤就和群聊一致了。调用接收者所在实例的?HTTP?接口下发信息。

只是群聊是遍历所有的在线客户,私聊只发送一个的区别。

⑥下线接口

一旦用户端下线,我们就需要将之前存放在?Redis?中的少量信息删除掉(路由信息、登录状态)。

IM 用户端

用户端中的少量逻辑其实在上文已经谈到少量了。

登录

第一步也就是登录,需要在启动时调用?Route?的登录接口,取得?cim-server?信息再创立连接。

登录过程中?Route?接口会判断能否为重复登录,重复登录则会直接退出程序。

接下来是利用?Route?接口返回的?cim-server?实例信息(ip+port)创立连接。

最后一步就是发送一个登录标志的信息到服务端,让它保持用户端和?Channel?的关系。

自己设置协议

上文提到的少量登录报文、真正的消息报文这些都是在我们自己设置协议中可以区别出来的。

因为是使用?Google Protocol Buffer?编解码,所以先看看原始格式。

其实这个协议中目前一共就三个字段:

requestId?可以了解为?userId。

reqMsg?就是真正的消息。

type?也就是上文提到的消息类别。

目前主要是三种类型,分别对应不同的业务:

心跳

为了保持用户端和服务端的连接,每隔一段时间没有发送消息都需要自动的发送心跳。

目前的策略是每隔一分钟就发送一个心跳包到服务端:

这样服务端每隔一分钟没有收到业务消息时就会收到 Ping?的心跳包:

内置命令

用户端也内置了少量基本命令来方便使用。

比方输入?:q?就会退出用户端,同时会关闭少量系统资源。

当输入?:olu(onlineUser?的简写)就会去调用?Route?的获取所有在线客户接口。

群聊

群聊的使用非常简单,只要要在控制台输入消息回车就可。这时会去调用?Route?的群聊接口。

私聊

私聊也是同理,但前提是需要触发关键字;使用?userId;; 消息内容这样的格式才会给某个客户发送消息,所以一般都需要先使用?:olu?命令获取所有在线客户才方便使用。

消息回调

为了满足少量定制需求,比方消息需要保存之类的。所以在用户端收到消息之后会回调一个接口,在这个接口中可以自己设置实现。

因而先创立了一个?Caller?的?Bean,这个?Bean?中包含了一个?CustomMsgHandleListener?接口,需要自行解决只要要实现此接口就可。

最后,给大家推荐一个Java进阶内推交流群851531810,不论你在地球哪个方位,不论你参与工作几年都欢迎你的入驻!(群内会免费提供少量群主收藏的免费学习书籍资料以及整理好的几百道面试题和答案文档!)

说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 自己动手撸一个分布式IM(即时通讯) 系统

发表回复