Related: Schema, CLI, Observability, Upsert
Stream documents into a physical collection via JSONL bulk import with retries, stable memory, and notifications.
The Indexer focuses on importing documents. It does not create schemas, swap aliases, or enforce retention. Use Schema.apply! (or the schema:apply task) for full blue/green lifecycle: create new physical → import → alias swap → retention cleanup.
API
SearchEngine::Indexer.import!(SearchEngine::Book, into: "books_20251001_010203_001", enum: enumerable_batches, batch_size: 2000, max_parallel: 4)
- into: physical collection name
- enum: enumerable yielding batches (Arrays of Hash documents)
- batch_size: soft guard for JSONL production; batches are not re-sliced unless handling 413
- action: defaults to
:upsert
- max_parallel: maximum number of parallel threads for batch processing (default: 1, sequential). Set via mapper DSL with
max_parallel or override at runtime. See Parallel batch processing.
Data flow
- One JSON object per line
- Newline between documents; trailing newline optional
- Strings are escaped by the JSON library
Retries & backoff
- Transient errors (timeouts, connection, 429, 5xx) are retried with exponential backoff and jitter
- Non-transient errors (401/403/404/400/422) are not retried
- 413 Payload Too Large splits the batch recursively until it fits
Parallel batch processing
For large collections, you can speed up indexing by processing multiple batches simultaneously using parallel threads. This is especially useful when network latency is a bottleneck.
Configuration
Enable parallel processing in your mapper DSL:
class SearchEngine::Book < SearchEngine::Base
collection "books"
index do
source :active_record, model: ::Book, batch_size: 2000
max_parallel 4 # Process up to 4 batches concurrently
map do |r|
{ title: r.title, author_id: r.author_id }
end
end
end
The max_parallel setting controls how many batches are processed simultaneously. Each thread gets its own Typesense client instance and buffer to avoid conflicts.
Usage examples
# Parallel processing is automatically enabled when max_parallel > 1 is set in the mapper DSL
SearchEngine::Book.rebuild_partition!(partition: publisher_id)
# Override the DSL setting at runtime
docs_enum = build_docs_enum(rows_enum, mapper)
SearchEngine::Indexer.import!(
SearchEngine::Book,
into: "books_20251001_010203_001",
enum: docs_enum,
max_parallel: 8 # Use 8 threads instead of the DSL default
)
- Large collections with thousands of batches
- Network latency is the main bottleneck (not CPU or memory)
- Your Typesense server can handle concurrent requests
- You have sufficient memory for multiple buffers
- Small collections (< 100 batches)
- CPU-bound workloads (parallel processing adds overhead)
- Very memory-constrained environments
- When debugging (sequential processing has clearer error messages)
How it works
Parallel batch processing uses a producer-consumer pattern with a thread pool:
- Producer thread: Fetches batches from your source (ActiveRecord, SQL, etc.) and adds them to a queue
- Worker threads: Multiple threads pull batches from the queue and process them concurrently
- Thread safety: Each worker gets its own Typesense client and buffer to avoid conflicts
- Statistics: All counters are synchronized using a mutex to ensure accurate reporting
The queue has a capacity of max_parallel * 2 to keep workers busy while the producer fetches more batches. Progress is logged every 10 batches when log_batches is enabled.
When a batch fails, the error is caught and recorded with full statistics (document count, failure details) just like in sequential processing. Failed batches are properly counted in the summary’s docs_total, failed_total, and batches array to ensure accurate reporting.
Memory notes
- Operates strictly batch-by-batch, reusing a single buffer per thread
- No accumulation of all records in memory; per-batch array may be materialized to support 413 splitting
- Parallel processing: each worker thread maintains its own buffer and client instance (memory usage scales with
max_parallel)
Instrumentation
- Emits
search_engine.indexer.batch_import per attempted batch
- Payload includes:
collection, batch_index, docs_count, success_count, failure_count, attempts, duration_ms, http_status, bytes_sent, transient_retry, error_sample
Dry-run
SearchEngine::Indexer.dry_run!(…) builds JSONL for the first batch only and returns { collection, action, bytes_estimate, docs_count, sample_line }
Data Sources
Adapters provide batched records for the Indexer in a memory-stable way. Each adapter implements each_batch(partition:, cursor:) and yields arrays.
Examples:
source :active_record, model: ::Book, scope: -> { where(published: true) }, batch_size: 2000
source :sql, sql: "SELECT * FROM books WHERE published = TRUE", fetch_size: 2000
source :lambda do |cursor: nil, partition: nil|
Enumerator.new { |y| external_api.each_page(cursor) { |rows| y << rows } }
end
When source is not :active_record, you must define
identify_by on the model. The mapper ignores any id
returned from map, so SQL/lambda rows need an explicit identity
strategy. See
Models.
partition and cursor are opaque; adapters interpret them per-domain (e.g., id ranges, keyset predicates, external API tokens).
- Instrumentation: emits
search_engine.source.batch_fetched and search_engine.source.error.
Mapper
Backlinks: Models, Schema
class SearchEngine::Book < SearchEngine::Base
collection "books"
attribute :publisher_id, :integer
attribute :author_id, :integer
attribute :author_name, :string
attribute :price_cents, :integer
# Default identity is record.id.to_s for ActiveRecord sources.
# For :sql or :lambda sources, set identify_by.
# identify_by :isbn
# identify_by ->(r) { "#{r.publisher_id}-#{r.id}" }
index do
source :active_record, model: ::Book, scope: -> { where(published: true) }
map do |r|
# :id from map is ignored; mapper injects computed id
{ publisher_id: r.publisher_id, author_id: r.author_id, author_name: r.author&.name, price_cents: r.price_cents }
end
end
end
Model → Document mapping:
| Model field | Document field | Transform |
|---|
id | id | identity |
publisher_id | publisher_id | identity |
author_id | author_id | identity |
author.name | author_name | rename + safe navigation |
price_cents | price_cents | identity |
Validation:
- Missing required fields: the mapper validates declared attributes;
id is injected and not required to be declared.
- Unknown fields: warns by default; set
SearchEngine.config.mapper.strict_unknown_keys = true to error.
- Type checks: invalid types reported (e.g.,
Invalid type for field :price_cents (expected Integer, got String: “12.3”).).
- Coercions: enable with
SearchEngine.config.mapper.coercions[:enabled] = true (safe integer/float/bool only).
Array empty filtering (hidden flags)
If an array attribute is declared with empty_filtering: true, the mapper auto-populates a hidden boolean <name>_empty per document:
attribute :category_ids, [:string], empty_filtering: true
# Mapper stores: category_ids_empty = category_ids.nil? || category_ids.empty?
Hidden fields are included in the schema and documents but are not exposed via model APIs/inspect.
Runtime API:
-
mapper = SearchEngine::Mapper.for(SearchEngine::Book)
-
docs, report = mapper.map_batch!(rows, batch_index: 1)
- Emits
search_engine.mapper.batch_mapped per batch with: collection, batch_index, docs_count, duration_ms, missing_required_count, extra_keys_count, invalid_type_count, coerced_count.
Partitioning
Backlinks: Schema, CLI, Observability
index do
partitions { Publisher.pluck(:id) }
partition_fetch { |publisher_id| ::Book.where(publisher_id: publisher_id).in_batches(of: 2000) }
before_partition { |publisher_id| delete_by filter_by: "publisher_id:=#{publisher_id}" }
after_partition { |publisher_id| nil } # custom metrics
end
SearchEngine::Indexer.rebuild_partition!(SearchEngine::Book, partition: publisher_id)
Model-level shortcut
You can call the same operation directly on the collection model. This delegates to the Indexer and returns the same SearchEngine::Indexer::Summary (or an Array when multiple partitions are provided).
SearchEngine::Book.rebuild_partition!(partition: publisher_id)
SearchEngine::Book.rebuild_partition!(partition: publisher_id, into: "optional_physical_collection")
# Multiple keys return an Array<Summary>:
SearchEngine::Book.rebuild_partition!(partition: [1, 2, 3])
Summary return value
SearchEngine::Indexer::Summary includes:
collection: logical collection name
status: :ok, :partial, or :failed
batches_total: total number of batches processed
docs_total: total documents processed
success_total: successfully indexed documents
failed_total: failed documents
failed_batches_total: count of batches with failures
duration_ms_total: total wall-clock duration in milliseconds
batches: array of per-batch stats
Notes:
partitions must return an Enumerable of keys; partition_fetch must return an Enumerable of batches (Arrays of records).
- Hooks are optional; if provided, they must accept exactly one argument (the partition key).
- When
partition_fetch is missing, the source adapter is used with the partition passed through; for ActiveRecord sources, provide a Hash/Range partition or define partition_fetch.
Model indexation (.index_collection)
Backlinks: Schema, CLI, Observability, Models
High-level convenience API that orchestrates schema lifecycle and (partitioned) indexing from the model class.
# Full indexation flow (create/apply schema if needed, reindex, retention)
SearchEngine::Book.index_collection
# Partial indexation by key(s) (requires collection present and schema in-sync)
SearchEngine::Book.index_collection(partition: publisher_id)
SearchEngine::Book.index_collection(partition: [publisher_id_1, publisher_id_2])
Behavior:
- Full flow when
partition is nil:
- Presence check (
Schema.diff).
- If missing → create new physical and apply schema, then import; else skip.
- If present → check drift; report
in_sync vs drift.
- If drift → apply schema (create new physical, import, alias swap, retention).
- If nothing was applied in steps 2–4 → index into the current alias (single or per partition).
- Retention cleanup: skipped when
Schema.apply! ran (already handled); otherwise best‑effort cleanup of old physicals beyond keep_last.
- Partial flow when
partition: is set:
- Presence check; if missing → quit early with a message.
- Schema status; if drift → quit early with a message to run full indexation.
- Index only the provided partition(s) into the current alias.
Preflight dependency indexation (optional)
You can ask the engine to walk direct and transitive belongs_to dependencies and ensure they are ready before indexing the current collection:
# Ensure dependencies exist (create/apply+index only when missing)
SearchEngine::Book.index_collection(pre: :ensure)
# Ensure dependencies exist and also fix drift (apply+index when missing or drift)
SearchEngine::Book.index_collection(pre: :index)
# Partial with preflight
SearchEngine::Book.index_collection(partition: publisher_id, pre: :ensure)
Notes:
- Preflight walks only
belongs_to edges recursively, skipping unregistered collections.
- Cycles are guarded with a visited set; already-visited collections are skipped.
- Default remains unchanged when
pre: is omitted.
Output
- Emits concise console lines for each step and per‑partition result, for example:
Step 1: Presence — processing → present
Step 3: Check Schema Status — in_sync
Step 5: Indexation — processing
partition=123 → status=ok docs=2000 failed=0 batches=10 duration_ms=1523.4
Step 5: Indexation — done
Step 6: Retention Cleanup — skip (handled by schema apply)
Sequence
Targeted bulk helpers (Bulk.index_collections / Bulk.reindex_collections!)
- Run blue/green or destructive indexation for a specific set of collections (not just “all”).
- Two-stage plan:
- Stage 1: inputs that are not referrers of other inputs (referenced-first order).
- Stage 2: unique referencers of any input, topologically sorted and processed once.
- Cascades are suppressed inside individual runs; the final cascade stage handles referencers exactly once.
# Blue/green selected collections
SearchEngine::Bulk.index_collections(:books, :publishers)
# Destructive reindex of a subset with an injected client
SearchEngine::Bulk.reindex_collections!(
SearchEngine::Book,
"authors",
client: SearchEngine.client
)
Use these helpers when you need a targeted rollout (only changed collections) but still want reference-aware ordering.
Return value
Bulk.index_collections and Bulk.reindex_collections! return a hash with:
mode: :index or :reindex
inputs: array of input collection names
stage_1: array of stage 1 collection names
cascade: array of cascade stage collection names
inputs_count: number of input collections
stage_1_count: number of stage 1 collections
cascade_count: number of cascade collections
failed_collections_total: count of unresolved collection targets
Bulk indexing (all collections)
Use Bulk helpers to index or reindex every declared SearchEngine collection discovered under your configured search_engine_models directory. The engine ensures models are eagerly loaded, discovers collections, and orchestrates a two‑stage, reference‑aware run (inputs first, then a deduped cascade of referencers).
# Non‑destructive (blue/green when needed via .index_collection per model)
SearchEngine::Bulk.index_all
# Destructive: drop + index per model
SearchEngine::Bulk.reindex_all!
# Optional client override
SearchEngine::Bulk.index_all(client: SearchEngine.client)
Notes:
- index_all uses each model’s
.index_collection (presence/drift checks, apply+retention when needed; otherwise index into current alias).
- reindex_all! uses each model’s
.reindex_collection! (drop active physical, then .index_collection).
- Discovery leverages the engine’s dedicated loader and
CollectionResolver.models_map; ensure your collections live under app/search_engine (default) or your configured SearchEngine.config.search_engine_models path.
- Emits
search_engine.bulk.run with { inputs, stage_1, cascade, counts, failed_collections_total }. See Observability for payload details.
- Return value: same hash structure as
Bulk.index_collections / Bulk.reindex_collections!, including failed_collections_total.
Backlinks: Models · Schema · Observability
Stale Deletes
Backlinks: CLI, Observability, Troubleshooting, Configuration
Define stale rules once. The same compiled filter powers .stale,
.cleanup, Indexer.delete_stale!, and index:delete_stale.
class SearchEngine::Order < SearchEngine::Base
index do
# Register one or more stale rules; they will be OR‑merged
stale scope: :cancelled # scope must return a Relation
stale :archived # attribute equality => { archived: true }
stale filter: "status:=cancelled" # raw Typesense fragment
stale({ ends_at: ..Time.zone.now })
stale do |partition:| # custom per‑partition rule
partition ? { store_id: partition, archived: true } : "archived:=true"
end
end
end
SearchEngine::Indexer.delete_stale!(SearchEngine::Order, partition: 42)
- Compiles all declared
stale rules into an OR‑merged filter_by string and issues DELETE /collections/:collection/documents with filter_by.
- If no rules are defined or all resolve to blank for the given partition, deletion is skipped.
- Strict-mode guardrails block suspicious catch-alls; enable via
SearchEngine.config.stale_deletes.strict_mode = true.
- Dry-run preview:
SearchEngine::Indexer.delete_stale!(…, dry_run: true) returns a summary without deleting.
Typical filters:
| Pattern | Example |
|---|
| Archived flag | archived:=true |
| Partition + archived | store_id:=123 && archived:=true |
| Date threshold | updated_at:<“2025-01-01T00:00:00Z” or updated_at:<1704067200 |
Events:
search_engine.stale_deletes.started — { collection, into, partition, filter_hash }
search_engine.stale_deletes.skipped — { reason, collection, into, partition }
search_engine.stale_deletes.finished — { collection, into, partition, duration_ms, deleted_count }
search_engine.stale_deletes.error — { collection, into, partition, error_class, message_truncated }
Config:
Inspect stale docs via a Relation
You can preview or chain on the stale set using a scope-like class method that returns a Relation:
# Partition-specific stale docs compiled from `stale` rules
SearchEngine::Order.stale(partition: 42)
# Whole-collection stale docs (if none of the rules depend on partition)
SearchEngine::Order.stale
# Chainable like AR
SearchEngine::Order.stale(partition: 42).limit(10).pluck(:id)
Notes:
-
.stale compiles only stale rules and returns an empty relation when no rules produce a filter for the given partition (implemented as a safe, always‑false filter so Typesense never 400s).
-
For destructive deletes, prefer
Model.cleanup or the index:delete_stale task.
-
SearchEngine.config.stale_deletes.enabled = true
-
SearchEngine.config.stale_deletes.strict_mode = false
-
SearchEngine.config.stale_deletes.timeout_ms = nil
-
SearchEngine.config.stale_deletes.estimation_enabled = false
Stale cleanup DSL & model helper
Define stale cleanup rules alongside your index DSL. Each stale call registers
one rule; cleanup runs them with OR semantics and uses the same compiled filter as
Indexer.delete_stale!.
class SearchEngine::Book < SearchEngine::Base
collection "books"
index do
stale scope: :archived # reuse a model scope (must return a Relation)
stale :marked_as_stale # attribute equality => { marked_as_stale: true }
stale filter: "status:=archived" # raw Typesense fragment
stale({ ends_at: ..Time.zone.now }) # hash converted via Sanitizer
stale do |partition:| # custom filter per partition
partition ? "store_id:=#{partition} && archived:=true" : "archived:=true"
end
map do |record|
# ...
end
end
end
Runtime cleanup is available directly on the model:
# Deletes documents matching any declared stale rules
SearchEngine::Book.cleanup
SearchEngine::Book.cleanup(partition: 42)
SearchEngine::Book.cleanup(into: "books_20251011_120000_001")
# Clear Typesense cache after cleanup
SearchEngine::Book.cleanup(clear_cache: true)
SearchEngine::Base.cleanup delegates to SearchEngine::Deletion.delete_by, emitting the same search_engine.indexer.delete_stale instrumentation. When no stale configuration exists, it logs a skip and returns 0.
Pass clear_cache: true to clear the Typesense search cache after cleanup
finishes. This invokes SearchEngine::Cache.clear and emits
search_engine.cache.clear. Failures are logged and do not affect the cleanup
result.
Available stale inputs:
| Form | Result | | |
|---|
stale scope: :archived | Invokes a model scope; must return a Relation | | |
stale :flagged / stale attribute: :flagged, value: false | Attribute equality Hash | | |
stale({ flag: true, archived: true }) | Sanitized filter_by hash | | |
stale filter: ‘status:=archived’ | Raw Typesense filter string | | |
stale relation | Uses the relation’s compiled filter_by | | |
| `stale do | partition: | … end` | Custom block returning String/Hash/Relation |
Notes:
- Multiple rules are OR‑ed together.
- Block arguments receive
partition:; return a String, Hash, or Relation. nil/blank results are ignored.
- Scope helpers receive the partition as the first positional argument when their arity is ≥ 1, or as
partition: when they declare a keyword.
- Attribute Hash inputs default values to
true when omitted (e.g., stale :flagged).
See also: Deletion for low-level helpers and Observability for emitted events.
Dispatcher
Backlinks: CLI, Observability, Configuration, Jobs
SearchEngine.configure do |c|
c.indexer.dispatch = :active_job # or :inline
c.indexer.queue_name = "search_index"
end
- What it does: routes per-partition rebuilds either synchronously (inline) or via
ActiveJob.
- Note: This setting controls
SearchEngine::Indexer logic (used by rebuild_partition!, index_collection flow). It does not affect ActiveRecordSyncable callbacks, which are always inline.
- API:
SearchEngine::Dispatcher.dispatch!(SearchEngine::Book, partition: key, into: nil, mode: nil, queue: nil, metadata: {})
- Returns an object describing the action: for ActiveJob
{ mode: :active_job, job_id, queue, collection, partition, into }; for inline { mode: :inline, collection, partition, into, indexer_summary, duration_ms }.
- Mode resolution:
mode || SearchEngine.config.indexer.dispatch with a fallback to :inline if ActiveJob is unavailable.
- Queue name: taken from
queue || SearchEngine.config.indexer.queue_name.
ActiveJob job: SearchEngine::IndexPartitionJob.
- Args:
collection_class_name (String), partition (JSON-serializable), optional into (String), optional metadata (Hash).
When to choose model-level upserts
For light-touch updates or small repair batches, the model helpers described in Upsert
are often faster than orchestrating a full Indexer run:
- Single or sparse fixes — use
SearchEngine::Model.upsert(record: …) to map and stream one document without building custom JSONL buffers.
- Small batches —
SearchEngine::Model.upsert_bulk(records: …) streams an enumerable of records through the mapper and client, reusing the same validation path as the Indexer.
- Blue/green touch-ups — the helpers accept
into: / partition: to target a specific physical collection during staged rollout.
Reach for the Indexer when you need full rebuilds, partition orchestration, retry/backoff logic, or instrumented batch statistics. Use the upsert helpers when you only need to nudge a handful of documents into place.
Troubleshooting
- Bulk import shape errors: Ensure each document is a flat Hash with required keys and valid types.
- Retry exhaustion: Inspect
search_engine.indexer.batch_import events; increase backoff or fix upstream issues.
- Stale deletes strict block: Verify your filter is not a catch-all; use partition guards.
Backlinks: CLI, Troubleshooting