Eino ADK: Agent 扩展
Agent Runner
Runner 是 Eino ADK 中负责执行 Agent 的核心引擎。它的主要作用是管理和控制 Agent 的整个生命周期,如处理多 Agent 协作,保存传递上下文等,interrupt、callback 等切面能力也均依赖 Runner 实现。任何 Agent 都应通过 Runner 来运行。
Interrupt & Resume
该功能允许一个正在运行的 Agent 主动中断其执行,保存当前状态,并在稍后从中断点恢复执行。这对于处理需要外部输入、长时间等待或可暂停的任务流非常有用。
Interrupted Action
在 Agent 的执行过程中,可以通过产生包含 Interrupted Action 的 AgentEvent 来主动中断 Runner 的运行:
// github.com/cloudwego/eino/adk/interface.go
type AgentAction struct {
// other actions
Interrupted *InterruptInfo
// other actions
}
// github.com/cloudwego/eino/adk/interrupt.go
type InterruptInfo struct {
Data any
}
当中断发生时,可以通过 InterruptInfo 结构体附带自定义的中断信息。此信息:
- 会被传递给调用者,可以通过该信息向调用者说明中断原因等
- 如果后续需要恢复 Agent 运行,InterruptInfo 会在恢复时重新传递给中断的 Agent,Agent 可以依据该信息恢复运行
状态持久化 (Checkpoint)
当 Runner 捕获到这个带有 Interrupted Action 的 Event 时,会立即终止当前的执行流程。 如果:
- Runner 中设置了 CheckPointStore
// github.com/cloudwego/eino/adk/runner.go
type RunnerConfig struct {
// other fields
CheckPointStore CheckPointStore
}
// github.com/cloudwego/eino/adk/interrupt.go
type CheckPointStore interface {
Set(ctx context.Context, key string, value []byte) error
Get(ctx context.Context, key string) ([]byte, bool, error)
}
- 调用 Runner 时通过 AgentRunOption WithCheckPointID 传入 CheckPointID
// github.com/cloudwego/eino/adk/interrupt.go
func WithCheckPointID(id string) _AgentRunOption_
Runner 在终止运行后会将当前运行状态(原始输入、对话历史等)以及 Agent 抛出的 InterruptInfo 以 CheckPointID 为 key 持久化到 CheckPointStore 中。
💡 为了保存 interface 中数据的原本类型,Eino ADK 使用 gob(https://pkg.go.dev/encoding/gob)序列化运行状态。因此在使用自定义类型时需要提前使用 gob.Register 或 gob.RegisterName 注册类型(更推荐后者,前者使用路径加类型名作为默认名字,因此类型的位置和名字均不能发生变更)。Eino 会自动注册框架内置的类型。
Resume
运行中断,调用 Runner 的 Resume 接口传入中断时的 CheckPointID 可以恢复运行:
// github.com/cloudwego/eino/adk/runner.go
func (r *Runner) Resume(ctx context.Context, checkPointID string, opts ...AgentRunOption) (*AsyncIterator[*AgentEvent], error)
恢复 Agent 运行需要发生中断的 Agent 实现了 ResumableAgent 接口, Runner 从 CheckPointerStore 读取运行状态并恢复运行,其中 InterruptInfo 和上次运行配置的 EnableStreaming 会作为输入提供给 Agent:
// github.com/cloudwego/eino/adk/interface.go
type ResumableAgent interface {
Agent
Resume(ctx context.Context, info *ResumeInfo, opts ...AgentRunOption) *AsyncIterator[*AgentEvent]
}
// github.com/cloudwego/eino/adk/interrupt.go
type ResumeInfo struct {
EnableStreaming bool
*_InterruptInfo_
}
Resume 如果向 Agent 传入新信息,可以定义 AgentRunOption,在调用 Runner.Resume 时传入。
Callback
TODO