Skip to main content

Agora — async-first ETL framework for Python.

Project description

Agora ETL Framework

Async-first ETL framework for Python.

License Python


Documentation

Examples


Overview

Agora is an async-first ETL framework for Python. It provides a composable pipeline model — source, middleware chain, sink — with built-in support for fault tolerance, observability, checkpointing, and AI enrichment.

Key features:

  • Fluent, immutable pipeline builder
  • Built-in dead-letter queue (DLQ) with replay
  • Resumable pipelines via checkpointing
  • Adaptive backpressure for fast-source / slow-sink scenarios
  • AI enrichment middlewares (enrich, classify, extract, translate, batch)
  • Fuzzy and exact deduplication
  • Scheduled pipelines with health monitoring
  • Plugin system via Python entry-points
  • CLI for scaffolding, running, and managing pipelines

Requirements

  • Python 3.11+

Install

pip install agora-etl                # core only
pip install agora-etl[file]          # + Parquet / CSV / JSON Lines support
pip install agora-etl[all,dev]       # everything + dev tools

Quick start

agora new my-pipeline
cd my-pipeline
agora run pipelines.example

Core concepts

A pipeline has three parts:

Source  →  Middleware chain  →  Sink(s)
  • Source — emits records one at a time
  • Middleware — transforms, filters, enriches, or validates each record
  • Sink — persists records to a destination

Pipeline is immutable. Every .pipe() and .filter() call returns a new instance, making it safe to branch:

from agora import Pipeline
from agora.sources.file.jsonlines import JsonLinesSource
from agora.sinks.file.jsonlines import JsonLinesSink
from agora.core.dlq import SQLiteDLQSink
from agora.core.checkpoint import SQLiteCheckpointStore


class NormalizeMiddleware(Middleware[RawRecord, CleanRecord]):
    name = "normalize"

    async def process(self, record: RawRecord, ctx: PipelineContext) -> CleanRecord | None:
        if not record.name:
            return None
        return CleanRecord(id=record.id, name=record.name.strip())


summary = await (
    Pipeline(JsonLinesSource("data/records.jsonl", row_mapper=RawRecord.from_dict))
    .pipe(NormalizeMiddleware())
    .filter(lambda r: r.score > 0.8)
    .build(
        JsonLinesSink("output/clean.jsonl"),
        dlq=SQLiteDLQSink(".dlq.db"),
        checkpoint=SQLiteCheckpointStore(".checkpoint.db"),
        checkpoint_every=100,
        batch_size=50,
    )
    .run(max_records=10_000)
)

print(f"written={summary.records_written}  dropped={summary.records_dropped}  errors={summary.records_errored}")

Built-in components

Sources

Component Description
JsonLinesSource Stream records from a JSONL file
CsvSource Stream records from a CSV file
ParquetSource Stream records from a Parquet file ([file] extra)
HTTPSource Abstract base for HTTP polling sources

Sinks

Component Description
JsonLinesSink Write records as JSONL
CsvSink Write records as CSV
ParquetSink Write records to Parquet ([file] extra)
WebhookSink POST records to an HTTP endpoint
StdoutSink Print records to stdout
LogSink Emit records via the structured logger

Middlewares

Component Description
MapMiddleware Apply a synchronous function to each record
FilterMiddleware Drop records that do not match a predicate
RetryMiddleware Retry a middleware on exception
ValidateMiddleware Validate records against a Pydantic model
EnrichMiddleware Enrich records with data from an async callable
DedupMiddleware Drop duplicate records by a computed key

AI middlewares (require an AIProvider plugin)

Component Description
AIEnrichMiddleware Add fields to each record using an LLM
AIClassifyMiddleware Classify records into a fixed set of categories
AIExtractMiddleware Extract structured fields from unstructured text
AIValidateMiddleware Validate records using an LLM
AITranslateMiddleware Translate text fields to a target language
AIBatchMiddleware Batch multiple records into a single LLM call

CLI

agora new <name>          # scaffold a new project
agora run <module>        # run a pipeline once
agora worker              # start the worker pool
agora pipelines list      # list pipeline modules
agora plugins list        # list registered plugins
agora dlq list            # inspect dead-letter queue
agora dlq replay          # replay failed records
agora config show         # show resolved settings
agora version             # print version

Configuration

Agora reads config from environment variables or an agora.env file:

AGORA_LOG_LEVEL=INFO
AGORA_HEALTH_HOST=127.0.0.1
AGORA_HEALTH_PORT=8080
AGORA_HEALTH_AUTH_TOKEN=my-secret

License

Apache 2.0 — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agora_etl-0.1.0.tar.gz (231.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agora_etl-0.1.0-py3-none-any.whl (211.3 kB view details)

Uploaded Python 3

File details

Details for the file agora_etl-0.1.0.tar.gz.

File metadata

  • Download URL: agora_etl-0.1.0.tar.gz
  • Upload date:
  • Size: 231.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.11.9 HTTPX/0.28.1

File hashes

Hashes for agora_etl-0.1.0.tar.gz
Algorithm Hash digest
SHA256 358967318a1641fd365ce41b6ddb0c6d72faa75c62213d368690198440e232db
MD5 20c93836e4de17b93523220520bf8df5
BLAKE2b-256 a33b1e5b45c54a77992abf1a697572970de3a5ed27bec3c28a12a06e0a73cca8

See more details on using hashes here.

File details

Details for the file agora_etl-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: agora_etl-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 211.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.11.9 HTTPX/0.28.1

File hashes

Hashes for agora_etl-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d09fc7c7c7c4e4669f7ba36f1f6227d1f924b21a51e4bff28e9a1e6f72ddcc35
MD5 97aee39f851af8c878b97a2528f44515
BLAKE2b-256 146ffe85bf9300152cbf4e85b2fec73ba35b4799c587cc800719e8de13a60484

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page