Eino: Interrupt & CheckPoint使用手册
💡 注意:v0.3.26 版本中因为代码编写错误导致 CheckPoint 的序列化内容产生 break,新接入 CheckPoint 使用 v0.3.26 以后的版本,建议直接使用最新。
eino 提供了兼容分支,使用了 checkpoint 且版本低于 v0.3.26 的业务在升级 eino 时可以先升级到兼容分支,老数据淘汰后再升级到主干。
因为兼容分支会引入额外的性能开销并且一般来说业务 agent checkpoint 有不太长的有效期,所以分支没有合入主干。
介绍
使用 Interrupt & CheckPoint 功能,可以实现在指定位置暂停 Graph 执行并在之后断点续传,如果是 StateGraph,还可以在断点续传前修改 State。
💡 断点续传仅能复原输入和运行时各节点产生的数据,需要确保 Graph 编排完全相同,以及重新完整传入 CallOption(没有特殊情况应当保持一致,除非依赖 CallOption 在 Resume 时传递数据等)。
使用静态 Interrupt
静态 Interrupt 支持在指定 Node 执行前或执行后暂停 Graph,Compile 时传入 WithInterruptAfterNodes 与 WithInterruptBeforeNodes Option 来设置 Interrupt:
import (
"github.com/cloudwego/eino/compose"
)
func main() {
g := NewGraph[string, string]()
err := g.AddLambdaNode("node1", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
if err != nil {/* error handle */}
err = g.AddLambdaNode("node2", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
if err != nil {/* error handle */}
/** other graph composed code
xxx
*/
err = g.Compile(ctx, compose.WithInterruptAfterNodes([]string{"node1"}), compose.WithInterruptBeforeNodes([]string{"node2"}))
if err != nil {/* error handle */}
}
💡 目前仅支持 Compile 时设置静态断点,如果需要请求时设置,欢迎提出~
可以从运行返回的 error 中获得本次运行是否 Interrupt 以及 Interrupt 信息:
// compose/checkpoint.go
**type **InterruptInfo **struct **{
State any
BeforeNodes []string
AfterNodes []string
RerunNodes []string
RerunNodesExtra **map**[string]any
SubGraphs **map**[string]*InterruptInfo
InterruptContexts []*InterruptCtx
}
func ExtractInterruptInfo(err error) (info *InterruptInfo, existed bool) {}
例如:
import "github.com/cloudwego/eino/compse"
/***graph compose code
* g := NewGraph
* xxx
* runner := g.Compile
*/
result, err := runner.Invoke(ctx, input)
if info, ok := ExtractInterruptInfo(err); ok {
// handler info
}
if err != nil {
// handle error
}
💡 Interrupt 时 output 为空值,没有意义。
使用 CheckPoint
CheckPoint 记录 Graph 运行状态,使用 CheckPoint 可以在 Interrupt 后恢复运行。
实现 CheckPointerStore
CheckPointStore 是一个 key 类型为 string、value 类型为[]byte 的 KV 存储接口,我们没有提供封装和默认实现,需要用户自行实现,用来存储 checkpoint。
// compose/checkpoint.go
type CheckpointStore interface {
Get(ctx **context**._Context_, key string) (value []byte, existed bool,err error)
Set(ctx **context**._Context_, key string, value []byte) (err error)
}
注册序列化方法
CheckPoint 的保存和读取涉及对 Graph 节点输入输出以及 State 的序列化和反序列化,在仅使用简单类型或 eino 内置类型(比如 Message 或 Document)时,用户无需额外操作;当引入自定义 struct 时,需要提前注册类型,Eino 提供了注册方法 schema.RegisterName:
package main
import "github.com/cloudwego/eino/schema"
type MyState struct {
Counter int
Note string
}
func init() {
// Register the type with a stable name for serialization/persistence.
// Use the pointer form if you persist pointers to this type.
// It's recommended to register types within the `init()` function
// within the same file your type is declared.
schema.RegisterName[*MyState]("my_state_v1")
}
注册后的类型在序列化时将被额外记录类型信息,因此在反序列化时,即使不指明类型(比如反序列化到 interface{}),Eino 也可以反序列化出正确的类型。注册方法中的 key 唯一标识了这个类型,一旦确定了 key 需要保证其不能改变,否则已持久化的 checkpoint 将不能被正确恢复。
💡 结构体的未导出字段无法访问,因此不会被存储/恢复
默认情况下,会使用 eino 内置的序列化功能,此时,如果注册的类型实现了 json Marshaler 和 Unmarshaler,此类型的序列化和反序列化会使用自定义方法。
// encoding/json
type Marshaler interface {
MarshalJSON() ([]byte, error)
}
type Unmarshaler interface {
UnmarshalJSON([]byte) error
}
Eino 同时提供了将序列化方式改为 gob 的选项:
r, err := compose.NewChain[*AgentInput, Message]().
AppendLambda(compose.InvokableLambda(func(ctx context.Context, input *AgentInput) ([]Message, error) {
return a.genModelInput(ctx, instruction, input)
})).
AppendChatModel(a.model).
Compile(ctx, compose.WithGraphName(a.name),
compose.WithCheckPointStore(store),
compose.WithSerializer(&gobSerializer{}))
用户可以按偏好选择,选择后不建议轻易变更,历史数据不兼容。
开启 CheckPoint
创建 CheckPointStore 后在 Compile Graph 时作为 Option 传入,把 CheckPointer 绑定到 Graph:
import (
"github.com/cloudwego/eino/compose"
)
func main() {
/** graph composed code
xxx
*/
err = g.Compile(ctx, compose.WithCheckPointStore(store), compose.WithInterruptBeforeNodes([]string{"node2"}))
if err != nil {/* error handle */}
}
之后可以在请求时通过 CallOption 引入 CheckPoint:
// compose/checkpoint.go
func WithCheckPointID(checkPointID string, sm StateModifier) Option
Checkpoint id 会被作为 CheckPointStore 的 key 使用,graph 运行时会检查 CheckPointStore 是否存在此 id,如果存在则从 checkpoint 中恢复运行;interrupt 是会把 graph 状态保存到此 id 中。
动态 Interrupt
节点返回特殊错误可以动态地触发 Interrupt:
在 eino v0.7.0 之前
// eino/compose/interrupt.go
// emit a plain interrupt signal
var InterruptAndRerun = errors.New("interrupt and rerun")
// emit an interrupt signal with extra info
**func **NewInterruptAndRerunErr(extra any) error
Eino Graph 接收到节点返回此错误后会发生 interrupt,恢复运行时,会再次运行此节点,再次运行前会调用 StateModifier 修改 state(如果已配置)。
这种情况下,再次运行节点时输入会替换为空值,而不是原本的输入,如果再次运行时需要仍需要原本输入,需要提前保存到 State 中。
在 eino v0.7.0 及之后
增加了对“保存本地状态”、“透出内部中断信号”、“并行中断”的支持:
// eino/compose/interrupt.go
// emit an interrupt signal with user-facing info
func Interrupt(ctx context.Context, info any) error
// emit an interrupt signal with user-facing info AS WELL AS
// persistent LOCALLY-DEFINED state
func StatefulInterrupt(ctx context.Context, info any, state any) error
// emit an interrupt signal WRAPPING other interrupt signals
// emitted from inner processes,
// such as ToolsNode wrapping Tools.
func CompositeInterrupt(ctx context.Context, info any, state any, errs ...error)
详细设计参见:Eino human-in-the-loop 框架:技术架构指南
流式传输中的 CheckPoint
流式传输在保存 CheckPoint 时需要拼接数据流,因此需要注册拼接方法:
// compose/stream_concat.go
func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))
// example
type TestStruct struct {
Body string
}
// RegisterStreamChunkConcatFunc非线程安全,需要在初始化阶段使用
RegisterStreamChunkConcatFunc(func(ss []TestStruct)(TestStruct, error){
ret := TestStruct{Body:""}
for i := range ss {
ret.Body += ss[i].Body
}
return ret, nil
})
eino 默认提供了*schema.Message、[]*schema.Message 和 string 的 concat 方法。
嵌套图中的 Interrupt&CheckPoint
父图传入 CheckPointer 的前提下,AddGraphNode 时使用 WithGraphCompileOptions 传入 InterruptNodes 可以开启子图的 Interrupt&CheckPoint,父图未设置 CheckPointer 时会在 Compile 时报错。
/* graph compose code
xxx
*/
g.AddGraphNode("node1", subGraph, WithGraphCompileOptions(
WithInterruptAfterNodes([]string{"node2"}),
))
g.Compile(ctx, WithCheckPointStore(cp))
如果在子图中 interrupt,resume 时修改的 state 应为子图 state。TODO,说明下 StateModifier 中 Path 使用
恢复
恢复:Interrupt 并保存 checkpoint 后,后续的 graph 运行。
在 eino v0.7.0 之前
通过修改 State 来影响恢复时的行为。
// compose/checkpoint.go
type StateModifier func(ctx context.Context, path NodePath, state any) error
func WithStateModifier(sm StateModifier) GraphCompileOption
StateModifier 在 Graph 恢复运行时生效,可以在运行前修改 State,path 在嵌套图中生效,非嵌套视为空数组。
/* graph compose and compile
xxx
*/
// first run interrupt
id := GenUUID()
_, err := runner.Invoke(ctx, input, WithCheckPointID(id))
// resume from id
_, err = runner.Invoke(ctx, input/*unused*/,
WithCheckPointID(id),
WithStateModifier(func(ctx context.Context, path NodePath, state any) error{
state.(*testState).Field1 = "hello"
return nil
}),
)
💡 Resume 时 input 不会被读取,此时 input 传空即可。
在 eino v0.7.0 及之后
除了 StateModifier 之外,还可以选择性的恢复某个中断点,以及直接给指定的“中断点位”传递“恢复数据”:
// specifically resume particular interrupt point(s),
// without specifying resume data
func Resume(ctx context.Context, interruptIDs ...string) context.Context
// specifically resume one interrupt point, with custom resume data
func ResumeWithData(ctx context.Context, interruptID string, data any) context.Context
// specifically resume multiple interrupt points, each with custom resume data
func BatchResumeWithData(ctx context.Context, resumeData map[string]any) context.Context
其中,InterruptID 是从 interrupt error 中获取的:
interruptInfo, isInterrupt := ExtractInterruptInfo(err)
if isInterrupt {
// maybe multiple interrupt points exist here,
// we only take the first one for illustration purpose
interruptID = interruptInfo.InterruptContexts[0].ID
}
resumeData 是发生中断的点位定义的类型,比如一个 Tool 发生了中断并要求用户“审批”是否执行这个 Tool,自定义了一个 ApprovalResult 作为 resumeData:
func (i InvokableApprovableTool) InvokableRun(ctx context.Context, argumentsInJSON string,
opts ...tool.Option) (string, error) {
toolInfo, err := i.Info(ctx)
if err != nil {
return "", err
}
wasInterrupted, _, storedArguments := compose.GetInterruptState[string](ctx)
if !wasInterrupted { // initial invocation, interrupt and wait for approval
return "", compose.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: toolInfo.Name,
ArgumentsInJSON: argumentsInJSON,
ToolCallID: compose.GetToolCallID(ctx),
}, argumentsInJSON)
}
isResumeTarget, hasData, data := compose.GetResumeContext[*ApprovalResult](ctx)
if !isResumeTarget { // was interrupted but not explicitly resumed, reinterrupt and wait for approval again
return "", compose.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: toolInfo.Name,
ArgumentsInJSON: storedArguments,
ToolCallID: compose.GetToolCallID(ctx),
}, storedArguments)
}
if !hasData {
return "", fmt.Errorf("tool '%s' resumed with no data", toolInfo.Name)
}
if data.Approved {
return i.InvokableTool.InvokableRun(ctx, storedArguments, opts...)
}
if data.DisapproveReason != nil {
return fmt.Sprintf("tool '%s' disapproved, reason: %s", toolInfo.Name, *data.DisapproveReason), nil
}
return fmt.Sprintf("tool '%s' disapproved", toolInfo.Name), nil
}
例子
在 eino v0.7.0 之前
https://github.com/cloudwego/eino-examples/tree/main/compose/graph/react_with_interrupt
在 eino v0.7.0 之后
https://github.com/cloudwego/eino/blob/main/compose/resume_test.go
其中
TestInterruptStateAndResumeForRootGraph: 简单动态中断
TestInterruptStateAndResumeForSubGraph: 子图中断
TestInterruptStateAndResumeForToolInNestedSubGraph: 嵌套子图内部 tool 中断
TestMultipleInterruptsAndResumes: 并行中断
TestReentryForResumedTools: ReAct Agent 内 tool 中断,恢复后多次循环执行
TestGraphInterruptWithinLambda: Lambda 节点内包含独立 Graph 且内部中断