在微服务中用过 WebSocket 的有没有?来举个爪

虽说像 Spring Cloud Gateway 这类网关已经支持了 WebSocket 的转发

但是当我们在向客户端发送消息的时候仍会由于客户端的连接负载均衡到了其他的服务实例而发送不了消息

举个栗子:

假设有 service-a service-b 两个服务实例

客户端 client 通过网关的负载均衡连接到了 service-a

现在我们调用接口触发了给 client 发送消息的业务

好死不死,这个接口调用被负载均衡到了 service-b

service-b client 并没有建立连接以至于无法发送消息

一个注解就够了?

基于一些原因我实现了一个库来解决上述问题,只需一个启动注解

核心原理其实就是让 service-b 将消息转发给 service-a ,然后 service-a 再发给 client

这里是 Wiki,觉得不错的话记得 一键三连 Star 哦

给大家简单演示一下(最简)用法

WebSocket

先在启动类上添加注解 @EnableWebSocketLoadBalanceConcept 启用功能

@EnableWebSocketLoadBalanceConcept
@SpringBootApplication
public class WsServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(WsServiceApplication.class, args);

接着我们在需要发送消息的地方注入WebSocketLoadBalanceConcept就可以愉快的跨实例发消息啦

@RestController
@RequestMapping("/ws")
public class WsController {
    @Autowired
    private WebSocketLoadBalanceConcept concept;
    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        concept.send(msg);

Netty

先在启动类上添加注解@EnableNettyLoadBalanceConcept启用功能

@EnableNettyLoadBalanceConcept
@SpringBootApplication
public class NettyServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(NettyServiceApplication.class, args);

WebSocket多一步,配置NettyLoadBalanceHandler

@Component
public class NettySampleServer {
    @Autowired
    private NettyLoadBalanceConcept concept;
    public void start(int port) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new LineBasedFrameDecoder(1024));
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //将连接交由 NettyLoadBalanceHandler 管理
                            pipeline.addLast(new NettyLoadBalanceHandler(concept));
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();

接着我们在需要发送消息的地方注入NettyLoadBalanceConcept就可以愉快的跨实例发消息啦

@RestController
@RequestMapping("/netty")
public class NettyController {
    @Autowired
    private NettyLoadBalanceConcept concept;
    @RequestMapping("/send")
    public void send(@RequestParam String msg) {
        concept.send(msg);

是!不!是!非常简单!

是!不!是!非常方便!

是!不!是!非常心动!

是 2.0 版本啦

看到这里,可能有读者开始皱起了眉头,怎么有点眼熟呢,好像在哪里看过

(中二预警)

没错,一切都是石头门的选择,El Psy Congroo

(中二结束)

其实你应该是看过我的这篇文章 【Spring Cloud】一个配置注解实现 WebSocket 集群方案

对比之前的实现,新的版本带来了如下功能(主要)

功能1.x.x2.x.x
长连接类型WebSocket1. WebSocket
2. Netty
订阅(转发)方式服务间 ws(s) 双向连接1. 服务间 ws(s) 双向连接
2. Redis(Redisson)
3. Kafka
4. RabbitMQ
主从订阅(转发)不支持1. 主订阅(转发)失败切换到从订阅(转发)
2. 主订阅(转发)恢复切回到主订阅(转发)

在1.x.x版本中我只实现了WebSocket以及服务间ws(s)的双向连接转发,虽然我有规划其他的一些功能,但是基于工作量等因素就暂时实现了基本的核心功能,主要也不需要依赖其他的库,对于只有2-3个服务实例的场景还是比较适用的

然后过了一段时间我发现这个库的反响还不错,于是决定将之前规划的一些功能完善上去

更详细的用法可以看这里

接下来给大家说明一下核心理念:

继续用service-aservice-b举例

service-a通过订阅的方式监听service-b中的消息发送

service-a监听到service-b发送的消息之后,将消息也发送给连接自身的客户端

反过来,service-b也用同样的方式监听service-a

如果有3个服务实例,service-aservice-bservice-c也是一样

service-a监听service-bservice-c

service-b监听service-aservice-c

service-c监听service-aservice-b

如果有4,5,6...n个服务实例,和上述的逻辑一样,以此类推

因为WebSocket本身就可以用来发送消息

所以我们可以通过WebSocket在两个服务实例间转发消息(作为订阅通道)

把连接进行一个分类

类型说明
Client普通客户端
Subscriber订阅其他的服务消息的连接,该类型连接接收到的消息需要被转发
Observable其他服务监听自身消息的连接,发送消息时需要转发消息到该类型的连接

还是以service-aservice-b为例

service-a作为客户端连接service-b,可以看作service-a订阅监听service-b的消息发送

service-a持有的连接为Subscriberservice-b持有的连接为Observable

service-b在发送消息给客户端的时候,同时通过Observableservice-a也发送消息

service-a通过Subscriber收到service-b的消息,可以看作监听到service-b的消息发送

service-a再把消息发送给自己的客户端

我将订阅逻辑抽象成了ConnectionSubscriber

public interface ConnectionSubscriber {
    void subscribe(Consumer<Connection> consumer);

非常简单,只有一个subscribe方法,对于调用该方法的组件来说,不用关心具体怎么订阅的,只需要知道会返回作为Subscriber的连接就行了

我们可以实现成WebSocket连接,也可以实现成订阅Redis或是监听RabbitMQKafka

还可以实现多种方式,比如Kafka/RabbitMQ+Redis,也就是最新的主从订阅功能,默认使用Kafka/RabbitMQ可以避免消息丢失,当Kafka/RabbitMQ不可用时,切换到Redis转发,提高容错

简单的背后是更复杂的设计

虽然大家看我上面的使用实例可能会觉得很简单

但是整个框架其实还是蛮复杂的,源码量已经近w行了

因为很多逻辑我都通过接口抽象了出来方便扩展

ConnectionSubscriber连接订阅只是其中的一个组件,比如还有:

连接仓库ConnectionRepository用于缓存连接

方便自定义效率更高的算法来存取连接,当然默认就是用Map

连接服务管理器

连接服务管理器ConnectionServerManager用于获得其他服务的信息

服务实例间的ws(s)就是根据这个信息来连接的

默认通过DiscoveryClient获取其他实例的信息

当然也可以自定义通过数据库或是配置文件来获取

连接工厂ConnectionFactory用于扩展不同的连接

如目前已经实现的WebSocketConnectionFactoryNettyConnectionFactory

之后如果有新的长连接可以直接扩展

连接选择器

连接选择器ConnectionSelector用于在发送消息的时候确定发送给哪些连接

能够实现精确的条件发送,比如根据WebSocket的路径PathuserId或是分组group来发送消息

消息工厂MessageFactory用于将消息内容统一成Message方便添加消息头等参数

消息编解码适配器

消息编解码适配器MessageCodecAdapter用于适配消息的编解码器MessageEncoderMessageDecoder

如普通的客户端的消息要如何编码和解码,服务实例间转发的消息要如何编码和解码,都是可以自定义的

消息重试策略适配器

消息重试策略适配器MessageRetryStrategyAdapter用于指定消息重试策略MessageRetryStrategy

可以分别定义普通客户端消息发送的重试策略和服务实例间转发消息的重试策略

消息幂等校验器

当我们使用RabbitMQ或是Kafka来转发消息的时候,可能会存在重复消费的情况

可以自定义MessageIdempotentVerifier来实现消息重复的校验

整个生命周期会触发大量的事件发布

事件说明
ConnectionLoadBalanceConceptInitializeEventConcept初始化
ConnectionLoadBalanceConceptDestroyEventConcept销毁
ConnectionEstablishEvent连接建立
ConnectionCloseEvent连接关闭
ConnectionCloseErrorEvent连接关闭异常
ConnectionErrorEvent连接异常
ConnectionSubscribeErrorEvent连接订阅异常
MessagePrepareEvent消息准备
MessageSendEvent消息发送
MessageSendSuccessEvent消息发送成功
MessageSendErrorEvent消息发送异常
DeadMessageEvent当一个消息不会发送给任何一个连接
MessageDecodeErrorEvent消息解码异常
MessageForwardEvent消息转发
MessageForwardErrorEvent消息转发异常
MessageReceiveEvent消息接收
MessageDiscardEvent消息丢弃
MasterSlaveSwitchEvent主从切换
MasterSlaveSwitchErrorEvent主从切换异常
HeartbeatTimeoutEvent心跳超时
EventPublishErrorEvent事件发布异常
LoadBalanceMonitorEvent监控触发
UnknownCloseEvent未知的连接关闭
UnknownErrorEvent未知的连接异常
UnknownMessageEvent未知的消息

基于事件也能非常方便的实现一些自定义扩展

可以直接用Spring@EventListener来监听

其实写这个库的契机,之前有个前同事说他现在的公司想要把项目(设备和服务直连)做成微服务,然后遇到了类似的问题

分析了之后发现和公司之前遇到的微服务 + WebSocket很类似,抽象之后其实就是长连接 + 负载均衡的问题,WebSocket无非就是长连接的一种实现,于是就有了这个库

GitHub上还有其他的功能组件,可以先混个眼熟,说不定以后就用到了呐