HTTP 升级 SSE(Server-Sent Events)

本文档包含:如何把普通 HTTP 请求变为 SSE、服务端/客户端示例、fetch 流式解析、代理配置与常见问题,可直接在项目中按需使用。

SSE 不需要像 WebSocket 一样“升级”,它本质是一个保持的普通 HTTP 响应(HTTP/1.1 长连接)。只要把响应头设为 text/event-stream,并按 SSE 文本协议持续写入数据即可。

核心要点

  • 不是升级协议,无需 Upgrade 握手。
  • 关键响应头:
    • Content-Type: text/event-stream; charset=utf-8
    • Cache-Control: no-cache, no-transform
    • Connection: keep-alive
    • Access-Control-Allow-Origin: *(跨域需要时)
  • 消息格式(以空行 \n\n 作为分隔):
    • 基本:data: <payload>\n\n
    • 可选:
      • event: <name>\n
      • id: <id>\n
      • retry: <ms>\n
  • 建议定期发送注释心跳:: ping\n\n,防止中间层超时断开。
  • 客户端可用 EventSource,或 fetch + ReadableStream 自行解析。

服务端(Node/Express)示例:带心跳与可控结束

  1. import express from 'express';
  2. const app = express();
  3. app.get('/sse', (req, res) => {
  4. res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
  5. res.setHeader('Cache-Control', 'no-cache, no-transform');
  6. res.setHeader('Connection', 'keep-alive');
  7. // 设置 CORS 相关头
  8. res.setHeader('Access-Control-Allow-Origin', '*');
  9. res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS');
  10. )
  11. // 若使用了压缩中间件(compression),需对该路由禁用压缩
  12. // 设置状态码
  13. res.status(200);
  14. // 立即发送响应头,这个很重要,否则客户端可能会一直重试
  15. res.flushHeaders?.();
  16. // 心跳,防止链路被中间层回收
  17. const heartbeat = setInterval(() => {
  18. res.write(`: ping\n\n`);
  19. }, 15000);
  20. let id = 1;
  21. const timer = setInterval(() => {
  22. res.write(`id: ${id}\n`);
  23. res.write(`data: ${JSON.stringify({ time: Date.now() })}\n\n`);
  24. id++;
  25. // 示例:发到第 5 条后主动结束
  26. if (id > 5) endSSE('done');
  27. }, 3000);
  28. let closed = false;
  29. function endSSE(reason?: string) {
  30. if (closed) return;
  31. closed = true;
  32. clearInterval(timer);
  33. clearInterval(heartbeat);
  34. // 可选:先通知客户端“结束事件”
  35. res.write(`event: end\n`);
  36. res.write(`data: ${JSON.stringify({ reason: reason ?? 'normal' })}\n\n`);
  37. // 主动关闭响应 => SSE 连接关闭
  38. res.end();
  39. }
  40. // 客户端断开时清理
  41. req.on('close', () => {
  42. if (!closed) {
  43. clearInterval(timer);
  44. clearInterval(heartbeat);
  45. res.end();
  46. closed = true;
  47. }
  48. });
  49. });
  50. app.listen(3000, () => console.log('SSE server at http://localhost:3000/sse'));
JavaScript复制

要点回顾:

  • 主动结束只需 res.end();可在此之前发送一个自定义 event: end 方便客户端收尾。
  • 心跳使用以冒号开头的注释行(不会被客户端 message 监听触发)。

客户端:EventSource 示例

使用场景: 页面上的表格数据需要服务端试试推送,直到用户离开页面。

  1. const es = new EventSource('http://localhost:3000/sse');
  2. es.addEventListener('end', (e) => {
  3. console.log('server end', e.data);
  4. // 收到结束事件后,可主动关闭本地连接(可选)
  5. es.close();
  6. });
  7. es.onmessage = (e) => {
  8. console.log('message', e.lastEventId, e.data);
  9. };
  10. es.onerror = (e) => {
  11. console.warn('sse error', e);
  12. // EventSource 会自动重连,间隔受服务端 `retry:` 影响
  13. };
JavaScript复制

客户端:fetch + ReadableStream 示例

