Eino 流式编程要点

💡 建议先看:Eino: 基础概念介绍 Eino: 编排的设计理念

编排流式概述

编排流式的 Graph 时,需要考虑的几个关键要素:

  • 组件/Lambda 中包含哪几种 Lambda 算子: 从 Invoke、Stream、Collect、Transform 中任选
  • 编排拓扑图中,上下游节点的输入、输出是否同为流或同为非流。
  • 如果上下游节点的流类型无法匹配。 需要借助 流化、合包 两个操作
    • 流化(Streaming):将 T 流化成单 Chunk 的 Stream[T]
    • 合包(Concat):将 Stream[T] 合并成一个完整的 T。Stream[T] 中的每一“帧”是这个完整 T 的一部分。

Eino 流式编程的内涵

  • 有的组件,天然支持分“帧”来输出,每次输出一个完整出参的一部分,即“流式”输出。流式输出完成后,需要下游把这些“帧”拼接(concat)成完整的出参。典型的例子,是 LLM。
  • 有的组件,天然支持分“帧”来输入,接收到不完整的入参时,就能开始有意义的业务处理,甚至完成业务处理的过程。比如 react agent 中用来判断是调 tool 还是结束运行的 branch 里面,拿到 LLM 的流式输出,从第一个帧里面就可以通过判断 message 是否包含 tool call 来做出决策。
  • 因此,一个组件,从入参角度看,有“非流式”入参和“流式”入参两种,从出参角度看,有“非流式”出参和“流式”出参两种。
  • 组合起来,有四种可能的流式编程范式

单个组件角度的流式

Eino 是个 “component first” 的框架,组件可以独立使用。定组件接口的时候,需要考虑流式编程的问题吗?简单的答案是不需要。复杂的答案是“以业务真实场景为准”。

组件自身的业务范式

一个典型的组件,比如 Chat Model,Retriever 等,根据实际的业务语义定接口就行,如果实际上支持某种流式的范式,就实现那一种流式范式,如果实际上某种流式范式没有真正的业务场景,那就不需要实现。比如

  • Chat Model,除了 Invoke 这种非流式的范式外,还天然支持 Stream 这种流式范式,因此 Chat Model 的接口中,实现了 Generate 和 Stream 两个接口。但是 Collect 和 Transform 没有对应的真实业务场景,所以就没有实现相应的接口:
type ChatModel interface {
    Generate(ctx context.Context**, **input []*schema.Message**, **opts ...Option) (*schema.Message**, **error)
    Stream(ctx context.Context**, **input []*schema.Message**, **opts ...Option) (
       *schema.StreamReader[*schema.Message]**, **error)
    // other methods omitted...
}
  • Retriever,除了 Invoke 这种非流式的范式外,另外三种流式范式都没有真实的业务场景,因此只实现了 Retrieve 一个接口:
type Retriever interface {
    Retrieve(ctx context.Context**, **query string**, **opts ...Option) ([]*schema.Document**, **error)
}

组件具体支持的范式

组件名称
是否实现 Invoke
是否实现 Stream
是否实现 Collect
是否实现 Transform
Chat model
yes
yes
no
no
Chat template
yes
no
no
no
Retriever
yes
no
no
no
Indexer
yes
no
no
no
Embedder
yes
no
no
no
Document Loader
yes
no
no
no
Document Transformer
yes
no
no
no
Tool
yes
yes
no
no

Eino 官方组件中,除了 Chat Model 和 Tool 额外支持 stream 外,其他所有组件都只支持 invoke。组件具体介绍参见:[更新中]Eino: Components 抽象&实现

Collect 和 Transform 两种流式范式,目前只在编排场景有用到。

多个组件编排角度的流式

组件在编排中的流式范式

一个组件,单独使用时,入参和出参的流式范式是框定的,不可能超出组件定义的接口范围。

  • 比如 Chat Model,入参只可能是非流式的 []Message,出参则可能是非流式的 Message 或者流式的 StreamReader[Message],因为 Chat Model 只实现了 Invoke 和 Stream 两个范式。

