HTML5 Server Sent Events

Server-Sent Events(以下简称:SSE) 是 HTML5 标准中的一个API,它提供了一种服务器主动向浏览器推送数据的方式。

SSE 与 WebSocket 类似,都允许浏览器“订阅”服务器端的数据源,每当有新数据产生时,服务器就会发送通知给浏览器,以实时更新页面内容。

和 WebSocket 相比,SSE 更适合于只需要单向通信的场景,例如股票行情、新闻推送、社交媒体状态更新以及AI聊天机器人等。相反,WebSocket 提供了双向通信,适合需要双向交互的应用,如多人游戏、聊天室等。
由于 WebSocket 所使用的的是双向全双工的连接,所以需要浏览器和服务器支持 WebSocket 协议才能工作,而 SSE 是一种构建在 HTTP 协议之上的传输方式,也就是不需要支持额外的通信协议就可以直接使用 SSE,此外 SSE 还有一些 WebSocket 没有的特性,比如自动重连、事件 ID 和发送任意事件等,这些都是 SSE 的优点

为了方便验证下面的客户端示例代码,可以先克隆 https://github.com/y1j2x34/sse-server-example 项目到本地运行 SSE 服务。

浏览器端如何使用SSE

EventSource API

首先创建一个 EventSource 对象

const url = "http://localhost:10086/sse"
const source = new EventSource(url, {
	withCredentials: false
  • url: 表示远程资源的地址
  • withCredentials: 默认为 false, 表示跨域时,是否包含 credentials 凭据
  • EventSource 对象创建后,会立即发起一个 GET 请求,并且还自动带上了Accept: text/event-stream请求头。

    并且可以看到多出了一个 EventStream 标签页:

    当接收到数据时, message 事件就会被触发,并且我们可以通过 e.data 获取到从服务端发来的数据。

    source.addEventListener('message', e => {
    	console.log(e.data)
    

    另外还有链接建立和关闭的事件:

    source.addEventListener('open', e => {
    	console.log('已建立连接')
    source.addEventListener('error', e => {
    	if(sourece.readyState === EventSource.CLOSED) {
    		console.log('连接已关闭')
    

    sourece.readyState 有以下这些状态值:

  • EventSource.CONNECTING: 连接正在进行
  • EventSource.OPEN: 连接建立
  • EventSource.CLOSED: 连接关闭
  • 需要注意的是,SSE 连接中断或者服务器返回数据失败, 都会触发error事件, 而不是 close 事件。而且根据 SSE 的规范,连接因为某些原因中断后,它还会自动重连。重连的间隔时间可以在后端响应数据中指定。

    fetch API

    下面这张截图是使用 ChatGPT 过程中, 前端向后台发送的 SSE 请求:

    很容易发现,这是一个由 fetch API 发起的 POST 请求,因为请求列表上的 Type 是 'fetch', 而且 EventSource 并不能指定请求方法。接下来我们一起看一下如何通过 fetch API 读取 SSE 数据。

    首先,我们先实现一个 chunkIterator 方法, 实现这个方法的目的是后续从 ReadableStream 中读取数据只要通过 for await ... of 语法就可以实现:

    * @param {ReadableStream<Uint8Array>} readableStream async function* chunkIterator(readableStream) { const reader = readableStream.getReader(); while(true) { const {value, done} = await reader.read(); if(done) { return yield value;

    接下来通过 fetch 发起 POST 请求,然后读取 response.body 的数据。

    (async () => {
        const response = await fetch('http://127.0.0.1:10086/sse', {
            method: 'POST',
            headers: {
                'Accept': 'text/event-stream',
        for await(const chunk of chunkIterator(response.body)) {
            // TODO:
    

    这时候读取到的 chunk 还是 Uint8Array, 我们要对其进行解码:

    const decoder = new TextDecoder();
    const chunkText = decoder.decode(chunk);
    // TODO:
    

    打印 chunkText 会发现输出类似这种格式的内容:

    'data: {"seq":2,"time":"2024-01-25T06:58:43.642Z"}\n\n'
    

    最后,去除多余的信息,便可获取到正文:

    const json = chunkText.replace(/^data:\s+/, '').replace(/\n+^/, '');
    

    以上,就是关于如何通过 fetch API 读取 SSE 信息的全部内容。整个过程还是有一点繁琐的,在实际项目中推荐使用靠谱的第三方库来实现。

    @microsoft/fetch-event-source 用法

    参考:https://github.com/Azure/fetch-event-source

    import { fetchEventSource } from '@microsoft/fetch-event-source';
    class RetriableError extends Error { }
    class FatalError extends Error { }
    fetchEventSource('http://127.0.0.1:10086/sse', {
        async onopen(response) {
            if (response.ok && response.headers.get('content-type') === EventStreamContentType) {
                return; // everything's good
            } else if (response.status >= 400 && response.status < 500 && response.status !== 429) {
                // client-side errors are usually non-retriable:
                throw new FatalError();
            } else {
                throw new RetriableError();
        onmessage(msg) {
            // if the server emits an error message, throw an exception
            // so it gets handled by the onerror callback below:
            if (msg.event === 'FatalError') {
                throw new FatalError(msg.data);
        onclose() {
            // if the server closes the connection unexpectedly, retry:
            throw new RetriableError();
        onerror(err) {
            if (err instanceof FatalError) {
                throw err; // rethrow to stop the operation
            } else {
                // do nothing to automatically retry. You can also
                // return a specific retry interval here.