第十一章:TurnLoop — 抢占、中止与多轮生命周期

上一章我们用 adk.Runner 实现了完整的 A2UI Web 应用。它能正常工作,但试试这个场景:

你问 Agent 一个复杂问题,它开始调用工具、生成长回答……但你忽然意识到问错了,想换一个问题。

在上一章的 Runner 模式下,你只能等它说完,或者刷新页面丢弃一切。

本章引入 adk.TurnLoop,让 Agent 支持两个用户侧可感知的新能力:抢占中止

前置条件

与第一章一致:需要配置一个可用的 ChatModel(OpenAI 或 Ark),详见第一章的"前置条件"部分。

运行 & 体验

quickstart/chatwitheino 目录下执行:

go run .

打开浏览器访问 http://localhost:8080,然后试试以下操作:

体验抢占(Preempt)

  1. 发送一个会触发长回答的问题,例如"详细解释一下 Eino 的所有组件"
  2. 在 Agent 还在回答时,直接发送一条新消息,例如"算了,就告诉我 ChatModel 是什么"
  3. 观察:旧回答立即停止,Agent 开始回答新问题

体验中止(Abort)

  1. 发送一个问题
  2. 在 Agent 回答过程中,点击右上角的 Abort 按钮
  3. 观察:Agent 立即停止,不再继续输出

这两个能力在上一章的 Runner 版本中都不存在。以下解释它们是如何实现的。

代码位置

为什么 Runner 做不到

上一章的 cmd/ch10 中,每个 /sessions/:id/chat 请求调用一次 runner.Run(ctx, messages)。Runner 是**单轮(single-turn)**模型——调用一次、执行一次、结束。如果用户在 Agent 执行过程中又发了一条消息,Runner 没有"正在运行的循环"可以接收它。

TurnLoop 则是一个持久运行的多轮(multi-turn)执行循环。它在轮次之间保持 idle 等待,随时可以通过 Push() 接收新输入并立即响应。正是因为有一个持续运行的循环,抢占和中止才成为可能——你可以打断一个正在进行的轮次,或者直接停止整个循环。

能力Ch10(Runner,单轮)Ch11(TurnLoop,多轮)
流式输出
审批 / 中断
跨轮次持久运行、实时响应新输入❌ 每次 Run () 独立✅ Push () 随时送入
抢占正在进行的回答✅ Push(item, WithPreempt(...))
中止 Agent✅ loop.Stop(WithImmediate())
灵活的 per-turn 输入构建❌ 业务层手动拼装✅ GenInput 回调

TurnLoop 的核心模型

TurnLoop 是一个基于推送的事件循环,以轮次(turn)为单位管理 Agent 的执行。与 Runner 的"调用一次、执行一次"不同,TurnLoop 持续运行:轮次结束后进入 idle 等待,新 item 到来时立即启动下一轮。

Push(item) → [队列] → GenInput(items) → Agent.Run() → OnAgentEvents(events)
                                ↑                              │
                                └──── idle 等待 / 下一轮 ←──────┘

关键概念:

  • Item:用户输入的载体。本示例定义为 ChatItem,可以携带用户消息或审批决定
  • GenInput:从队列中的 items 构建 Agent 输入(选择哪些 items 消费、哪些保留给下一轮)
  • OnAgentEvents:接收 Agent 输出的事件流,负责渲染和持久化
  • Push:向队列推入新 item,可附带抢占选项

一个 Session 对应一个 TurnLoop

在本示例的 Web 场景中,每个聊天 session 对应一个 TurnLoop 实例。当用户发送第一条消息时,服务端为该 session 创建一个 TurnLoop 并调用 Run() 启动它;后续消息通过 Push() 送入同一个循环。这个循环在轮次之间保持 idle 等待,直到 session 被删除或用户 abort。

这是 TurnLoop 最典型的使用模式:循环的生命周期与用户会话绑定。一个长期运行的 TurnLoop 让抢占和中止成为自然的操作——因为"正在运行的循环"始终存在,新输入随时可以送入。

常规流程:idle → 新消息 → 回答 → idle

最简单的场景是用户依次提问、等回答、再提下一个问题:

