Skip to content

RuoYi-Vue-Plus 项目中 WebSocket 逻辑功能实现梳理

前端业务逻辑

前端连接方式:

javascript
ws://[后端ip]:[端口]/resource/websocket?clientid=[clientid]&Authorization=[token]

传输方式:

javascript
headers: {
    Authorization: "Bearer " + getToken(),
    clientid: import.meta.env.VITE_APP_CLIENT_ID
}

WebSocket 开关

.env 文件中定义了 WebSocket 开关 VITE_APP_WEBSOCKET(开发环境默认关闭 WebSocket,因 Vite 的 Bug 导致如 WebSocket 无法连接则会崩溃)。

24082401.png

工具文件 websocket.ts

在前端中定义了一个 WebSocket 工具文件 websocket.ts。该文件提供了一个 WebSocket 通信的全面实现,包括处理连接、错误、重连、心跳和消息传递。

websocket.ts 中实现了 WebSocket 通信的基本功能,包括连接管理、错误处理、心跳维持和消息处理,适用于需要与服务器保持长连接的场景。

模块和参数

模块:

  • initWebSocket:初始化 WebSocket 连接。
  • websocketonopen:处理连接成功。
  • websocketonerror:处理连接错误。
  • websocketclose:处理断开连接。
  • resetHeart:重置心跳。
  • sendSocketHeart:发送心跳消息。
  • reconnect:重连。
  • sendMsg:发送数据。
  • websocketonmessage:接收数据。
  • test:测试收到消息传递。

参数:

  • url:socket 地址。
  • websocket:WebSocket 实例。
  • heartTime:心跳定时器实例。
  • socketHeart:心跳次数。
  • HeartTimeOut:心跳超时时间。
  • socketError:错误次数。

详细功能

  • 初始化 WebSocket

初始化 WebSocket 连接,设置必要的事件处理函数和心跳。

具体步骤:

  1. 环境变量检查:如果环境变量 VITE_APP_WEBSOCKET 为 'false',则直接返回,不初始化 WebSocket。
  2. 设置 socketUrl:将传入的 URL 赋值给 socketUrl。
  3. 创建 WebSocket 实例:使用传入的 URL 和包含 Authorization 和 clientid 的查询参数创建 WebSocket 实例。
  4. 设置事件处理函数: a. websocketonopen():设置连接成功处理函数。 b. websocketonmessage():设置消息接收处理函数。 c. websocketonerror():设置连接错误处理函数。 d. websocketclose():设置连接关闭处理函数。
  5. 发送心跳:调用 sendSocketHeart() 方法。
  6. 返回 WebSocket 实例。
typescript
export const initWebSocket = (url: any) => {
  if (import.meta.env.VITE_APP_WEBSOCKET === 'false') {
    return;
  }
  socketUrl = url;
  websocket = new WebSocket(url + '?Authorization=Bearer ' + getToken() + '&clientid=' + import.meta.env.VITE_APP_CLIENT_ID);
  websocketonopen();
  websocketonmessage();
  websocketonerror();
  websocketclose();
  sendSocketHeart();
  return websocket;
};
  • 连接成功

处理连接成功事件,并重置心跳。

具体步骤:

  1. 设置 onopen 事件处理函数:当 WebSocket 连接成功时,执行该函数。
  2. 日志输出:输出连接成功的日志信息。
  3. 重置心跳:调用 resetHeart() 方法重置心跳机制。
typescript
export const websocketonopen = () => {
  websocket.onopen = function () {
    console.log('连接 websocket 成功');
    resetHeart();
  };
};
  • 连接错误

处理连接错误事件。

具体步骤:

  1. 设置 onerror 事件处理函数:当 WebSocket 连接出错时,执行该函数。
  2. 日志输出:输出连接失败的日志信息。
typescript
export const websocketonerror = () => {
  websocket.onerror = function (e: any) {
    console.log('连接 websocket 失败', e);
  };
};
  • 断开连接

处理断开连接事件。

具体步骤:

  1. 设置 onclose 事件处理函数:当 WebSocket 连接关闭时,执行该函数。
  2. 日志输出:输出连接关闭的日志信息。
typescript
export const websocketclose = () => {
  websocket.onclose = function (e: any) {
    console.log('断开连接', e);
  };
};
  • 重置心跳

重置心跳计数和错误计数,并重新开始发送心跳。

具体步骤:

  1. 重置计数器: a. socketHeart 重置为 0。 b. socketError 重置为 0。
  2. 清除心跳定时器:停止当前心跳定时器。
  3. 重新发送心跳:调用 sendSocketHeart() 方法重新启动心跳机制。
