Skip to main content
Related: Schema, CLI, Observability, Upsert Stream documents into a physical collection via JSONL bulk import with retries, stable memory, and notifications.
The Indexer focuses on importing documents. It does not create schemas, swap aliases, or enforce retention. Use Schema.apply! (or the schema:apply task) for full blue/green lifecycle: create new physical → import → alias swap → retention cleanup.

API

SearchEngine::Indexer.import!(SearchEngine::Book, into: "books_20251001_010203_001", enum: enumerable_batches, batch_size: 2000, max_parallel: 4)
  • into: physical collection name
  • enum: enumerable yielding batches (Arrays of Hash documents)
  • batch_size: soft guard for JSONL production; batches are not re-sliced unless handling 413
  • action: defaults to :upsert
  • max_parallel: maximum number of parallel threads for batch processing (default: 1, sequential). Set via mapper DSL with max_parallel or override at runtime. See Parallel batch processing.

Data flow

JSONL format

  • One JSON object per line
  • Newline between documents; trailing newline optional
  • Strings are escaped by the JSON library

Retries & backoff

  • Transient errors (timeouts, connection, 429, 5xx) are retried with exponential backoff and jitter
  • Non-transient errors (401/403/404/400/422) are not retried
  • 413 Payload Too Large splits the batch recursively until it fits

Parallel batch processing

For large collections, you can speed up indexing by processing multiple batches simultaneously using parallel threads. This is especially useful when network latency is a bottleneck.

Configuration

Enable parallel processing in your mapper DSL:
class SearchEngine::Book < SearchEngine::Base
  collection "books"

  index do
    source :active_record, model: ::Book, batch_size: 2000
    max_parallel 4  # Process up to 4 batches concurrently

    map do |r|
      { title: r.title, author_id: r.author_id }
    end
  end
end
The max_parallel setting controls how many batches are processed simultaneously. Each thread gets its own Typesense client instance and buffer to avoid conflicts.

Usage examples

# Parallel processing is automatically enabled when max_parallel > 1 is set in the mapper DSL
SearchEngine::Book.rebuild_partition!(partition: publisher_id)
# Override the DSL setting at runtime
docs_enum = build_docs_enum(rows_enum, mapper)
SearchEngine::Indexer.import!(
  SearchEngine::Book,
  into: "books_20251001_010203_001",
  enum: docs_enum,
  max_parallel: 8  # Use 8 threads instead of the DSL default
)
  • Large collections with thousands of batches
  • Network latency is the main bottleneck (not CPU or memory)
  • Your Typesense server can handle concurrent requests
  • You have sufficient memory for multiple buffers
  • Small collections (< 100 batches)
  • CPU-bound workloads (parallel processing adds overhead)
  • Very memory-constrained environments
  • When debugging (sequential processing has clearer error messages)

How it works

Parallel batch processing uses a producer-consumer pattern with a thread pool:
  1. Producer thread: Fetches batches from your source (ActiveRecord, SQL, etc.) and adds them to a queue
  2. Worker threads: Multiple threads pull batches from the queue and process them concurrently
  3. Thread safety: Each worker gets its own Typesense client and buffer to avoid conflicts
  4. Statistics: All counters are synchronized using a mutex to ensure accurate reporting
The queue has a capacity of max_parallel * 2 to keep workers busy while the producer fetches more batches. Progress is logged every 10 batches when log_batches is enabled. When a batch fails, the error is caught and recorded with full statistics (document count, failure details) just like in sequential processing. Failed batches are properly counted in the summary’s docs_total, failed_total, and batches array to ensure accurate reporting.

Memory notes

  • Operates strictly batch-by-batch, reusing a single buffer per thread
  • No accumulation of all records in memory; per-batch array may be materialized to support 413 splitting
  • Parallel processing: each worker thread maintains its own buffer and client instance (memory usage scales with max_parallel)

Instrumentation

  • Emits search_engine.indexer.batch_import per attempted batch
  • Payload includes: collection, batch_index, docs_count, success_count, failure_count, attempts, duration_ms, http_status, bytes_sent, transient_retry, error_sample

Dry-run

  • SearchEngine::Indexer.dry_run!(…) builds JSONL for the first batch only and returns { collection, action, bytes_estimate, docs_count, sample_line }

