Eino: Lambda Guide

Introduction

Lambda is the simplest component type, allowing you to embed custom function logic in a workflow. Lambdas can implement one or more of the four paradigms formed by streaming/non-streaming input/output: Invoke, Stream, Collect, Transform.

The framework converts among paradigms under defined rules. See Overview for details (Runnable section).

Definition and Construction

Code: eino/compose/types_lambda.go

type Lambda struct { executor *composableRunnable }

Lambda function signatures:

type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)
type Stream[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output *schema.StreamReader[O], err error)
type Collect[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output O, err error)
type Transform[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)

Usage

Examples: https://github.com/cloudwego/eino-examples/blob/main/components/lambda

Constructors

Eino components generally accept func(ctx, input, ...option) (output, error). For lambdas, simpler constructors are provided:

  • Provide exactly one paradigm function
    • Without custom options
    • With custom options
  • Provide any subset of the four paradigms via AnyLambda

Without Custom Options

  • InvokableLambda
// input and output can be any custom types
lambda := compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
    // some logic
})
  • StreamableLambda
// input is any type; output must be *schema.StreamReader[O]
lambda := compose.StreamableLambda(func(ctx context.Context, input string) (*schema.StreamReader[string], error) {
    // some logic
})
  • CollectableLambda
// input must be *schema.StreamReader[I]; output can be any type
lambda := compose.CollectableLambda(func(ctx context.Context, input *schema.StreamReader[string]) (string, error) {
    // some logic
})
  • TransformableLambda
// input and output must be *schema.StreamReader[I]
lambda := compose.TransformableLambda(func(ctx context.Context, input *schema.StreamReader[string]) (*schema.StreamReader[string], error) {
    // some logic
})

Shared options:

  • compose.WithLambdaType() — change component type (default: Lambda)
  • compose.WithLambdaCallbackEnable() — disable default node callbacks in Graph

With Custom Options

type Options struct { Field1 string }
type MyOption func(*Options)

lambda := compose.InvokableLambdaWithOption(
    func(ctx context.Context, input string, opts ...MyOption) (string, error) {
        // handle opts
        // some logic
    },
)

AnyLambda

Implement multiple paradigms at once:

type Options struct { Field1 string }
type MyOption func(*Options)

lambda, err := compose.AnyLambda(
    func(ctx context.Context, input string, opts ...MyOption) (string, error) { /* ... */ },
    func(ctx context.Context, input string, opts ...MyOption) (*schema.StreamReader[string], error) { /* ... */ },
    func(ctx context.Context, input *schema.StreamReader[string], opts ...MyOption) (string, error) { /* ... */ },
    func(ctx context.Context, input *schema.StreamReader[string], opts ...MyOption) (*schema.StreamReader[string], error) { /* ... */ },
)

In Orchestration

Graph

graph := compose.NewGraph[string, *MyStruct]()
graph.AddLambdaNode(
    "node1",
    compose.InvokableLambda(func(ctx context.Context, input string) (*MyStruct, error) {
        // some logic
    }),
)

Chain

chain := compose.NewChain[string, string]()
chain.AppendLambda(compose.InvokableLambda(func(ctx context.Context, input string) (string, error) {
    // some logic
}))

Built-in Lambdas

ToList

Convert a single element into a one-item slice:

lambda := compose.ToList[*schema.Message]()
chain := compose.NewChain[[]*schema.Message, []*schema.Message]()
chain.AppendChatModel(chatModel)
chain.AppendLambda(lambda)

MessageParser

Parse a JSON message (often from an LLM) into a struct:

// define target struct
type MyStruct struct {
    ID int `json:"id"`
}

// create parser
parser := schema.NewMessageJSONParser[*MyStruct](&schema.MessageJSONParseConfig{
    ParseFrom: schema.MessageParseFromContent,
    ParseKeyPath: "", // use "key.sub.grandsub" to parse subfields
})

// create parser lambda
parserLambda := compose.MessageParser(parser)

// use in Chain
chain := compose.NewChain[*schema.Message, *MyStruct]()
chain.AppendLambda(parserLambda)

// example
runner, err := chain.Compile(context.Background())
parsed, err := runner.Invoke(context.Background(), &schema.Message{
    Content: `{"id": 1}`,
})
// parsed.ID == 1

Parsing from tool call results is also supported:

// parse from tool call results
parser := schema.NewMessageJSONParser[*MyStruct](&schema.MessageJSONParseConfig{
    ParseFrom: schema.MessageParseFromToolCall,
})