typescript
export const resetHeart = () => {
  socketHeart = 0;
  socketError = 0;
  clearInterval(heartTime);
  sendSocketHeart();
};
  • 发送心跳

定时发送心跳消息,如果连接断开则尝试重连。

具体步骤:

  1. 设置心跳定时器:每隔 HeartTimeOut 毫秒(10秒)执行一次。
  2. 检查 WebSocket 状态: a. 如果连接正常(readyState == 1),则发送心跳消息 {"type": "ping"} 并增加心跳计数 socketHeart。 b. 如果连接不正常,则调用 reconnect() 方法尝试重连。
typescript
export const sendSocketHeart = () => {
  heartTime = setInterval(() => {
    if (websocket.readyState == 1) {
      websocket.send(
        JSON.stringify({
          type: 'ping'
        })
      );
      socketHeart = socketHeart + 1;
    } else {
      reconnect();
    }
  }, HeartTimeOut);
};
  • 重连

如果连接错误次数不超过 2 次,尝试重连。

具体步骤:

  1. 检查错误次数:如果错误次数 socketError 不超过 2 次,执行以下操作: a. 停止当前心跳定时器。 b. 重新初始化 WebSocket 连接。 c. 增加错误计数 socketError。 d. 输出重连日志信息。
  2. 错误次数超过限制:输出日志信息并停止心跳定时器。
typescript
export const reconnect = () => {
  if (socketError <= 2) {
    clearInterval(heartTime);
    initWebSocket(socketUrl);
    socketError = socketError + 1;
    console.log('socket重连', socketError);
  } else {
    console.log('重试次数已用完');
    clearInterval(heartTime);
  }
};
  • 发送数据

通过 WebSocket 发送数据。

具体步骤:

发送数据:通过 WebSocket 连接发送指定的数据 data。

typescript
export const sendMsg = (data: any) => {
  websocket.send(data);
};
  • 接收数据

处理接收到的数据,根据消息内容执行相应的操作。

具体步骤:

  1. 设置 onmessage 事件处理函数:当 WebSocket 接收到消息时,执行该函数。
  2. 检查消息内容: a. 如果消息包含 'heartbeat',则调用 resetHeart() 方法重置心跳。 b. 如果消息包含 'ping',则直接返回,不处理该消息。
  3. 处理消息: a. 调用 addNotice() 方法添加通知。 b. 使用 ElNotification 显示通知。
  4. 返回消息内容:返回接收到的消息数据 e.data。
typescript
export const websocketonmessage = () => {
  websocket.onmessage = function (e: any) {
    if (e.data.indexOf('heartbeat') > 0) {
      resetHeart();
    }
    if (e.data.indexOf('ping') > 0) {
      return;
    }
    addNotice({
      message: e.data,
      read: false,
      time: new Date().toLocaleString()
    });
    ElNotification({
      title: '消息',
      message: e.data,
      type: 'success',
      duration: 3000
    });
    return e.data;
  };
};

初始化 WebSocket 连接

  1. 在 onMounted 钩子中,根据当前页面的协议(window.location.protocol)确定 WebSocket 的协议,如果是 https: 则使用 wss://,否则使用 ws://。
  2. 使用 initWebSocket 方法初始化 WebSocket 连接。
  3. 构建 WebSocket 的 URL,包括协议、主机、基础 API 路径(VITE_APP_BASE_API),以及具体的 WebSocket 资源路径(/resource/websocket)。
typescript
<script setup name="Index" lang="ts">
import { initWebSocket } from '@/utils/websocket';

onMounted(() => {
  let protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://'
  initWebSocket(protocol + window.location.host + import.meta.env.VITE_APP_BASE_API + "/resource/websocket");
});
</script>

24082402.png

后端业务逻辑

yml 文件中配置 WebSocket

  • enabled 是否开启此功能
  • path 应用路径
  • allowedOrigins 设置访问源地址
yaml
--- # websocket
websocket:
  # 如果关闭 需要和前端开关一起关闭
  enabled: true
  # 路径
  path: /resource/websocket
  # 设置访问源地址
  allowedOrigins: '*'

24082403.png

如关闭 WebSocket 功能需连同前端 WebSocket 开关一同关闭,不然前端启动会报错。

bash
# websocket 开关(开发环境默认关闭ws 因vite的bug导致如ws无法连接则会崩溃)
VITE_APP_WEBSOCKET = false

