import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
public class SseEmitterUtil {
* 使用 map 对象缓存userId
private static Map
sseEmitterMap = new ConcurrentHashMap<>();
* 创建连接
* @param userId 用户ID
* @return SseEmitter
public static SseEmitter connect(String userId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(1L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
// 缓存
sseEmitterMap.put(userId, sseEmitter);
log.info("创建新的sse连接,当前用户:{}", userId);
return sseEmitter;
* 移除用户连接
public static void removeUser(String userId) {
sseEmitterMap.remove(userId);
log.info("移除用户:{}", userId);
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("结束连接:{}", userId);
removeUser(userId);
private static Runnable timeoutCallBack(String userId) {
return () -> {
log.info("连接超时:{}", userId);
removeUser(userId);
private static Consumer errorCallBack(String userId) {
return throwable -> {
log.info("连接异常:{}", userId);
removeUser(userId);
package com.webstockdemo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Random;
@RestController
@Slf4j
public class WebStockController {
* 用于创建连接
@CrossOrigin
@GetMapping("/sse/connect/{userId}")
public SseEmitter connect(@PathVariable String userId) throws IOException {
SseEmitter connect = SseEmitterUtil.connect(userId);
connect.send(new Random().nextInt());
return connect;
* 关闭连接
@CrossOrigin
@GetMapping("/sse/close/{userid}")
public String close(@PathVariable("userid") String userid) {
SseEmitterUtil.removeUser(userid);
return "连接关闭";
jsp 页面代码
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<title>消息推送</title>
</head>
<button onclick="closeSse()">关闭连接</button>
<div id="message"></div>
</body>
<script>
let source = null;
const userId = new Date().getTime();
if (window.EventSource) {
// 建立连接
source = new EventSource('http://localhost:8080/sse/connect/' + userId);
* 触发open事件
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立连接。。。");
}, false);
* 客户端收到服务器发来的数据
// 监听消息并打印
source.onmessage = function (evt) {
console.log("message", evt.data, evt)
source.addEventListener("me", function (evt) {
console.log("event", evt.data)
// 事件流如果不关闭会自动刷新请求,所以我们需要根据条件手动关闭
// if (evt.data == 3) {
// source.close();
setMessageInnerHTML(evt.data);
* 触发error事件
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log("服务器异常:"+JSON.stringify(e));
}, false);
} else {
setMessageInnerHTML("你的浏览器不支持SSE");
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
// closeSse();
// 关闭Sse连接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', 'http://localhost:8080/sse/close/' + userId, true);
httpRequest.send();
console.log("close");
// 将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
</script>
</html>
带有Spring Boot Reactive + Kafka +服务器已发送事件+ Cassandra的流。
使用服务器发送事件和Kafka的HTTP流,通过Spring Boot Reactive和Reactor Kafka实现。 此外,还使用Spring Data将数据存储在Cassandra中。
用户通过Javascript(EventSource)连接到SSE(服务器发送事件)的服务以监听事件(来自kafka)。
用户与发送HTTP请求并生成事件(到kafka)并存储在Cassanda中的服务进行交互。
快速开始。
git clone https://github.com/renatoaguimaraes/spring-reactive-kafka-sse.git spring-reactive-kafka-sse
cd ./spring-reactive-kafka-s
#PHP SSE客户端
(从Python SSE客户端移植: : )
这是一个PHP客户端库,用于遍历http服务器发送事件(SSE)流(在浏览器内部Javascript接口的名称之后,也称为EventSource)。
用法示例:
$ client = new SseClient \ Client ( 'https://eventsource.firebaseio-demo.com/.json' );
// returns generator
$ events = $ client -> getEvents ();
// blocks until new event arrive
foreach ( $ events as $ event ) {
print_r ( $ event );
cd ~/Library/Application\ Support/TextMate/Bundles/
git clone https://github.com/nanoant/assembly.tmbundle.git
这个包是在 MIT 许可下发布的:
特此授予任何人免费获得本软件副本和相关文档文件(“软件”)的许可,不受限制地处理本软件,包括但不限于使用、复制、修改、合并的权利、发布、分发、再许可和/或出售软件的副本,并允许向其提供软件的人员这样做,但须符合以下条件:
上述版权声明和本许可声明应包含在软件的所有副本或重要部分中。
本软件按“原样”提供,不提供任何形式的明示或暗示的保证,包括但不限于适销性、特定用途的适用性和不侵权的
PHP SSE:服务器发送的事件
一个简单高效的库通过PHP实现了HTML5的服务器发送的事件,用于将事件从服务器实时推送到客户端,并且比Websocket更容易,而不是AJAX请求。
PHP 5.4或更高版本
通过Composer( )安装
composer require " hhxsv5/php-sse:~2.0 " -vvv
运行PHP Web服务器
cd examples
php -S 127.0.0.1:9001 -t .
开启网址http://127.0.0.1:9001/index.html
Javascript示范
客户端:从服务器接收事件。
// withCredentials=true: pass the cross-domain cookies to server-side
const source = new EventSource (
手机端登录接口
生成PC端二维码接口
PC端监听二维码session状态接口,目前定义状态:0 二维码生成成功 ,1 手机端扫码成功 2手机端确认登录 -1 sessionId过期失效
二维码扫描通知,手机端扫描成功会调用此接口,发送session通知
手机端确认通知,手机端确认登录会调用此接口,发送确认登录通知
显示登录扫描二维码,使用base64编码显示二维码
二维码显示成功后,使用SSE方式开启二维码session监听状态,状态:0 二维码生成成功 ,1 手机端扫码成功 2手机端确认登录 -1 sessionId过期失效
调用登录接口,跳转到扫描二维码界面
扫描二维码,发送扫描通知
二维码有效,跳转到确认登录界面
在确认登录界面点击确认登录,发送确认登录通知
对于流式返回,Spring Boot提供了两种不同的方式:
1. 使用ResponseBodyEmitter:ResponseBodyEmitter是一种异步的,基于流的响应机制,可以实现将大量数据分批次发送给客户端。
2. 使用SseEmitter:SseEmitter是Server-Sent Events(SSE)的一种实现,使用起来也非常简单,可以用于向客户端推送实时事件、消息和数据。
同时,Spring Boot还支持使用reactive编程模型,通过WebFlux框架提供了响应式编程的能力。使用WebFlux,你可以将响应作为响应式流进行处理,并在需要的时候推送数据到客户端。