在现代 Web 应用程序中,WebSocket 是实现实时通信的重要技术。它允许客户端与服务器之间建立持久的双向连接,从而实现即时数据传输。为了处理 WebSocket 接收到的数据并将其传递到应用程序的各个部分,我们通常会使用一种设计模式——事件总线(EventBus)。

新建一个文件 eventBus.ts 

type Handler = (...args: any[]) => void;

class EventBus {
  // 消息中心,记录所有事件及其处理函数
  private subs: Record<string, Function[]>;

  constructor() {
    // 初始化消息中心
    this.subs = Object.create(null);
  }

  // 注册事件
  on(eventType: string, handler: Function) {
    this.subs[eventType] = this.subs[eventType] || [];
    this.subs[eventType].push(handler);
  }
 // 移除事件
 off(eventType: string, handler: Function) {
  if (this.subs[eventType]) {
    this.subs[eventType] = this.subs[eventType].filter(h => h !== handler);
  }
}
  // 触发事件
  emit(eventType: string, ...args: any[]) {
    if (this.subs[eventType]) {
      this.subs[eventType].forEach(handler => {
        handler(...args);
      });
    }
  }
}

export default new EventBus();

关于代码的解释:  

  1. 事件注册 (on):

    • on 方法允许其他组件注册对特定事件的处理函数。这些函数会在事件被触发时被调用。
    • subs 对象用于存储事件及其对应的处理函数数组。
  2. 事件注销 (off):

    • off 方法允许组件注销对特定事件的处理函数。这有助于避免内存泄漏,尤其是在组件卸载时。
  3. 事件触发 (emit):

    • emit 方法用于触发特定事件,并将参数传递给所有注册的处理函数。
    • 这使得不同组件能够在不相互依赖的情况下接收数据和通知。

EventBus 在 WebSocket 中的应用

        在使用 WebSocket 的情况下,EventBus 可以有效地将服务器发送的数据分发给应用程序的各个部分。例如,在接收到新数据时,可以使用 eventBus.emit('newMessage', data) 将数据发送到注册了 newMessage 事件的所有组件。这样,组件可以根据接收到的新数据更新其状态或显示相应的信息。

 新建一个文件myWebSocket.ts

import eventBus from './eventBus'

// 定义 WebSocket 消息类型
enum ModeCodeEnum {
  MSG = 'message', // 普通消息
}

class MyWebSocket {
  private websocket: WebSocket | null = null
  private reconnectTimer: NodeJS.Timeout | null = null // 断线重连定时器
  private isReconnect = false // 记录是否断线重连
  private webSocketState = false // 记录 WebSocket 连接状态
  private reconnectAttempts = 0 // 当前重连次数
  private maxReconnectAttempts = 3 // 最大重连次数

  constructor(private url: string) {}

  /**
   * 初始化 WebSocket 连接
   * @param isReconnect 是否断线重连
   */
  init(isReconnect = false) {
    this.isReconnect = isReconnect
    this.websocket = new WebSocket(this.url)

    // 绑定 WebSocket 事件
    this.websocket.onopen = this.openHandler.bind(this)
    this.websocket.onclose = this.closeHandler.bind(this)
    this.websocket.onmessage = this.messageHandler.bind(this)
    this.websocket.onerror = this.errorHandler.bind(this)
  }

   // 解析接收到的消息
   private getMessage(event: MessageEvent): any {
    try {
      return JSON.parse(event.data); // 直接返回解析后的数据
    } catch (error) {
      console.log('收到非JSON消息:', event.data);
      return event.data; // 如果解析失败,返回原始数据
    }
  }

  // 发送消息
  sendMessage(data: object) {
    try {
      this.websocket?.send(JSON.stringify(data))
    } catch (error) {
      console.log('发送消息失败:', error)
    }
  }

  // 连接成功后的回调函数
  private openHandler() {
    console.log('====onopen 连接成功====')
    eventBus.emit('changeBtnState', 'open')
    this.webSocketState = true
    this.reconnectAttempts = 0 // 重置重连次数
  }

  // 收到服务器数据后的回调函数
  // 收到服务器数据后的回调函数
private messageHandler(event: MessageEvent) {
  const { data } = this.getMessage(event); // 解析消息
  // console.log('收到的消息:', data); // 打印收到的消息
  
  // 直接通过 EventBus 传递数据给 Vue 组件
  eventBus.emit('newMessage', data);
}

