Eino: Interrupt & CheckPoint User Manual

Introduction

Using the Interrupt & CheckPoint feature, it is possible to pause the execution of the Graph at a specified location and resume it from the breakpoint later, and if it is a StateGraph, the State can also be modified before resuming from the breakpoint.

💡 Resume from breakpoint can only restore the data generated by each node during input and runtime,and it is necessary to ensure that the Graph orchestration andCallOptionsare exactly the same.

Use Interrupt

Interrupt supports pausing the Graph before or after the execution of a specified Node. During Compile, pass in the WithInterruptAfterNodes and WithInterruptBeforeNodes Options to set Interrupt:

import (
    "github.com/cloudwego/eino/compose"
)

func main() {
    g := NewGraph[string, string]()
    err := g.AddLambdaNode("node1", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
    if err != nil {/* error handle */}
    err = g.AddLambdaNode("node2", compose.InvokableLambda(func(ctx **context**._Context_, input string) (output string, err error) {/*invokable func*/})
    if err != nil {/* error handle */}
    
    /** other graph composed code
    xxx 
    */
    
    err = g.Compile(ctx, compose.WithInterruptAfterNodes([]string{"node1"}), compose.WithInterruptBeforeNodes([]string{"node2"}))
    if err != nil {/* error handle */}
}

💡 Currently, setting breakpoints is only supported during Compile. If you need to set them during requests, feel free to submit your suggestions~

Whether the current run is interrupted and the interrupt information can be obtained from the error returned by the run:

// compose/checkpoint.go

type InterruptInfo struct {
    State any
    BeforeNodes []string
    AfterNodes []string
    SubGraph map[string]InterruptInfo
}

func ExtractInterruptInfo(err error) (info *InterruptInfo, existed bool) {}

For example:

import "github.com/cloudwego/eino/compse"

/***graph compose code
* g := NewGraph
* xxx
* runner := g.Compile
*/

result, err := runner.Invoke(ctx, input)
if info, ok := ExtractInterruptInfo(err); ok {
    // handler info
}
if err != nil {
    // handle error
}

💡 When Interrupt occurs, the output is null, which is meaningless.

Using CheckPoint

CheckPoint records the running state of the Graph, and using CheckPoint allows resuming the operation after an Interrupt.

Implement CheckPointerStore

CheckPointStore is a KV storage interface where the key type is string and the value type is []byte. We do not provide encapsulation or default implementation, so users need to implement it themselves to store checkpoints.

// coompose/checkpoint.go

type CheckpointStore interface {
    Get(ctx **context**._Context_, key string) (value []byte, existed bool,err error)
    Set(ctx **context**._Context_, key string, value []byte) (err error)
}

Register serialization method

The saving and loading of CheckPoint involve the serialization and deserialization of the input and output of Graph nodes as well as State. When only simple types or Eino built-in types (such as Message or Document) are used, users do not need to perform additional operations; when custom structs are introduced, types need to be registered in advance, and Eino provides a registration method RegisterSerializableType:

import "github.com/cloudwego/eino/compose"

type MyStruct struct {
    // struct body
}

// func RegisterSerializableType[T any](name string) error
err = compose.RegisterSerializableType[MyStruct]("MyStruct")

The registered type will have additional type information recorded during serialization. Therefore, during deserialization, even if the type is not specified (such as deserializing to interface{}), Eino can still deserialize the correct type. The key in the registration method uniquely identifies this type, and once the key is determined, it must be ensured that it cannot be changed; otherwise, the persisted checkpoint cannot be correctly restored.

💡 The unexported fields of Struct cannot be accessed, so they will not be stored/restored

If the registered type implements json Marshaler and Unmarshaler, the serialization and deserialization of this type will use custom methods.

// encoding/json

type Marshaler interface {
    MarshalJSON() ([]byte, error)
}

type Unmarshaler interface {
    UnmarshalJSON([]byte) error
}

Turn on CheckPoint

After creating the CheckPointStore, pass it as an option during Compile Graph and bind the CheckPointer to the Graph:

import (
    "github.com/cloudwego/eino/compose"
)

func main() {
    /** graph composed code
    xxx 
    */
    
    err = g.Compile(ctx, compose.WithCheckPointStore(store), compose.WithInterruptBeforeNodes([]string{"node2"}))
    if err != nil {/* error handle */}
}

After that, CheckPoint can be introduced via CallOption when making a request:

// compose/checkpoint.go

func WithCheckPointID(checkPointID string, sm StateModifier) Option
type StateModifier func(ctx context.Context, path NodePath, state any) error
func WithStateModifier(sm StateModifier) GraphCompileOption

The Checkpoint id will be used as the key for the CheckPointStore. When the graph runs, it will check if this id exists in the CheckPointStore. If it exists, the graph will resume execution from the checkpoint; interrupt will save the graph state to this id.

StateModifier takes effect when the Graph resumes running, can modify the State before running, and the path takes effect in nested graphs, being an empty array when not nested.

/* graph compose and compile
xxx
*/

// first run interrupt
id := GenUUID()
_, err := runner.Invoke(ctx, input, WithCheckPointID(id))

// resume from id
_, err = runner.Invoke(ctx, input/*unused*/, 
    WithCheckPointID(id),
    WithStateModifier(func(ctx context.Context, path NodePath, state any) error{
        state.(*testState).Field1 = "hello"
        return nil
    }),
)

💡 When resuming, the input will not be read, so just pass an empty input at this time.

Dynamic Interrupt

A node returning a special error can dynamically trigger an Interrupt:

// eion/compose/checkpoint.go
var InterruptAndRerun = errors.New("interrupt and rerun")

After receiving this error returned by the node, Eino Graph will interrupt. When resuming operation, it will run this node again, and before running again, it will call StateModifier to modify the state (if configured).

In this case, when the node is run again, the input will be replaced with a null value instead of the original input. If the original input is still needed when running again, it needs to be saved to the State in advance.

CheckPoint in Streaming

Streaming requires splicing data streams when saving CheckPoint, so a splicing method needs to be registered:

// compose/stream_concat.go
func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))

// example
type TestStruct struct {
    Body string
}

// RegisterStreamChunkConcatFunc非线程安全,需要在初始化阶段使用
RegisterStreamChunkConcatFunc(func(ss []TestStruct)(TestStruct, error){
    ret := TestStruct{Body:""}
    for i := range ss {
        ret.Body += ss[i].Body
    }
    return ret, nil
})

eino provides concat methods for *schema.Message, []*schema.Message, and string by default.

Interrupt&CheckPoint in the nested graph

When the parent graph passes a CheckPointer, passing InterruptNodes via WithGraphCompileOptions when calling AddAnyGraph can enable Interrupt&CheckPoint for the child graph. If the parent graph does not set a CheckPointer, an error will be reported during Compile.

/* graph compose code
xxx
*/
g.AddAnyGraph("node1", subGraph, WithGraphCompileOptions(
    WithInterruptAfterNodes([]string{"node2"}),
))
    
g.Compile(ctx, WithCheckPointer(cp))

If interrupted in the subgraph, the state modified upon resume should be the subgraph state. TODO, explain the use of Path in StateModifier

Example

https://github.com/cloudwego/eino-examples/tree/main/compose/graph/react_with_interrupt