Skip to main content

A lightweight, high-concurrency data ingestion and chunking pipeline for RAG systems.

Project description

Tributary

PyPI Tests Python License

A lightweight, high-concurrency data ingestion and chunking pipeline for Retrieval-Augmented Generation (RAG) systems.


The Problem

Building a RAG pipeline means wiring together document loading, text extraction, chunking, embedding, and vector storage — each with different APIs, async patterns, and failure modes. Most teams end up with brittle scripts that process files sequentially, can't handle mixed formats, and break silently when one document fails.

Tributary handles the plumbing so you can focus on your data. It processes documents concurrently, auto-detects file formats, batches embedding API calls, caches duplicate chunks, and reports exactly what failed and why.


Installation

pip install tributary-ai                # Core only (~16 MB)
pip install tributary-ai[openai]        # + OpenAI embedder
pip install tributary-ai[pdf,s3]        # + PDF extraction + S3 source
pip install tributary-ai[all]           # Everything

Optional dependency groups: pdf, s3, gcs, azure, web, sentence, token, openai, cohere, pinecone, qdrant, chroma, pgvector, dashboard, all.

Import stays import tributary regardless of which extras you install.

Docker:

docker build -t tributary .
docker run -v ./docs:/app/docs tributary run --config /app/config/pipeline.yaml

# Or with docker-compose
docker compose up tributary
docker compose --profile dashboard up   # with live dashboard

Quickstart

import asyncio
from tributary.sources.local_source import LocalSource
from tributary.chunkers.fixed_chunker import FixedChunker
from tributary.embedders.custom_embedder import CustomEmbedder
from tributary.destinations.json_destination import JSONDestination
from tributary.pipeline.orchestrator import Pipeline

pipeline = Pipeline(
    source=LocalSource(directory="./docs", extensions=[".txt", ".md", ".pdf"]),
    chunker=FixedChunker(chunk_size=500, overlap=50),
    embedder=CustomEmbedder(embed_fn=lambda texts: [[0.0] * 384 for _ in texts]),
    destination=JSONDestination("output.jsonl"),
)

result = asyncio.run(pipeline.run())
print(f"Processed {result.successful}/{result.total_documents} documents in {result.time_ms:.0f}ms")
print(f"Metrics: {result.metrics}")

CLI

# Scaffold a new config interactively
tributary init --output pipeline.yaml

# Validate config without running
tributary validate --config pipeline.yaml

# Dry run — show what the config would do
tributary inspect --config pipeline.yaml

# Run the pipeline (with progress bar and rich output)
tributary run --config pipeline.yaml

# Benchmark throughput on sample data
tributary benchmark --docs-dir ./docs --chunk-size 500 --workers 3

# Estimate embedding API costs
tributary cost-estimate --docs-dir ./docs --model text-embedding-3-small

# Run with real-time web dashboard
tributary dashboard --config pipeline.yaml --port 8765

Example pipeline.yaml:

source:
  type: local
  params:
    directory: ./docs
    extensions: [".txt", ".md", ".pdf"]

chunker:
  strategy: recursive
  params:
    chunk_size: 500
    overlap: 50

embedder:
  provider: openai
  params:
    api_key: your-api-key-here

destination:
  type: json
  params:
    file_path: ./output.jsonl

pipeline:
  max_workers: 3
  batch_size: 256

  # Resilience (all optional)
  state_store:
    path: .tributary_state.json
  retry_policy:
    max_retries: 3
    base_delay: 1.0
    max_delay: 30.0
  dead_letter_queue:
    path: .tributary_dlq.jsonl

  # Adaptive batch sizing (optional)
  adaptive_batching:
    initial_batch_size: 64
    min_batch_size: 8
    max_batch_size: 512
    target_latency_ms: 2000

Everything except on_event callbacks is configurable via YAML. For custom embedding functions, event callbacks, or multi-pass logic, use the Python API directly — see examples/.


Architecture