  // 连接关闭后的回调函数
  private closeHandler() {
    console.log('====onclose websocket关闭连接====')
    eventBus.emit('changeBtnState', 'close')
    this.webSocketState = false
    this.reconnectWebSocket()
  }

  // 连接出错的回调函数
  private errorHandler() {
    console.log('====onerror websocket连接出错====')
    eventBus.emit('changeBtnState', 'close')
    this.webSocketState = false
    this.reconnectWebSocket()
  }

  // 重新连接
  private reconnectWebSocket() {
    if (!this.isReconnect) return
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.log('重连次数已达上限,停止重连')
      this.close()
      return
    }

    console.log(`尝试重新连接 WebSocket... (${this.reconnectAttempts + 1}/${this.maxReconnectAttempts})`)
    this.reconnectTimer = setTimeout(() => {
      eventBus.emit('reconnect')
      this.reconnectAttempts++
      this.init(true) // 重新初始化 WebSocket
    }, 5000) // 重连间隔
  }

  // 关闭 WebSocket 连接
  close() {
    this.websocket?.close()
    this.reconnectTimer && clearTimeout(this.reconnectTimer)
    this.websocket = null
  }
}

export default MyWebSocket

关键功能

  1. WebSocket 连接管理:
    init(isReconnect = false):初始化 WebSocket 连接,接受一个参数指示是否为断线重连。该方法绑定了 WebSocket 的各种事件处理程序。
  2. 消息解析:
    getMessage(event: MessageEvent):接收服务器发送的消息并尝试将其解析为 JSON 格式。如果解析失败,返回原始消息。这使得类能够处理不同类型的数据。
  3. 消息发送:
    sendMessage(data: object):将数据序列化为 JSON 格式并发送给服务器。这个方法包括错误处理,以确保消息发送的可靠性。
  4. 事件处理:

    openHandler():连接成功后调用,更新连接状态并通过 eventBus 通知应用程序  连接状态变化。

    messageHandler(event: MessageEvent):当接收到服务器数据时被调用,解析消息并通过 eventBus 将其传递给其他组件。

    closeHandler():连接关闭时调用,更新状态并尝试重新连接。

    errorHandler():连接出错时调用,同样尝试重新连接。

      5. 重连机制:
  • reconnectWebSocket():如果连接关闭并且设置了重连选项,该方法将尝试在设定的时间间隔内重新连接 WebSocket。重连次数有最大限制,以防止无限重连。

  • 6. 关闭连接:

  • close():关闭 WebSocket 连接并清理相关的定时器和状态。

vue组件中的应用 (测试)

新建webSocketDemo.vue文件 

<template>
  <div>
    <button @click="connectWebSocket">连接</button>
    <button @click="closeWebSocket" :disabled="!isConnected">关闭</button>
    <div v-if="receivedData">
      <h3>实时更新的数据:</h3>
      <p>值: {{ receivedData.value }}</p>
      <p>时间戳: {{ receivedData.timestamp }}</p>
    </div>
  </div>
</template>

<script lang="ts">
import {defineComponent, ref, onMounted, onBeforeUnmount} from 'vue'
import MyWebSocket from '@/hooks/myWebSocket'
import eventBus from '@/hooks/eventBus'

export default defineComponent({
  name: 'WebSocketComponent',
  setup() {
    const taskId = 'proceOpening1' // 根据具体的业务逻辑来设置 taskId
    // const wsUrl = `ws://192.168.2.127:8865/websocket/${taskId}`
    const wsUrl = `ws://192.168.2.127:8865/websocket`
    // console.log(wsUrl);
    
    let myWS: MyWebSocket | null = null
    const isConnected = ref(false)
    const receivedData = ref<any>(null)

    const setButtonState = (state: 'open' | 'close') => {
      isConnected.value = state === 'open'
    }

    const handleNewMessage = (data: any) => {
      receivedData.value = data
      console.log(data);
      
    }

    // 连接 WebSocket
    const connectWebSocket = () => {
      if (myWS) {
        console.log('WebSocket 连接已存在,不能重复连接。')
        return
      }
      myWS = new MyWebSocket(wsUrl)
      myWS.init(true) // 初始化 WebSocket

      eventBus.on('newMessage', handleNewMessage)
      eventBus.on('changeBtnState', setButtonState)
    }

    // 关闭 WebSocket
    const closeWebSocket = () => {
      if (myWS) {
        myWS.close()
        myWS = null
        setButtonState('close')
      }
    }

    onBeforeUnmount(() => {
      if (myWS) {
        myWS.close()
      }
    })

    return {
      connectWebSocket,
      closeWebSocket,
      isConnected,
      receivedData,
    }
  },
})
</script>

