一、代码

package org.springblade.modules.api.controller;

import org.springblade.modules.api.entity.Auth;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;

/**
 * 该接口是一个轻量级的实时数据推送服务,适用于需要服务端主动向多个客户端同步数据的场景(如监控、通知、协作工具等)。通过 SSE 技术,客户端可以高效接收实时更新,而服务端能集中管理数据的分发。
 */
@RestController
@RequestMapping("/test")
@CrossOrigin(origins = "*")
public class TestController {

    private static final int MAX_CONNECTIONS = 1000;
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();

    @GetMapping("/stream")
    public SseEmitter streamData() {
        if (emitters.size() >= MAX_CONNECTIONS) {
            return null; // 拒绝新的连接
        }
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        emitters.add(emitter);

        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> {
            emitter.complete();
            emitters.remove(emitter);
        });
        emitter.onError((e) -> {
            emitter.completeWithError(e);
            emitters.remove(emitter);
        });

        return emitter;
    }

    private void sendToAllClients(Auth data) {
        List<SseEmitter> deadEmitters = new ArrayList<>();
        emitters.forEach(emitter -> {
            try {
                emitter.send(SseEmitter.event()
                        .data(data)
                        .reconnectTime(3000));
            } catch (IOException e) {
                deadEmitters.add(emitter);
                emitter.completeWithError(e);
            }
        });
        emitters.removeAll(deadEmitters);
    }

    @PostMapping("/data")
    public ResponseEntity<String> saveData(@RequestBody Auth data) {
        System.out.println("Received data: " + data);
        /**
         * 处理逻辑,每天的数字加,比较前后2次数据
         */
        data.setClientId(UUID.randomUUID().toString());
        sendToAllClients(data);
        return ResponseEntity.ok("Data saved and sent to clients");
    }
}

二、post测试

建立连接

头部需要加参数:
Accept:text/event-stream

在这里插入图片描述

发送数据
在这里插入图片描述

封装的工具类

package org.springblade.modules.api.utils;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.utils.CommonUtil;
import org.springblade.core.tool.api.R;
import org.springblade.core.tool.utils.Func;
import org.springblade.modules.api.entity.FlowMeterDetail;
import org.springblade.modules.api.entity.LedgerAlgalMudTransport;
import org.springblade.modules.api.mapper.LedgerAlgalMudTransportMapper;
import org.springblade.modules.api.vo.AppletHomeVO;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * sse推送工具
 */
@Slf4j
@Service
@AllArgsConstructor
public class SseEmitterUtils {
    //最大连接数
    private static final int MAX_CONNECTIONS = 1000;
    private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
   /* // 新增:心跳调度器
    private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();

    // 新增:初始化心跳
    @PostConstruct
    private void init() {
        // 每5秒发送一次心跳(可根据需要调整间隔)
        heartbeatScheduler.scheduleAtFixedRate(() -> {
            try {
                this.sendHeartbeat();
            } catch (Exception e) {
                log.error("心跳发送异常", e);
            }
        }, 5, 5, TimeUnit.SECONDS);
    }

    // 新增:心跳发送方法
    private void sendHeartbeat() {
        if (emitters.isEmpty()) return;

        log.debug("发送SSE心跳包,当前连接数:{}", emitters.size());
        this.sendToAllClients("发送SSE心跳包,当前连接数:" + emitters.size());
    }*/

    public SseEmitter streamData() {
        if (emitters.size() >= MAX_CONNECTIONS) {
            return null; // 拒绝新的连接
        }
        SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
        emitters.add(emitter);

        emitter.onCompletion(() -> emitters.remove(emitter));
        emitter.onTimeout(() -> {
            emitter.complete();
            emitters.remove(emitter);
        });
        emitter.onError((e) -> {
            emitter.completeWithError(e);
            emitters.remove(emitter);
        });
        return emitter;
    }

    /**
     * 推送数据给前端
     *
     * @param data
     */
    private void sendToAllClients(Object data) {
        List<SseEmitter> deadEmitters = new ArrayList<>();
        emitters.forEach(emitter -> {
            try {
                emitter.send(SseEmitter.event()
                        .data(data)
                        .reconnectTime(3000));
            } catch (IOException e) {
                deadEmitters.add(emitter);
                emitter.completeWithError(e);
            }
        });
        emitters.removeAll(deadEmitters);
    }

    public void sendDate(Object data, String name) {
        log.info(name + "推送sse数据:" + Func.toJson(data));
        sendToAllClients(R.data(data));
    }


}

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