没有简单,通用的方法来以可接受的性能在Web应用程序中实现
服务器到客户端的
异步通信。
HTTP是
客户端-服务器
计算模型中的请求-响应协议。为了开始交换,客户端向服务器提交请求。为了完成交换,服务器将响应返回给客户端。在HTTP协议中,
客户端
是消息交换的发起者。
在某些情况下,
服务器
应该是发起者
。实现此目的的方法之一是允许服务器将消息推送到
发布/订阅
计算模型中的客户端。
服务器发送事件(SSE)是一种用于为
特定
Web应用程序实现异步
服务器到客户端
通信的
简单
技术。
有几种技术可以使客户端从服务器接收有关异步更新的消息。它们可以分为两类:
客户端
请求
和
服务器请求
。
在
客户
端请求技术中,客户端会定期请求服务器进行更新。服务器可以响应更新,也可以响应尚未更新的特殊响应。
客户端请求
有两种类型:
短轮询
和
长轮询
。
客户端定期向服务器发送请求。如果服务器有更新,它将向客户端发送响应并关闭连接。如果服务器没有更新,它将向客户端发送特殊响应,并关闭连接。
比如客户端使用 ajax 定时执行。
长时间轮询
客户端向服务器发送请求。如果服务器有更新,它将向客户端发送响应并关闭连接。如果服务器没有更新,它将保持连接,直到有可用的更新为止。当有可用更新时,服务器将响应发送到客户端并关闭连接。如果更新在某些超时时间内不可用,则服务器会向客户端发送特殊响应,并关闭连接。
服务器推送
在
服务器推送
技术中,服务器在客户端可用后立即主动向客户端发送消息。其中,
服务器推送
有两种类型:
服务器发送事件(SSE)
和
WebSocket
。
服务器发送的事件(SSE)
服务器发送的事件是一种仅从服务器向基于浏览器的Web应用程序中的客户端发送文本消息的技术。服务器发送的事件基于HTTP协议中的
持久连接
。服务器发送的事件具有W3C
标准化
的网络协议和EventSource客户端接口,作为HTML5标准套件的一部分。
WebSocket
WebSocket是一种在Web应用程序中实现同时,双向,实时通信的技术。WebSocket基于除HTTP之外的协议,因此它可能需要对网络基础架构(代理服务器,NAT,防火墙等)进行额外的设置。但是,WebSocket可以提供使用基于HTTP的技术难以实现的性能。
SSE网络协议
要订阅服务器事件,客户端应
GET
使用标头进行请求:
-
Accept: text/event-stream
指示标准要求的事件的
媒体类型
-
Cache-Control: no-cache
禁用所有事件缓存
-
Connection: keep-alive
指示正在使用
持久连接
GET /sse HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
服务器应使用以下标头的响应来确认订阅:
Content-Type: text/event-stream;charset=UTF-8
指示标准要求的媒体类型和事件编码Transfer-Encoding: chunked
表示服务器流式传输动态生成的内容,因此事先不知道内容大小
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
订阅后,服务器将在消息可用后立即发送消息。事件是UTF-8
编码的文本消息。事件之间用两个换行符分隔\n\n
。每个事件都包含一个或多个name: value
字段,以单个换行符分隔\n
。
在该data
字段中,服务器可以发送事件数据。
data: The first event.
data: The second event.
服务器可以data
通过一个换行符将字段分成几行\n
。
data: The third
data: event.
id
服务器可以在该字段中发送唯一的事件标识符。如果连接断开,客户端应自动重新连接并发送id
带有header 的最后一个接收到的事件Last-Event-ID
。
id: 1
data: The first event.
id: 2
data: The second event.
event
服务器可以在该字段中发送事件类型。服务器可以在同一预订中发送不同类型的事件,也可以不发送任何类型的事件。
event: type1
data: An event of type1.
event: type2
data: An event of type2.
data: An event without any type.
retry
服务器可以在该字段中发送超时(以毫秒为单位),在此之后客户端应在连接断开时自动重新连接。如果未指定此字段,则标准值为3000毫秒。
retry: 1000
如果一行以冒号开头:
,则客户端应将其忽略。这可用于从服务器发送注释或防止某些代理服务器因超时而关闭连接。
: ping
SSE客户端:EventSource界面
要打开连接,应创建一个EventSource
对象。
var eventSource = new EventSource('/sse);
尽管服务器发送事件旨在将事件从服务器发送到客户端,但是仍然可以使用GET
查询参数将数据从客户端传递到服务器。
var eventSource = new EventSource('/sse?event=type1);
eventSource.close();
eventSource = new EventSource('/sse?event=type1&event=type2);
要关闭连接,应将其称为method close()
。
eventSource.close();
有readyState
一个表示连接状态的属性:
EventSource.CONNECTING = 0
-尚未建立连接,或者连接已关闭并且客户端正在重新连接EventSource.OPEN = 1
-客户端具有打开的连接,并在接收事件时处理事件EventSource.CLOSED = 2
-连接未打开,并且客户端没有尝试重新连接,或者出现致命错误或close()
调用了该方法
要处理连接的建立,应将其预订给onopen
事件处理程序。
eventSource.onopen = function () {
console.log('connection is established');
要处理连接状态中的某些更改或致命错误,应将其预订给onerrror
事件处理程序。
eventSource.onerror = function (event) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
要处理没有该event
字段的接收事件,应将其预订给onmessage
事件处理程序。
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
要使用该event
字段处理接收事件,应为该事件订阅事件处理程序。
eventSource.addEventListener('type1', function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
}, false);
SSE Java服务器:Spring Web MVC
Spring Web MVC框架5.2.0基于Servlet 3.1 API,并使用线程池来实现异步Java Web应用程序。此类应用程序可以在Servlet 3.1+容器(例如Tomcat 8.5和Jetty 9.3)上运行。
要使用Spring Web MVC框架实现发送事件:
- 创建一个控制器类,并用
@RestController
注释对其进行标记 - 创建一个创建客户端连接的方法,该方法返回SseEmitter,处理
GET
请求并产生text/event-stream
- 创建一个new
SseEmitter
,以保存它并从方法中返回它
- 在另一个线程中异步发送事件,获取已保存的内容,
SseEmitter
并SseEmitter.send
根据需要多次调用方法
- 完成发送事件,请调用SseEmitter.complete()方法
- 要完成特殊的事件发送,请调用SseEmitter.completeWithError()方法
简化的控制器源:
@RestController
public class SseWebMvcController
private SseEmitter emitter;
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter createConnection() {
emitter = new SseEmitter();
return emitter;
// in another thread
void sendEvents() {
try {
emitter.send("Alpha");
emitter.send("Omega");
emitter.complete();
} catch(Exception e) {
emitter.completeWithError(e);
若要仅使用data
字段发送事件,应使用SseEmitter.send(Object object)方法。要发送的事件与领域data
,id
,event
,retry
和意见,应当使用 SseEmitter.send(SseEmitter.SseEventBuilder建设者)方法。
在下面的示例中,为了将相同的事件发送给许多客户端,实现了SseEmitters类。要创建客户端连接,有一种add(SseEmitter emitter)
方法将a保存SseEmitter
在线程安全的容器中。为了异步发送事件,有一种send(Object obj)
方法可以将相同的事件发送到所有连接的客户端。
简化的类源:
class SseEmitters {
private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
SseEmitter add(SseEmitter emitter) {
this.emitters.add(emitter);
emitter.onCompletion(() -> {
this.emitters.remove(emitter);
emitter.onTimeout(() -> {
emitter.complete();
this.emitters.remove(emitter);
return emitter;
void send(Object obj) {
List<SseEmitter> failedEmitters = new ArrayList<>();
this.emitters.forEach(emitter -> {
try {
emitter.send(obj);
} catch (Exception e) {
emitter.completeWithError(e);
failedEmitters.add(emitter);
this.emitters.removeAll(failedEmitters);
处理持久性周期性事件流
在此示例中,服务器每秒发送一次持续时间很短的定期事件流-有限的单词流(快速的棕色狐狸跳过懒惰的狗 pangram),直到单词完成为止。
为了实现这一点,使用了提到的SseEmitters类。为了异步并定期发送事件,已创建了一个缓存的线程池。因为事件流是持久的,所以每个客户端连接都在controller方法内部将单独的任务提交给线程池。
简化的控制器源:
@Controller
@RequestMapping("/sse/mvc")
public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getWords() {
SseEmitter emitter = new SseEmitter();
cachedThreadPool.execute(() -> {
try {
for (int i = 0; i < WORDS.length; i++) {
emitter.send(WORDS[i]);
TimeUnit.SECONDS.sleep(1);
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
return emitter;
具有EventSource
JavaScript客户端的事件客户端源。
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<title>Server-Sent Events client example with EventSource</title>
</head>
<script>
if (window.EventSource == null) {
alert('The browser does not support Server-Sent Events');
} else {
var eventSource = new EventSource('/sse/mvc/words');
eventSource.onopen = function () {
console.log('connection is established');
eventSource.onerror = function (error) {
console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
eventSource.onmessage = function (event) {
console.log('id: ' + event.lastEventId + ', data: ' + event.data);
if (event.data.endsWith('.')) {
eventSource.close();
console.log('connection is closed');
</script>
</body>
</html>
EventSource
浏览器中带有JavaScript客户端的事件客户端示例。在客户端使用了自动重新连接,在服务器端使用了已实现的重新连接。
处理持久的周期性事件
在此示例中,服务器发送持久的周期性事件流-服务器性能信息的每秒潜在无限流:
- 承诺的虚拟内存大小
- 交换空间总大小
- 可用交换空间大小
- 物理内存总大小
- 可用物理内存大小
- 系统CPU负载
- 处理CPU负载
为了实现此目的,实现了PerformanceService类,该类使用OperatingSystemMXBean类从操作系统读取性能信息。还使用了提到的SseEmitters类。为了异步并定期发送事件,已创建了计划的线程池。因为事件流是持久的,所以将单个任务提交到线程池以将事件同时发送到所有客户端。
简化的控制器示例:
@RestController
@RequestMapping("/sse/mvc")
public class PerformanceController {
private final PerformanceService performanceService;
PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
private final AtomicInteger id = new AtomicInteger();
private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
private final SseEmitters emitters = new SseEmitters();
@PostConstruct
void init() {
scheduledThreadPool.scheduleAtFixedRate(() -> {
emitters.send(performanceService.getPerformance());
}, 0, 1, TimeUnit.SECONDS);
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getPerformance() {
return emitters.add();
处理非周期性事件
在此示例中,服务器发送有关正在监视的文件夹中的文件更改(创建,修改,删除)的非定期事件流。使用该文件夹后,System.getProperty("user.home")
属性将提供当前用户的主文件夹。
为了实现此目的,实现了FolderWatchService类,该类使用Java NIO文件监视功能。还使用了提到的SseEmitters类。为了异步和非周期性地发送事件,FolderWatchService类将生成Spring应用程序事件,这些事件由控制器(通过实现侦听器方法)消耗。
一个简化的服务器示例:
@RestController
@RequestMapping("/sse/mvc")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService;
FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
private final SseEmitters emitters = new SseEmitters();
@PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
SseEmitter getFolderWatch() {
return emitters.add(new SseEmitter());
@Override
public void onApplicationEvent(FolderChangeEvent event) {
emitters.send(event.getEvent());
SSE Java服务器:Spring Web Flux
Spring Web Flux框架5.2.0基于Reactive Streams API,并使用事件循环计算模型来实现异步Java Web应用程序。这样的应用可以在非阻挡的Web服务器等的Netty 4.1和1.4暗流运行和上的Servlet容器3.1+如Tomcat 8.5和9.3码头。
要使用Spring Web Flux框架实现发送事件:
- 创建一个控制器类,并用
@RestController
注释对其进行标记 - 创建一个创建客户端连接并发送事件的方法,该方法返回Flux,处理
GET
请求并产生text/event-stream
- 创建一个新的
Flux
并从方法中返回它
简化的控制器源:
@RestController
public class ExampleController
@GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> createConnectionAndSendEvents() {
return Flux.just("Alpha", "Omega");
要仅使用data
字段发送事件,应使用该Flux<T>
类型。要发送的事件与领域data
,id
,event
,retry
和意见,应当使用的Flux<ServerSentEvent<T>>
类型。
处理持久性周期性事件流
在此示例中,服务器每秒发送一次持续时间很短的定期事件流-有限的单词流(快速的棕色狐狸跳过懒惰的狗 pangram),直到单词完成为止。
要实现这一点:
- 创建类型为a
Flux
的单词Flux.just(WORDS)
Flux<String>
- 创建一个 类型每秒
Flux
发出递增long
值Flux.interval(Duration.ofSeconds(1))
的Flux<Long>
- 通过
zip
方法将它们组合在一起键入Flux<Tuple2<String,Long>>
- 通过
map(Tuple2::getT1)
类型提取元组的第一个元素Flux<String>
简化的控制器源:
@RestController
@RequestMapping("/sse/flux")
public class WordsController {
private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
@GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<String> getWords() {
return Flux
.zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1);
此示例的事件客户端与Web MVC示例中使用的事件客户端相同。
处理持久的周期性事件
在此示例中,服务器发送持久的周期性事件流-服务器性能信息的每秒潜在无限流。
要实现这一点:
- 创建一个类型每秒
Flux
发出递增long
值Flux.interval(Duration.ofSeconds(1))
的Flux<Long>
- 通过
map(sequence -> performanceService.getPerformance())
方法将其转换为类型Flux<Performance>
简化的控制器示例:
@RestController
@RequestMapping("/sse/flux")
public class PerformanceController {
private final PerformanceService performanceService;
PerformanceController(PerformanceService performanceService) {
this.performanceService = performanceService;
@GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<Performance> getPerformance() {
return Flux
.interval(Duration.ofSeconds(1))
.map(sequence -> performanceService.getPerformance());
此示例的事件客户端与Web MVC示例中使用的事件客户端相同。
处理非周期性事件
在此示例中,服务器发送有关正在监视的文件夹中的文件更改(创建,修改,删除)的非定期事件流。使用该文件夹后,System.getProperty("user.home")
属性将提供当前用户的主文件夹。
为了实现此目的,实现了FolderWatchService类,该类使用Java NIO文件监视功能。为了异步和非周期性地发送事件,FolderWatchService类将生成Spring应用程序事件,这些事件由控制器(通过实现侦听器方法)消耗。控制器侦听器方法将事件发送到SubscribableChannel
,在控制器方法中订阅该Flux
事件以产生事件。
简化的控制器示例:
@RestController
@RequestMapping("/sse/flux")
public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
private final FolderWatchService folderWatchService;
FolderWatchController(FolderWatchService folderWatchService) {
this.folderWatchService = folderWatchService;
private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();
@PostConstruct
void init() {
folderWatchService.start(System.getProperty("user.home"));
@GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<FolderChangeEvent.Event> getFolderWatch() {
return Flux.create(sink -> {
MessageHandler handler = message -> sink.next(FolderChangeEvent.class.cast(message.getPayload()).getEvent());
sink.onCancel(() -> subscribableChannel.unsubscribe(handler));
subscribableChannel.subscribe(handler);
}, FluxSink.OverflowStrategy.LATEST);
@Override
public void onApplicationEvent(FolderChangeEvent event) {
subscribableChannel.send(new GenericMessage<>(event));
此示例的事件客户端与Web MVC示例中使用的事件客户端相同。
SSE限制
SSE 在设计上有局限性:
- 从服务器到客户端只能在一个方向上发送消息
- 可以只发送短信;尽管可以使用
Base64
编码和gzip
压缩来发送二进制消息,但效率低下。
但是SSE 在实施方面也存在局限性 :
- Internet Explorer / Edge和许多移动浏览器不支持SSE。尽管可以使用polyfills,但效率低下
- 许多浏览器允许打开数量非常有限的SSE连接(对于Chrome,Firefox,每个浏览器最多支持6个连接)
原文链接:https://github.com/aliakh/demo-spring-sse
如果有人购买商品,后台实时推送一个消息到用户。如果用websocket来做还需要写websocket服务器和配置。用轮询,会影响服务器性能,还达不到实时效果。有没有别的选择呢?当然有!
一、简介:Server-Sent 事件指的是网页自动获取来自服务器的更新。
二、局限性:
1.WebSocket 比 SSE 更强大,Websocket 在客户端和服务器之间建立了双向的实时通信。而 SSE 只支持从服务器到客户端的单向实时通信。
2.WebSocket 在浏览器方面支持更全面,IE / Edge 几
今天我们开始来学习下 WebFlux,为什么突然要学这个东西?
因为我之前是想学习 Spring Cloud Gateway 来着,然后发现它是基于 Spring5.0+SpringBoot2.0+WebFlux等技术开发的。所以学之前才要来简单了解下 WebFlux 技术。
然后要学习 WebFlux 时我发现又需要 Java 8 中的函数式编程、Stream 流等技术作为前置知识。环环相扣啊,套娃一样。
所以前面还有两篇学习的文章:来系统学习下 lambda 表达式吧和来一起学习下 Java 8 的 S
适用场景:异步非阻塞。
一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;
一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。
响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(
【SpringBoot WEB系列】SSE 服务器发送事件详解
SSE 全称Server Sent Event,直译一下就是服务器发送事件,一般的项目开发中,用到的机会不多,可能很多小伙伴不太清楚这个东西,到底是干啥的,有啥用
本文主要知识点如下:
SSE 扫盲,应用场景分析
借助异步请求实现 sse 功能,加深概念理解
使用SseEmitter实现一个简单的推送示例
I. SSE 扫...
基于http协议交互的推送方法大概方法如下:
轮询(ajax),比较耗费服务器资源。COMET方式(COMET 技术并不是 HTML 5 )
websocket 双向数据推送,灵活,功能强大
Server-sent-event(简称SSE),单项数据推送(Server-sent Events 规范是 HTML 5 规范的一个组成部分)
这里我们研究一下SSE;
一、什么是SSE
Server-sent Events 规范是 HTML 5 规范的一个组成部分,具体的规范文档见参考资源。该规范比较简单,主要由两