适用于需要自定义请求头/鉴权或手动解析的场景,如 AI chat 通常需要通过 POST 的方式发送请求体,然后内容全部返回后结束当前 SSE 推送,等待用户发起新的 chat 请求。

  1. const res = await fetch('http://localhost:3000/sse', {
  2. headers: { Accept: 'text/event-stream' },
  3. // 若需要取消,可传入 signal: abortController.signal
  4. });
  5. if (!res.ok) throw new Error(`HTTP ${res.status}`);
  6. const reader = res.body!.getReader();
  7. const decoder = new TextDecoder();
  8. let buffer = '';
  9. try {
  10. while (true) {
  11. const { value, done } = await reader.read();
  12. if (done) {
  13. console.log('stream closed');
  14. break;
  15. }
  16. buffer += decoder.decode(value, { stream: true });
  17. // 以 \n\n 分帧,解析行:event:/data:/id:/retry:
  18. let idx;
  19. while ((idx = buffer.indexOf('\n\n')) >= 0) {
  20. const raw = buffer.slice(0, idx);
  21. buffer = buffer.slice(idx + 2);
  22. const lines = raw.split('\n');
  23. let event = 'message';
  24. let data: string[] = [];
  25. let id: string | undefined;
  26. for (const line of lines) {
  27. if (!line || line.startsWith(':')) continue; // 注释/空行
  28. const [k, v = ''] = line.split(':', 2);
  29. const val = v.startsWith(' ') ? v.slice(1) : v;
  30. if (k === 'event') event = val;
  31. else if (k === 'data') data.push(val);
  32. else if (k === 'id') id = val;
  33. }
  34. const payload = data.join('\n');
  35. // 自行分发处理
  36. console.log('evt=', event, 'id=', id, 'data=', payload);
  37. }
  38. }
  39. } catch (err) {
  40. // AbortError/网络错误等会在这里抛出
  41. console.error('stream error', err);
  42. } finally {
  43. reader.releaseLock();
  44. }
JavaScript复制

何时 reader.read()donetrue

  • 正常读完响应体:
    • Content-Length 指定的字节读尽;或
    • chunked 传输收到终止块;或
    • 服务端/代理主动结束(调用 res.end() 或关闭响应)。
    • 响应体为空:第一次 read() 就会 { done: true }
  • 你主动关闭:
    • 调用 reader.cancel()response.body.cancel() 后,后续 read() 会返回 done: true(或当前 read Promise 直接 reject)。
  • 不会返回 done: true 的情况(而是抛错):
    • AbortController 中止:read()DOMException: AbortError
    • 网络/代理异常:read() 通常抛 TypeError
  • 与 SSE:
    • 只要链路不断开,done 会一直为 false
    • 服务端 res.end()、代理超时回收、后端进程退出时,下一次 read() 才会 done: true

反向代理与中间层配置(以 Nginx 为例)

  • 关闭缓冲与延迟:
    • proxy_buffering off;
    • proxy_cache off;
  • 延长超时:
    • proxy_read_timeout 1h;
    • keepalive_timeout 1h;
  • 禁用压缩(避免分块被聚合/缓冲):
    • gzip off;(或对该路径禁用)
  • 定期心跳:服务端发送 : ping\n\n 注释行保持活跃。

常见问题排查

  • 看不到任何消息:
    • 是否为 text/event-stream?是否每条消息以 \n\n 结束?
    • 代理是否启用缓冲/压缩?(关闭)
  • EventSource 自动重连太频繁:
    • 服务端发送 retry: <ms> 指定重连间隔。
  • CORS 报错:
    • 设置 Access-Control-Allow-Origin 和必要的 CORS 头。
  • 与框架中间件冲突:
    • 对 SSE 路由禁用 body parser、压缩和缓存相关中间件。

Checklist

  • [ ] 路由响应头:text/event-streamno-cachekeep-alive、CORS
  • [ ] 消息以 \n\n 结尾;必要时 event:id:retry:
  • [ ] 定时 : ping 心跳,如果服务端没有需要长时间执行的任务,可以不设置
  • [ ] 代理关闭缓冲、延长超时、禁用压缩
  • [ ] 需要时可发送 event: end,随后 res.end() 主动关闭

SSE 代码示例

参见 github sse-template

编程笔记 & 随笔杂谈