Data Sources

Adapters provide batched records for the Indexer in a memory-stable way. Each adapter implements each_batch(partition:, cursor:) and yields arrays. Examples:
source :active_record, model: ::Book, scope: -> { where(published: true) }, batch_size: 2000
source :sql, sql: "SELECT * FROM books WHERE published = TRUE", fetch_size: 2000
source :lambda do |cursor: nil, partition: nil|
  Enumerator.new { |y| external_api.each_page(cursor) { |rows| y << rows } }
end
When source is not :active_record, you must define identify_by on the model. The mapper ignores any id returned from map, so SQL/lambda rows need an explicit identity strategy. See Models.
  • partition and cursor are opaque; adapters interpret them per-domain (e.g., id ranges, keyset predicates, external API tokens).
  • Instrumentation: emits search_engine.source.batch_fetched and search_engine.source.error.

Mapper

Backlinks: Models, Schema
class SearchEngine::Book < SearchEngine::Base
  collection "books"
  attribute :publisher_id, :integer
  attribute :author_id, :integer
  attribute :author_name, :string
  attribute :price_cents, :integer

  # Default identity is record.id.to_s for ActiveRecord sources.
  # For :sql or :lambda sources, set identify_by.
  # identify_by :isbn
  # identify_by ->(r) { "#{r.publisher_id}-#{r.id}" }

  index do
    source :active_record, model: ::Book, scope: -> { where(published: true) }
    map do |r|
      # :id from map is ignored; mapper injects computed id
      { publisher_id: r.publisher_id, author_id: r.author_id, author_name: r.author&.name, price_cents: r.price_cents }
    end
  end
end
Model → Document mapping:
Model fieldDocument fieldTransform
idididentity
publisher_idpublisher_ididentity
author_idauthor_ididentity
author.nameauthor_namerename + safe navigation
price_centsprice_centsidentity
Validation:
  • Missing required fields: the mapper validates declared attributes; id is injected and not required to be declared.
  • Unknown fields: warns by default; set SearchEngine.config.mapper.strict_unknown_keys = true to error.
  • Type checks: invalid types reported (e.g., Invalid type for field :price_cents (expected Integer, got String: “12.3”).).
  • Coercions: enable with SearchEngine.config.mapper.coercions[:enabled] = true (safe integer/float/bool only).

Array empty filtering (hidden flags)

If an array attribute is declared with empty_filtering: true, the mapper auto-populates a hidden boolean <name>_empty per document:
attribute :category_ids, [:string], empty_filtering: true
# Mapper stores: category_ids_empty = category_ids.nil? || category_ids.empty?
Hidden fields are included in the schema and documents but are not exposed via model APIs/inspect. Runtime API:
  • mapper = SearchEngine::Mapper.for(SearchEngine::Book)
  • docs, report = mapper.map_batch!(rows, batch_index: 1)
  • Emits search_engine.mapper.batch_mapped per batch with: collection, batch_index, docs_count, duration_ms, missing_required_count, extra_keys_count, invalid_type_count, coerced_count.

Partitioning

Backlinks: Schema, CLI, Observability
index do
  partitions { Publisher.pluck(:id) }
  partition_fetch { |publisher_id| ::Book.where(publisher_id: publisher_id).in_batches(of: 2000) }
  before_partition { |publisher_id| delete_by filter_by: "publisher_id:=#{publisher_id}" }
  after_partition  { |publisher_id| nil } # custom metrics
end
SearchEngine::Indexer.rebuild_partition!(SearchEngine::Book, partition: publisher_id)

Model-level shortcut

You can call the same operation directly on the collection model. This delegates to the Indexer and returns the same SearchEngine::Indexer::Summary (or an Array when multiple partitions are provided).
SearchEngine::Book.rebuild_partition!(partition: publisher_id)
SearchEngine::Book.rebuild_partition!(partition: publisher_id, into: "optional_physical_collection")
# Multiple keys return an Array<Summary>:
SearchEngine::Book.rebuild_partition!(partition: [1, 2, 3])

Summary return value

SearchEngine::Indexer::Summary includes:
  • collection: logical collection name
  • status: :ok, :partial, or :failed
  • batches_total: total number of batches processed
  • docs_total: total documents processed
  • success_total: successfully indexed documents
  • failed_total: failed documents
  • failed_batches_total: count of batches with failures
  • duration_ms_total: total wall-clock duration in milliseconds
  • batches: array of per-batch stats