Source              Extractor          Chunker              Embedder          Destination
──────              ─────────          ───────              ────────          ───────────
LocalSource         TextExtractor      FixedChunker         OpenAIEmbedder    JSONDestination
S3Source            MarkdownExtractor  RecursiveChunker     CohereEmbedder    PineconeDestination
GCSSource           HTMLExtractor      SentenceChunker      CustomEmbedder    QdrantDestination
AzureBlobSource     CSVExtractor       TokenBasedChunker                      ChromaDestination
WebScraperSource    JSONExtractor      SlidingWindowChunker                   PgvectorDestination
                    PDFExtractor

    fetch() ────> extract() ────> chunk() ────> embed() ────> store()
         │             │              │             │             │
     async gen     auto-detect     thread        batched      concurrent
     + backpres    by extension    offloaded     + cached     + locked

Feature Matrix

Sources

Source Description Auth
LocalSource Local filesystem, files or directories None
S3Source AWS S3 buckets with prefix filtering AWS credentials
GCSSource Google Cloud Storage with pagination GCP credentials
AzureBlobSource Azure Blob Storage containers Connection string or account URL
WebScraperSource HTTP/HTTPS URLs with concurrency control None

Extractors

Format Extractor Library
.txt TextExtractor Built-in
.md, .markdown MarkdownExtractor markdown-it-py
.html, .htm HTMLExtractor BeautifulSoup4 + lxml
.csv, .tsv CSVExtractor Built-in csv module
.json JSONExtractor Built-in json module
.pdf PDFExtractor PyMuPDF

Extractors are auto-detected by file extension. All handle UTF-8 with Latin-1 fallback and log encoding issues with structlog.

Chunkers

Strategy Class Key Parameters
Fixed-size FixedChunker chunk_size, overlap
Recursive RecursiveChunker chunk_size, overlap, separators
Sentence SentenceChunker sentences_per_chunk, overlap_sentences
Token-based TokenBasedChunker chunk_size, overlap, tokenizer
Sliding window SlidingWindowChunker window_size, step_size
Conditional ChunkerRouter default, rules (by extension)

Conditional routing — different chunking strategies per file type:

chunker:
  strategy: fixed
  params: { chunk_size: 500, overlap: 50 }
  routing:
    ".pdf":
      strategy: recursive
      params: { chunk_size: 800, overlap: 100 }
    ".md":
      strategy: sentence
      params: { sentences_per_chunk: 5, overlap_sentences: 1 }
# Python API
from tributary.chunkers.router import ChunkerRouter

router = ChunkerRouter(
    default=FixedChunker(chunk_size=500),
    rules={".pdf": RecursiveChunker(chunk_size=800), ".md": SentenceChunker()},
)

Embedders

Provider Class Default Model
OpenAI OpenAIEmbedder text-embedding-3-small
Cohere CohereEmbedder embed-english-v3.0
Custom function CustomEmbedder Any sync/async callable

Need a different provider? CustomEmbedder accepts any sync or async function, so you can plug in Vertex AI, Bedrock, Voyage AI, or any other embedding API in one line.

Destinations

Destination Class Type
JSON Lines JSONDestination File
Pinecone PineconeDestination Managed cloud
Qdrant QdrantDestination Self-hosted / cloud
ChromaDB ChromaDestination In-memory / local
pgvector PgvectorDestination PostgreSQL extension
Multi MultiDestination Fan-out to multiple destinations simultaneously

Multi-destination — send embeddings to multiple destinations at once:

# YAML config
destination:
  - type: json
    params: { file_path: ./backup.jsonl }
  - type: qdrant
    params: { collection_name: docs, url: "http://localhost:6333" }
# Python API
from tributary.destinations.multi_destination import MultiDestination

destination = MultiDestination([
    JSONDestination("backup.jsonl"),
    QdrantDestination(collection_name="docs"),
])

