vue3+ ts + Echarts大屏之WebSocket 实现数据实时刷新(有后端代码)
在现代 Web 应用程序中,WebSocket 是实现实时通信的重要技术。它允许客户端与服务器之间建立持久的双向连接,从而实现即时数据传输。为了处理 WebSocket 接收到的数据并将其传递到应用程序的各个部分,我们通常会使用一种设计模式——事件总线(EventBus)。
在现代 Web 应用程序中,WebSocket 是实现实时通信的重要技术。它允许客户端与服务器之间建立持久的双向连接,从而实现即时数据传输。为了处理 WebSocket 接收到的数据并将其传递到应用程序的各个部分,我们通常会使用一种设计模式——事件总线(EventBus)。
新建一个文件 eventBus.ts
type Handler = (...args: any[]) => void;
class EventBus {
// 消息中心,记录所有事件及其处理函数
private subs: Record<string, Function[]>;
constructor() {
// 初始化消息中心
this.subs = Object.create(null);
}
// 注册事件
on(eventType: string, handler: Function) {
this.subs[eventType] = this.subs[eventType] || [];
this.subs[eventType].push(handler);
}
// 移除事件
off(eventType: string, handler: Function) {
if (this.subs[eventType]) {
this.subs[eventType] = this.subs[eventType].filter(h => h !== handler);
}
}
// 触发事件
emit(eventType: string, ...args: any[]) {
if (this.subs[eventType]) {
this.subs[eventType].forEach(handler => {
handler(...args);
});
}
}
}
export default new EventBus();
关于代码的解释:
事件注册 (
on):
on方法允许其他组件注册对特定事件的处理函数。这些函数会在事件被触发时被调用。subs对象用于存储事件及其对应的处理函数数组。事件注销 (
off):
off方法允许组件注销对特定事件的处理函数。这有助于避免内存泄漏,尤其是在组件卸载时。事件触发 (
emit):
emit方法用于触发特定事件,并将参数传递给所有注册的处理函数。- 这使得不同组件能够在不相互依赖的情况下接收数据和通知。
EventBus 在 WebSocket 中的应用
在使用 WebSocket 的情况下,
EventBus可以有效地将服务器发送的数据分发给应用程序的各个部分。例如,在接收到新数据时,可以使用eventBus.emit('newMessage', data)将数据发送到注册了newMessage事件的所有组件。这样,组件可以根据接收到的新数据更新其状态或显示相应的信息。
新建一个文件myWebSocket.ts
import eventBus from './eventBus'
// 定义 WebSocket 消息类型
enum ModeCodeEnum {
MSG = 'message', // 普通消息
}
class MyWebSocket {
private websocket: WebSocket | null = null
private reconnectTimer: NodeJS.Timeout | null = null // 断线重连定时器
private isReconnect = false // 记录是否断线重连
private webSocketState = false // 记录 WebSocket 连接状态
private reconnectAttempts = 0 // 当前重连次数
private maxReconnectAttempts = 3 // 最大重连次数
constructor(private url: string) {}
/**
* 初始化 WebSocket 连接
* @param isReconnect 是否断线重连
*/
init(isReconnect = false) {
this.isReconnect = isReconnect
this.websocket = new WebSocket(this.url)
// 绑定 WebSocket 事件
this.websocket.onopen = this.openHandler.bind(this)
this.websocket.onclose = this.closeHandler.bind(this)
this.websocket.onmessage = this.messageHandler.bind(this)
this.websocket.onerror = this.errorHandler.bind(this)
}
// 解析接收到的消息
private getMessage(event: MessageEvent): any {
try {
return JSON.parse(event.data); // 直接返回解析后的数据
} catch (error) {
console.log('收到非JSON消息:', event.data);
return event.data; // 如果解析失败,返回原始数据
}
}
// 发送消息
sendMessage(data: object) {
try {
this.websocket?.send(JSON.stringify(data))
} catch (error) {
console.log('发送消息失败:', error)
}
}
// 连接成功后的回调函数
private openHandler() {
console.log('====onopen 连接成功====')
eventBus.emit('changeBtnState', 'open')
this.webSocketState = true
this.reconnectAttempts = 0 // 重置重连次数
}
// 收到服务器数据后的回调函数
// 收到服务器数据后的回调函数
private messageHandler(event: MessageEvent) {
const { data } = this.getMessage(event); // 解析消息
// console.log('收到的消息:', data); // 打印收到的消息
// 直接通过 EventBus 传递数据给 Vue 组件
eventBus.emit('newMessage', data);
}
// 连接关闭后的回调函数
private closeHandler() {
console.log('====onclose websocket关闭连接====')
eventBus.emit('changeBtnState', 'close')
this.webSocketState = false
this.reconnectWebSocket()
}
// 连接出错的回调函数
private errorHandler() {
console.log('====onerror websocket连接出错====')
eventBus.emit('changeBtnState', 'close')
this.webSocketState = false
this.reconnectWebSocket()
}
// 重新连接
private reconnectWebSocket() {
if (!this.isReconnect) return
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log('重连次数已达上限,停止重连')
this.close()
return
}
console.log(`尝试重新连接 WebSocket... (${this.reconnectAttempts + 1}/${this.maxReconnectAttempts})`)
this.reconnectTimer = setTimeout(() => {
eventBus.emit('reconnect')
this.reconnectAttempts++
this.init(true) // 重新初始化 WebSocket
}, 5000) // 重连间隔
}
// 关闭 WebSocket 连接
close() {
this.websocket?.close()
this.reconnectTimer && clearTimeout(this.reconnectTimer)
this.websocket = null
}
}
export default MyWebSocket
关键功能
WebSocket 连接管理:
init(isReconnect = false):初始化 WebSocket 连接,接受一个参数指示是否为断线重连。该方法绑定了 WebSocket 的各种事件处理程序。消息解析:
getMessage(event: MessageEvent):接收服务器发送的消息并尝试将其解析为 JSON 格式。如果解析失败,返回原始消息。这使得类能够处理不同类型的数据。消息发送:
sendMessage(data: object):将数据序列化为 JSON 格式并发送给服务器。这个方法包括错误处理,以确保消息发送的可靠性。事件处理:
openHandler():连接成功后调用,更新连接状态并通过eventBus通知应用程序 连接状态变化。
messageHandler(event: MessageEvent):当接收到服务器数据时被调用,解析消息并通过eventBus将其传递给其他组件。
closeHandler():连接关闭时调用,更新状态并尝试重新连接。
errorHandler():连接出错时调用,同样尝试重新连接。5. 重连机制:
reconnectWebSocket():如果连接关闭并且设置了重连选项,该方法将尝试在设定的时间间隔内重新连接 WebSocket。重连次数有最大限制,以防止无限重连。6. 关闭连接:
close():关闭 WebSocket 连接并清理相关的定时器和状态。
vue组件中的应用 (测试)
新建webSocketDemo.vue文件
<template>
<div>
<button @click="connectWebSocket">连接</button>
<button @click="closeWebSocket" :disabled="!isConnected">关闭</button>
<div v-if="receivedData">
<h3>实时更新的数据:</h3>
<p>值: {{ receivedData.value }}</p>
<p>时间戳: {{ receivedData.timestamp }}</p>
</div>
</div>
</template>
<script lang="ts">
import {defineComponent, ref, onMounted, onBeforeUnmount} from 'vue'
import MyWebSocket from '@/hooks/myWebSocket'
import eventBus from '@/hooks/eventBus'
export default defineComponent({
name: 'WebSocketComponent',
setup() {
const taskId = 'proceOpening1' // 根据具体的业务逻辑来设置 taskId
// const wsUrl = `ws://192.168.2.127:8865/websocket/${taskId}`
const wsUrl = `ws://192.168.2.127:8865/websocket`
// console.log(wsUrl);
let myWS: MyWebSocket | null = null
const isConnected = ref(false)
const receivedData = ref<any>(null)
const setButtonState = (state: 'open' | 'close') => {
isConnected.value = state === 'open'
}
const handleNewMessage = (data: any) => {
receivedData.value = data
console.log(data);
}
// 连接 WebSocket
const connectWebSocket = () => {
if (myWS) {
console.log('WebSocket 连接已存在,不能重复连接。')
return
}
myWS = new MyWebSocket(wsUrl)
myWS.init(true) // 初始化 WebSocket
eventBus.on('newMessage', handleNewMessage)
eventBus.on('changeBtnState', setButtonState)
}
// 关闭 WebSocket
const closeWebSocket = () => {
if (myWS) {
myWS.close()
myWS = null
setButtonState('close')
}
}
onBeforeUnmount(() => {
if (myWS) {
myWS.close()
}
})
return {
connectWebSocket,
closeWebSocket,
isConnected,
receivedData,
}
},
})
</script>
<style scoped>
button {
margin: 5px;
}
</style>
后端java文件
新建一个webSocketServer222.java
package org.forbes.webSocket;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.forbes.biz.service.bi.IWorkshopWorkmanshipService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ServerEndpoint("/websocket")
@Component
public class webSocketServer222 {
private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>());
private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
private ObjectMapper objectMapper = new ObjectMapper(); // Jackson 用于转换对象为 JSON
// 建立连接
@OnOpen
public void onOpen(Session session) {
System.out.println("WebSocket 连接已经建立。");
sessions.add(session);
startScheduledTask(); // 启动定时任务,向前端推送模拟数据
}
// 定时生成模拟数据并推送给前端
private void startScheduledTask() {
scheduler.scheduleAtFixedRate(() -> {
try {
// 模拟数据,假设是一个简单的数字或对象
Map<String, Object> response = new HashMap<>();
response.put("data", generateMockData()); // 调用生成模拟数据的方法
String jsonResponse = objectMapper.writeValueAsString(response);
sendMessageToAllSessions(jsonResponse);
} catch (JsonProcessingException e) {
System.out.println("JSON 序列化错误:" + e.getMessage());
e.printStackTrace();
}
}, 0, 4, TimeUnit.SECONDS); // 每4秒推送一次
}
// 模拟数据生成方法
private Map<String, Object> generateMockData() {
Random random = new Random();
Map<String, Object> data = new HashMap<>();
data.put("value", random.nextInt(100)); // 随机生成 0-100 的数字
data.put("timestamp", new Date()); // 当前时间戳
return data;
}
// 发送消息到所有连接的客户端
private void sendMessageToAllSessions(String message) {
sessions.forEach(s -> {
if (s.isOpen()) {
try {
s.getBasicRemote().sendText(message);
} catch (IOException e) {
System.out.println("发送消息时出错:" + e.getMessage());
e.printStackTrace();
}
}
});
}
// 关闭连接
@OnClose
public void onClose(Session session) {
System.out.println("WebSocket 连接已经关闭。");
sessions.remove(session);
if (sessions.isEmpty()) {
stopScheduledTask(); // 如果没有连接,停止定时任务
}
}
// 停止定时任务
private void stopScheduledTask() {
scheduler.shutdown(); // 或者 scheduler.shutdownNow();
System.out.println("定时任务已停止。");
}
// 连接出错
@OnError
public void onError(Session session, Throwable t) {
System.out.println("WebSocket 连接出现错误:" + t.getMessage());
t.printStackTrace();
}
// 收到客户端消息
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("收到客户端消息:" + message);
// 可以在这里处理客户端消息
}
}


