建站案例

神了,WebSocket竟然可以这么设计!

发表日期:2026-04-13

01 引言

长连接是业务项目中经常遇到的技术,往往用于数据向前端推送,如各种大屏、驾驶舱等实时数据的展示。单向推送可能会选择SSESSE因为AI时代的到来,逐步被大家熟知,而WebSocket作为经典的双向通讯,也经常被用来做数据推送。

今天聊一下,我发现的一种特殊的设计,可以单独将基于NettyWebSocket单独部署,接入时,只需要引入API,初始化客户端即可完成对接。直接隔离了WebSocket服务端的编码。

02 普通应用

WebSocket的普通接入,需要编写WebSocket服务端。通过浏览器原生 API即可实现。

2.1 前端代码

浏览器原生的代码:

js体验AI代码助手代码解读复制代码if ('WebSocket' in window) {   const websocket = new WebSocket("ws://localhost:9090/testWs"); } else {   alert('当前浏览器不支持 WebSocket'); }  websocket.onopen = function(event) {   console.log('WebSocket 连接成功'); };  websocket.onmessage = function(event) {   console.log('收到消息:', event.data); };  websocket.onerror = function(error) {   console.error('WebSocket 错误:', error); };  websocket.onclose = function(event) {   console.log('WebSocket 连接关闭'); };  // 发送消息 function sendMessage() {   const message = document.getElementById('text').value;   websocket.send(message); }  // 关闭连接 function closeConnection() {   websocket.close(); }

2.2 服务端代码

java体验AI代码助手代码解读复制代码@Slf4j @Component public class WebSocketServer {      @Getter     private ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);      public void start() throws InterruptedException {         EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workGroup = new NioEventLoopGroup();          ServerBootstrap serverBootstrap = new ServerBootstrap();         serverBootstrap.group(bossGroup, workGroup);         serverBootstrap.channel(NioServerSocketChannel.class);         serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){              @Override             protected void initChannel(SocketChannel socketChannel) throws Exception {                 ChannelPipeline pipeline = socketChannel.pipeline();                 pipeline.addLast(new HttpServerCodec());                 pipeline.addLast(new HttpObjectAggregator(65535));                 pipeline.addLast(new WebSocketServerProtocolHandler("/testWs"));                 // 自定义的handler,处理业务逻辑                 pipeline.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {                      @Override                     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {                         // 建立客户端                         Channel channel = ctx.channel();                         log.info("客户端建立连接:channelId={}", channel.id());                         channelGroup.add(channel);                     }                      @Override                     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {                         // 断开链接                         Channel channel = ctx.channel();                         log.info("客户端断开连接:channelId={}", channel.id());                         channelGroup.remove(channel);                     }                      @Override                     protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {                         // 接受消息                         Channel channel = ctx.channel();                         log.info("收到来自通道channelId[{}]发送的消息:{}", channel.id(), msg.text());                          // 广播通知所有的客户端                         channelGroup.writeAndFlush(new TextWebSocketFrame("收到来自channelId[" + channel.id() + "]发送的消息:" + msg.text() + "123_"));                     }                 });             }         });          // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功         ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();         log.info("Server started and listen on:{}",channelFuture.channel().localAddress());         // 对关闭通道进行监听         channelFuture.channel().closeFuture().sync();     } }

2.3 效果演示

为了方便演示,我直接使用在线测试工具:

webfem.com/tools/ws/in…

2.4 设计思想

设计如图:

这就是一个简单的点对点的一个设计。这样的设计本身没有什么问题,但是面对不同的业务系统都要接入WebSocket,我们就需要将服务端的代码复制一份,然后修改成适合自己业务项目的逻辑。

如果业务项目比较多,就会出现大量重复的代码,如我们公司就有20多个业务系统。从《代码重构》这本书中,就得知这是一种坏的味道,需要我们想办法优化。

如何来优化呢?按照阿里程序员的说话,没有什么是加一个中间层不能解决的,如果不能那就再加一层。

03 独特的设计

3.1 总览

如何通过中间层去解耦呢?

为了将WebSocket能够复用,就需要通过一个中间层能够作为一个传递者。既可以让用户直接连接WebSocket,也可以通过中间层直接推送消息。

我们来看看最终的设计流程:

3.2 流程分析

在流程分析执之前,我们需要说明引入的中间层。

  • Socket中间客户端

  • Socket服务

Socket中间客户端

Socket中间客户端作为一个jar传递于业务项目中,用来代替WebSocket直接推送消息给Socket客户端。同时也会将WebSocket服务的IP和端口暴露给客户端。

Socket中间客户端是基于NettySocket客户端,通过Bootstrap bootstrap = new Bootstrap()实例化,遵循TCP协议。详见代码。

Socket服务

为什么需要引入Socket服务呢?这也是小编之前非常疑惑的地方,直到自己搭建才知道为什么这么设计。

由于Socket中间客户端无法直接连接WebSocket,那么那就要一个完全基于TCP协议的Socket服务,就可以和Socket中间客户端建立连接。

Socket服务WebSocket位于同一个服务,就可以获取到WebSocket的所有通道(channel),就可以将消息推送给客户端了。

运行流程

  • ① 客户端通过业务项目暴露的WebSOcketIP和端口给前端,前端用来建立WebSocket连接。当着这个主要针对H5。类似安卓或者IOS有支持TCPSDK,就可以直接连接Socket服务了。

  • ② 随着业务项目启动建立与Socket服务的连接,等待随时给Socket服务发送消息。

  • Socket服务接收到消息后,直接获取WebSocket的通道。然后通过通道可以推送消息。

  • ④ 获取到通道之后,就可以直接推送消息给前端了。

所以每次使用,只需要引入Jar,需要推送消息给客户端,只需要直接调用方法推送即可。

04 设计实现

4.1 WebSocket服务端

代码同2.2的代码

WebSocket服务的端口是9090

4.2 Socket服务端

java体验AI代码助手代码解读复制代码@Slf4j @Component public class SockerServer {      @Autowired     private WebSocketServer webSocketServer;      public void start() throws InterruptedException {         EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workGroup = new NioEventLoopGroup();          ServerBootstrap serverBootstrap = new ServerBootstrap();         serverBootstrap.group(bossGroup, workGroup);         serverBootstrap.channel(NioServerSocketChannel.class);         serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>(){              @Override             protected void initChannel(SocketChannel socketChannel) throws Exception {                 ChannelPipeline pipeline = socketChannel.pipeline();                 pipeline.addLast(new DelimiterBasedFrameDecoder(2048, Unpooled.copiedBuffer("_".getBytes())));                 pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));                 pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));                 // 自定义的handler,处理业务逻辑                 pipeline.addLast(new SimpleChannelInboundHandler<>() {                      @Override                     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {                         // 建立客户端                         Channel channel = ctx.channel();                         log.info("Socket客户端建立连接:channelId={}", channel.id());                     }                      @Override                     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {                         // 断开链接                         Channel channel = ctx.channel();                         log.info("Socket客户端断开连接:channelId={}", channel.id());                     }                      @Override                     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {                         // 接受消息                         Channel channel = ctx.channel();                         log.info("Socket收到来自通道channelId[{}]发送的消息:{}", channel.id(), msg);                         // 通过WebSocket将方法发送给客户端                         webSocketServer.getChannelGroup().writeAndFlush(new TextWebSocketFrame("收到来自channelId[" + channel.id() + "]发送的消息:" + msg + "123_"));                     }                 });             }         });          // 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功         ChannelFuture channelFuture = serverBootstrap.bind(9091).sync();         log.info("Server started and listen on:{}",channelFuture.channel().localAddress());         // 对关闭通道进行监听         channelFuture.channel().closeFuture().sync();     } }

Socket服务的端口是9091

4.3 Socket中间客户端

java体验AI代码助手代码解读复制代码@Slf4j public class MockClient {      @Getter     private SocketChannel socketChannel;      public void connect() throws InterruptedException {         EventLoopGroup eventLoopGroup = new NioEventLoopGroup();         Bootstrap bootstrap = new Bootstrap();         bootstrap.channel(NioSocketChannel.class);         bootstrap.option(ChannelOption.SO_KEEPALIVE, true);         bootstrap.option(ChannelOption.SO_BACKLOG, 500);         bootstrap.group(eventLoopGroup);          bootstrap.handler(new ChannelInitializer() {             @Override             protected void initChannel(Channel channel) throws Exception {                 ChannelPipeline pipeline = channel.pipeline();                 pipeline.addLast(new DelimiterBasedFrameDecoder(2048, Unpooled.copiedBuffer("_".getBytes())));                 pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));                 pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));                 pipeline.addLast(new SimpleChannelInboundHandler<String>(){                     @Override                     protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {                         log.info("client receive: {}", msg);                     }                 });             }         });          ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9091).sync();         this.socketChannel = (SocketChannel) channelFuture.channel();     } }

