模式切换
RuoYi-Vue-Plus 项目中 WebSocket 逻辑功能实现梳理
前端业务逻辑
前端连接方式:
javascript
ws://[后端ip]:[端口]/resource/websocket?clientid=[clientid]&Authorization=[token]
传输方式:
javascript
headers: {
Authorization: "Bearer " + getToken(),
clientid: import.meta.env.VITE_APP_CLIENT_ID
}
WebSocket 开关
.env
文件中定义了 WebSocket 开关 VITE_APP_WEBSOCKET
(开发环境默认关闭 WebSocket,因 Vite 的 Bug 导致如 WebSocket 无法连接则会崩溃)。
工具文件 websocket.ts
在前端中定义了一个 WebSocket 工具文件 websocket.ts
。该文件提供了一个 WebSocket 通信的全面实现,包括处理连接、错误、重连、心跳和消息传递。
websocket.ts
中实现了 WebSocket 通信的基本功能,包括连接管理、错误处理、心跳维持和消息处理,适用于需要与服务器保持长连接的场景。
模块和参数
模块:
- initWebSocket:初始化 WebSocket 连接。
- websocketonopen:处理连接成功。
- websocketonerror:处理连接错误。
- websocketclose:处理断开连接。
- resetHeart:重置心跳。
- sendSocketHeart:发送心跳消息。
- reconnect:重连。
- sendMsg:发送数据。
- websocketonmessage:接收数据。
- test:测试收到消息传递。
参数:
- url:socket 地址。
- websocket:WebSocket 实例。
- heartTime:心跳定时器实例。
- socketHeart:心跳次数。
- HeartTimeOut:心跳超时时间。
- socketError:错误次数。
详细功能
- 初始化 WebSocket
初始化 WebSocket 连接,设置必要的事件处理函数和心跳。
具体步骤:
- 环境变量检查:如果环境变量 VITE_APP_WEBSOCKET 为 'false',则直接返回,不初始化 WebSocket。
- 设置 socketUrl:将传入的 URL 赋值给 socketUrl。
- 创建 WebSocket 实例:使用传入的 URL 和包含 Authorization 和 clientid 的查询参数创建 WebSocket 实例。
- 设置事件处理函数: a. websocketonopen():设置连接成功处理函数。 b. websocketonmessage():设置消息接收处理函数。 c. websocketonerror():设置连接错误处理函数。 d. websocketclose():设置连接关闭处理函数。
- 发送心跳:调用 sendSocketHeart() 方法。
- 返回 WebSocket 实例。
typescript
export const initWebSocket = (url: any) => {
if (import.meta.env.VITE_APP_WEBSOCKET === 'false') {
return;
}
socketUrl = url;
websocket = new WebSocket(url + '?Authorization=Bearer ' + getToken() + '&clientid=' + import.meta.env.VITE_APP_CLIENT_ID);
websocketonopen();
websocketonmessage();
websocketonerror();
websocketclose();
sendSocketHeart();
return websocket;
};
- 连接成功
处理连接成功事件,并重置心跳。
具体步骤:
- 设置 onopen 事件处理函数:当 WebSocket 连接成功时,执行该函数。
- 日志输出:输出连接成功的日志信息。
- 重置心跳:调用 resetHeart() 方法重置心跳机制。
typescript
export const websocketonopen = () => {
websocket.onopen = function () {
console.log('连接 websocket 成功');
resetHeart();
};
};
- 连接错误
处理连接错误事件。
具体步骤:
- 设置 onerror 事件处理函数:当 WebSocket 连接出错时,执行该函数。
- 日志输出:输出连接失败的日志信息。
typescript
export const websocketonerror = () => {
websocket.onerror = function (e: any) {
console.log('连接 websocket 失败', e);
};
};
- 断开连接
处理断开连接事件。
具体步骤:
- 设置 onclose 事件处理函数:当 WebSocket 连接关闭时,执行该函数。
- 日志输出:输出连接关闭的日志信息。
typescript
export const websocketclose = () => {
websocket.onclose = function (e: any) {
console.log('断开连接', e);
};
};
- 重置心跳
重置心跳计数和错误计数,并重新开始发送心跳。
具体步骤:
- 重置计数器: a. socketHeart 重置为 0。 b. socketError 重置为 0。
- 清除心跳定时器:停止当前心跳定时器。
- 重新发送心跳:调用 sendSocketHeart() 方法重新启动心跳机制。
typescript
export const resetHeart = () => {
socketHeart = 0;
socketError = 0;
clearInterval(heartTime);
sendSocketHeart();
};
- 发送心跳
定时发送心跳消息,如果连接断开则尝试重连。
具体步骤:
- 设置心跳定时器:每隔 HeartTimeOut 毫秒(10秒)执行一次。
- 检查 WebSocket 状态: a. 如果连接正常(readyState == 1),则发送心跳消息 {"type": "ping"} 并增加心跳计数 socketHeart。 b. 如果连接不正常,则调用 reconnect() 方法尝试重连。
typescript
export const sendSocketHeart = () => {
heartTime = setInterval(() => {
if (websocket.readyState == 1) {
websocket.send(
JSON.stringify({
type: 'ping'
})
);
socketHeart = socketHeart + 1;
} else {
reconnect();
}
}, HeartTimeOut);
};
- 重连
如果连接错误次数不超过 2 次,尝试重连。
具体步骤:
- 检查错误次数:如果错误次数 socketError 不超过 2 次,执行以下操作: a. 停止当前心跳定时器。 b. 重新初始化 WebSocket 连接。 c. 增加错误计数 socketError。 d. 输出重连日志信息。
- 错误次数超过限制:输出日志信息并停止心跳定时器。
typescript
export const reconnect = () => {
if (socketError <= 2) {
clearInterval(heartTime);
initWebSocket(socketUrl);
socketError = socketError + 1;
console.log('socket重连', socketError);
} else {
console.log('重试次数已用完');
clearInterval(heartTime);
}
};
- 发送数据
通过 WebSocket 发送数据。
具体步骤:
发送数据:通过 WebSocket 连接发送指定的数据 data。
typescript
export const sendMsg = (data: any) => {
websocket.send(data);
};
- 接收数据
处理接收到的数据,根据消息内容执行相应的操作。
具体步骤:
- 设置 onmessage 事件处理函数:当 WebSocket 接收到消息时,执行该函数。
- 检查消息内容: a. 如果消息包含 'heartbeat',则调用 resetHeart() 方法重置心跳。 b. 如果消息包含 'ping',则直接返回,不处理该消息。
- 处理消息: a. 调用 addNotice() 方法添加通知。 b. 使用 ElNotification 显示通知。
- 返回消息内容:返回接收到的消息数据 e.data。
typescript
export const websocketonmessage = () => {
websocket.onmessage = function (e: any) {
if (e.data.indexOf('heartbeat') > 0) {
resetHeart();
}
if (e.data.indexOf('ping') > 0) {
return;
}
addNotice({
message: e.data,
read: false,
time: new Date().toLocaleString()
});
ElNotification({
title: '消息',
message: e.data,
type: 'success',
duration: 3000
});
return e.data;
};
};
初始化 WebSocket 连接
- 在 onMounted 钩子中,根据当前页面的协议(window.location.protocol)确定 WebSocket 的协议,如果是 https: 则使用 wss://,否则使用 ws://。
- 使用 initWebSocket 方法初始化 WebSocket 连接。
- 构建 WebSocket 的 URL,包括协议、主机、基础 API 路径(VITE_APP_BASE_API),以及具体的 WebSocket 资源路径(/resource/websocket)。
typescript
<script setup name="Index" lang="ts">
import { initWebSocket } from '@/utils/websocket';
onMounted(() => {
let protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://'
initWebSocket(protocol + window.location.host + import.meta.env.VITE_APP_BASE_API + "/resource/websocket");
});
</script>
后端业务逻辑
yml 文件中配置 WebSocket
- enabled 是否开启此功能
- path 应用路径
- allowedOrigins 设置访问源地址
yaml
--- # websocket
websocket:
# 如果关闭 需要和前端开关一起关闭
enabled: true
# 路径
path: /resource/websocket
# 设置访问源地址
allowedOrigins: '*'
如关闭 WebSocket 功能需连同前端 WebSocket 开关一同关闭,不然前端启动会报错。
bash
# websocket 开关(开发环境默认关闭ws 因vite的bug导致如ws无法连接则会崩溃)
VITE_APP_WEBSOCKET = false
WebSocketUtils 工具类
WebSocketUtils.java
是一个 WebSocket 工具类,用于处理 WebSocket 连接、消息发送、订阅发布等操作。
该工具类提供了 WebSocket 通信的基础设施,包括发送消息、订阅消息、发布消息等功能。利用 Redis 进行消息的发布订阅,以支持跨服务的 WebSocket 通信。通过这些方法,可以方便地在 WebSocket 连接中进行消息传递和处理。
java
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {
/**
* 发送消息
*
* @param sessionKey session主键 一般为用户id
* @param message 消息文本
*/
public static void sendMessage(Long sessionKey, String message) {
WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
sendMessage(session, message);
}
/**
* 订阅消息
*
* @param consumer 自定义处理
*/
public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
}
/**
* 发布订阅的消息
*
* @param webSocketMessage 消息对象
*/
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
List<Long> unsentSessionKeys = new ArrayList<>();
// 当前服务内session,直接发送消息
for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey)) {
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
continue;
}
unsentSessionKeys.add(sessionKey);
}
// 不在当前服务内session,发布订阅消息
if (CollUtil.isNotEmpty(unsentSessionKeys)) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(webSocketMessage.getMessage());
broadcastMessage.setSessionKeys(unsentSessionKeys);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
});
}
}
/**
* 发布订阅的消息(群发)
*
* @param message 消息内容
*/
public static void publishAll(String message) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(message);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
});
}
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
public static void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (session == null || !session.isOpen()) {
log.warn("[send] session会话已经关闭");
} else {
try {
session.sendMessage(message);
} catch (IOException e) {
log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
}
}
}
}
类注解
- @Slf4j:这是 Lombok 提供的注解,用于生成日志记录器对象 log,可以方便地进行日志记录。
- @NoArgsConstructor(access = AccessLevel.PRIVATE):生成一个私有的无参构造器,防止该工具类被实例化。
java
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {
sendMessage 方法
- 功能:根据 sessionKey(通常是用户 ID)获取 WebSocket 会话,并发送消息。
- 参数: a. sessionKey:会话主键。 b. message:消息文本。
- 内部逻辑: a. 从 WebSocketSessionHolder 中获取会话。 b. 调用重载的 sendMessage 方法发送消息。
java
public static void sendMessage(Long sessionKey, String message) {
WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
sendMessage(session, message);
}
subscribeMessage 方法
- 功能:订阅特定主题的消息。
- 参数: a. consumer:消息处理逻辑。
- 内部逻辑: a. 使用 RedisUtils 订阅 Redis 中的 WEB_SOCKET_TOPIC 主题,并处理 WebSocketMessageDto 类型的消息。
java
public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
}
publishMessage 方法
- 功能:发布消息到指定的 WebSocket 会话,如果会话不在当前服务,则发布到 Redis 主题。
- 参数: a. webSocketMessage:消息对象,包含消息文本和目标会话列表。
- 内部逻辑: a. 遍历 webSocketMessage 中的会话列表,发送消息给当前服务内的会话。 b. 对于不在当前服务内的会话,将消息发布到 Redis 主题中。
java
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
List<Long> unsentSessionKeys = new ArrayList<>();
for (Long sessionKey : webSocketMessage.getSessionKeys()) {
if (WebSocketSessionHolder.existSession(sessionKey)) {
WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
continue;
}
unsentSessionKeys.add(sessionKey);
}
if (CollUtil.isNotEmpty(unsentSessionKeys)) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(webSocketMessage.getMessage());
broadcastMessage.setSessionKeys(unsentSessionKeys);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
});
}
}
publishAll 方法
- 功能:发布消息到所有订阅该主题的会话。
- 参数: a. message:消息内容。
- 内部逻辑: a. 构建一个 WebSocketMessageDto 对象并设置消息内容。 b. 使用 RedisUtils 将消息发布到 Redis 主题。
java
public static void publishAll(String message) {
WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
broadcastMessage.setMessage(message);
RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
});
}
sendPongMessage 方法
- 功能:发送 Pong 消息响应。
- 参数: a. session:WebSocket 会话。
- 内部逻辑: a. 调用 sendMessage 方法,发送 PongMessage 对象。
java
public static void sendPongMessage(WebSocketSession session) {
sendMessage(session, new PongMessage());
}
sendMessage 方法(重载)
- 功能:发送文本消息。
- 参数: a. session:WebSocket 会话。 b. message:消息文本。
- 内部逻辑: a. 调用 sendMessage 方法,发送 TextMessage 对象。
java
public static void sendMessage(WebSocketSession session, String message) {
sendMessage(session, new TextMessage(message));
}
私有的 sendMessage 方法
- 功能:发送 WebSocket 消息。
- 参数: a. session:WebSocket 会话。 b. message:消息对象,类型为 WebSocketMessage<?>。
- 内部逻辑: a. 检查会话是否为 null 或已关闭。如果是,记录警告日志。 b. 尝试通过会话发送消息。如果发送过程中出现 IOException,记录错误日志。
java
private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
if (session == null || !session.isOpen()) {
log.warn("[send] session会话已经关闭");
} else {
try {
session.sendMessage(message);
} catch (IOException e) {
log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
}
}
}
使用 publishAll 方法群发消息
例如框架内的新增通知公告后,使用 publishAll 方法群发消息。
java
/**
* 新增通知公告
*/
@SaCheckPermission("system:notice:add")
@Log(title = "通知公告", businessType = BusinessType.INSERT)
@PostMapping
public R<Void> add(@Validated @RequestBody SysNoticeBo notice) {
int rows = noticeService.insertNotice(notice);
if (rows <= 0) {
return R.fail();
}
String type = dictService.getDictLabel("sys_notice_type", notice.getNoticeType());
WebSocketUtils.publishAll("[" + type + "] " + notice.getNoticeTitle());
return R.ok();
}
通知公告功能实现业务逻辑
前端在 index.vue
的 onMounted
钩子中初始化了 WebSocket 连接。在 websocketonmessage 方法中,当后端通过 WebSocket 发送消息(例如新增通知公告)时能够接收到消息,并通过 Piana 将消息响应式地存储到客户端的内存中。
在 store/modules 目录下定义的 notice.ts 文件定义了一个 Pinia 存储,用于管理通知信息。它提供了添加、删除、标记已读和清空通知的方法,并将这些方法和状态暴露出来,供 Vue 组件使用。
typescript
import { defineStore } from 'pinia';
interface NoticeItem {
title?: string;
read: boolean;
message: any;
time: string;
}
export const useNoticeStore = defineStore('notice', () => {
const state = reactive({
notices: [] as NoticeItem[]
});
const addNotice = (notice: NoticeItem) => {
state.notices.push(notice);
};
const removeNotice = (notice: NoticeItem) => {
state.notices.splice(state.notices.indexOf(notice), 1);
};
//实现全部已读
const readAll = () => {
state.notices.forEach((item) => {
item.read = true;
});
};
const clearNotice = () => {
state.notices = [];
};
return {
state,
addNotice,
removeNotice,
readAll,
clearNotice
};
});
export default useNoticeStore;