Eino Streaming Essentials
đź’ˇ Recommended reading first: Eino: Overview and Eino: Orchestration Design Principles
Streaming in Orchestration: Overview
Key factors when orchestrating streaming graphs:
- Which Lambda operators does the component/Lambda offer: choose among Invoke, Stream, Collect, Transform.
- In the topology, do upstream outputs and downstream inputs match (both streaming or both non-streaming)?
- If they don’t match, use streaming helpers:
- Streaming: wrap
Tinto a single-chunkStream[T]. - Concat: merge
Stream[T]into a completeT. Each frame inStream[T]is a piece of the finalT.
- Streaming: wrap
Semantics of Streaming in Eino
- Some components naturally emit frames — partial outputs of the final result — i.e., streaming output. Downstream must concat frames into a complete output. A typical example is an LLM.
- Some components naturally accept frames and can begin meaningful processing before receiving the full input. For example, in a ReAct agent, a branch may decide to call a tool or end execution by inspecting the first frame of the ChatModel’s output.
- Thus, each component may accept non-streaming or streaming input, and produce non-streaming or streaming output.
- Combined, there are four streaming paradigms:
| Function | Pattern | Interaction | Lambda Constructor | Notes |
| Invoke | Non-streaming in, non-streaming out | Ping-Pong | compose.InvokableLambda() | |
| Stream | Non-streaming in, streaming out | Server-Streaming | compose.StreamableLambda() | |
| Collect | Streaming in, non-streaming out | Client-Streaming | compose.CollectableLambda() | |
| Transform | Streaming in, streaming out | Bidirectional-Streaming | compose.TransformableLambda() |
Streaming at the Single-Component Level
Eino is a “component-first” framework; components can be used independently. When defining component interfaces, streaming is guided by real business semantics.
Business Semantics of Components
- ChatModel: besides
Invoke(non-streaming), it naturally supportsStream(streaming). It therefore implementsGenerateandStream, but notCollectorTransform:
type ChatModel interface {
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
*schema.StreamReader[*schema.Message], error)
// other methods omitted...
}
- Retriever: only
Invokehas real use; the other paradigms don’t fit typical scenarios, so it exposes justRetrieve:
type Retriever interface {
Retrieve(ctx context.Context, query string, opts ...Option) ([]*schema.Document, error)
}
Which Paradigms Components Implement
| Component | Invoke | Stream | Collect | Transform |
| ChatModel | yes | yes | no | no |
| ChatTemplate | yes | no | no | no |
| Retriever | yes | no | no | no |
| Indexer | yes | no | no | no |
| Embedder | yes | no | no | no |
| Document Loader | yes | no | no | no |
| Document Transformer | yes | no | no | no |
| Tool | yes | yes | no | no |
Official Eino components: only ChatModel and Tool also support Stream; all others support Invoke only. See: Eino: Components
Collect and Transform are generally useful only within orchestration.
Streaming Across Multiple Components (Orchestration)
Component Paradigms in Orchestration
Standalone, a component’s input/output are fixed by its interface. For example:
- ChatModel inputs non-streaming
[]Messageand outputs either non-streamingMessageor streamingStreamReader[Message].
In orchestration, inputs/outputs depend on upstream/downstream. Consider a typical ReAct agent:
If the Tool is StreamableTool (output is StreamReader[Message]), then Tool → ChatModel may be streaming. However, ChatModel does not accept streaming input. Eino automatically bridges this by concatenating streams into non-streaming input:
Eino’s automatic StreamReader[T] → T conversion applies whenever a component expects T but upstream produces StreamReader[T]. You may need to provide a custom concat function for T.
đź’ˇ The
StreamReader[T] → Tconversion may require a user-provided concat function. See Orchestration Design Principles under “merge frames”.
Conversely, consider another ReAct diagram:
Here, branch reads the ChatModel’s output and decides whether to end or call a tool. Since branch can decide from the first frame, define it with Collect (streaming in, non-streaming out):
compose.NewStreamGraphBranch(func(ctx context.Context, sr *schema.StreamReader[*schema.Message]) (endNode string, err error) {
msg, err := sr.Recv()
if err != nil {
return "", err
}
defer sr.Close()
if len(msg.ToolCalls) == 0 {
return compose._END_, nil
}
return nodeKeyTools, nil
})
If the agent is invoked via Stream, ChatModel outputs StreamReader[Message], matching the branch’s input.
If the agent is invoked via Generate, ChatModel outputs Message. Eino automatically wraps Message into a single-frame StreamReader[Message] (pseudo-stream) to match the branch.
Summary: When upstream outputs T but downstream expects StreamReader[T], Eino wraps T into a single-frame StreamReader[T].
Streaming Paradigms of Orchestration Aids
Branch is an orchestration-only aid, not a standalone component. Others include:
| Element | Use Case | Invoke | Stream | Collect | Transform |
| Branch | Select a downstream node dynamically based on upstream output | yes | no | yes | no |
| StatePreHandler | Modify State/Input before entering a node. Streaming-friendly. | yes | no | no | yes |
| StatePostHandler | Modify State/Output after a node. Streaming-friendly. | yes | no | no | yes |
| Passthrough | Balance node counts across parallel branches by inserting passthroughs. Input equals output. | yes | no | no | yes |
| Lambda | Encapsulate custom business logic; implement the paradigm that matches your logic. | yes | yes | yes | yes |
Orchestration artifacts (compiled Chain/Graph) can be treated as components themselves — used standalone or as nodes within higher-level orchestration.
Streaming at Orchestration Level (Whole Graph)
“Business” Paradigms of Orchestration Artifacts
As a component, a compiled artifact has no business semantics — it serves orchestration. It must support all paradigms:
type Runnable[I, O any] interface {
Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}
For a specific compiled graph, the correct paradigm depends on its business scenario. A ReAct-like graph typically fits Invoke and Stream.
Runtime Paradigms of Components Inside a Compiled Graph
Viewed as a “component”, a compiled Graph’s internal implementation is data flowing among its nodes according to your specified flow direction and streaming paradigms. Flow direction is out of scope here; runtime paradigms are determined by how the Graph is triggered.
When you call a compiled graph via Invoke, all internal components run in the Invoke paradigm. If a component does not implement Invoke, Eino wraps it using the first available option:
- If
Streamexists, wrapStreamasInvokeby concatenating the output stream.
- Else if
Collectexists, wrapCollectasInvokeby boxing non-streaming input into a single-frame stream.
- Else use
Transform, wrapping it asInvokeby boxing input into a single-frame stream and concatenating the output stream.
When you call via Stream / Collect / Transform, all internal components run in the Transform paradigm. If a component does not implement Transform, Eino wraps using the first available option:
- If
Streamexists, wrapStreamasTransformby concatenating the input stream.
- Else if
Collectexists, wrapCollectasTransformby boxing non-streaming output into a single-frame stream.
- Else wrap
InvokeasTransformby concatenating input streams and boxing outputs into single-frame streams.
In summary, Eino’s automatic conversions between T and Stream[T] are:
- T → Stream[T]: box
Tinto a single-frame stream (pseudo-stream). - Stream[T] → T: concat the stream into a complete
T. For non-single-frame streams, you may need to provide a concat function.
You might wonder why a graph-level Invoke enforces Invoke internally, and Stream/Collect/Transform enforces Transform internally. Consider these counterexamples:
- Two components A and B composed into a Chain, called via
Invoke. Suppose A implementsStream, B implementsCollect.- Choice 1: A runs as
Stream, B runs asCollect. The overall Chain remainsInvoketo the caller while preserving true streaming semantics inside (no concat; A’s output stream feeds B in real time). - Current Eino behavior: both A and B run as
Invoke. A’s output stream is concatenated to a full value; B’s input is boxed into a pseudo-stream. True streaming semantics inside are lost.
- Choice 1: A runs as
- Two components A and B composed into a Chain, called via
Collect. Suppose A implementsTransformandCollect, B implementsInvoke.- Choice 1: A runs as
Collect, B runs asInvoke. The overall remainsCollectwith no automatic conversions or boxing. - Current Eino behavior: both A and B run as
Transform. Since B only implementsInvoke, its input may require concat from a real stream, potentially needing a user-provided concat function — which could have been avoided.
- Choice 1: A runs as
Generalizing across arbitrary graphs, it’s difficult to define a universal rule that is always better and remains clear. Influencing factors include which paradigms components implement, aiming to maximize true streaming, and whether concat functions exist. For complex graphs, the number of factors grows quickly. Even if a more optimal universal rule exists, it would be hard to explain and use without exceeding the benefit it provides. Eino’s design therefore favors clarity and predictability.
By design:
- Invoke outside → Invoke inside: no streaming inside.
- Stream/Collect/Transform outside → Transform inside: streaming inside;
Stream[T] → Tmay require a concat function.