Performance Features

  • Concurrent workers — N workers process documents in parallel via asyncio producer-consumer queue
  • Backpressure — bounded queue (queue_size) pauses the producer when workers fall behind
  • Batched embedding — chunks are grouped into configurable batches (default 256) to minimize API round trips
  • Concurrent embedding — multiple embedding batches fire simultaneously per worker, controlled by semaphore (max_concurrent_embeds)
  • LRU embedding cache — duplicate chunks (shared headers/footers) are embedded once and cached via OrderedDict
  • Thread-offloaded chunking — CPU-bound chunking runs in asyncio.to_thread to avoid blocking the event loop
  • Per-stage metrics — extraction, chunking, embedding, and storage are individually timed with min/avg/max stats
  • Event callbacks — sync or async hooks for pipeline_started, document_started, document_completed, document_failed, pipeline_completed
  • Connection pooling — all destinations use persistent connections via connect()/close() lifecycle, initialized once and reused across all batches
  • Adaptive batch sizing — auto-tunes embedding batch size based on API response latency, halves on errors (rate limits/timeouts), respects min/max bounds
  • Lazy dependency loading — optional packages (OpenAI, Pinecone, PyMuPDF, etc.) are only loaded when used, with interactive install prompts for missing dependencies

Reliability & Resilience

from tributary.pipeline.state_store import StateStore
from tributary.pipeline.retry import RetryPolicy, DeadLetterQueue

pipeline = Pipeline(
    source=..., chunker=..., embedder=..., destination=...,
    state_store=StateStore(".tributary_state.json"),
    retry_policy=RetryPolicy(max_retries=3, base_delay=1.0),
    dead_letter_queue=DeadLetterQueue(".tributary_dlq.jsonl"),
    checkpoint_interval=10,
)
Feature How it works
Document deduplication SHA-256 hash of content — already-processed documents are skipped on restart
Idempotent restart StateStore loads from disk on init, pipeline resumes where it left off
Retry with exponential backoff Failed documents retry up to max_retries times with base_delay * 2^attempt delay
Dead-letter queue After all retries exhausted, failed documents are persisted to a JSONL file for inspection
Checkpointing State saved to disk every N documents (configurable via checkpoint_interval)
Graceful shutdown SIGINT/SIGTERM stops fetching new documents, finishes current work, saves state

All resilience features are opt-in — configurable via YAML or Python API. Pass nothing and the pipeline works exactly as before.


Observability

Correlation IDs — each document gets a unique 12-character ID that flows through every log line, making it easy to trace a single document across extraction, chunking, embedding, and storage stages.

Cost estimation — estimate embedding API costs before running:

tributary cost-estimate --docs-dir ./docs --model text-embedding-3-small
from tributary.pipeline.cost_estimator import estimate_cost

est = estimate_cost(chunks, model_name="text-embedding-3-small")
print(f"~{est.estimated_tokens:,} tokens, ~${est.estimated_cost_usd:.4f}")

OpenTelemetry export — bridge pipeline events to Prometheus/Grafana/Datadog:

from tributary.pipeline.otel_exporter import TributaryMetricsExporter

exporter = TributaryMetricsExporter(service_name="my-pipeline")
pipeline = Pipeline(..., on_event=exporter.on_event)

Exposes counters (documents.processed, documents.failed, pipeline.runs) and histograms (pipeline.duration_ms, document.chunks). Requires opentelemetry-sdk — gracefully no-ops if not installed.

Real-time dashboard — browser-based live view of pipeline progress:

tributary dashboard --config pipeline.yaml --port 8765

Opens a web dashboard at http://localhost:8765 showing live document count, success/failure rates, docs/sec throughput, event log, and failure details — all streamed via WebSocket.


Middleware / Hooks

Inject custom logic between pipeline stages without modifying the orchestrator:

from tributary.pipeline.hooks import PipelineHooks

hooks = PipelineHooks()

@hooks.after_extract
def skip_tiny_documents(extraction, source_name):
    if extraction.char_count < 100:
        return None  # skip this document
    return extraction

@hooks.after_chunk
def filter_short_chunks(chunks, source_name):
    return [c for c in chunks if c.char_count > 50]

@hooks.before_embed
def normalize_text(texts, source_name):
    return [t.lower().strip() for t in texts]

pipeline = Pipeline(..., hooks=hooks)
Hook Receives Can
after_extract ExtractionResult Modify text, skip document (return None)
after_chunk list[ChunkResult] Filter, reorder, modify chunks
before_embed list[str] Transform text before embedding
after_embed list[EmbeddingResult] Filter embeddings before storage

Multiple hooks per stage chain in registration order. No hooks registered = zero overhead.

Built-in: Chunk Quality Scoring — detect and filter garbage chunks before they waste embedding API calls:

