public interface ConnectionSubscriber {
void subscribe(Consumer<Connection> consumer);
非常简单,只有一个subscribe
方法,对于调用该方法的组件来说,不用关心具体怎么订阅的,只需要知道会返回作为Subscriber
的连接就行了
我们可以实现成WebSocket
连接,也可以实现成订阅Redis
或是监听RabbitMQ
和Kafka
还可以实现多种方式,比如Kafka/RabbitMQ
+Redis
,也就是最新的主从订阅功能,默认使用Kafka/RabbitMQ
可以避免消息丢失,当Kafka/RabbitMQ
不可用时,切换到Redis
转发,提高容错
简单的背后是更复杂的设计
虽然大家看我上面的使用实例可能会觉得很简单
但是整个框架其实还是蛮复杂的,源码量已经近w行了
因为很多逻辑我都通过接口抽象了出来方便扩展
ConnectionSubscriber
连接订阅只是其中的一个组件,比如还有:
连接仓库ConnectionRepository
用于缓存连接
方便自定义效率更高的算法来存取连接,当然默认就是用Map
了
连接服务管理器
连接服务管理器ConnectionServerManager
用于获得其他服务的信息
服务实例间的ws(s)
就是根据这个信息来连接的
默认通过DiscoveryClient
获取其他实例的信息
当然也可以自定义通过数据库或是配置文件来获取
连接工厂ConnectionFactory
用于扩展不同的连接
如目前已经实现的WebSocketConnectionFactory
和NettyConnectionFactory
之后如果有新的长连接可以直接扩展
连接选择器
连接选择器ConnectionSelector
用于在发送消息的时候确定发送给哪些连接
能够实现精确的条件发送,比如根据WebSocket
的路径Path
,userId
或是分组group
来发送消息
消息工厂MessageFactory
用于将消息内容统一成Message
方便添加消息头等参数
消息编解码适配器
消息编解码适配器MessageCodecAdapter
用于适配消息的编解码器MessageEncoder
和MessageDecoder
如普通的客户端的消息要如何编码和解码,服务实例间转发的消息要如何编码和解码,都是可以自定义的
消息重试策略适配器
消息重试策略适配器MessageRetryStrategyAdapter
用于指定消息重试策略MessageRetryStrategy
可以分别定义普通客户端消息发送的重试策略和服务实例间转发消息的重试策略
消息幂等校验器
当我们使用RabbitMQ
或是Kafka
来转发消息的时候,可能会存在重复消费的情况
可以自定义MessageIdempotentVerifier
来实现消息重复的校验
整个生命周期会触发大量的事件发布
事件 | 说明 |
---|
ConnectionLoadBalanceConceptInitializeEvent | Concept 初始化 |
ConnectionLoadBalanceConceptDestroyEvent | Concept 销毁 |
ConnectionEstablishEvent | 连接建立 |
ConnectionCloseEvent | 连接关闭 |
ConnectionCloseErrorEvent | 连接关闭异常 |
ConnectionErrorEvent | 连接异常 |
ConnectionSubscribeErrorEvent | 连接订阅异常 |
MessagePrepareEvent | 消息准备 |
MessageSendEvent | 消息发送 |
MessageSendSuccessEvent | 消息发送成功 |
MessageSendErrorEvent | 消息发送异常 |
DeadMessageEvent | 当一个消息不会发送给任何一个连接 |
MessageDecodeErrorEvent | 消息解码异常 |
MessageForwardEvent | 消息转发 |
MessageForwardErrorEvent | 消息转发异常 |
MessageReceiveEvent | 消息接收 |
MessageDiscardEvent | 消息丢弃 |
MasterSlaveSwitchEvent | 主从切换 |
MasterSlaveSwitchErrorEvent | 主从切换异常 |
HeartbeatTimeoutEvent | 心跳超时 |
EventPublishErrorEvent | 事件发布异常 |
LoadBalanceMonitorEvent | 监控触发 |
UnknownCloseEvent | 未知的连接关闭 |
UnknownErrorEvent | 未知的连接异常 |
UnknownMessageEvent | 未知的消息 |
基于事件也能非常方便的实现一些自定义扩展
可以直接用Spring
的@EventListener
来监听
其实写这个库的契机,之前有个前同事说他现在的公司想要把项目(设备和服务直连)做成微服务,然后遇到了类似的问题
分析了之后发现和公司之前遇到的微服务 + WebSocket
很类似,抽象之后其实就是长连接 + 负载均衡的问题,WebSocket
无非就是长连接的一种实现,于是就有了这个库
GitHub上还有其他的功能组件,可以先混个眼熟,说不定以后就用到了呐