Skip to main content

Official plugins for agora-etl — Kafka, PostgreSQL, Redis, cron scheduling, and distributed coordination.

Project description

Agora ETL Plugins

Official plugin collection for agora-etl — Redis, cron scheduling, distributed coordination, Kafka, and PostgreSQL.

License Python PyPI


Overview

agora-etl-plugins extends agora-etl with production-ready integrations. Plugins are auto-discovered via Python entry-points — install the package and they register themselves automatically, no manual wiring needed.

from agora import Pipeline
from agora_plugins.redis.sources import RedisStreamSource
from agora_plugins.redis.sinks import RedisSink

summary = await (
    Pipeline(RedisStreamSource(url="redis://localhost:6379", stream="events", group="my-group", consumer="worker-1"))
    .build(RedisSink(url="redis://localhost:6379", key_fn=lambda r: r["id"]))
    .run()
)
print(f"written={summary.records_written}  errors={summary.records_errored}")

Install

pip install "agora-etl-plugins[redis]"        # Redis source, sink, state, DLQ, dedup, AI cache
pip install "agora-etl-plugins[cron]"         # Cron schedule support for ScheduledPipeline
pip install "agora-etl-plugins[distributed]"  # Redis-backed distributed worker coordination
pip install "agora-etl-plugins[kafka]"        # Kafka source and sink
pip install "agora-etl-plugins[postgres]"     # PostgreSQL source, sink, DLQ, schema adapter
pip install "agora-etl-plugins[all]"          # Everything in one install

Available plugins

Redis [redis]

Full Redis integration — streaming ingestion, writes, dead-letter queue, state, deduplication, and LLM response caching.

Component Type Description
RedisStreamSource Source Consume records from a Redis Stream via XREADGROUP
RedisSink Sink Write records to Redis (SET / LPUSH / RPUSH / XADD)
RedisDLQSink Sink Route failed records to a Redis-backed dead-letter queue
RedisDLQSource Source Replay failed records from the Redis DLQ
RedisBackend State Redis-backed state backend with TTL and membership support
RedisStore Dedup Exact-match deduplication via Redis SET NX
RedisEmbeddingStore Dedup Semantic deduplication using cosine similarity (up to ~10k entries)
RedisLLMCache AI Cache Distributed LLM response cache backed by Redis
from agora_plugins.redis.sources import RedisStreamSource
from agora_plugins.redis.sinks import RedisSink
from agora_plugins.redis.dlq import RedisDLQSink, RedisDLQSource
from agora_plugins.redis.state import RedisBackend
from agora_plugins.redis.dedup.stores import RedisStore, RedisEmbeddingStore
from agora_plugins.redis.ai.cache import RedisLLMCache

RedisStreamSource

At-least-once delivery via consumer groups. Acknowledges messages only after successful downstream write.

source = RedisStreamSource(
    url="redis://localhost:6379",
    stream="agora:events",
    group="pipeline-1",
    consumer="worker-1",
    deserializer=lambda fields: MyRecord(**fields),
    batch_size=100,
    block_ms=2000,
    reclaim_idle_ms=60_000,   # reclaim stale pending messages from dead consumers
)

RedisSink

Supports four write modes: set, lpush, rpush, xadd.

# Write as Redis Stream entries
sink = RedisSink(
    url="redis://localhost:6379",
    key_fn=lambda r: "agora:processed",
    serializer=lambda r: {"id": r["id"], "value": r["value"]},
    mode="xadd",
    maxlen=10_000,
)

# Write as key-value pairs with TTL
sink = RedisSink(
    url="redis://localhost:6379",
    key_fn=lambda r: f"cache:{r['id']}",
    serializer=lambda r: json.dumps(r),
    mode="set",
    ttl_seconds=3600,
)

Dead-letter queue

from agora_plugins.redis.dlq import RedisDLQSink, RedisDLQSource

# Route failures to DLQ
summary = await (
    Pipeline(source)
    .build(sink, dlq=RedisDLQSink(url="redis://localhost:6379"))
    .run()
)

# Replay failed records
dlq_source = RedisDLQSource(
    url="redis://localhost:6379",
    pipeline_id="my-pipeline",
    stage="sink_write",
)
async for record in dlq_source.stream():
    print(record.error_message)

Cron [cron]

Adds cron expression support to ScheduledPipeline. Without this plugin, only interval-based scheduling is available.

from agora.runner import Schedule, ScheduledPipeline

