Indexer - ES8

ES8 索引器

这是一个 Eino 的 Elasticsearch 8.x 索引器实现,它实现了 Indexer 接口。这使得与 Eino 的向量存储和检索系统无缝集成,从而增强了语义搜索能力。

特性

  • 实现了 github.com/cloudwego/eino/components/indexer.Indexer
  • 易于与 Eino 的索引系统集成
  • 可配置的 Elasticsearch 参数
  • 支持向量相似度搜索
  • 批量索引操作
  • 支持自定义字段映射
  • 灵活的文档向量化

安装

go get github.com/cloudwego/eino-ext/components/indexer/es8@latest

快速开始

这是一个如何使用索引器的快速示例,你可以阅读 components/indexer/es8/examples/indexer/add_documents.go 获取更多细节:

import (
        "github.com/cloudwego/eino/components/embedding"
        "github.com/cloudwego/eino/schema"
        "github.com/elastic/go-elasticsearch/v8"
        "github.com/cloudwego/eino-ext/components/indexer/es8" // 导入 es8 索引器
)

const (
        indexName          = "eino_example"
        fieldContent       = "content"
        fieldContentVector = "content_vector"
        fieldExtraLocation = "location"
        docExtraLocation   = "location"
)

func main() {
        ctx := context.Background()

        // es 支持多种连接方式
        username := os.Getenv("ES_USERNAME")
        password := os.Getenv("ES_PASSWORD")
        httpCACertPath := os.Getenv("ES_HTTP_CA_CERT_PATH")

        cert, err := os.ReadFile(httpCACertPath)
        if err != nil {
                log.Fatalf("read file failed, err=%v", err)
        }

        client, err := elasticsearch.NewClient(elasticsearch.Config{
                Addresses: []string{"https://localhost:9200"},
                Username:  username,
                Password:  password,
                CACert:    cert,
        })
        if err != nil {
                log.Panicf("connect es8 failed, err=%v", err)
        }

        // 创建 embedding 组件
        emb := createYourEmbedding()

        // 加载文档
        docs := loadYourDocs()

        // 创建 es 索引器组件
        indexer, err := es8.NewIndexer(ctx, &es8.IndexerConfig{
                Client:    client,
                Index:     indexName,
                BatchSize: 10,
                DocumentToFields: func(ctx context.Context, doc *schema.Document) (field2Value map[string]es8.FieldValue, err error) {
                        return map[string]es8.FieldValue{
                                fieldContent: {
                                        Value:    doc.Content,
                                        EmbedKey: fieldContentVector, // 对文档内容进行向量化并保存向量到 "content_vector" 字段
                                },
                                fieldExtraLocation: {
                                        Value: doc.MetaData[docExtraLocation],
                                },
                        }, nil
                },
                Embedding: emb, // 替换为真实的 embedding 组件
        })
        if err != nil {
                log.Panicf("create indexer failed, err=%v", err)
        }

        ids, err := indexer.Store(ctx, docs)
        if err != nil {
                log.Panicf("create docs failed, err=%v", err)
        }

        fmt.Println(ids)
        // 与 Eino 系统一起使用
        // ... 配置并与 Eino 一起使用
}

配置

索引器可以通过 IndexerConfig 结构体进行配置:

type IndexerConfig struct {
        Client *elasticsearch.Client // 必填:Elasticsearch 客户端实例
        Index  string                // 必填:存储文档的索引名称
        BatchSize int                // 可选:embedding 的最大文本大小(默认:5)
        
        // 必填:将文档字段映射到 Elasticsearch 字段的函数
        DocumentToFields func(ctx context.Context, doc *schema.Document) (map[string]FieldValue, error)
        
        // 可选:仅当需要向量化时才需要
        Embedding embedding.Embedder
}

// FieldValue 定义了字段应如何存储和向量化
type FieldValue struct {
        Value     any    // 要存储的原始值
        EmbedKey  string // 如果设置,Value 将被向量化并保存
        Stringify func(val any) (string, error) // 可选:自定义字符串转换
}

更多详情


最后修改 December 12, 2025 : chore: update websocket docs (#1479) (967538e)