A lightweight, high-concurrency data ingestion and chunking pipeline for RAG systems.
Project description
Tributary
A lightweight, high-concurrency data ingestion and chunking pipeline for Retrieval-Augmented Generation (RAG) systems.
The Problem
Building a RAG pipeline means wiring together document loading, text extraction, chunking, embedding, and vector storage — each with different APIs, async patterns, and failure modes. Most teams end up with brittle scripts that process files sequentially, can't handle mixed formats, and break silently when one document fails.
Tributary handles the plumbing so you can focus on your data. It processes documents concurrently, auto-detects file formats, batches embedding API calls, caches duplicate chunks, and reports exactly what failed and why.
Installation
pip install tributary-ai # Core only (~16 MB)
pip install tributary-ai[openai] # + OpenAI embedder
pip install tributary-ai[pdf,s3] # + PDF extraction + S3 source
pip install tributary-ai[all] # Everything
Optional dependency groups: pdf, s3, gcs, azure, web, sentence, token, openai, cohere, pinecone, qdrant, chroma, pgvector, dashboard, all.
Import stays import tributary regardless of which extras you install.
Docker:
docker build -t tributary .
docker run -v ./docs:/app/docs tributary run --config /app/config/pipeline.yaml
# Or with docker-compose
docker compose up tributary
docker compose --profile dashboard up # with live dashboard
Quickstart
import asyncio
from tributary.sources.local_source import LocalSource
from tributary.chunkers.fixed_chunker import FixedChunker
from tributary.embedders.custom_embedder import CustomEmbedder
from tributary.destinations.json_destination import JSONDestination
from tributary.pipeline.orchestrator import Pipeline
pipeline = Pipeline(
source=LocalSource(directory="./docs", extensions=[".txt", ".md", ".pdf"]),
chunker=FixedChunker(chunk_size=500, overlap=50),
embedder=CustomEmbedder(embed_fn=lambda texts: [[0.0] * 384 for _ in texts]),
destination=JSONDestination("output.jsonl"),
)
result = asyncio.run(pipeline.run())
print(f"Processed {result.successful}/{result.total_documents} documents in {result.time_ms:.0f}ms")
print(f"Metrics: {result.metrics}")
CLI
# Scaffold a new config interactively
tributary init --output pipeline.yaml
# Validate config without running
tributary validate --config pipeline.yaml
# Dry run — show what the config would do
tributary inspect --config pipeline.yaml
# Run the pipeline (with progress bar and rich output)
tributary run --config pipeline.yaml
# Benchmark throughput on sample data
tributary benchmark --docs-dir ./docs --chunk-size 500 --workers 3
# Estimate embedding API costs
tributary cost-estimate --docs-dir ./docs --model text-embedding-3-small
# Run with real-time web dashboard
tributary dashboard --config pipeline.yaml --port 8765
Example pipeline.yaml:
source:
type: local
params:
directory: ./docs
extensions: [".txt", ".md", ".pdf"]
chunker:
strategy: recursive
params:
chunk_size: 500
overlap: 50
embedder:
provider: openai
params:
api_key: your-api-key-here
destination:
type: json
params:
file_path: ./output.jsonl
pipeline:
max_workers: 3
batch_size: 256
# Resilience (all optional)
state_store:
path: .tributary_state.json
retry_policy:
max_retries: 3
base_delay: 1.0
max_delay: 30.0
dead_letter_queue:
path: .tributary_dlq.jsonl
# Adaptive batch sizing (optional)
adaptive_batching:
initial_batch_size: 64
min_batch_size: 8
max_batch_size: 512
target_latency_ms: 2000
Everything except on_event callbacks is configurable via YAML. For custom embedding functions, event callbacks, or multi-pass logic, use the Python API directly — see examples/.
Architecture
Source Extractor Chunker Embedder Destination
────── ───────── ─────── ──────── ───────────
LocalSource TextExtractor FixedChunker OpenAIEmbedder JSONDestination
S3Source MarkdownExtractor RecursiveChunker CohereEmbedder PineconeDestination
GCSSource HTMLExtractor SentenceChunker CustomEmbedder QdrantDestination
AzureBlobSource CSVExtractor TokenBasedChunker ChromaDestination
WebScraperSource JSONExtractor SlidingWindowChunker PgvectorDestination
PDFExtractor
fetch() ────> extract() ────> chunk() ────> embed() ────> store()
│ │ │ │ │
async gen auto-detect thread batched concurrent
+ backpres by extension offloaded + cached + locked
Feature Matrix
Sources
| Source | Description | Auth |
|---|---|---|
LocalSource |
Local filesystem, files or directories | None |
S3Source |
AWS S3 buckets with prefix filtering | AWS credentials |
GCSSource |
Google Cloud Storage with pagination | GCP credentials |
AzureBlobSource |
Azure Blob Storage containers | Connection string or account URL |
WebScraperSource |
HTTP/HTTPS URLs with concurrency control | None |
Extractors
| Format | Extractor | Library |
|---|---|---|
.txt |
TextExtractor |
Built-in |
.md, .markdown |
MarkdownExtractor |
markdown-it-py |
.html, .htm |
HTMLExtractor |
BeautifulSoup4 + lxml |
.csv, .tsv |
CSVExtractor |
Built-in csv module |
.json |
JSONExtractor |
Built-in json module |
.pdf |
PDFExtractor |
PyMuPDF |
Extractors are auto-detected by file extension. All handle UTF-8 with Latin-1 fallback and log encoding issues with structlog.
Chunkers
| Strategy | Class | Key Parameters |
|---|---|---|
| Fixed-size | FixedChunker |
chunk_size, overlap |
| Recursive | RecursiveChunker |
chunk_size, overlap, separators |
| Sentence | SentenceChunker |
sentences_per_chunk, overlap_sentences |
| Token-based | TokenBasedChunker |
chunk_size, overlap, tokenizer |
| Sliding window | SlidingWindowChunker |
window_size, step_size |
| Conditional | ChunkerRouter |
default, rules (by extension) |
Conditional routing — different chunking strategies per file type:
chunker:
strategy: fixed
params: { chunk_size: 500, overlap: 50 }
routing:
".pdf":
strategy: recursive
params: { chunk_size: 800, overlap: 100 }
".md":
strategy: sentence
params: { sentences_per_chunk: 5, overlap_sentences: 1 }
# Python API
from tributary.chunkers.router import ChunkerRouter
router = ChunkerRouter(
default=FixedChunker(chunk_size=500),
rules={".pdf": RecursiveChunker(chunk_size=800), ".md": SentenceChunker()},
)
Embedders
| Provider | Class | Default Model |
|---|---|---|
| OpenAI | OpenAIEmbedder |
text-embedding-3-small |
| Cohere | CohereEmbedder |
embed-english-v3.0 |
| Custom function | CustomEmbedder |
Any sync/async callable |
Need a different provider? CustomEmbedder accepts any sync or async function, so you can plug in Vertex AI, Bedrock, Voyage AI, or any other embedding API in one line.
Destinations
| Destination | Class | Type |
|---|---|---|
| JSON Lines | JSONDestination |
File |
| Pinecone | PineconeDestination |
Managed cloud |
| Qdrant | QdrantDestination |
Self-hosted / cloud |
| ChromaDB | ChromaDestination |
In-memory / local |
| pgvector | PgvectorDestination |
PostgreSQL extension |
| Multi | MultiDestination |
Fan-out to multiple destinations simultaneously |
Multi-destination — send embeddings to multiple destinations at once:
# YAML config
destination:
- type: json
params: { file_path: ./backup.jsonl }
- type: qdrant
params: { collection_name: docs, url: "http://localhost:6333" }
# Python API
from tributary.destinations.multi_destination import MultiDestination
destination = MultiDestination([
JSONDestination("backup.jsonl"),
QdrantDestination(collection_name="docs"),
])
Performance Features
- Concurrent workers — N workers process documents in parallel via asyncio producer-consumer queue
- Backpressure — bounded queue (
queue_size) pauses the producer when workers fall behind - Batched embedding — chunks are grouped into configurable batches (default 256) to minimize API round trips
- Concurrent embedding — multiple embedding batches fire simultaneously per worker, controlled by semaphore (
max_concurrent_embeds) - LRU embedding cache — duplicate chunks (shared headers/footers) are embedded once and cached via
OrderedDict - Thread-offloaded chunking — CPU-bound chunking runs in
asyncio.to_threadto avoid blocking the event loop - Per-stage metrics — extraction, chunking, embedding, and storage are individually timed with min/avg/max stats
- Event callbacks — sync or async hooks for
pipeline_started,document_started,document_completed,document_failed,pipeline_completed - Connection pooling — all destinations use persistent connections via
connect()/close()lifecycle, initialized once and reused across all batches - Adaptive batch sizing — auto-tunes embedding batch size based on API response latency, halves on errors (rate limits/timeouts), respects min/max bounds
- Lazy dependency loading — optional packages (OpenAI, Pinecone, PyMuPDF, etc.) are only loaded when used, with interactive install prompts for missing dependencies
Reliability & Resilience
from tributary.pipeline.state_store import StateStore
from tributary.pipeline.retry import RetryPolicy, DeadLetterQueue
pipeline = Pipeline(
source=..., chunker=..., embedder=..., destination=...,
state_store=StateStore(".tributary_state.json"),
retry_policy=RetryPolicy(max_retries=3, base_delay=1.0),
dead_letter_queue=DeadLetterQueue(".tributary_dlq.jsonl"),
checkpoint_interval=10,
)
| Feature | How it works |
|---|---|
| Document deduplication | SHA-256 hash of content — already-processed documents are skipped on restart |
| Idempotent restart | StateStore loads from disk on init, pipeline resumes where it left off |
| Retry with exponential backoff | Failed documents retry up to max_retries times with base_delay * 2^attempt delay |
| Dead-letter queue | After all retries exhausted, failed documents are persisted to a JSONL file for inspection |
| Checkpointing | State saved to disk every N documents (configurable via checkpoint_interval) |
| Graceful shutdown | SIGINT/SIGTERM stops fetching new documents, finishes current work, saves state |
All resilience features are opt-in — configurable via YAML or Python API. Pass nothing and the pipeline works exactly as before.
Observability
Correlation IDs — each document gets a unique 12-character ID that flows through every log line, making it easy to trace a single document across extraction, chunking, embedding, and storage stages.
Cost estimation — estimate embedding API costs before running:
tributary cost-estimate --docs-dir ./docs --model text-embedding-3-small
from tributary.pipeline.cost_estimator import estimate_cost
est = estimate_cost(chunks, model_name="text-embedding-3-small")
print(f"~{est.estimated_tokens:,} tokens, ~${est.estimated_cost_usd:.4f}")
OpenTelemetry export — bridge pipeline events to Prometheus/Grafana/Datadog:
from tributary.pipeline.otel_exporter import TributaryMetricsExporter
exporter = TributaryMetricsExporter(service_name="my-pipeline")
pipeline = Pipeline(..., on_event=exporter.on_event)
Exposes counters (documents.processed, documents.failed, pipeline.runs) and histograms (pipeline.duration_ms, document.chunks). Requires opentelemetry-sdk — gracefully no-ops if not installed.
Real-time dashboard — browser-based live view of pipeline progress:
tributary dashboard --config pipeline.yaml --port 8765
Opens a web dashboard at http://localhost:8765 showing live document count, success/failure rates, docs/sec throughput, event log, and failure details — all streamed via WebSocket.
Middleware / Hooks
Inject custom logic between pipeline stages without modifying the orchestrator:
from tributary.pipeline.hooks import PipelineHooks
hooks = PipelineHooks()
@hooks.after_extract
def skip_tiny_documents(extraction, source_name):
if extraction.char_count < 100:
return None # skip this document
return extraction
@hooks.after_chunk
def filter_short_chunks(chunks, source_name):
return [c for c in chunks if c.char_count > 50]
@hooks.before_embed
def normalize_text(texts, source_name):
return [t.lower().strip() for t in texts]
pipeline = Pipeline(..., hooks=hooks)
| Hook | Receives | Can |
|---|---|---|
after_extract |
ExtractionResult |
Modify text, skip document (return None) |
after_chunk |
list[ChunkResult] |
Filter, reorder, modify chunks |
before_embed |
list[str] |
Transform text before embedding |
after_embed |
list[EmbeddingResult] |
Filter embeddings before storage |
Multiple hooks per stage chain in registration order. No hooks registered = zero overhead.
Built-in: Chunk Quality Scoring — detect and filter garbage chunks before they waste embedding API calls:
from tributary.pipeline.quality import ChunkQualityScorer
scorer = ChunkQualityScorer(min_score=0.3)
hooks.after_chunk(scorer.as_chunk_filter())
Scores each chunk 0.0-1.0 based on 5 signals: length, whitespace ratio, alphabetic ratio, repetition, and sentence structure. Chunks below min_score are dropped before embedding.
Examples
The examples/ directory shows things the CLI can't do:
| Example | What it demonstrates |
|---|---|
local_to_json.py |
Compare 3 chunking strategies on the same documents |
pdf_recursive_chunking.py |
Two-pass pipeline with automatic retry on failures |
with_events_and_metrics.py |
Live progress callbacks and per-stage performance analysis |
Webhook Notifications
POST to a URL when the pipeline completes or a document fails:
pipeline:
webhook:
url: https://example.com/hook
events: ["pipeline_completed", "document_failed"]
headers:
Authorization: "Bearer token"
timeout: 10
Failures are logged and swallowed — a webhook timeout never crashes the pipeline.
Config Inheritance
Create a base config and override per environment:
# base.yaml
source:
type: local
params: { directory: ./docs }
chunker:
strategy: recursive
params: { chunk_size: 500 }
embedder:
provider: openai
destination:
type: json
params: { file_path: ./output.jsonl }
# production.yaml
extends: base.yaml
source:
params: { directory: /data/production }
destination:
type: qdrant
params: { collection_name: prod, url: "http://qdrant:6333" }
Deep merge: nested dicts merge recursively, scalars and lists are replaced. Supports chaining (A extends B extends C).
Tests
pytest -v # 317 tests passing
Design Patterns
| Pattern | Where | Why |
|---|---|---|
| Strategy | Chunkers, Embedders, Destinations | Swap algorithms without changing the pipeline |
| Factory + Registry | get_source(), get_chunker(), get_embedder(), get_destination(), get_extractor_for_extension() |
Create components by name string |
| Producer-Consumer | Pipeline orchestrator | Decouple document fetching from processing |
| Abstract Base Class | BaseSource, BaseExtractor, BaseChunker, BaseEmbedder, BaseDestination |
Enforce interface contracts |
| Template Method | BaseEmbedder.embed_chunks() wraps embed() |
Base class handles caching + metadata, subclass handles vectors |
| Observer | on_event callback |
Monitor pipeline progress without coupling |
| Feedback Loop | AdaptiveBatcher |
Auto-tune batch size from embedding API response times |
| Composite | MultiDestination |
Fan-out to multiple destinations as one |
| Router | ChunkerRouter |
Route documents to different chunkers by file type |
| Interceptor | PipelineHooks |
Inject custom logic between stages via decorator chain |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tributary_ai-0.1.0.tar.gz.
File metadata
- Download URL: tributary_ai-0.1.0.tar.gz
- Upload date:
- Size: 325.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b4f9aa23d42f1db42d3f8ff4efa7f364b25f82c0192ec065363c0dd9c0d830b6
|
|
| MD5 |
e6946a4ff8bd447c96ed904d13ea5bf4
|
|
| BLAKE2b-256 |
576803ac32dc07bb367c87a949f82033dbef2cecfcbf19b280087228c671fe7a
|
File details
Details for the file tributary_ai-0.1.0-py3-none-any.whl.
File metadata
- Download URL: tributary_ai-0.1.0-py3-none-any.whl
- Upload date:
- Size: 67.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.11.3 {"installer":{"name":"uv","version":"0.11.3","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cbc1fcb3763536b824c0a52950965fbb6c96af923b55f72755fde26f06a93991
|
|
| MD5 |
2442705eb51be137024ab0904d966b9d
|
|
| BLAKE2b-256 |
c9277a8b2497e2fe693576ac5d0da509c285e365e196743ecf5fe79797eea4a7
|