WebSocketUtils 工具类

WebSocketUtils.java 是一个 WebSocket 工具类,用于处理 WebSocket 连接、消息发送、订阅发布等操作。

该工具类提供了 WebSocket 通信的基础设施,包括发送消息、订阅消息、发布消息等功能。利用 Redis 进行消息的发布订阅,以支持跨服务的 WebSocket 通信。通过这些方法,可以方便地在 WebSocket 连接中进行消息传递和处理。

java
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {

    /**
     * 发送消息
     *
     * @param sessionKey session主键 一般为用户id
     * @param message    消息文本
     */
    public static void sendMessage(Long sessionKey, String message) {
        WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
        sendMessage(session, message);
    }

    /**
     * 订阅消息
     *
     * @param consumer 自定义处理
     */
    public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
        RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
    }

    /**
     * 发布订阅的消息
     *
     * @param webSocketMessage 消息对象
     */
    public static void publishMessage(WebSocketMessageDto webSocketMessage) {
        List<Long> unsentSessionKeys = new ArrayList<>();
        // 当前服务内session,直接发送消息
        for (Long sessionKey : webSocketMessage.getSessionKeys()) {
            if (WebSocketSessionHolder.existSession(sessionKey)) {
                WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
                continue;
            }
            unsentSessionKeys.add(sessionKey);
        }
        // 不在当前服务内session,发布订阅消息
        if (CollUtil.isNotEmpty(unsentSessionKeys)) {
            WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
            broadcastMessage.setMessage(webSocketMessage.getMessage());
            broadcastMessage.setSessionKeys(unsentSessionKeys);
            RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
                log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
                    WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
            });
        }
    }

    /**
     * 发布订阅的消息(群发)
     *
     * @param message 消息内容
     */
    public static void publishAll(String message) {
        WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
        broadcastMessage.setMessage(message);
        RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
            log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
        });
    }

    public static void sendPongMessage(WebSocketSession session) {
        sendMessage(session, new PongMessage());
    }

    public static void sendMessage(WebSocketSession session, String message) {
        sendMessage(session, new TextMessage(message));
    }

    private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
        if (session == null || !session.isOpen()) {
            log.warn("[send] session会话已经关闭");
        } else {
            try {
                session.sendMessage(message);
            } catch (IOException e) {
                log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
            }
        }
    }
}

类注解

  1. @Slf4j:这是 Lombok 提供的注解,用于生成日志记录器对象 log,可以方便地进行日志记录。
  2. @NoArgsConstructor(access = AccessLevel.PRIVATE):生成一个私有的无参构造器,防止该工具类被实例化。
java
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class WebSocketUtils {

sendMessage 方法

  1. 功能:根据 sessionKey(通常是用户 ID)获取 WebSocket 会话,并发送消息。
  2. 参数: a. sessionKey:会话主键。 b. message:消息文本。
  3. 内部逻辑: a. 从 WebSocketSessionHolder 中获取会话。 b. 调用重载的 sendMessage 方法发送消息。
java
public static void sendMessage(Long sessionKey, String message) {
    WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
    sendMessage(session, message);
}

subscribeMessage 方法

  1. 功能:订阅特定主题的消息。
  2. 参数: a. consumer:消息处理逻辑。
  3. 内部逻辑: a. 使用 RedisUtils 订阅 Redis 中的 WEB_SOCKET_TOPIC 主题,并处理 WebSocketMessageDto 类型的消息。
java
public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
    RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
}

publishMessage 方法

  1. 功能:发布消息到指定的 WebSocket 会话,如果会话不在当前服务,则发布到 Redis 主题。
  2. 参数: a. webSocketMessage:消息对象,包含消息文本和目标会话列表。
  3. 内部逻辑: a. 遍历 webSocketMessage 中的会话列表,发送消息给当前服务内的会话。 b. 对于不在当前服务内的会话,将消息发布到 Redis 主题中。
java
public static void publishMessage(WebSocketMessageDto webSocketMessage) {
    List<Long> unsentSessionKeys = new ArrayList<>();
    for (Long sessionKey : webSocketMessage.getSessionKeys()) {
        if (WebSocketSessionHolder.existSession(sessionKey)) {
            WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage());
            continue;
        }
        unsentSessionKeys.add(sessionKey);
    }
    if (CollUtil.isNotEmpty(unsentSessionKeys)) {
        WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
        broadcastMessage.setMessage(webSocketMessage.getMessage());
        broadcastMessage.setSessionKeys(unsentSessionKeys);
        RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
            log.info(" WebSocket发送主题订阅消息topic:{} session keys:{} message:{}",
                WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage());
        });
    }
}