前端业务代码(只展示部分 提供个思路 我的工作需求是实现第一次使用 Axios 请求数据,然后使用 WebSocket 实时刷新数据 根据自己业务逻辑 进行修改)
import MyWebSocket from '@/hooks/myWebSocket'
import eventBus from '@/hooks/eventBus'
let myWS: MyWebSocket | null = null
const getProceCollecAllData = async () => {
try {
const res = await reqProceOpening1()
if (res && res.code === 200) {
const result = res.result
// 放入自己的请求逻辑
// 初始化 WebSocket 连接
initWebSocket()
}
} catch (error) {
return Promise.reject(error)
}
}
// 使用 WebSocket 实时刷新数据
const initWebSocket = () => {
if (myWS) return // 防止重复连接
const taskId = 'proceOpening1' // 根据具体的业务逻辑来设置 taskId
const wsUrl = `ws://XXX/websocket/${taskId}`
myWS = new MyWebSocket(wsUrl)
myWS.init(true)
console.log('WebSocket 已连接')
// 监听 WebSocket 消息
eventBus.on('newMessage', (data: any) => {
// console.log(data, '接收到的数据')
// 处理 webSocketData1 的数据
if (data.webSocketData1) {
}
// 处理 webSocketData2 的数据
if (data.webSocketData2) {
}
})
}
onMounted(() => {
getProceCollecAllData()
})
onUnmounted(() => {
if (myWS) {
myWS.close()
}
})
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)