LLM 生成一个完整答案可能要几秒甚至十几秒,如果等全部生成完再返回,用户盯着空白页面等待,体验极差。流式返回是现在所有 AI 产品的标配:LLM 生成一个 token,立刻推给浏览器,用户看到内容实时"打出来"的感觉。

实现这件事,最自然的选择是 SSE(Server-Sent Events)。它是 HTTP 协议原生支持的单向推流,浏览器端用 EventSource 接收,不需要 WebSocket 那么重——没有握手协议,断线自动重连,兼容性好。
整体设计是一条三段管道:
LLM provider
│
▼
LLMClient.Stream() → chan StreamChunk ← 我们的 LLM 层
│
▼
chatModelAdapter → schema.Pipe → Eino ReAct 循环 ← Eino 框架内部
│
▼
tokenStreamReader.Recv() → consumeEinoStream() ← 应用层消费 token
│
▼
HertzStreamer.Emit() → chan StreamEvent(缓冲64) ← SSE 推流层
│
▼
HTTP handler → data: {JSON}nn → 浏览器 ← SSE 帧
每个 Session 对应一个 channel,HTTP handler 注册进来消费,Agent 循环往里投事件。
用户发消息时,浏览器 POST /v1/sessions/{id}/messages,handler 注册 SSE 订阅、启动 Agent 循环、然后把事件一个个推给浏览器:
// interfaces/rest/handler.go — SendMessage(简化)
func (h *Handler) SendMessage(ctx context.Context, c *hertzapp.RequestContext) {
sid := model.SessionID(c.Param("id"))
// ... 解析请求 body ... // 1. 注册 SSE 订阅,拿到这个 session 的 channel
ch := h.streamer.Register(sid)
defer h.streamer.Unregister(sid) // 2. 启动 Agent 循环(goroutine,不阻塞 SSE 推流)
go func() {
h.runTurn.Handle(ctx, RunTurnInput{SessionID: sid, UserText: body.Content})
}() // 3. 设 SSE 响应头
c.Response.Header.Set("Content-Type", "text/event-stream")
c.Response.Header.Set("Cache-Control", "no-cache")
c.Response.Header.Set("Connection", "keep-alive")
c.SetStatusCode(http.StatusOK) // 4. 从 channel 读事件,逐个推 SSE 帧
for evt := range ch {
b, _ := json.Marshal(evt)
fmt.Fprintf(c.Response.BodyWriter(), "data: %snn", b)
if evt.Type == "done" || evt.Type == "error" {
return // 终态事件,关闭连接
}
}
}
三个关键点:
data: {JSON}nn,每次 Flush 浏览器就收到一帧。done 和 error 是终态,收到后关闭连接。defer 保证客户端断开时清理 channel,不泄漏。HertzStreamer 是这条管道的核心,维护 sessionID → channel 的映射:
type HertzStreamer struct {
mu sync.RWMutex
subs map[model.SessionID]chan<- port.StreamEvent
}func (s *HertzStreamer) Register(sid model.SessionID) <-chan port.StreamEvent {
ch := make(chan port.StreamEvent, 64)
s.mu.Lock()
s.subs[sid] = ch
s.mu.Unlock()
return ch
}
HTTP handler 建立 SSE 连接时调 Register,客户端断开时调 Unregister。64 个缓冲位应对 LLM 短时 burst 的 token。
Emit 里有一段看起来简单但很重要的逻辑:
func mustReach(eventType string) bool {
switch eventType {
case "interrupt", "done", "error":
return true
}
return false
}func (s *HertzStreamer) Emit(ctx context.Context, sid model.SessionID, evt port.StreamEvent) {
s.mu.RLock()
ch, ok := s.subs[sid]
s.mu.RUnlock()
if !ok {
return
}
if mustReach(evt.Type) {
select {
case ch <- evt:
case <-ctx.Done():
slog.Error("sse: critical event abandoned", "sid", sid, "type", evt.Type)
}
return
}
// 非关键事件:buffer 满就丢,保护 Agent 循环
select {
case ch <- evt:
default:
slog.Warn("sse: buffer full, dropping", "sid", sid, "type", evt.Type)
}
}
区分两类事件:
interrupt、done、error):阻塞投递,宁可暂停 Agent 循环也不能丢。interrupt 丢了用户就永远等不到审批弹窗;done 丢了前端流不会结束。turn.delta token 增量):非阻塞投递,buffer 满了直接丢。偶尔丢一个 token delta 不影响最终答案,但如果 Agent 因此阻塞,整个循环就会卡死。这个判断踩过坑。早期版本所有事件都阻塞,结果网络抖动时浏览器消费慢,64 个缓冲位很快满了,Agent 循环停在 Emit 里等,整个会话卡住。
port.StreamEvent 的 Type 字段标识事件类型,序列化成 JSON 放在 SSE 的 data: 行里。前端解析 JSON 后按 Type 分发处理:
| Type | 时机 | Payload |
|---|---|---|
turn.delta | 每个 LLM token | string |
turn.tool_call | LLM 发出工具调用 | ToolCall |
turn.tool_result | 工具执行完成 | {call_id, result} |
interrupt | HITL 触发 | *Interrupt |
done | 循环结束 | {state, turns, error} |
error | 系统错误 | string |
LLM 流要经过两层转换才能到达浏览器。第一层在 chatModelAdapter,第二层在 tokenStreamReader。
chatModelAdapter 在 infrastructure/einoadapter/chatmodel.go 里,把我们自己的 LLMClient.Stream() 返回的 <-chan llm.StreamChunk 转成 Eino 的 schema.StreamReader:
// chatmodel.go — Stream 方法(简化)
func (a *chatModelAdapter) Stream(ctx context.Context, input []*schema.Message, ...) (*schema.StreamReader[*schema.Message], error) {
// 1. 调我们的 LLMClient,拿到 chunk channel
chunkCh, _ := a.client.Stream(ctx, a.profile, req) // 2. 创建 Eino 的流式管道(Pipe = 一对读写端)
sr, sw := schema.Pipe[*schema.Message](16) // 3. goroutine 把我们的 chunk 逐个写进 Eino 的管道
go func() {
defer sw.Close()
for chunk := range chunkCh {
msg := llmChunkToEinoMessage(chunk) // 格式转换
sw.Send(msg, nil)
}
}()
return sr, nil
}
启动一个 goroutine,把我们的 channel 读出来,逐个转成 Eino 的消息格式,写进 Eino 的流式管道。两边的数据格式不一样,但流动方式一样——都是"生产者写,消费者读"。
Eino 的 ReAct 循环跑完后,返回的是 schema.StreamReader[*schema.Message]。应用层不直接用这个类型(它绑死了 Eino),而是通过 tokenStreamReader 包装成框架无关的 port.TokenReader:
// factory.go — tokenStreamReader(简化)
type tokenStreamReader struct {
sr *schema.StreamReader[*schema.Message]
}func (r *tokenStreamReader) Recv() (string, error) {
msg, err := r.sr.Recv() // 从 Eino 流里取一个消息
if msg != nil {
return msg.Content, nil // 只取文本内容
}
return "", nil
}
TokenReader 只有一个 Recv() 方法,返回字符串。应用层的 consumeEinoStream 调它拿 token,每拿到一个就投一帧 SSE。Eino 的类型被封装在 infrastructure 层里,application 层完全看不到。
两层的方向不一样:第一层是"我们的格式 → Eino 格式"(给 Eino 用),第二层是"Eino 格式 → 我们的格式"(给应用层用)。中间 ReAct 循环里数据全在 Eino 内部流动,我们不管。
SSE 推流的核心是三个设计决定:
mustReach 区分 interrupt/done/error 和普通 token delta,保护 Agent 循环不卡死,同时保证状态变更不丢失chatModelAdapter 把我们的 channel 转给 Eino,tokenStreamReader 把 Eino 的流式输出转回给应用层,application 层完全看不到 Eino 类型