但是,一个组件,一旦处在多个组件组合使用的“编排”场景中,它的入参和出参就没那么固定了,而是取决于这个组件在编排场景中的“上游输出”和“下游输入”。比如 React Agent 的典型编排示意图:

上图中,如果 Tool 是个 StreamableTool,也就是输出是 StreamReader[Message],则 Tool -> ChatModel 就可能是流式的输出。但是 Chat Model 并没有接收流式输入的业务场景,也没有对应的接口。这时 Eino 框架会自动帮助 ChatModel 补足接收流式输入的能力:

上面的 Concat message stream 是 Eino 框架自动提供的能力,即使不是 message,是任意的 T,只要满足特定的条件,Eino 框架都会自动去做这个 StreamReader[T] 到 T 的转化,这个条件是:在编排中,当一个组件的上游输出是 StreamReader[T],但是组件只提供了 T 作为输入的业务接口时,框架会自动将 StreamReader[T] concat 成 T,再输入给这个组件。

💡 框架自动将 StreamReader[T] concat 成 T 的过程,可能需要用户提供一个 Concat function。详见 Eino: 编排的设计理念 中关于“合并帧”的章节。

另一方面,考虑一个相反的例子。还是 React Agent,这次是一个更完整的编排示意图:

在上图中,branch 接收 chat model 输出的 message,并根据 message 中是否包含 tool call,来选择直接结束 agent 本次运行并将 message 输出,还是调用 Tool 并将调用结果再次给 Chat Model 循环处理。由于这个 Branch 可以通过 message stream 的首个帧就完成逻辑判断,因此我们给这个 Branch 定义的是 Collect 接口,即流式输入,非流式输出:

compose.NewStreamGraphBranch(func(ctx context.Context**, **sr *schema.StreamReader[*schema.Message]) (endNode string**, **err error) {
    msg**, **err := sr.Recv()
    if err != nil {
       return ""**, **err
    }
    defer sr.Close()

    if len(msg.ToolCalls) == **0 **{
       return compose._END_**, **nil
    }

    return nodeKeyTools**, **nil
}

ReactAgent 有两个接口,Generate 和 Stream,分别实现了 Invoke 和 Stream 的流式编程范式。当一个 ReactAgent 以 Stream 的方式被调用时,Chat Model 的输出是 StreamReader[Message],因此 Branch 的输入是 StreamReader[Message],符合这个 Branch condition 的函数签名定义,不需要做任何的转换就可以运行。

但是,当这个 ReactAgent 以 Generate 的方式被调用时,Chat Model 的输出是 Message,因此 Branch 的输入也会是 Message,不符合 Branch Condition 的 StreamReader[Message] 的函数签名定义。这时,Eino 框架会自动将 Message 装箱成 StreamReader[Message],再传给 Branch,而这个 StreamReader 里面只会有一个帧。

💡 这种只有一个帧的流,俗称“假流”,因为它并没有带来流式的实际好处即“首包延迟低”,而是仅仅为了满足流式出入参接口签名的要求而做的简单装箱。

总结起来,就是:在编排中,当一个组件的上游输出是 T,但是组件只提供了 StreamReader[T] 作为输入的业务接口时,框架会自动将 T 装箱成 StreamReader[T] 的单帧流,再输入给这个组件。

编排辅助元素的流式范式

上面提到的 Branch,并不是一个可单独使用的组件,而是只在编排场景中才有意义的“编排辅助元素”,类似的仅编排场景有意义的“组件”,还有一些,详见下图:

组件名称
使用场景
是否实现 Invoke
是否实现 Stream
是否实现 Collect
是否实现 Transform
Branch
根据上游输出,在一组下游 Node 中动态选择一个
- 只能在接收到完整入参后才能判断的,实现 Invoke
- 可以在接收部分帧后做判断的,实现 Collect
- 两者只能实现一个
yes
no
yes
no
StatePreHandler
Graph中,进入 Node 前修改 State 或/与 Input。可支持流式。
yes
no
no
yes
StatePostHandler
Graph中,Node 完成后修改 State 或/与 Output。可支持流式
yes
no
no
yes
Passthrough
在并行情况下,为了打平每个并行分支的 Node 个数,可以给 Node 个数少的分支加 Passthrough 节点。Passthrough 节点的输入输出相同,跟随上游节点的输出或跟随下游节点的输入(预期应当相同)。
yes
no
no
yes
Lambda
封装官方组件未定义的业务逻辑。业务逻辑是哪种范式,就选择对应的那种流式范式来实现。
yes
yes
yes
yes

另外还有一种只有编排场景才有意义的“组件”,就是把编排产物作为一个整体来看待,比如编排后的 Chain,Graph。这些整体的编排产物,既可以作为“组件”来单独调用,也可以作为节点加入到更上级的编排产物中。

编排整体角度的流式

编排产物的“业务”范式

既然整体的编排产物,可以被看做一个“组件”,那从组件的视角可以提出问题:编排产物这个“组件”,有没有像 Chat Model 等组件那样的,符合“业务场景”的接口范式?答案是既“有”也“没有”。

  • “没有”:整体而言,Graph,Chain 等编排产物,自身是没有业务属性的,只为抽象的编排服务的,因此也就没有符合业务场景的接口范式。同时,编排需要支持各种范式的业务场景。所以,Eino 中代表编排产物的 Runnable[I, O] 接口,不做选择也无法选择,提供了所有流式范式的方法:
type Runnable[I**, **O any] interface {
    Invoke(ctx context.Context**, **input I**, **opts ...Option) (output O**, **err error)
    Stream(ctx context.Context**, **input I**, **opts ...Option) (output *schema.StreamReader[O]**, **err error)
    Collect(ctx context.Context**, **input *schema.StreamReader[I]**, **opts ...Option) (output O**, **err error)
    Transform(ctx context.Context**, **input *schema.StreamReader[I]**, **opts ...Option) (output *schema.StreamReader[O]**, **err error)
}
  • “有”:具体而言,某一个具体的 Graph、Chain,一定是承载了具体的业务逻辑的,因此也就一定有适合那个特定业务场景的流式范式。比如类似 React Agent 的 Graph,匹配的业务场景是 Invoke 和 Stream,因此这个 Graph 在调用时,符合逻辑的调用方式是 Invoke 和 Stream。虽然编排产物本身接口 Runnable[I, O] 中有 Collect 和 Transform 的方法,但是正常的业务场景不需要使用。

编排产物内部各组件在运行时的范式

从另一个角度看,既然编排产物整体可以被看做“组件”,那“组件”必然有自己的内部实现,比如 ChatModel 的内部实现逻辑,可能是把入参的 []Message 转化成各个模型的 API request,之后调用模型的 API,获取 response 后再转化成出参的 Message。那么类比的话,Graph 这个“组件”的内部实现是什么?是数据在 Graph 内部各个组件间以用户指定的流转方向和流式范式来流转。其中,“流转方向”不在当前讨论范围内,而各组件运行时的流式范式,则由 Graph 整体的触发方式决定,具体来说:

  • 如果用户通过 Invoke 来调用 Graph,则 Graph 内部所有组件都以 Invoke 范式来调用。如果某个组件,没有实现 Invoke 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Invoke 调用范式,优先顺位如下:
    • 若组件实现了 Stream,则通过 Stream 封装 Invoke,即自动 concat 输出流。

  • 否则,若组件实现了 Collect,则通过 Collect 封装 Invoke,即非流式入参转单帧流。

  • 如果都没实现,则必须实现 Transform,通过 Transform 封装 Invoke,即入参转单帧流,出参 concat。

  • 如果用户通过 Stream/Collect/Transform 来调用 Graph,则 Graph 内部所有组件都以 Transform 范式来调用。如果某个组件,没有实现 Transform 范式,则 Eino 框架自动根据组件实现了的流式范式,封装出 Transform 调用范式,优先顺位如下:
    • 若组件实现了 Stream,则通过 Stream 封装 Transform,即自动 concat 输入流。

  • 否则,若组件实现了 Collect,则通过 Collect 封装 Transform,即非流式出参转单帧流。

  • 如果都没实现,则必须实现 Invoke,通过 Invoke 封装 Transform,即入参流 concat,出参转单帧流

结合上面穷举的各种案例,Eino 框架对 T 和 Stream[T] 的自动转换,可以总结为:

  • T -> Stream[T]: 将完整的 T 装箱为单帧的 Stream[T]。非流式变假流式。
  • Stream[T] -> T: 将 Stream[T] Concat 为完整的 T。当 Stream[T] 不是单帧流时,可能需要提供针对 T 的 Concat 方法。

看了上面的实现原理,可能会有疑问,为什么对 graph 的 Invoke,会要求所有内部组件都以 Invoke 调用?以及为什么对 graph 的 Stream/Collect/Transform,会要求所有内部组件都以 Transform 调用?毕竟,可以举出一些反例:

  • A, B 两个组件编排为一个 Chain,以 Invoke 调用。其中 A 的业务接口实现了 Stream,B 的业务接口实现了 Collect。这时 graph 内部组件的调用范式有两个选择:
    • A 以 stream 调用,B 以 collect 调用,整体的 Chain 依然是 Invoke 语义,同时保留了真流式的内部语义。即 A 的输出流不需要做 Concat,可以实时的输入到 B 中。
    • 目前 Eino 的实现,A、B 都以 Invoke 调用,需要把 A 的输出流 Concat,并把 B 的输入做成假流式。失去了真流式的内部语义。
  • A,B 两个组件编排为一个 Chain,以 Collect 调用。其中 A 实现了 Transform 和 Collect,B 实现了 Invoke。两个选择:
    • A 以 Collect 调用,B 以 Invoke 调用:整体还是 Collect 的语义,不需要框架做任何的自动转化和装箱操作。
    • 目前 Eino 的实现,A、B 都以 Transform 调用,由于 A 的业务接口里实现了 Transform,因此 A 的输出和 B 的输入都可能是真流式,而 B 的业务接口里只实现了 Invoke,根据上面的分析,B 的入参会需要由真流式 concat 成非流式。这时就需要用户额外提供 B 的入参的 cancat 函数,这本可以避免。

上面两个例子,都可以找到一个明确的、与 Eino 的约定不同的,但却更优的流式调用路径。但是,当泛化到任意的编排场景时,很难找到一个明确定义的、与 Eino 的约定不同的、却总是更优的普适的规则。比如,A->B->C,以 Collect 语义调用,是 A->B 的时候 Collect,还是 B->C 的时候 Collect?潜在的因素有 A、B、C 具体实现的业务接口,可能还有“尽量多的使用真流式”的判断,也许还有哪个参数实现了 Concat,哪个没有实现。如果是更复杂的 Graph,需要考虑的因素会快速增加。在这种情况下,即使框架能定义出一套明确的、更优的普适规则,也很难解释清楚,理解和使用成本会很高,很可能已经超过了这个新规则实际带来的好处。

综上,我们可以说,Eino 编排产物内部各组件在运行时的范式,是 By Design 的,明确如下:

  • 整体以 Invoke 调用,内部各组件均以 Invoke 调用,不存在任何流式的过程。
  • 整体以 Stream/Collect/Transform 调用,内部各组件均以 Transform 调用,当出现 Stream[T] -> T 的 concat 过程时,可能需要额外提供 T 的 concat function。

最后修改 January 20, 2025 : doc: eino doc update (#1214) (b324caa)