publishAll 方法

  1. 功能:发布消息到所有订阅该主题的会话。
  2. 参数: a. message:消息内容。
  3. 内部逻辑: a. 构建一个 WebSocketMessageDto 对象并设置消息内容。 b. 使用 RedisUtils 将消息发布到 Redis 主题。
java
public static void publishAll(String message) {
    WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
    broadcastMessage.setMessage(message);
    RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
        log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
    });
}

sendPongMessage 方法

  1. 功能:发送 Pong 消息响应。
  2. 参数: a. session:WebSocket 会话。
  3. 内部逻辑: a. 调用 sendMessage 方法,发送 PongMessage 对象。
java
public static void sendPongMessage(WebSocketSession session) {
    sendMessage(session, new PongMessage());
}

sendMessage 方法(重载)

  1. 功能:发送文本消息。
  2. 参数: a. session:WebSocket 会话。 b. message:消息文本。
  3. 内部逻辑: a. 调用 sendMessage 方法,发送 TextMessage 对象。
java
public static void sendMessage(WebSocketSession session, String message) {
    sendMessage(session, new TextMessage(message));
}

私有的 sendMessage 方法

  1. 功能:发送 WebSocket 消息。
  2. 参数: a. session:WebSocket 会话。 b. message:消息对象,类型为 WebSocketMessage<?>。
  3. 内部逻辑: a. 检查会话是否为 null 或已关闭。如果是,记录警告日志。 b. 尝试通过会话发送消息。如果发送过程中出现 IOException,记录错误日志。
java
private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
    if (session == null || !session.isOpen()) {
        log.warn("[send] session会话已经关闭");
    } else {
        try {
            session.sendMessage(message);
        } catch (IOException e) {
            log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
        }
    }
}

使用 publishAll 方法群发消息

例如框架内的新增通知公告后,使用 publishAll 方法群发消息。

java
/**
 * 新增通知公告
 */
@SaCheckPermission("system:notice:add")
@Log(title = "通知公告", businessType = BusinessType.INSERT)
@PostMapping
public R<Void> add(@Validated @RequestBody SysNoticeBo notice) {
    int rows = noticeService.insertNotice(notice);
    if (rows <= 0) {
        return R.fail();
    }
    String type = dictService.getDictLabel("sys_notice_type", notice.getNoticeType());
    WebSocketUtils.publishAll("[" + type + "] " + notice.getNoticeTitle());
    return R.ok();
}

通知公告功能实现业务逻辑

前端在 index.vueonMounted 钩子中初始化了 WebSocket 连接。在 websocketonmessage 方法中,当后端通过 WebSocket 发送消息(例如新增通知公告)时能够接收到消息,并通过 Piana 将消息响应式地存储到客户端的内存中。

在 store/modules 目录下定义的 notice.ts 文件定义了一个 Pinia 存储,用于管理通知信息。它提供了添加、删除、标记已读和清空通知的方法,并将这些方法和状态暴露出来,供 Vue 组件使用。

typescript
import { defineStore } from 'pinia';

interface NoticeItem {
  title?: string;
  read: boolean;
  message: any;
  time: string;
}

export const useNoticeStore = defineStore('notice', () => {
  const state = reactive({
    notices: [] as NoticeItem[]
  });

  const addNotice = (notice: NoticeItem) => {
    state.notices.push(notice);
  };

  const removeNotice = (notice: NoticeItem) => {
    state.notices.splice(state.notices.indexOf(notice), 1);
  };

  //实现全部已读
  const readAll = () => {
    state.notices.forEach((item) => {
      item.read = true;
    });
  };

  const clearNotice = () => {
    state.notices = [];
  };
  return {
    state,
    addNotice,
    removeNotice,
    readAll,
    clearNotice
  };
});

export default useNoticeStore;
编程洪同学服务平台是一个广泛收集编程相关内容和资源,旨在满足编程爱好者和专业开发人员的需求的网站。无论您是初学者还是经验丰富的开发者,都可以在这里找到有用的信息和资料,我们将助您提升编程技能和知识。
专业开发
高端定制
售后无忧
站内资源均为本站制作或收集于互联网等平台,如有侵权,请第一时间联系本站,敬请谅解!本站资源仅限于学习与参考,严禁用于各种非法活动,否则后果自行负责,本站概不承担!