Notes:
  • partitions must return an Enumerable of keys; partition_fetch must return an Enumerable of batches (Arrays of records).
  • Hooks are optional; if provided, they must accept exactly one argument (the partition key).
  • When partition_fetch is missing, the source adapter is used with the partition passed through; for ActiveRecord sources, provide a Hash/Range partition or define partition_fetch.

Model indexation (.index_collection)

Backlinks: Schema, CLI, Observability, Models High-level convenience API that orchestrates schema lifecycle and (partitioned) indexing from the model class.
# Full indexation flow (create/apply schema if needed, reindex, retention)
SearchEngine::Book.index_collection

# Partial indexation by key(s) (requires collection present and schema in-sync)
SearchEngine::Book.index_collection(partition: publisher_id)
SearchEngine::Book.index_collection(partition: [publisher_id_1, publisher_id_2])
Behavior:
  • Full flow when partition is nil:
    1. Presence check (Schema.diff).
    2. If missing → create new physical and apply schema, then import; else skip.
    3. If present → check drift; report in_sync vs drift.
    4. If drift → apply schema (create new physical, import, alias swap, retention).
    5. If nothing was applied in steps 2–4 → index into the current alias (single or per partition).
    6. Retention cleanup: skipped when Schema.apply! ran (already handled); otherwise best‑effort cleanup of old physicals beyond keep_last.
  • Partial flow when partition: is set:
    1. Presence check; if missing → quit early with a message.
    2. Schema status; if drift → quit early with a message to run full indexation.
    3. Index only the provided partition(s) into the current alias.

Preflight dependency indexation (optional)

You can ask the engine to walk direct and transitive belongs_to dependencies and ensure they are ready before indexing the current collection:
# Ensure dependencies exist (create/apply+index only when missing)
SearchEngine::Book.index_collection(pre: :ensure)

# Ensure dependencies exist and also fix drift (apply+index when missing or drift)
SearchEngine::Book.index_collection(pre: :index)

# Partial with preflight
SearchEngine::Book.index_collection(partition: publisher_id, pre: :ensure)
Notes:
  • Preflight walks only belongs_to edges recursively, skipping unregistered collections.
  • Cycles are guarded with a visited set; already-visited collections are skipped.
  • Default remains unchanged when pre: is omitted.
Output
  • Emits concise console lines for each step and per‑partition result, for example:
Step 1: Presence — processing → present
Step 3: Check Schema Status — in_sync
Step 5: Indexation — processing
  partition=123 → status=ok docs=2000 failed=0 batches=10 duration_ms=1523.4
Step 5: Indexation — done
Step 6: Retention Cleanup — skip (handled by schema apply)
Sequence

Targeted bulk helpers (Bulk.index_collections / Bulk.reindex_collections!)

  • Run blue/green or destructive indexation for a specific set of collections (not just “all”).
  • Two-stage plan:
    • Stage 1: inputs that are not referrers of other inputs (referenced-first order).
    • Stage 2: unique referencers of any input, topologically sorted and processed once.
  • Cascades are suppressed inside individual runs; the final cascade stage handles referencers exactly once.
# Blue/green selected collections
SearchEngine::Bulk.index_collections(:books, :publishers)

# Destructive reindex of a subset with an injected client
SearchEngine::Bulk.reindex_collections!(
  SearchEngine::Book,
  "authors",
  client: SearchEngine.client
)
Use these helpers when you need a targeted rollout (only changed collections) but still want reference-aware ordering.

Return value

Bulk.index_collections and Bulk.reindex_collections! return a hash with:
  • mode: :index or :reindex
  • inputs: array of input collection names
  • stage_1: array of stage 1 collection names
  • cascade: array of cascade stage collection names
  • inputs_count: number of input collections
  • stage_1_count: number of stage 1 collections
  • cascade_count: number of cascade collections
  • failed_collections_total: count of unresolved collection targets

Bulk indexing (all collections)

