Reduction
adk/middlewares/reduction
💡 本中间件在 v0.8.0 版本引入。
概述
reduction 中间件管理 Agent 对话中工具输出占用的 token 数量,分为两个阶段:
- 截断(Truncation):工具调用返回时立即触发。单次输出超过
MaxLengthForTrunc时,完整内容存入 Backend,消息替换为截断摘要。 - 清理(Clear):模型调用前触发(
BeforeModelRewriteState)。总 token 超过MaxTokensForClear时,遍历历史消息,将旧的工具参数和结果卸载到 Backend。
架构
Tool 调用返回结果
│
▼
┌─────────────────────────────────────────────────────────────┐
│ WrapInvokableToolCall / WrapStreamableToolCall │
│ WrapEnhancedInvokableToolCall / WrapEnhancedStreamable │
│ │
│ Truncation(可通过 SkipTruncation 跳过) │
│ 结果长度 > MaxLengthForTrunc? │
│ 是 → 截断内容,完整内容存到 Backend │
│ 否 → 原样返回 │
└─────────────────────────────────────────────────────────────┘
│
▼
结果加入 Messages
│
▼
┌─────────────────────────────────────────────────────────────┐
│ BeforeModelRewriteState │
│ │
│ Clear(可通过 SkipClear 跳过) │
│ 总 token > MaxTokensForClear? │
│ 是 → ClearMessageRewriter 预处理 │
│ → 旧工具结果存到 Backend,替换为文件路径 │
│ → ClearAtLeastTokens 最小释放量检查 │
│ → ClearPostProcess 回调 │
│ 否 → 不处理 │
└─────────────────────────────────────────────────────────────┘
│
▼
调用 Model
泛型体系
本中间件采用 ADK 标准泛型模式,同时支持 *schema.Message 和 *schema.AgenticMessage:
// 泛型配置,M 约束为 adk.MessageType
type TypedConfig[M adk.MessageType] struct { ... }
// 向后兼容别名
type Config = TypedConfig[*schema.Message]
构造函数同样提供泛型和非泛型两种:
func NewTyped[M adk.MessageType](ctx context.Context, config *TypedConfig[M]) (adk.TypedChatModelAgentMiddleware[M], error)
func New(ctx context.Context, config *Config) (adk.ChatModelAgentMiddleware, error)
配置
TypedConfig[M] 主配置
| 字段 | 类型 | 说明 |
| Backend | Backend | 存储后端。SkipTruncation为 false 时必填;仅做 Clear 且不需要 offload 时可为 nil。 |
| SkipTruncation | bool | 跳过截断阶段。 |
| SkipClear | bool | 跳过清理阶段。 |
| ReadFileToolName | string | 用于读取卸载内容的工具名。默认"read_file"。 |
| RootDir | string | 保存内容的根目录。默认"/tmp"。截断内容存到 {RootDir}/trunc/{tool_call_id},清理内容存到{RootDir}/clear/{tool_call_id}。 |
| GenTruncOffloadFilePath | func(ctx, *ToolDetail) (string, error) | 自定义截断文件路径生成。设置后 RootDir 对截断不生效。适用于 tool_call_id 不唯一的场景。 |
| GenClearOffloadFilePath | func(ctx, *ToolDetail) (string, error) | 自定义清理文件路径生成。设置后 RootDir 对清理不生效。 |
| MaxLengthForTrunc | int | 触发截断的最大字符长度。默认50000。 |
| TruncExcludeTools | []string | 不截断的工具名列表。 |
| TokenCounter | func(ctx, []M, []*schema.ToolInfo) (int64, error) | token 计数函数。默认使用字符数/4 估算。建议用 tiktoken-go/tokenizer 替换。 |
| MaxTokensForClear | int64 | 触发清理的 token 阈值。默认160000。 |
| ClearRetentionSuffixLimit | int | 保留最近 N 轮 assistant 消息不清理。默认1。 |
| ClearAtLeastTokens | int64 | 清理至少释放的 token 量。未达标则不执行清理(避免无谓破坏 prompt cache)。默认0。 |
| ClearExcludeTools | []string | 不清理的工具名列表。 |
| ClearMessageRewriter | func(ctx, M, []M) ([]M, error) | 清理前的消息重写回调。参数为 toolCallMsg 和对应的 toolResponseMsgs。可用于将 write_file/edit_file 调用重写为 system-reminder。返回 nil 表示移除该组消息。 |
| ClearPostProcess | func(ctx, *adk.TypedChatModelAgentState[M]) context.Context | 清理完成后的回调,可保存状态或发送通知。返回可能更新后的 context。 |
| ToolConfig | map[string]*ToolReductionConfig | 按工具名配置,优先级高于全局。 |
ToolReductionConfig 工具级配置
type ToolReductionConfig struct {
Backend Backend
SkipTruncation bool
TruncHandler func(ctx context.Context, detail *ToolDetail) (*TruncResult, error)
SkipClear bool
ClearHandler func(ctx context.Context, detail *ToolDetail) (*ClearResult, error)
}
TruncHandler/ClearHandler为 nil 且未跳过时,使用全局默认 handler。Backend为该工具独立的存储后端,可覆盖全局 Backend。
ToolDetail 工具详情
type ToolDetail struct {
ToolContext *adk.ToolContext
ToolArgument *schema.ToolArgument
ToolResult *schema.ToolResult // 非流式
StreamToolResult *schema.StreamReader[*schema.ToolResult] // 流式
}
TruncResult 截断结果
type TruncResult struct {
NeedTrunc bool
ToolResult *schema.ToolResult // NeedTrunc && 非流式时必填
StreamToolResult *schema.StreamReader[*schema.ToolResult] // NeedTrunc && 流式时必填
NeedOffload bool
OffloadFilePath string // NeedOffload 时必填
OffloadContent string // NeedOffload 时必填
}
ClearResult 清理结果
type ClearResult struct {
NeedClear bool
ToolArgument *schema.ToolArgument // NeedClear 时必填
ToolResult *schema.ToolResult // NeedClear 时必填
NeedOffload bool
OffloadFilePath string // NeedOffload 时必填
OffloadContent string // NeedOffload 时必填
}
Backend 接口
// 定义于 reduction/internal,通过类型别名导出
type Backend interface {
Write(context.Context, *filesystem.WriteRequest) error
}
filesystem.WriteRequest 包含 FilePath string 和 Content string 两个字段。
创建中间件
基本用法
import "github.com/cloudwego/eino/adk/middlewares/reduction"
middleware, err := reduction.New(ctx, &reduction.Config{
Backend: myBackend,
})
agent, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Model: chatModel,
Middlewares: []adk.ChatModelAgentMiddleware{middleware},
})
泛型用法(AgenticMessage)
middleware, err := reduction.NewTyped[*schema.AgenticMessage](ctx, &reduction.TypedConfig[*schema.AgenticMessage]{
Backend: myBackend,
TokenCounter: myAgenticTokenCounter,
})
agent, err := adk.NewTypedChatModelAgent(ctx, &adk.TypedChatModelAgentConfig[*schema.AgenticMessage]{
Model: chatModel,
Middlewares: []adk.TypedChatModelAgentMiddleware[*schema.AgenticMessage]{middleware},
})
自定义配置
middleware, err := reduction.New(ctx, &reduction.Config{
Backend: myBackend,
RootDir: "/data/agent",
MaxLengthForTrunc: 30000,
MaxTokensForClear: 100000,
ClearRetentionSuffixLimit: 2,
ClearAtLeastTokens: 10000,
TruncExcludeTools: []string{"search_tool"},
ClearExcludeTools: []string{"read_file"},
ClearMessageRewriter: func(ctx context.Context, toolCallMsg *schema.Message, toolResponseMsgs []*schema.Message) ([]*schema.Message, error) {
// 将 write_file 调用重写为 system-reminder
return []*schema.Message{schema.UserMessage("<system-reminder>file written</system-reminder>")}, nil
},
ClearPostProcess: func(ctx context.Context, state *adk.ChatModelAgentState) context.Context {
log.Printf("Clear completed, messages: %d", len(state.Messages))
return ctx
},
ToolConfig: map[string]*reduction.ToolReductionConfig{
"grep": {Backend: grepBackend},
"read_file": {SkipClear: true},
},
})
仅截断
middleware, err := reduction.New(ctx, &reduction.Config{
Backend: myBackend,
SkipClear: true,
})
仅清理
middleware, err := reduction.New(ctx, &reduction.Config{
SkipTruncation: true,
MaxTokensForClear: 100000,
// Backend 为 nil 时,清理仍会替换内容为占位符,但不执行 offload
})
工作原理
Truncation(截断)
在 WrapInvokableToolCall / WrapStreamableToolCall / WrapEnhancedInvokableToolCall / WrapEnhancedStreamableToolCall 中处理:
- 工具返回结果
- 检查
TruncExcludeTools,命中则跳过 - 查找 ToolConfig → 全局 defaultConfig,获取 TruncHandler
- TruncHandler 判定:读取完整输出,检查所有 text 部分总长度是否超过
MaxLengthForTrunc - 超过则:保留首尾各
MaxLengthForTrunc/(textParts*2)字符作为预览,完整内容存到 Backend - 返回截断通知,告知 agent 完整内容的文件路径
💡 对于流式工具,默认 TruncHandler 会等待完整流读取完毕后再决定是否截断。若需严格增量流式行为,请为该工具提供自定义 TruncHandler。
Clear(清理)
在 BeforeModelRewriteState 中处理:
- 用
TokenCounter计算总 token - 未超过
MaxTokensForClear则跳过 - 确定清理范围:从第一条未处理的 assistant 消息开始,到
len(messages) - ClearRetentionSuffixLimit轮结束 - 若配置了
ClearMessageRewriter,先对范围内消息执行重写预处理 - 遍历范围内的 tool call 消息,跳过
ClearExcludeTools - 对每个 tool call 调用 ClearHandler,替换参数和结果
- 如设置了
ClearAtLeastTokens:先在副本上操作,对比清理前后 token 差值,不达标则放弃本次清理 - 达标后执行实际 offload 写入,更新 state.Messages
- 调用
ClearPostProcess
多语言支持
截断和清理的提示文字支持中英文自动切换:
adk.SetLanguage(adk.LanguageChinese) // 中文
adk.SetLanguage(adk.LanguageEnglish) // 英文(默认)
注意事项
SkipTruncation为 false 时,Backend必须设置- 默认 TokenCounter 用字符数/4 估算,建议使用
github.com/tiktoken-go/tokenizer替换 - 已处理过的消息通过 Extra 字段打标记
_reduction_mw_processed,不会重复处理 ToolConfig中配置优先级高于全局;若 ToolConfig 中仅设置了SkipTruncation: false但未提供TruncHandler,则回退到默认 handlerGenTruncOffloadFilePath/GenClearOffloadFilePath适用于 tool_call_id 不唯一的场景(如 retry),防止文件覆盖ClearMessageRewriter在清理范围确定后、逐工具清理前执行,适合将 write/edit 类调用压缩为简短提示ClearAtLeastTokens设为 0 表示只要超阈值就执行清理;大于 0 时可避免微量清理破坏 prompt cache- Legacy API(
NewClearToolResult、NewToolResultMiddleware)已废弃,建议迁移到New/NewTyped