// 用户发送第一条消息时,创建并启动 TurnLoop
loop := adk.NewTurnLoop(cfg)
loop.Push(&ChatItem{Query: "hello"})
loop.Run(ctx)
// → GenInput 构建输入 → Agent 执行 → OnAgentEvents 流式输出
// → 轮次结束,TurnLoop 进入 idle 等待

// 用户发送第二条消息(此时 loop 处于 idle)
loop.Push(&ChatItem{Query: "explain Eino's architecture"})
// → TurnLoop 唤醒,开始新一轮:GenInput → Agent → OnAgentEvents → idle

这个流程与上一章的 Runner 在用户体验上没有区别——区别在于 TurnLoop 的循环持续存在,不需要每次都重新创建。而一旦用户在 Agent 还在回答时发来新消息,就进入了下面的"抢占"场景。

抢占是怎么实现的

当用户在 Agent 回答过程中发送新消息时,业务层只需一行代码触发抢占:

loop.Push(item, adk.WithPreempt[*ChatItem, M](adk.AfterToolCalls))

TurnLoop 收到这个指令后:

  1. 等待当前 tool call 完成(AfterToolCalls 表示不打断正在执行的工具,避免不一致状态)
  2. 取消当前轮次——OnAgentEvents 的 context 被取消,旧轮次退出
  3. 从队列取出新 item,通过 GenInput 构建输入,启动新一轮

抢占模式可以根据业务需要选择不同的安全点:

模式具体行为
AfterToolCalls等待当前正在执行的工具调用完成后,再取消当前轮次并启动新一轮执行
AfterChatModel等待当前大模型调用完成后,再取消当前轮次并启动新一轮执行
AnySafePoint在任一安全点(如工具调用间隙、模型调用间隙)立即取消当前轮次并启动新一轮执行

本示例中 TurnLoop 运行在独立 goroutine 中,而 HTTP handler 需要把事件流写入 SSE 响应。两者之间通过 channel 协调(见 server/server.go 中的 iterEnvelope/iterResult 以及 handlerDone 信号机制)。这些是 HTTP 适配层的细节,不属于 TurnLoop API 本身。

中止是怎么实现的

中止更简单——直接停止整个 TurnLoop:

loop.Stop(adk.WithImmediate())  // 立即取消,不等待当前轮次
loop.Wait()                     // 等待完全退出

Stop 的三种模式

模式具体行为
loop.Stop()轮次边界退出:等待当前轮次完成后退出
loop.Stop(WithImmediate())立即退出:取消当前轮次的 context
loop.Stop(WithGraceful())安全点退出:在下一个安全点(如 tool call 之间)退出

TurnLoop 的配置

创建 TurnLoop 时,通过 TurnLoopConfig 指定回调和选项:

cfg := adk.TurnLoopConfig[*ChatItem, M]{
    // GenInput:每轮开始时调用,决定"这一轮 Agent 看到什么"
    // 从队列中选择 items 构建 Agent 输入,返回 Consumed(本轮处理)和 Remaining(留到后续轮次)
    GenInput: func(ctx context.Context, loop *adk.TurnLoop[*ChatItem, M], items []*ChatItem) (*adk.GenInputResult[*ChatItem, M], error) {
        // ...构建 AgentInput,持久化用户消息...
    },

    // PrepareAgent:每轮调用一次,返回本轮使用的 Agent
    // 本示例直接返回同一个 Agent,但你可以根据 items 动态选择不同 Agent
    PrepareAgent: func(ctx context.Context, loop *adk.TurnLoop[*ChatItem, M], consumed []*ChatItem) (adk.TypedAgent[M], error) {
        return agent, nil
    },

    // OnAgentEvents:接收 Agent 的事件流,负责渲染输出和持久化中间消息
    // 本示例通过 channel 把事件流转交给 HTTP handler 做 SSE 输出
    OnAgentEvents: func(ctx context.Context, tc *adk.TurnContext[*ChatItem, M], events *adk.AsyncIterator[*adk.TypedAgentEvent[M]]) error {
        // ...把 events 交给 HTTP handler,等待消费完成...
    },

    // 以下三个字段用于声明式 checkpoint(审批恢复),下一节详细介绍
    GenResume:    makeGenResume(),
    Store:        checkpointStore,
    CheckpointID: sessionID,
}