Use Bulk helpers to index or reindex every declared SearchEngine collection discovered under your configured search_engine_models directory. The engine ensures models are eagerly loaded, discovers collections, and orchestrates a two‑stage, reference‑aware run (inputs first, then a deduped cascade of referencers).
# Non‑destructive (blue/green when needed via .index_collection per model)
SearchEngine::Bulk.index_all

# Destructive: drop + index per model
SearchEngine::Bulk.reindex_all!

# Optional client override
SearchEngine::Bulk.index_all(client: SearchEngine.client)
Notes:
  • index_all uses each model’s .index_collection (presence/drift checks, apply+retention when needed; otherwise index into current alias).
  • reindex_all! uses each model’s .reindex_collection! (drop active physical, then .index_collection).
  • Discovery leverages the engine’s dedicated loader and CollectionResolver.models_map; ensure your collections live under app/search_engine (default) or your configured SearchEngine.config.search_engine_models path.
  • Emits search_engine.bulk.run with { inputs, stage_1, cascade, counts, failed_collections_total }. See Observability for payload details.
  • Return value: same hash structure as Bulk.index_collections / Bulk.reindex_collections!, including failed_collections_total.
Backlinks: Models · Schema · Observability

Stale Deletes

Backlinks: CLI, Observability, Troubleshooting, Configuration
Define stale rules once. The same compiled filter powers .stale, .cleanup, Indexer.delete_stale!, and index:delete_stale.
class SearchEngine::Order < SearchEngine::Base
  index do
    # Register one or more stale rules; they will be OR‑merged
    stale scope: :cancelled            # scope must return a Relation
    stale :archived                    # attribute equality => { archived: true }
    stale filter: "status:=cancelled"  # raw Typesense fragment
    stale({ ends_at: ..Time.zone.now })
    stale do |partition:|              # custom per‑partition rule
      partition ? { store_id: partition, archived: true } : "archived:=true"
    end
  end
end
SearchEngine::Indexer.delete_stale!(SearchEngine::Order, partition: 42)
  • Compiles all declared stale rules into an OR‑merged filter_by string and issues DELETE /collections/:collection/documents with filter_by.
  • If no rules are defined or all resolve to blank for the given partition, deletion is skipped.
  • Strict-mode guardrails block suspicious catch-alls; enable via SearchEngine.config.stale_deletes.strict_mode = true.
  • Dry-run preview: SearchEngine::Indexer.delete_stale!(…, dry_run: true) returns a summary without deleting.
Typical filters:
PatternExample
Archived flagarchived:=true
Partition + archivedstore_id:=123 && archived:=true
Date thresholdupdated_at:<“2025-01-01T00:00:00Z” or updated_at:<1704067200
Events:
  • search_engine.stale_deletes.started{ collection, into, partition, filter_hash }
  • search_engine.stale_deletes.skipped{ reason, collection, into, partition }
  • search_engine.stale_deletes.finished{ collection, into, partition, duration_ms, deleted_count }
  • search_engine.stale_deletes.error{ collection, into, partition, error_class, message_truncated }
Config:

Inspect stale docs via a Relation

You can preview or chain on the stale set using a scope-like class method that returns a Relation:
# Partition-specific stale docs compiled from `stale` rules
SearchEngine::Order.stale(partition: 42)

# Whole-collection stale docs (if none of the rules depend on partition)
SearchEngine::Order.stale

# Chainable like AR
SearchEngine::Order.stale(partition: 42).limit(10).pluck(:id)
Notes:
  • .stale compiles only stale rules and returns an empty relation when no rules produce a filter for the given partition (implemented as a safe, always‑false filter so Typesense never 400s).
  • For destructive deletes, prefer Model.cleanup or the index:delete_stale task.
  • SearchEngine.config.stale_deletes.enabled = true
  • SearchEngine.config.stale_deletes.strict_mode = false
  • SearchEngine.config.stale_deletes.timeout_ms = nil
  • SearchEngine.config.stale_deletes.estimation_enabled = false

Stale cleanup DSL & model helper

Define stale cleanup rules alongside your index DSL. Each stale call registers one rule; cleanup runs them with OR semantics and uses the same compiled filter as Indexer.delete_stale!.
class SearchEngine::Book < SearchEngine::Base
  collection "books"

  index do
    stale scope: :archived              # reuse a model scope (must return a Relation)
    stale :marked_as_stale              # attribute equality => { marked_as_stale: true }
    stale filter: "status:=archived"    # raw Typesense fragment
    stale({ ends_at: ..Time.zone.now }) # hash converted via Sanitizer
    stale do |partition:|               # custom filter per partition
      partition ? "store_id:=#{partition} && archived:=true" : "archived:=true"
    end

    map do |record|
      # ...
    end
  end