from tributary.pipeline.quality import ChunkQualityScorer

scorer = ChunkQualityScorer(min_score=0.3)
hooks.after_chunk(scorer.as_chunk_filter())

Scores each chunk 0.0-1.0 based on 5 signals: length, whitespace ratio, alphabetic ratio, repetition, and sentence structure. Chunks below min_score are dropped before embedding.


Examples

The examples/ directory shows things the CLI can't do:

Example What it demonstrates
local_to_json.py Compare 3 chunking strategies on the same documents
pdf_recursive_chunking.py Two-pass pipeline with automatic retry on failures
with_events_and_metrics.py Live progress callbacks and per-stage performance analysis

Webhook Notifications

POST to a URL when the pipeline completes or a document fails:

pipeline:
  webhook:
    url: https://example.com/hook
    events: ["pipeline_completed", "document_failed"]
    headers:
      Authorization: "Bearer token"
    timeout: 10

Failures are logged and swallowed — a webhook timeout never crashes the pipeline.


Config Inheritance

Create a base config and override per environment:

# base.yaml
source:
  type: local
  params: { directory: ./docs }
chunker:
  strategy: recursive
  params: { chunk_size: 500 }
embedder:
  provider: openai
destination:
  type: json
  params: { file_path: ./output.jsonl }
# production.yaml
extends: base.yaml
source:
  params: { directory: /data/production }
destination:
  type: qdrant
  params: { collection_name: prod, url: "http://qdrant:6333" }

Deep merge: nested dicts merge recursively, scalars and lists are replaced. Supports chaining (A extends B extends C).


Tests

pytest -v  # 317 tests passing

Design Patterns

Pattern Where Why
Strategy Chunkers, Embedders, Destinations Swap algorithms without changing the pipeline
Factory + Registry get_source(), get_chunker(), get_embedder(), get_destination(), get_extractor_for_extension() Create components by name string
Producer-Consumer Pipeline orchestrator Decouple document fetching from processing
Abstract Base Class BaseSource, BaseExtractor, BaseChunker, BaseEmbedder, BaseDestination Enforce interface contracts
Template Method BaseEmbedder.embed_chunks() wraps embed() Base class handles caching + metadata, subclass handles vectors
Observer on_event callback Monitor pipeline progress without coupling
Feedback Loop AdaptiveBatcher Auto-tune batch size from embedding API response times
Composite MultiDestination Fan-out to multiple destinations as one
Router ChunkerRouter Route documents to different chunkers by file type
Interceptor PipelineHooks Inject custom logic between stages via decorator chain

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tributary_ai-0.1.2.tar.gz (288.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tributary_ai-0.1.2-py3-none-any.whl (67.2 kB view details)

Uploaded Python 3

File details

Details for the file tributary_ai-0.1.2.tar.gz.

File metadata

  • Download URL: tributary_ai-0.1.2.tar.gz
  • Upload date:
  • Size: 288.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for tributary_ai-0.1.2.tar.gz
Algorithm Hash digest
SHA256 e6d6d14c2ec605840cb2811cb47b8cd20a87a193795cd11f79ec555bbc9347fb
MD5 8b226e0a652010c29e3a1ce4f395b5ef
BLAKE2b-256 dd90075fad45b211fcb00e693bcdc0ec85c9896145c4624d1b76c7f78ad6c1cc

See more details on using hashes here.

Provenance

The following attestation bundles were made for tributary_ai-0.1.2.tar.gz:

Publisher: ci.yml on AbhishekMandapmalvi/Tributary-ai

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file tributary_ai-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: tributary_ai-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 67.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for tributary_ai-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 961285fc6779c9bebac30a027f9c16f413527b8d2553824522951d930a68a862
MD5 0256ac1f4716c50f9d5b8c50a923d44f
BLAKE2b-256 6103d638213a4ce5dc64d94a0d6a487cfec38b51ec08a1448d7a104d0efa0b75

See more details on using hashes here.

Provenance

The following attestation bundles were made for tributary_ai-0.1.2-py3-none-any.whl:

Publisher: ci.yml on AbhishekMandapmalvi/Tributary-ai

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page