loop := adk.NewTurnLoop(cfg)
回调调用时机职责
GenInput队列中有 items 时选择消费哪些 items,构建 Agent 输入(可决定哪些 items 保留给下一轮)
PrepareAgentGenInput 之后返回本轮使用的 Agent 实例,支持动态调整 Agent 配置
OnAgentEventsAgent 产出事件流时消费事件、渲染输出、持久化结果,是业务层处理 Agent 输出的核心入口
GenResume从 checkpoint 恢复时从新 Push 进来的 items 中提取审批结果,构建
ResumeParams
,实现审批恢复的自动化
Store + CheckpointID启用声明式 checkpoint,TurnLoop 自动处理执行状态的保存与恢复

完整的回调实现请参考 server/server.go

声明式 Checkpoint:审批恢复的自动化

在第七章(Runner 模式)中,审批恢复需要业务层手动调用 runner.ResumeWithParams(),自己判断"这次是正常执行还是恢复执行"。TurnLoop 提供了更简洁的方式——在配置中声明 StoreCheckpointID(见上一节),TurnLoop 会自动处理保存与恢复:

  1. Agent 执行到审批 interrupt 时,TurnLoop 自动将执行状态保存到 Store(以 CheckpointID 为 key)
  2. 用户做出审批决定后,业务层创建一个新的 TurnLoop(使用相同的 CheckpointID),并 Push 审批 item
  3. 新 TurnLoop Run() 时,检测到 checkpoint 存在,自动调用 GenResume(而非 GenInput)获取恢复参数
  4. Agent 从 interrupt 点继续执行

GenResume 的职责就是从新 Push 进来的 items 中提取审批结果,构建 ResumeParams

GenResume: func(ctx context.Context, loop *adk.TurnLoop[*ChatItem, M],
    canceledItems, unhandledItems, newItems []*ChatItem,
) (*adk.GenResumeResult[*ChatItem, M], error) {
    // newItems 包含审批恢复时 Push 的 item
    item := newItems[0]
    return &adk.GenResumeResult[*ChatItem, M]{
        ResumeParams: &adk.ResumeParams{
            InterruptID: item.InterruptID,
            ApprovalResult: item.ApprovalResult,
        },
    }, nil
}

相比 Runner 的 ResumeWithParams(),声明式 checkpoint 让业务层不需要管理"正常执行 vs 恢复执行"的分支——TurnLoop 根据 checkpoint 是否存在自动选择走 GenInput 还是 GenResume

本章小结

  • TurnLoop 是一个持久运行的多轮执行循环,生命周期与用户会话绑定
  • 常规流程Push(item) → GenInput → Agent → OnAgentEvents → idle → 等待下一个 Push
  • 抢占Push(item, WithPreempt(AfterToolCalls)) 一行代码取消当前轮次并开始新一轮
  • 中止loop.Stop(WithImmediate()) 一行代码终止整个循环
  • 声明式 checkpoint:配置 Store + CheckpointID,TurnLoop 自动处理 interrupt 的保存与恢复
  • 回调的具体实现请参考 server/server.go

系列收尾:完整 Agent 应用骨架

到本章为止,我们用一个可以实际运行的 Agent 串起了 Eino 的核心能力:

  • 运行时:Runner / TurnLoop 驱动执行,支持流式输出、抢占与中止
  • 工具层:Filesystem / Shell 等 Tool 能力接入,工具错误可被安全处理
  • 中间件:可插拔的 middleware/handler,用于错误处理、重试、审批等横切能力
  • 可观测:callbacks/trace 能力把关键链路打通,便于调试与线上观测
  • 人机协作:interrupt/resume + checkpoint 支持审批、补参、分支选择等交互式流程
  • 确定性编排:compose(graph/chain/workflow)把复杂业务流程组织为可维护、可复用的执行图
  • 业务交付:A2UI 协议把 Agent 能力以流式 UI 的形式呈现给用户
  • 执行控制:TurnLoop 提供抢占、中止、多轮生命周期管理,适配真实业务场景的复杂交互需求

你可以在这个骨架上逐步替换/扩展任意环节:模型、工具、存储、工作流、前端渲染协议,而不需要推倒重来。