WebSocket使用-Springboot

本文websocket使用netty实现,原因:因为netty提供有心跳组件,方便实现心跳检测

一、添加依赖

在springboot项目中添加下面两个依赖

1
2
3
4
5
6
7
8
9
10
11
<!-- netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>

二、netty的基本实现

netty服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.aixbox.websocketdemo.webSocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Slf4j
@Configuration
public class NettyWebSocketServer {
//设置webSocket端口
public static final int WEB_SOCKET_PORT = 8090;
//webSocket处理器
public static final NettyWebSocketServerHandler NETTY_WEB_SOCKET_SERVER_HANDLER = new NettyWebSocketServerHandler();
// 创建线程池执行器
//负责管理连接的建立和关闭
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//负责处理实际的数据传输和业务逻辑
private EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors());

/**
* 启动 ws server
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
run();
}

/**
* 销毁
*/
@PreDestroy
public void destroy() {
Future<?> future = bossGroup.shutdownGracefully();
Future<?> future1 = workerGroup.shutdownGracefully();
future.syncUninterruptibly();
future1.syncUninterruptibly();
log.info("关闭 ws server 成功");
}

public void run() throws InterruptedException {
// 服务器启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
//让这两个线程池处理请求
serverBootstrap.group(bossGroup, workerGroup)
//指定服务器端接受连接的 Channel 类型
.channel(NioServerSocketChannel.class)
//设置指定 Channel 的配置参数。在这里,设置了 SO_BACKLOG 参数,表示系统用于临时存放已完成三次握手的请求的队列的最大长度。这里设置为 128。
.option(ChannelOption.SO_BACKLOG, 128)
//设置指定 Channel 的配置参数。在这里,设置了 SO_KEEPALIVE 参数,表示是否开启 TCP 的 keepalive 检测机制。如果开启,TCP 连接在空闲一段时间后会发送心跳包以检测连接的存活性。
.option(ChannelOption.SO_KEEPALIVE, true)
// 为 bossGroup 添加 日志处理器
.handler(new LoggingHandler(LogLevel.INFO))
//为接受到的连接的子 Channel(即客户端连接的 SocketChannel)设置初始化器。
.childHandler(new ChannelInitializer<SocketChannel>() {
//使用 ChannelInitializer 类来初始化新连接的 SocketChannel 的处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//30秒客户端没有向服务器发送心跳则关闭连接
pipeline.addLast(new IdleStateHandler(30, 0, 0));
// 因为使用http协议连接,所以需要使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
// 以块方式写,添加 chunkedWriter 处理器
pipeline.addLast(new ChunkedWriteHandler());
/**
* 作用:处理http请求和响应的聚合
* 说明:
* 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
* 2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/**
* 作用:处理 WebSocket 升级握手,握手协议升级成ws协议
* 说明:
* 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
* 2. 可以看到 WebSocketFrame 下面有6个子类
* 3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
* 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
* 是通过一个状态码 101 来切换的
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/"));
// 自定义handler ,处理业务逻辑
pipeline.addLast(NETTY_WEB_SOCKET_SERVER_HANDLER);
}
});
// 启动服务器,监听端口,阻塞直到启动成功
serverBootstrap.bind(WEB_SOCKET_PORT).sync();
}

}

websocket处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.aixbox.websocketdemo.webSocket;

import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONUtil;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@Sharable
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {


// 读取客户端发送的请求报文
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//打印接收到的消息
log.info("收到客户端消息:{}", msg.text());
//向请求回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息:" + msg.text()));
}
}

:::info
在上面的NettyWebSocketServer创建了一个run方法用来启动netty服务器,并且使用@PostConstruct注解在项目启动时启动,使用@PreDestroy注解在项目关闭时关闭线程。
在run方法中设置了netty服务器的参数,以及设置了webSocket处理器。
:::
:::info
在NettyWebSocketServerHandler方法中则是接收了请求发送的消息以及给请求方推送接收到了消息
:::

使用apifox测试

image.png

三、添加心跳机制

:::info
因为前端用户如果下线,后端是无法感知的,所以需要维持一个心跳,前端一定时间发送一个心跳包维持心跳,我们在前门的run方法中已经添加了 pipeline.addLast(new IdleStateHandler(30, 0, 0));处理器,在30秒前端没有发送消息,这个处理器就会发送一个事件,我们通过检测事件关闭连接
:::

在NettyWebSocketServerHandler处理心跳,30秒没消息关闭连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.aixbox.websocketdemo.webSocket;

import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONUtil;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@Sharable
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

/**
* 心跳检查
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//判断事件为IdleStateEvent
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 事件状态为读空闲
if (idleStateEvent.state() == IdleState.READER_IDLE) {
// 关闭用户的连接
ctx.channel().close();
}
}
//执行后续责任链
super.userEventTriggered(ctx, evt);
}



// 读取客户端发送的请求报文
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//打印接收到的消息
log.info("收到客户端消息:{}", msg.text());
//向请求回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息:" + msg.text()));
}
}

使用apifox测试

使用apifox测试,可以看到30秒没有消息,连接关闭了
image.png

四、管理WebSocket连接的channel

webSocket处理器添加握手完成后的事件中间channel保存到map中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.aixbox.websocketdemo.webSocket;

import cn.hutool.extra.spring.SpringUtil;
import com.aixbox.websocketdemo.service.WebSocketService;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@Sharable
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

private WebSocketService webSocketService;

// 当web客户端连接后,触发该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.webSocketService = getService();
}

/**
* 心跳检查
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//判断事件为IdleStateEvent
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
// 事件状态为读空闲
if (idleStateEvent.state() == IdleState.READER_IDLE) {
// 关闭用户的连接
ctx.channel().close();
}
} else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
//握手完成,将连接放入缓存
this.webSocketService.connect(ctx.channel());
}
//执行后续责任链
super.userEventTriggered(ctx, evt);
}



// 读取客户端发送的请求报文
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//前端可以发送一个json,根据不同的type,做不同的处理
//打印接收到的消息
log.info("收到客户端消息:{}", msg.text());
//向请求回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息:" + msg.text()));
}

private WebSocketService getService() {
return SpringUtil.getBean(WebSocketService.class);
}
}

WebSocketServiceImpl实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.aixbox.websocketdemo.service.impl;

import com.aixbox.websocketdemo.service.WebSocketService;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.ConcurrentHashMap;

/**
* Description: websocket处理类
* Author: <a href="https://github.com/zongzibinbin">abin</a>
* Date: 2023-03-19 16:21
*/
@Component
@Slf4j
public class WebSocketServiceImpl implements WebSocketService {

/**
* 所有已连接的websocket连接列表和一些额外参数
*/
private static final ConcurrentHashMap<Channel, String> ONLINE_WS_MAP = new ConcurrentHashMap<>();

/**
* 处理所有ws连接的事件
*
*/
@Override
public void connect(Channel channel) {
ONLINE_WS_MAP.put(channel, "");
}
}

下篇在vue项目中使用websocket