Eino: Workflow Orchestration Framework
What Is Eino Workflow?
An orchestration API at the same architectural level as Graph:
flowchart LR
E[Eino compose engine]
G[Graph API]
W[Workflow API]
C[Chain API]
E --> G
E --> W
G --> C
Key traits:
- Same capability level as Graph; both orchestrate “LLM-centric information flow”.
- Node types, streaming, callbacks, options, state, interrupt & checkpoint are consistent.
- Implements
AnyGraph; can be added as a child to Graph/Chain/Workflow viaAddGraphNode. - Can host other Graph/Chain/Workflow as children.
- Field-level mapping: node input can be a composition of arbitrary fields from arbitrary predecessors.
- Natively supports
struct,map, and arbitrarily nested combinations.
- Natively supports
- Control flow and data flow can be separated. In Graph, edges carry both; in Workflow, you may split them.
- No cycles (e.g., ChatModel→ToolsNode→ChatModel loops).
NodeTriggerModeis fixed toAllPredecessor.
Why Workflow?
Flexible Input/Output Types
For example, you need to orchestrate two lambda nodes containing two “existing business functions f1, f2” with specific struct inputs/outputs suited to business scenarios, each different:
When orchestrating with Workflow, map f1’s output field F1 directly to f2’s input field F3, while preserving the original function signatures of f1 and f2. The effect achieved is: each node’s input/output is “determined by the business scenario”, without needing to consider “who provides my input and who uses my output”.
When orchestrating with Graph, due to the “type alignment” requirement, if f1 → f2, then f1’s output type and f2’s input type need to align. You must choose one of two options:
- Define a new common struct, and change both f1’s output type and f2’s input type to this common struct. This has costs and may intrude on business logic.
- Change both f1’s output type and f2’s input type to map. This loses the strong typing alignment characteristic.
Separate Control and Data Flow
Consider the following scenario:
Node D references certain output fields from A, B, and C simultaneously. The dashed line from A to D is purely “data flow”, not carrying “control” information - meaning whether A completes execution does not determine whether D starts execution.
The bold arrow from node D to E represents that node E does not reference any output from node D - it is purely “control flow”, not carrying “data”. Whether D completes execution determines whether E starts execution, but D’s output does not affect E’s input.
Other lines in the diagram combine control flow and data flow.
Note that data flow can only be transmitted when a control flow path exists. For example, the A→D data flow depends on the existence of A→branch→B→D or A→branch→C→D control flow. Data flow can only reference outputs from predecessor nodes.
For example, this “cross-node” specific data passing scenario:
In the diagram above, the chat template node’s input can be very explicit:
map[string]any{"prompt": "prompt from START", "context": "retrieved context"}
In contrast, if using Graph or Chain API, you must choose one of two options:
- Use OutputKey to convert node output types (can’t add to START node, so need an extra passthrough node), and the ChatTemplate node’s input will include the full output of START and retriever (rather than just the specific fields actually needed).
- Put START node’s prompt in state, and ChatTemplate reads from state. This introduces additional state.
Using Workflow
Simplest Workflow
START → node → END
// creates and invokes a simple workflow with only a Lambda node.
// Since all field mappings are ALL to ALL mappings
// (by using AddInput without field mappings),
// this simple workflow is equivalent to a Graph: START -> lambda -> END.
func main() {
// create a Workflow, just like creating a Graph
wf := compose.NewWorkflow[int, string]()
// add a lambda node to the Workflow, just like adding the lambda to a Graph
wf.AddLambdaNode("lambda", compose.InvokableLambda(
func(ctx context.Context, in int) (string, error) {
return strconv.Itoa(in), nil
})).
// add an input to this lambda node from START.
// this means mapping all output of START to the input of the lambda.
// the effect of AddInput is to set both a control dependency
// and a data dependency.
AddInput(compose.START)
// obtain the compose.END of the workflow for method chaining
wf.End().
// add an input to compose.END,
// which means 'using ALL output of lambda node as output of END'.
AddInput("lambda")
// compile the Workflow, just like compiling a Graph
run, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
// invoke the Workflow, just like invoking a Graph
result, err := run.Invoke(context.Background(), 1)
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
Core APIs:
func NewWorkflow[I, O any](opts ...NewGraphOption) *Workflow[I, O]- Creates a new Workflow.
- Signature is identical to
NewGraph.
func (wf *Workflow[I, O]) AddChatModelNode(key string, chatModel model.BaseChatModel, opts ...GraphAddNodeOpt) *WorkflowNode- Adds a new node to the Workflow.
- Supported node types are identical to Graph.
- Unlike Graph’s AddXXXNode which returns an error immediately, Workflow defers error handling to the final Compile step.
- AddXXXNode returns a WorkflowNode, allowing subsequent field mapping operations via method chaining.
func (n *WorkflowNode) AddInput(fromNodeKey string, inputs ...*FieldMapping) *WorkflowNode- Adds input field mappings to a WorkflowNode.
- Returns WorkflowNode for continued method chaining.
(wf *Workflow[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)- Compiles a Workflow.
- Signature is identical to compiling a Graph.
Field Mapping
START (struct input) → [parallel lambda c1, c2] → END (map output).
We demonstrate counting occurrences of a substring in two different fields. The workflow input is an Eino Message plus a SubStr; c1 counts occurrences in Content, c2 counts occurrences in ReasoningContent. The two lambdas run in parallel and map their results to END.
In the diagram below, the workflow’s overall input is a message struct, both c1 and c2 lambdas have counter struct inputs, both output int, and the workflow’s overall output is map[string]any:
// demonstrates the field mapping ability of eino workflow.
func main() {
type counter struct {
FullStr string // exported because we will do field mapping for this field
SubStr string // exported because we will do field mapping for this field
}
// wordCounter is a lambda function that counts occurrences of SubStr within FullStr
wordCounter := func(ctx context.Context, c counter) (int, error) {
return strings.Count(c.FullStr, c.SubStr), nil
}
type message struct {
*schema.Message // exported because we will do field mapping for this field
SubStr string // exported because we will do field mapping for this field
}
// create a workflow just like a Graph
wf := compose.NewWorkflow[message, map[string]any]()
// add lambda c1 just like in Graph
wf.AddLambdaNode("c1", compose.InvokableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's SubStr field to lambda c1's SubStr field
compose.MapFields("SubStr", "SubStr"),
// map START's Message's Content field to lambda c1's FullStr field
compose.MapFieldPaths([]string{"Message", "Content"}, []string{"FullStr"}))
// add lambda c2 just like in Graph
wf.AddLambdaNode("c2", compose.InvokableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's SubStr field to lambda c2's SubStr field
compose.MapFields("SubStr", "SubStr"),
// map START's Message's ReasoningContent field to lambda c2's FullStr field
compose.MapFieldPaths([]string{"Message", "ReasoningContent"}, []string{"FullStr"}))
wf.End(). // Obtain the compose.END for method chaining
// add an input from c1, mapping full output of c1 to the map key 'content_count'
AddInput("c1", compose.ToField("content_count")).
// also add an input from c2, mapping full output of c2 to the map key 'reasoning_content_count'
AddInput("c2", compose.ToField("reasoning_content_count"))
// compile the workflow just like compiling a Graph
run, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
// invoke the workflow just like invoking a Graph
result, err := run.Invoke(context.Background(), message{
Message: &schema.Message{
Role: schema.Assistant,
Content: "Hello world!",
ReasoningContent: "I need to say something meaningful",
},
SubStr: "o", // would like to count the occurrences of 'o'
})
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
The AddInput method accepts 0–n field mappings and can be called multiple times. This means:
- A node can reference any number of fields from a single predecessor’s output.
- A node can reference fields from any number of predecessors.
- A mapping can be whole→field, field→whole, whole→whole, or nested field↔nested field.
- Different mapping types have different APIs:
- Top-level field → top-level field:
MapFields(string, string) - Whole output → top-level field:
ToField(string) - Top-level field → whole input:
FromField(string) - Nested field → nested field:
MapFieldPaths(FieldPath, FieldPath) - Whole output → nested field:
ToFieldPath(FieldPath) - Nested field → whole input:
FromFieldPath(FieldPath) - Whole output → whole input:
AddInput(fromKey)with noFieldMapping
- Top-level field → top-level field:
Advanced Features
Data-only Edges (No Control Flow)
Imagine a simple scenario: START → adder node → multiplier node → END. The “multiplier node” multiplies one field from START with the result from the adder node:
In the diagram above, the multiplier node executes after the adder node, meaning the “multiplier node” is controlled by the “adder node”. However, the START node does not directly control the “multiplier node”; it only passes data to it. In code, use AddInputWithOptions(fromNode, fieldMappings, WithNoDirectDependency) to specify a pure data flow:
func main() {
type calculator struct {
Add []int
Multiply int
}
adder := func(ctx context.Context, in []int) (out int, err error) {
for _, i := range in {
out += i
}
return out, nil
}
type mul struct {
A int
B int
}
multiplier := func(ctx context.Context, m mul) (int, error) {
return m.A * m.B, nil
}
wf := compose.NewWorkflow[calculator, int]()
wf.AddLambdaNode("adder", compose.InvokableLambda(adder)).
AddInput(compose.START, compose.FromField("Add"))
wf.AddLambdaNode("mul", compose.InvokableLambda(multiplier)).
AddInput("adder", compose.ToField("A")).
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.MapFields("Multiply", "B")},
// use WithNoDirectDependency to declare a 'data-only' dependency,
// in this case, START node's execution status will not determine whether 'mul' node can execute.
// START node only passes one field of its output to 'mul' node.
compose.WithNoDirectDependency())
wf.End().AddInput("mul")
runner, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
result, err := runner.Invoke(context.Background(), calculator{
Add: []int{2, 5},
Multiply: 3,
})
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%d", result)
}
New API introduced in this example:
func (n *WorkflowNode) AddInputWithOptions(fromNodeKey string, inputs []*FieldMapping, opts ...WorkflowAddInputOpt) *WorkflowNode {
return n.addDependencyRelation(fromNodeKey, inputs, getAddInputOpts(opts))
}
And the new Option:
func WithNoDirectDependency() WorkflowAddInputOpt {
return func(opt *workflowAddInputOpts) {
opt.noDirectDependency = true
}
}
Combined, these can add pure “data dependency relationships” to nodes.
Control-only Edges (No Data Flow)
Imagine a “sequential bidding with confidential prices” scenario: START → bidder1 → threshold check → bidder2 → END:
In the diagram above, regular lines are “control + data”, dashed lines are “data only”, and bold lines are “control only”. The logic is: input an initial price, bidder1 makes bid1, a branch checks if it’s high enough - if so, end directly; otherwise, pass the initial price to bidder2 for bid2, and finally aggregate both bids for output.
After bidder1 bids, an announcement is made: “bidder completed bidding”. Note that bidder1→announcer is a bold solid line, “control only”, because the amount must be kept confidential when announcing!
The two bold lines from the branch are both “control only” because neither bidder2 nor END depends on data from the branch. In code, use AddDependency(fromNode) to specify pure control flow:
func main() {
bidder1 := func(ctx context.Context, in float64) (float64, error) {
return in + 1.0, nil
}
bidder2 := func(ctx context.Context, in float64) (float64, error) {
return in + 2.0, nil
}
announcer := func(ctx context.Context, in any) (any, error) {
logs.Infof("bidder1 had lodged his bid!")
return nil, nil
}
wf := compose.NewWorkflow[float64, map[string]float64]()
wf.AddLambdaNode("b1", compose.InvokableLambda(bidder1)).
AddInput(compose.START)
// just add a node to announce bidder1 had lodged his bid!
// It should be executed strictly after bidder1, so we use `AddDependency("b1")`.
// Note that `AddDependency()` will only form control relationship,
// but not data passing relationship.
wf.AddLambdaNode("announcer", compose.InvokableLambda(announcer)).
AddDependency("b1")
// add a branch just like adding branch in Graph.
wf.AddBranch("b1", compose.NewGraphBranch(func(ctx context.Context, in float64) (string, error) {
if in > 5.0 {
return compose.END, nil
}
return "b2", nil
}, map[string]bool{compose.END: true, "b2": true}))
wf.AddLambdaNode("b2", compose.InvokableLambda(bidder2)).
// b2 executes strictly after b1 (through branch dependency),
// but does not rely on b1's output,
// which means b2 depends on b1 conditionally,
// but no data passing between them.
AddInputWithOptions(compose.START, nil, compose.WithNoDirectDependency())
wf.End().AddInput("b1", compose.ToField("bidder1")).
AddInput("b2", compose.ToField("bidder2"))
runner, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
result, err := runner.Invoke(context.Background(), 3.0)
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
New API introduced in this example:
func (n *WorkflowNode) AddDependency(fromNodeKey string) *WorkflowNode {
return n.addDependencyRelation(fromNodeKey, nil, &workflowAddInputOpts{dependencyWithoutInput: _true_})
}
You can use AddDependency to specify pure “control dependency relationships” for nodes.
Branch
In the example above, we added a branch in almost the same way as with the Graph API:
// add a branch just like adding branch in Graph.
wf.AddBranch("b1", compose.NewGraphBranch(func(ctx context.Context, in float64) (string, error) {
if in > 5.0 {
return compose.END, nil
}
return "b2", nil
}, map[string]bool{compose.END: true, "b2": true}))
Branch semantics are the same as Graph’s AllPredecessor mode:
- There is exactly one ‘fromNode’, meaning a branch can only have one predecessor control node.
- Can be single-select (
NewGraphBranch) or multi-select (NewGraphMultiBranch). - Selected branches can execute. Unselected branches are marked as skip.
- A node can only execute when all incoming edges are complete (success or skip), and at least one edge succeeded. (Like END in the example above)
- If all incoming edges of a node are skip, all outgoing edges of that node are automatically marked as skip.
Additionally, there is one key difference between workflow branch and graph branch:
- Graph branch is always “control and data combined”; the downstream node’s input of a branch is always the output of the branch’s fromNode.
- Workflow branch is always “control only”; the downstream node’s input is specified via
AddInputWithOptions.
Related API:
func (wf *Workflow[I, O]) AddBranch(fromNodeKey string, branch *GraphBranch) *WorkflowBranch {
wb := &WorkflowBranch{
fromNodeKey: fromNodeKey,
GraphBranch: branch,
}
wf.workflowBranches = append(wf.workflowBranches, wb)
return wb
}
Signature is almost identical to Graph.AddBranch, allowing you to add a branch to the workflow.
Static Values
Let’s modify the “bidding” example above by giving bidder1 and bidder2 each a static “budget” configuration:
budget1 and budget2 will be injected into bidder1 and bidder2’s inputs as “static values” respectively. Use the SetStaticValue method to configure static values for workflow nodes:
func main() {
type bidInput struct {
Price float64
Budget float64
}
bidder := func(ctx context.Context, in bidInput) (float64, error) {
if in.Price >= in.Budget {
return in.Budget, nil
}
return in.Price + rand.Float64()*in.Budget, nil
}
wf := compose.NewWorkflow[float64, map[string]float64]()
wf.AddLambdaNode("b1", compose.InvokableLambda(bidder)).
AddInput(compose.START, compose.ToField("Price")).
// set 'Budget' field to 3.0 for b1
SetStaticValue([]string{"Budget"}, 3.0)
// add a branch just like adding branch in Graph.
wf.AddBranch("b1", compose.NewGraphBranch(func(ctx context.Context, in float64) (string, error) {
if in > 5.0 {
return compose.END, nil
}
return "b2", nil
}, map[string]bool{compose.END: true, "b2": true}))
wf.AddLambdaNode("b2", compose.InvokableLambda(bidder)).
// b2 executes strictly after b1, but does not rely on b1's output,
// which means b2 depends on b1, but no data passing between them.
AddDependency("b1").
AddInputWithOptions(compose.START, []*compose.FieldMapping{compose.ToField("Price")}, compose.WithNoDirectDependency()).
// set 'Budget' field to 4.0 for b2
SetStaticValue([]string{"Budget"}, 4.0)
wf.End().AddInput("b1", compose.ToField("bidder1")).
AddInput("b2", compose.ToField("bidder2"))
runner, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
result, err := runner.Invoke(context.Background(), 3.0)
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
logs.Infof("%v", result)
}
Related API:
func (n *WorkflowNode) SetStaticValue(path FieldPath, value any) *WorkflowNode {
n.staticValues[path.join()] = value
return n
}
Use this method to set static values on specified fields of Workflow nodes.
Streaming Effects
Returning to the previous “character counting” example, if our workflow’s input is no longer a single message but a message stream, and our counting function can count each message chunk in the stream separately and return a “count stream”:
We make some modifications to the previous example:
- Change
InvokableLambdatoTransformableLambda, so it can consume streams and produce streams. - Change the
SubStrin the input to a static value, injected into c1 and c2. - Change the Workflow’s overall input to
*schema.Message. - Call the workflow using Transform, passing a stream containing 2
*schema.Message.
The completed code:
// demonstrates the stream field mapping ability of eino workflow.
// It's modified from 2_field_mapping.
func main() {
type counter struct {
FullStr string // exported because we will do field mapping for this field
SubStr string // exported because we will do field mapping for this field
}
// wordCounter is a transformable lambda function that
// counts occurrences of SubStr within FullStr for each chunk.
wordCounter := func(ctx context.Context, c *schema.StreamReader[counter]) (
*schema.StreamReader[int], error) {
var subStr, cachedStr string
return schema.StreamReaderWithConvert(c, func(co counter) (int, error) {
if len(co.SubStr) > 0 {
// static values will not always come in the first chunk,
// so before the static value (SubStr) comes in,
// we need to cache the full string
subStr = co.SubStr
fullStr := cachedStr + co.FullStr
cachedStr = ""
return strings.Count(fullStr, subStr), nil
}
if len(subStr) > 0 {
return strings.Count(co.FullStr, subStr), nil
}
cachedStr += co.FullStr
return 0, schema.ErrNoValue
}), nil
}
// create a workflow just like a Graph
wf := compose.NewWorkflow[*schema.Message, map[string]int]()
// add lambda c1 just like in Graph
wf.AddLambdaNode("c1", compose.TransformableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's Message's Content field to lambda c1's FullStr field
compose.MapFields("Content", "FullStr")).
// we can set static values even if the input will be stream
SetStaticValue([]string{"SubStr"}, "o")
// add lambda c2 just like in Graph
wf.AddLambdaNode("c2", compose.TransformableLambda(wordCounter)).
AddInput(compose.START, // add an input from START, specifying 2 field mappings
// map START's Message's ReasoningContent field to lambda c1's FullStr field
compose.MapFields("ReasoningContent", "FullStr")).
SetStaticValue([]string{"SubStr"}, "o")
wf.End(). // Obtain the compose.END for method chaining
// add an input from c1,
// mapping full output of c1 to the map key 'content_count'
AddInput("c1", compose.ToField("content_count")).
// also add an input from c2,
// mapping full output of c2 to the map key 'reasoning_content_count'
AddInput("c2", compose.ToField("reasoning_content_count"))
// compile the workflow just like compiling a Graph
run, err := wf.Compile(context.Background())
if err != nil {
logs.Errorf("workflow compile error: %v", err)
return
}
// call the workflow using Transform just like calling a Graph with Transform
result, err := run.Transform(context.Background(),
schema.StreamReaderFromArray([]*schema.Message{
{
Role: schema.Assistant,
ReasoningContent: "I need to say something meaningful",
},
{
Role: schema.Assistant,
Content: "Hello world!",
},
}))
if err != nil {
logs.Errorf("workflow run err: %v", err)
return
}
var contentCount, reasoningCount int
for {
chunk, err := result.Recv()
if err != nil {
if err == io.EOF {
result.Close()
break
}
logs.Errorf("workflow receive err: %v", err)
return
}
logs.Infof("%v", chunk)
contentCount += chunk["content_count"]
reasoningCount += chunk["reasoning_content_count"]
}
logs.Infof("content count: %d", contentCount)
logs.Infof("reasoning count: %d", reasoningCount)
}
Based on the example above, we summarize some characteristics of workflow streaming:
- Still 100% Eino stream: four paradigms (invoke, stream, collect, transform), automatically converted, copied, concatenated, and merged by the Eino framework.
- Field mapping configuration doesn’t require special handling for streams: regardless of whether the actual input/output is a stream, the
AddInputsyntax is the same. The Eino framework handles stream-based mapping. - Static values don’t require special handling for streams: even if the actual input is a stream, you can use
SetStaticValuethe same way. The Eino framework will place static values in the input stream, but not necessarily in the first chunk read.
Field Mapping Scenarios
Type Alignment
Workflow follows the same type alignment rules as Graph, except the alignment granularity changes from complete input/output alignment to alignment between mapped field pairs. Specifically:
- Identical types: passes compile-time validation and will definitely align.
- Different types but upstream can be assigned to downstream (e.g., upstream is a concrete type, downstream is
any): passes compile-time validation and will definitely align. - Upstream cannot be assigned to downstream (e.g., upstream is
int, downstream isstring): compile error. - Upstream may be assignable to downstream (e.g., upstream is
any, downstream isint): cannot be determined at compile time, deferred to runtime. At runtime, the actual upstream type is extracted and checked. If upstream cannot be assigned to downstream, an error is thrown.
Merge Scenarios
Merge refers to situations where a node’s input is mapped from multiple FieldMappings:
- Mapping to multiple different fields: supported
- Mapping to the same single field: not supported
- Mapping to the whole input while also having mappings to specific fields: conflict, not supported
Nested map[string]any
For example, this mapping: ToFieldPath([]string{"a","b"}), where the target node’s input type is map[string]any, the mapping order is:
- First level “a”: result is
map[string]any{"a": nil} - Second level “b”: result is
map[string]any{"a": map[string]any{"b": x}}
As you can see, at the second level, the Eino framework automatically replaces any with the actual map[string]any.
CustomExtractor
In some scenarios, standard field mapping semantics cannot support the requirement. For example, if upstream is []int and you want to extract the first element to map to downstream, use WithCustomExtractor:
t.Run("custom extract from array element", func(t *testing.T) {
wf := NewWorkflow[[]int, map[string]int]()
wf.End().AddInput(_START_, ToField("a", WithCustomExtractor(func(input any) (any, error) {
return input.([]int)[0], nil
})))
r, err := wf.Compile(context.Background())
assert.NoError(t, err)
result, err := r.Invoke(context.Background(), []int{1, 2})
assert.NoError(t, err)
assert.Equal(t, map[string]int{"a": 1}, result)
})
When using WithCustomExtractor, all compile-time type alignment checks cannot be performed and can only be deferred to runtime validation.
Constraints
- Map key restrictions: only
string, or string alias (types that can be converted tostring). - Unsupported CompileOptions:
WithNodeTriggerMode, because it’s fixed toAllPredecessor.WithMaxRunSteps, because there are no cycles.
- If the mapping source is a Map Key, the Map must contain this key. However, if the mapping source is a Stream, Eino cannot determine whether this key appears at least once across all frames in the stream, so validation cannot be performed for Streams.
- If the mapping source field or target field belongs to a struct, these fields must be exported, because reflection is used internally.
- Nil mapping source: generally supported, only errors when the mapping target cannot be nil, such as basic types (int, etc.).
Real-world Usage
Coze-Studio Workflow
Coze-Studio open source version’s workflow engine is built on the Eino Workflow orchestration framework. See: 11. Adding New Workflow Node Types (Backend)