<style scoped>
button {
  margin: 5px;
}
</style>

后端java文件

新建一个webSocketServer222.java

package org.forbes.webSocket;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.forbes.biz.service.bi.IWorkshopWorkmanshipService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@ServerEndpoint("/websocket")
@Component
public class webSocketServer222 {
    private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>());
    private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
    private ObjectMapper objectMapper = new ObjectMapper(); // Jackson 用于转换对象为 JSON

    // 建立连接
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("WebSocket 连接已经建立。");
        sessions.add(session);
        startScheduledTask(); // 启动定时任务,向前端推送模拟数据
    }

    // 定时生成模拟数据并推送给前端
    private void startScheduledTask() {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                // 模拟数据,假设是一个简单的数字或对象
                Map<String, Object> response = new HashMap<>();
                response.put("data", generateMockData()); // 调用生成模拟数据的方法

                String jsonResponse = objectMapper.writeValueAsString(response);
                sendMessageToAllSessions(jsonResponse);
            } catch (JsonProcessingException e) {
                System.out.println("JSON 序列化错误:" + e.getMessage());
                e.printStackTrace();
            }
        }, 0, 4, TimeUnit.SECONDS); // 每4秒推送一次
    }

    // 模拟数据生成方法
    private Map<String, Object> generateMockData() {
        Random random = new Random();
        Map<String, Object> data = new HashMap<>();
        data.put("value", random.nextInt(100)); // 随机生成 0-100 的数字
        data.put("timestamp", new Date()); // 当前时间戳
        return data;
    }

    // 发送消息到所有连接的客户端
    private void sendMessageToAllSessions(String message) {
        sessions.forEach(s -> {
            if (s.isOpen()) {
                try {
                    s.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    System.out.println("发送消息时出错:" + e.getMessage());
                    e.printStackTrace();
                }
            }
        });
    }

    // 关闭连接
    @OnClose
    public void onClose(Session session) {
        System.out.println("WebSocket 连接已经关闭。");
        sessions.remove(session);
        if (sessions.isEmpty()) {
            stopScheduledTask(); // 如果没有连接,停止定时任务
        }
    }

    // 停止定时任务
    private void stopScheduledTask() {
        scheduler.shutdown(); // 或者 scheduler.shutdownNow();
        System.out.println("定时任务已停止。");
    }

    // 连接出错
    @OnError
    public void onError(Session session, Throwable t) {
        System.out.println("WebSocket 连接出现错误:" + t.getMessage());
        t.printStackTrace();
    }

    // 收到客户端消息
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("收到客户端消息:" + message);
        // 可以在这里处理客户端消息
    }
}

 

 

前端业务代码(只展示部分 提供个思路 我的工作需求是实现第一次使用 Axios 请求数据,然后使用 WebSocket 实时刷新数据 根据自己业务逻辑 进行修改)

import MyWebSocket from '@/hooks/myWebSocket'
import eventBus from '@/hooks/eventBus'
let myWS: MyWebSocket | null = null
const getProceCollecAllData = async () => {
  try {
    const res = await reqProceOpening1()
    if (res && res.code === 200) {
      const result = res.result
      // 放入自己的请求逻辑
      // 初始化 WebSocket 连接
      initWebSocket()
    }
  } catch (error) {
    return Promise.reject(error)
  }
}
// 使用 WebSocket 实时刷新数据
const initWebSocket = () => {
  if (myWS) return // 防止重复连接
  const taskId = 'proceOpening1' // 根据具体的业务逻辑来设置 taskId
  const wsUrl = `ws://XXX/websocket/${taskId}`
  myWS = new MyWebSocket(wsUrl)
  myWS.init(true)
  console.log('WebSocket 已连接')

  // 监听 WebSocket 消息
  eventBus.on('newMessage', (data: any) => {
    // console.log(data, '接收到的数据')

    // 处理 webSocketData1 的数据
    if (data.webSocketData1) {
      
    }

    // 处理 webSocketData2 的数据
    if (data.webSocketData2) {
     
    }
  })
}

onMounted(() => {
  getProceCollecAllData()
})

onUnmounted(() => {
  if (myWS) {
    myWS.close()
  }
})

 

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