java使用sse(Server-Sent Events)实现实时数据推送,用于监控,通知,实时统计
【代码】java使用sse(Server-Sent Events)实现实时数据推送,用于监控,通知,实时统计。
·
一、代码
package org.springblade.modules.api.controller;
import org.springblade.modules.api.entity.Auth;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
/**
* 该接口是一个轻量级的实时数据推送服务,适用于需要服务端主动向多个客户端同步数据的场景(如监控、通知、协作工具等)。通过 SSE 技术,客户端可以高效接收实时更新,而服务端能集中管理数据的分发。
*/
@RestController
@RequestMapping("/test")
@CrossOrigin(origins = "*")
public class TestController {
private static final int MAX_CONNECTIONS = 1000;
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
@GetMapping("/stream")
public SseEmitter streamData() {
if (emitters.size() >= MAX_CONNECTIONS) {
return null; // 拒绝新的连接
}
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitters.add(emitter);
emitter.onCompletion(() -> emitters.remove(emitter));
emitter.onTimeout(() -> {
emitter.complete();
emitters.remove(emitter);
});
emitter.onError((e) -> {
emitter.completeWithError(e);
emitters.remove(emitter);
});
return emitter;
}
private void sendToAllClients(Auth data) {
List<SseEmitter> deadEmitters = new ArrayList<>();
emitters.forEach(emitter -> {
try {
emitter.send(SseEmitter.event()
.data(data)
.reconnectTime(3000));
} catch (IOException e) {
deadEmitters.add(emitter);
emitter.completeWithError(e);
}
});
emitters.removeAll(deadEmitters);
}
@PostMapping("/data")
public ResponseEntity<String> saveData(@RequestBody Auth data) {
System.out.println("Received data: " + data);
/**
* 处理逻辑,每天的数字加,比较前后2次数据
*/
data.setClientId(UUID.randomUUID().toString());
sendToAllClients(data);
return ResponseEntity.ok("Data saved and sent to clients");
}
}
二、post测试
建立连接
头部需要加参数:
Accept:text/event-stream
发送数据
封装的工具类
package org.springblade.modules.api.utils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.utils.CommonUtil;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.utils.Func;
import org.springblade.modules.api.entity.FlowMeterDetail;
import org.springblade.modules.api.entity.LedgerAlgalMudTransport;
import org.springblade.modules.api.mapper.LedgerAlgalMudTransportMapper;
import org.springblade.modules.api.vo.AppletHomeVO;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* sse推送工具
*/
@Slf4j
@Service
@AllArgsConstructor
public class SseEmitterUtils {
//最大连接数
private static final int MAX_CONNECTIONS = 1000;
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
/* // 新增:心跳调度器
private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
// 新增:初始化心跳
@PostConstruct
private void init() {
// 每5秒发送一次心跳(可根据需要调整间隔)
heartbeatScheduler.scheduleAtFixedRate(() -> {
try {
this.sendHeartbeat();
} catch (Exception e) {
log.error("心跳发送异常", e);
}
}, 5, 5, TimeUnit.SECONDS);
}
// 新增:心跳发送方法
private void sendHeartbeat() {
if (emitters.isEmpty()) return;
log.debug("发送SSE心跳包,当前连接数:{}", emitters.size());
this.sendToAllClients("发送SSE心跳包,当前连接数:" + emitters.size());
}*/
public SseEmitter streamData() {
if (emitters.size() >= MAX_CONNECTIONS) {
return null; // 拒绝新的连接
}
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
emitters.add(emitter);
emitter.onCompletion(() -> emitters.remove(emitter));
emitter.onTimeout(() -> {
emitter.complete();
emitters.remove(emitter);
});
emitter.onError((e) -> {
emitter.completeWithError(e);
emitters.remove(emitter);
});
return emitter;
}
/**
* 推送数据给前端
*
* @param data
*/
private void sendToAllClients(Object data) {
List<SseEmitter> deadEmitters = new ArrayList<>();
emitters.forEach(emitter -> {
try {
emitter.send(SseEmitter.event()
.data(data)
.reconnectTime(3000));
} catch (IOException e) {
deadEmitters.add(emitter);
emitter.completeWithError(e);
}
});
emitters.removeAll(deadEmitters);
}
public void sendDate(Object data, String name) {
log.info(name + "推送sse数据:" + Func.toJson(data));
sendToAllClients(R.data(data));
}
}
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐



所有评论(0)