pipeline = ScheduledPipeline(
    factory=lambda: my_pipeline,
    schedule=Schedule.cron("0 9 * * 1-5"),  # weekdays at 9am
)
await pipeline.start()

Supported expression format: standard 5-field cron (minute hour day month weekday).


Distributed [distributed]

Redis-backed distributed worker coordination. Prevents duplicate pipeline runs when multiple worker instances are deployed.

Each worker acquires a per-pipeline lease before each run and releases it atomically via a Lua script. Workers register heartbeats so the fleet is visible via agora plugins list.

from agora_plugins.distributed import RedisWorkerCoordinator, DistributedConfig
from agora.runner import WorkerPool

config = DistributedConfig()  # reads AGORA_DISTRIBUTED_* env vars
coordinator = RedisWorkerCoordinator(
    redis_url=config.redis_url,
    lease_ttl_seconds=config.lease_ttl_seconds,
    heartbeat_interval=config.heartbeat_interval,
)

pool = WorkerPool(coordinator=coordinator)
pool.register(my_pipeline)
await pool.run()

Environment variables:

Variable Default Description
AGORA_DISTRIBUTED_REDIS_URL redis://localhost:6379 Redis connection URL
AGORA_DISTRIBUTED_LEASE_TTL_SECONDS 300 Lease duration — must exceed longest pipeline run
AGORA_DISTRIBUTED_HEARTBEAT_INTERVAL 30 Heartbeat interval in seconds
AGORA_DISTRIBUTED_KEY_PREFIX agora:distributed: Redis key namespace
AGORA_DISTRIBUTED_FALLBACK_TO_LOCAL false If true, continue without coordination when Redis is unavailable (risks duplicate runs)

Kafka [kafka]

Kafka source and sink built on aiokafka, with async serializers and bounded pending acknowledgements for backpressure-aware writes.

import json

from agora_plugins.kafka import KafkaSink, KafkaSource

sink = KafkaSink(
    topic="events",
    bootstrap_servers="localhost:9092",
    serializer=lambda record: json.dumps(record).encode(),
)

source = KafkaSource(
    topics=["events"],
    bootstrap_servers="localhost:9092",
    group_id="agora-consumer",
    deserializer=lambda payload: json.loads(payload.decode()),
)

PostgreSQL [postgres]

PostgreSQL source, sink, DLQ, and schema adapter built on psycopg.

from agora_plugins.postgres import PostgresSink, PostgresSource

source = PostgresSource(
    dsn="postgresql://localhost/agora",
    query="SELECT id, name, score FROM public.events ORDER BY id",
    row_mapper=lambda row: row,
)

sink = PostgresSink(
    dsn="postgresql://localhost/agora",
    table="public.events",
    row_mapper=lambda record: record,
    conflict_key="id",
)

Plugin auto-discovery

Source and sink plugins register themselves via Python entry-points. After installing, run:

agora plugins list

to see the currently registered source, sink, and middleware plugins.


Requirements

  • Python 3.11+
  • agora-etl >= 0.1.2

License

Apache 2.0 — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agora_etl_plugins-0.2.0.tar.gz (58.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agora_etl_plugins-0.2.0-py3-none-any.whl (54.7 kB view details)

Uploaded Python 3

File details

Details for the file agora_etl_plugins-0.2.0.tar.gz.

File metadata

  • Download URL: agora_etl_plugins-0.2.0.tar.gz
  • Upload date:
  • Size: 58.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.11.9 HTTPX/0.28.1

File hashes

Hashes for agora_etl_plugins-0.2.0.tar.gz
Algorithm Hash digest
SHA256 b8e752c1b29d82f0bb2da50413b703961a57993f730cef7906bc95f124fb68f5
MD5 8b61359cfcbdd0fa03135329ba5b01ed
BLAKE2b-256 fbf5b9b0317254ff8caab87690514ec55d2799e230ead8eccd19602f7dd6f06f

See more details on using hashes here.

File details

Details for the file agora_etl_plugins-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: agora_etl_plugins-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 54.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.11.9 HTTPX/0.28.1

File hashes

Hashes for agora_etl_plugins-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e61465e0f413af46c91913883634d604fe9159473c4b02b2a7da24fc9ab299bb
MD5 78f1b165883fbd8c14353d7fd91f4fbf
BLAKE2b-256 5ca5180ce60d356723f479f36c7d26dff919ad58ad1da63c3f4dbd439af75118

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page