Indexer - ES8

ES8 Indexer

This is an Elasticsearch 8.x indexer implementation for Eino that implements the Indexer interface. It integrates with Eino’s vector storage and retrieval system for semantic search.

Features

  • Implements github.com/cloudwego/eino/components/indexer.Indexer
  • Easy integration with Eino indexing
  • Configurable Elasticsearch parameters
  • Supports vector similarity search
  • Batch indexing operations
  • Custom field mapping
  • Flexible document embedding

Installation

go get github.com/cloudwego/eino-ext/components/indexer/es8@latest

Quick Start

Example usage (see components/indexer/es8/examples/indexer/add_documents.go for details):

import (
    "github.com/cloudwego/eino/components/embedding"
    "github.com/cloudwego/eino/schema"
    "github.com/elastic/go-elasticsearch/v8"
    "github.com/cloudwego/eino-ext/components/indexer/es8"
)

const (
    indexName          = "eino_example"
    fieldContent       = "content"
    fieldContentVector = "content_vector"
    fieldExtraLocation = "location"
    docExtraLocation   = "location"
)

func main() {
    ctx := context.Background()

    username := os.Getenv("ES_USERNAME")
    password := os.Getenv("ES_PASSWORD")
    httpCACertPath := os.Getenv("ES_HTTP_CA_CERT_PATH")

    cert, err := os.ReadFile(httpCACertPath)
    if err != nil { log.Fatalf("read file failed, err=%v", err) }

    client, err := elasticsearch.NewClient(elasticsearch.Config{
        Addresses: []string{"https://localhost:9200"},
        Username:  username,
        Password:  password,
        CACert:    cert,
    })
    if err != nil { log.Panicf("connect es8 failed, err=%v", err) }

    emb := createYourEmbedding()
    docs := loadYourDocs()

    indexer, err := es8.NewIndexer(ctx, &es8.IndexerConfig{
        Client:    client,
        Index:     indexName,
        BatchSize: 10,
        DocumentToFields: func(ctx context.Context, doc *schema.Document) (map[string]es8.FieldValue, error) {
            return map[string]es8.FieldValue{
                fieldContent:       { Value: doc.Content, EmbedKey: fieldContentVector },
                fieldExtraLocation: { Value: doc.MetaData[docExtraLocation] },
            }, nil
        },
        Embedding: emb,
    })
    if err != nil { log.Panicf("create indexer failed, err=%v", err) }

    ids, err := indexer.Store(ctx, docs)
    if err != nil { log.Panicf("create docs failed, err=%v", err) }
    fmt.Println(ids)
}

Configuration

Configure via IndexerConfig:

type IndexerConfig struct {
    Client *elasticsearch.Client // required: Elasticsearch client instance
    Index  string                // required: index name to store documents
    BatchSize int                // optional: batch size (default: 5)
    
    // required: map document fields to ES fields
    DocumentToFields func(ctx context.Context, doc *schema.Document) (map[string]FieldValue, error)
    
    // optional: only needed when embedding is required
    Embedding embedding.Embedder
}

type FieldValue struct {
    Value     any    // raw value to store
    EmbedKey  string // if set, Value will be embedded and saved under this field
    Stringify func(val any) (string, error) // optional: custom string conversion
}

More Details


Last modified December 12, 2025 : chore: update websocket docs (#1479) (967538e)