Socket只是用来发送消息的,所以不同处理接受的消息。注意这里的中间客户端连接的是Socket服务,端口是9091

4.4 配置启动

java体验AI代码助手代码解读复制代码@Slf4j @Component public class StartConfig {      @Autowired     private WebSocketServer webSocketServer;     @Autowired     private SockerServer socketServer;       @PostConstruct     public void init() {         ExecutorService executorService = Executors.newFixedThreadPool(2);         executorService.execute(() -> {             log.info("websocket init ....");             try {                 webSocketServer.start();             } catch (InterruptedException e) {                 throw new RuntimeException(e);             }         });          executorService.execute(() -> {             log.info("socket init ....");             try {                 socketServer.start();             } catch (InterruptedException e) {                 throw new RuntimeException(e);             }         });     } }

这个就是独立部署的Socket服务配置,两个服务分别使用多线程启动。

4.5 模拟数据推送

java体验AI代码助手代码解读复制代码@Test void contextLoads() throws Exception {     MockClient mockClient = new MockClient();     mockClient.connect();     SocketChannel socketChannel = mockClient.getSocketChannel();      new Timer().schedule(new TimerTask() {         @Override         public void run() {             System.out.println("send msg...");             socketChannel.writeAndFlush("foo test..._");         }     }, 0, 2000);      System.in.read(); }

每个2s发送一次消息。

4.6 客户端

客户端同样用在线测试工具代替。

4.7 演示

05 小结

这就完成了WebSocket的解耦。关于Socket消息的编解码,有很多注意点,在搭建过程中,总会不成功, 需要根据连接的协议选择不同的编解码,才能正确的接受和发送信息。这些留到后面的文章继续介绍。


文章来源网络收集