Skip to main content

Agora — async-first ETL framework for Python.

Project description

Agora ETL Framework

Async-first ETL framework for Python.

License Python PyPI

Documentation

Examples


Overview

Agora is an async-first ETL framework for Python. It provides a composable pipeline model — source, middleware chain, sink — with built-in support for fault tolerance, observability, checkpointing, and AI enrichment.

The core package is intentionally focused on runtime primitives and extension contracts. Integrations that depend on external systems are expected to live in plugin packages such as agora-etl-plugins.

Key features:

  • Fluent, immutable pipeline builder
  • Built-in dead-letter queue (DLQ) with replay
  • Resumable pipelines via checkpointing
  • Adaptive backpressure for fast-source / slow-sink scenarios
  • AI enrichment middlewares (enrich, classify, extract, translate, batch)
  • Fuzzy and exact deduplication
  • Scheduled pipelines with health monitoring
  • Plugin system via Python entry-points
  • CLI for scaffolding, running, and managing pipelines

Requirements

  • Python 3.11+

Install

pip install agora-etl                # core only
pip install "agora-etl[file]"          # + Parquet support
pip install "agora-etl[all,dev]"       # everything + dev tools

Quick start

agora new my-pipeline
cd my-pipeline
agora run pipelines.example

Core concepts

A pipeline has three parts:

Source  →  Middleware chain  →  Sink(s)
  • Source — emits records one at a time
  • Middleware — transforms, filters, enriches, or validates each record
  • Sink — persists records to a destination

Pipeline is immutable. Every .pipe() and .filter() call returns a new instance, making it safe to branch:

from agora import Middleware, Pipeline, PipelineContext
from agora.sources.file.jsonlines import JsonLinesSource
from agora.sinks.file.jsonlines import JsonLinesSink
from agora.core.dlq import SQLiteDLQSink
from agora.core.checkpoint import SQLiteCheckpointStore


class NormalizeMiddleware(Middleware[RawRecord, CleanRecord]):
    name = "normalize"

    async def process(self, record: RawRecord, ctx: PipelineContext) -> CleanRecord | None:
        if not record.name:
            return None
        return CleanRecord(id=record.id, name=record.name.strip())


summary = await (
    Pipeline(JsonLinesSource("data/records.jsonl", row_mapper=RawRecord.from_dict))
    .pipe(NormalizeMiddleware())
    .filter(lambda r: r.score > 0.8)
    .build(
        JsonLinesSink("output/clean.jsonl"),
        dlq=SQLiteDLQSink(".dlq.db"),
        checkpoint=SQLiteCheckpointStore(".checkpoint.db"),
        checkpoint_every=100,
        batch_size=50,
    )
    .run(max_records=10_000)
)

print(f"written={summary.records_written}  dropped={summary.records_dropped}  errors={summary.records_errored}")

Built-in components

Sources

Component Description
JsonLinesSource Stream records from a JSONL file
CsvSource Stream records from a CSV file
ParquetSource Stream records from a Parquet file ([file] extra)
HTTPSource Abstract base for HTTP polling sources

Sinks

Component Description
JsonLinesSink Write records as JSONL
CsvSink Write records as CSV
ParquetSink Write records to Parquet ([file] extra)
WebhookSink POST records to an HTTP endpoint
StdoutSink Print records to stdout
LogSink Emit records via the structured logger

Middlewares

Component Description
MapMiddleware Apply a synchronous function to each record
FilterMiddleware Drop records that do not match a predicate
RetryMiddleware Retry a middleware on exception
ValidateMiddleware Validate records against a Pydantic model
EnrichMiddleware Enrich records with data from an async callable
DedupMiddleware Drop duplicate records by a computed key

AI middlewares (require an AIProvider plugin)

Component Description
AIEnrichMiddleware Add fields to each record using an LLM
AIClassifyMiddleware Classify records into a fixed set of categories
AIExtractMiddleware Extract structured fields from unstructured text
AIValidateMiddleware Validate records using an LLM
AITranslateMiddleware Translate text fields to a target language
AIBatchMiddleware Batch multiple records into a single LLM call

CLI

agora new <name>          # scaffold a new project
agora run <module>        # run a pipeline once
agora worker              # start the worker pool
agora pipelines list      # list pipeline modules
agora plugins list        # list registered plugins
agora dlq replay          # replay failed records
agora config show         # show resolved settings
agora version             # print version

Configuration

Agora reads config from environment variables or an agora.env file:

AGORA_LOG_LEVEL=INFO
AGORA_HEALTH_HOST=127.0.0.1
AGORA_HEALTH_PORT=8080
AGORA_HEALTH_AUTH_TOKEN=my-secret

Import references inside config files execute Python imports from your project. Treat agora.toml or any agora/v1 pipeline config as trusted input, not as something to accept from untrusted users.


License

Apache 2.0 — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agora_etl-0.1.5.tar.gz (280.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agora_etl-0.1.5-py3-none-any.whl (215.5 kB view details)

Uploaded Python 3

File details

Details for the file agora_etl-0.1.5.tar.gz.

File metadata

  • Download URL: agora_etl-0.1.5.tar.gz
  • Upload date:
  • Size: 280.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for agora_etl-0.1.5.tar.gz
Algorithm Hash digest
SHA256 379a5da66b3875a4383cb1bcba3d137c42ff0e97fb790d00c6b77e841e2808dd
MD5 22083c143517add6ae183d485971c48a
BLAKE2b-256 d63cff14f837d2df47930cddf96df8e973c57f4a4c65403d4abb2f6a6bf9073c

See more details on using hashes here.

Provenance

The following attestation bundles were made for agora_etl-0.1.5.tar.gz:

Publisher: release.yml on thanhtham010891/agora-etl

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file agora_etl-0.1.5-py3-none-any.whl.

File metadata

  • Download URL: agora_etl-0.1.5-py3-none-any.whl
  • Upload date:
  • Size: 215.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for agora_etl-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 d32b6b2fe0d6ac06a2ed72199fe7bae89f372f3e4d6f53b2bee147ca0ec159aa
MD5 577432994fa8a4485a8d431b1d5e2594
BLAKE2b-256 6a74b0d507836a5274524e3fa1b340192ebbac459a3993203602080a587f7a90

See more details on using hashes here.

Provenance

The following attestation bundles were made for agora_etl-0.1.5-py3-none-any.whl:

Publisher: release.yml on thanhtham010891/agora-etl

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page