end
Runtime cleanup is available directly on the model:
# Deletes documents matching any declared stale rules
SearchEngine::Book.cleanup
SearchEngine::Book.cleanup(partition: 42)
SearchEngine::Book.cleanup(into: "books_20251011_120000_001")
# Clear Typesense cache after cleanup
SearchEngine::Book.cleanup(clear_cache: true)
SearchEngine::Base.cleanup delegates to SearchEngine::Deletion.delete_by, emitting the same search_engine.indexer.delete_stale instrumentation. When no stale configuration exists, it logs a skip and returns 0. Pass clear_cache: true to clear the Typesense search cache after cleanup finishes. This invokes SearchEngine::Cache.clear and emits search_engine.cache.clear. Failures are logged and do not affect the cleanup result. Available stale inputs:
FormResult
stale scope: :archivedInvokes a model scope; must return a Relation
stale :flagged / stale attribute: :flagged, value: falseAttribute equality Hash
stale({ flag: true, archived: true })Sanitized filter_by hash
stale filter: ‘status:=archived’Raw Typesense filter string
stale relationUses the relation’s compiled filter_by
`stale dopartition:… end`Custom block returning String/Hash/Relation
Notes:
  • Multiple rules are OR‑ed together.
  • Block arguments receive partition:; return a String, Hash, or Relation. nil/blank results are ignored.
  • Scope helpers receive the partition as the first positional argument when their arity is ≥ 1, or as partition: when they declare a keyword.
  • Attribute Hash inputs default values to true when omitted (e.g., stale :flagged).
See also: Deletion for low-level helpers and Observability for emitted events.

Dispatcher

Backlinks: CLI, Observability, Configuration, Jobs
SearchEngine.configure do |c|
  c.indexer.dispatch = :active_job # or :inline
  c.indexer.queue_name = "search_index"
end
  • What it does: routes per-partition rebuilds either synchronously (inline) or via ActiveJob.
  • Note: This setting controls SearchEngine::Indexer logic (used by rebuild_partition!, index_collection flow). It does not affect ActiveRecordSyncable callbacks, which are always inline.
  • API: SearchEngine::Dispatcher.dispatch!(SearchEngine::Book, partition: key, into: nil, mode: nil, queue: nil, metadata: {})
    • Returns an object describing the action: for ActiveJob { mode: :active_job, job_id, queue, collection, partition, into }; for inline { mode: :inline, collection, partition, into, indexer_summary, duration_ms }.
  • Mode resolution: mode || SearchEngine.config.indexer.dispatch with a fallback to :inline if ActiveJob is unavailable.
  • Queue name: taken from queue || SearchEngine.config.indexer.queue_name.
ActiveJob job: SearchEngine::IndexPartitionJob.
  • Args: collection_class_name (String), partition (JSON-serializable), optional into (String), optional metadata (Hash).

When to choose model-level upserts

For light-touch updates or small repair batches, the model helpers described in Upsert are often faster than orchestrating a full Indexer run:
  • Single or sparse fixes — use SearchEngine::Model.upsert(record: …) to map and stream one document without building custom JSONL buffers.
  • Small batchesSearchEngine::Model.upsert_bulk(records: …) streams an enumerable of records through the mapper and client, reusing the same validation path as the Indexer.
  • Blue/green touch-ups — the helpers accept into: / partition: to target a specific physical collection during staged rollout.
Reach for the Indexer when you need full rebuilds, partition orchestration, retry/backoff logic, or instrumented batch statistics. Use the upsert helpers when you only need to nudge a handful of documents into place.

Troubleshooting

  • Bulk import shape errors: Ensure each document is a flat Hash with required keys and valid types.
  • Retry exhaustion: Inspect search_engine.indexer.batch_import events; increase backoff or fix upstream issues.
  • Stale deletes strict block: Verify your filter is not a catch-all; use partition guards.
Backlinks: CLI, Troubleshooting