Overview
The ingestion pipeline is a 4-stage document processing system that converts raw files into searchable, queryable knowledge. Each stage — conversion, chunking, embedding, and metadata extraction — is tracked independently per document, so a failure in one stage does not block progress in others. A document becomes searchable as soon as its chunks and embeddings are stored, even if metadata extraction has not yet completed.
The pipeline is queue-driven via BullMQ and Redis. A document worker pulls jobs from the document-processing queue and runs each stage sequentially with per-stage concurrency limiters to prevent any single bottleneck from overwhelming the system.
Key Concepts
- 4-stage pipeline — Conversion, chunking, embedding, and metadata extraction run as distinct stages with independent status tracking.
- Stage-level status — Each stage has its own status field (
stageConversion,stageChunking,stageEmbedding,stageMetadata) with values:pending,processing,completed,failed,skipped. - Overall status derivation — A document's overall status is derived from its stage statuses:
pending(all stages pending),processing(any stage in progress),completed(all stages completed/skipped),failed(any stage failed). - Concurrency limiters — Each stage has its own concurrency cap to balance throughput and resource usage.
- Fire-and-forget tasks — After all stages complete, background tasks (feed suggestion, wiki update, agent triggering) run without blocking the response.
How It Works
- Conversion (Stage 0) — Binary formats (PDF, DOCX, images) are converted to text via the Docling REST API. Text formats (
.txt,.md,.csv,.json) skip this stage entirely. Legacy formats (.doc,.rtf,.odt) are pre-converted via LibreOffice to.docxbefore Docling processes them. - Chunking (Stage 1) — The extracted text is split into chunks using smart break points (1000 characters default, 200 character overlap). Any existing chunks for the document in Qdrant are deleted before new ones are created.
- Embedding (Stage 2) — Vector embeddings are generated for each chunk via the OpenAI or OpenRouter API. Chunks are batched with concurrency control and upserted into a model-specific Qdrant collection.
- Metadata Extraction (Stage 3) — An LLM extracts structured metadata from the document: title, summary, category, keywords, document type, and data structure. For long documents, multi-region sampling is used to ensure representative coverage.
After all four stages complete successfully, the following fire-and-forget tasks are triggered:
- Feed suggestion —
generateFeedsevaluates whether the new document should be associated with any existing feeds. - Wiki incremental update —
triggerWikiIncrementalupdates the space's wiki with knowledge from the new document. - Agent triggering —
triggerDocumentEventAgentsfires any agents configured withon_documenttriggers.
Why It Works This Way
Stage Separation Preserves Searchability
The 4-stage separation means a metadata extraction failure does not block the document from being searchable. Once chunks and embeddings are stored (stages 1 and 2), the document appears in search results. Metadata extraction can be retried independently without re-processing earlier stages.
Fire-and-Forget for Responsiveness
Background tasks like feed suggestion, wiki generation, and agent triggering use a fire-and-forget pattern (fn().catch(err => logger.warn(...))). This preserves API responsiveness — the upload request returns immediately after queueing, and downstream tasks run asynchronously.
Per-Stage Concurrency Limiters
Each stage has its own concurrency cap (e.g., conversion: 5, embedding: 3). This prevents a burst of large PDFs from saturating the Docling service while embedding jobs starve, or vice versa. Binary and text document concurrency are also tracked separately since text documents skip conversion entirely.
Stage-Level Status Enables Targeted Retry
Because each stage tracks its own status, a failed metadata extraction can be retried without re-converting, re-chunking, or re-embedding the document. This reduces wasted compute and speeds up recovery from transient failures.
Configuration
| Env Var | Description |
|---|---|
DOCLING_API_URL | Docling REST API endpoint (default http://localhost:5001) |
INGESTION_CONCURRENCY_WORKER | Max concurrent document worker jobs (default 6) |
INGESTION_CONCURRENCY_CONVERSION | Max concurrent conversion tasks (default 5) |
INGESTION_CONCURRENCY_EMBEDDING | Max concurrent embedding tasks (default 3) |
INGESTION_CONCURRENCY_METADATA | Max concurrent metadata extraction tasks (default 3) |
INGESTION_CONCURRENCY_BINARY_DOCS | Max concurrent binary document jobs (default 4) |
INGESTION_CONCURRENCY_TEXT_DOCS | Max concurrent text document jobs (default 8) |
Code Reference
| File | Description |
|---|---|
apps/data-plane/src/services/ingestion.ts | Pipeline orchestration and all 4 stages |
apps/data-plane/src/workers/document.ts | BullMQ worker that processes document jobs |
apps/data-plane/src/services/upload.ts | Upload handling, duplicate detection, queue submission |
apps/data-plane/src/lib/queue.ts | BullMQ queue configuration |
apps/data-plane/src/lib/concurrency.ts | ConcurrencyLimiter utility |
Relationships
- Record Manager — Deduplication and document-source linking runs before the pipeline starts
- Tabular Data & SQL — CSV and structured files detected during ingestion are routed to tabular storage
- Spaces — Every document belongs to a space; RLS enforces isolation throughout the pipeline
- Feeds — Feed suggestion runs as a fire-and-forget task after ingestion completes
- Agents —
on_documentagents are triggered after ingestion completes