Skip to main content

Official plugins for agora-etl — Redis, cron scheduling, and distributed coordination.

Project description

Agora ETL Plugins

Official plugin collection for agora-etl — Redis, cron scheduling, and distributed coordination.

License Python PyPI


Overview

agora-etl-plugins extends agora-etl with production-ready integrations. Plugins are auto-discovered via Python entry-points — install the package and they register themselves automatically, no manual wiring needed.

from agora import Pipeline
from agora_plugins.redis.sources import RedisStreamSource
from agora_plugins.redis.sinks import RedisSink

summary = await (
    Pipeline(RedisStreamSource(url="redis://localhost:6379", stream="events", group="my-group", consumer="worker-1"))
    .build(RedisSink(url="redis://localhost:6379", key_fn=lambda r: r["id"]))
    .run()
)
print(f"written={summary.records_written}  errors={summary.records_errored}")

Install

pip install "agora-etl-plugins[redis]"        # Redis source, sink, state, DLQ, dedup, AI cache
pip install "agora-etl-plugins[cron]"         # Cron schedule support for ScheduledPipeline
pip install "agora-etl-plugins[distributed]"  # Redis-backed distributed worker coordination
pip install "agora-etl-plugins[all]"          # Everything

Available plugins

Redis [redis]

Full Redis integration — streaming ingestion, writes, dead-letter queue, state, deduplication, and LLM response caching.

Component Type Description
RedisStreamSource Source Consume records from a Redis Stream via XREADGROUP
RedisSink Sink Write records to Redis (SET / LPUSH / RPUSH / XADD)
RedisDLQSink Sink Route failed records to a Redis-backed dead-letter queue
RedisDLQSource Source Replay failed records from the Redis DLQ
RedisBackend State Redis-backed state backend with TTL and membership support
RedisStore Dedup Exact-match deduplication via Redis SET NX
RedisEmbeddingStore Dedup Semantic deduplication using cosine similarity (up to ~10k entries)
RedisLLMCache AI Cache Distributed LLM response cache backed by Redis
from agora_plugins.redis.sources import RedisStreamSource
from agora_plugins.redis.sinks import RedisSink
from agora_plugins.redis.dlq import RedisDLQSink, RedisDLQSource
from agora_plugins.redis.state import RedisBackend
from agora_plugins.redis.dedup.stores import RedisStore, RedisEmbeddingStore
from agora_plugins.redis.ai.cache import RedisLLMCache

RedisStreamSource

At-least-once delivery via consumer groups. Acknowledges messages only after successful downstream write.

source = RedisStreamSource(
    url="redis://localhost:6379",
    stream="agora:events",
    group="pipeline-1",
    consumer="worker-1",
    deserializer=lambda fields: MyRecord(**fields),
    batch_size=100,
    block_ms=2000,
    reclaim_idle_ms=60_000,   # reclaim stale pending messages from dead consumers
)

RedisSink

Supports four write modes: set, lpush, rpush, xadd.

# Write as Redis Stream entries
sink = RedisSink(
    url="redis://localhost:6379",
    key_fn=lambda r: "agora:processed",
    serializer=lambda r: {"id": r["id"], "value": r["value"]},
    mode="xadd",
    maxlen=10_000,
)

# Write as key-value pairs with TTL
sink = RedisSink(
    url="redis://localhost:6379",
    key_fn=lambda r: f"cache:{r['id']}",
    serializer=lambda r: json.dumps(r),
    mode="set",
    ttl_seconds=3600,
)

Dead-letter queue

from agora_plugins.redis.dlq import RedisDLQSink, RedisDLQSource

# Route failures to DLQ
summary = await (
    Pipeline(source)
    .build(sink, dlq=RedisDLQSink(url="redis://localhost:6379"))
    .run()
)

# Replay failed records
dlq_source = RedisDLQSource(
    url="redis://localhost:6379",
    pipeline_id="my-pipeline",
    stage="sink_write",
)
async for record in dlq_source.stream():
    print(record.error_message)

Cron [cron]

Adds cron expression support to ScheduledPipeline. Without this plugin, only interval-based scheduling is available.

from agora.runner import ScheduledPipeline

pipeline = ScheduledPipeline(
    factory=lambda: my_pipeline,
    schedule="0 9 * * 1-5",  # weekdays at 9am
)
await pipeline.run()

Supported expression format: standard 5-field cron (minute hour day month weekday).


Distributed [distributed]

Redis-backed distributed worker coordination. Prevents duplicate pipeline runs when multiple worker instances are deployed.

Each worker acquires a per-pipeline lease before each run and releases it atomically via a Lua script. Workers register heartbeats so the fleet is visible via agora plugins list.

from agora_plugins.distributed import RedisWorkerCoordinator, DistributedConfig
from agora.runner import WorkerPool

config = DistributedConfig()  # reads AGORA_DISTRIBUTED_* env vars
coordinator = RedisWorkerCoordinator(
    redis_url=config.redis_url,
    lease_ttl_seconds=config.lease_ttl_seconds,
    heartbeat_interval=config.heartbeat_interval,
)

pool = WorkerPool(pipelines=[my_pipeline], coordinator=coordinator)
await pool.run()

Environment variables:

Variable Default Description
AGORA_DISTRIBUTED_REDIS_URL redis://localhost:6379 Redis connection URL
AGORA_DISTRIBUTED_LEASE_TTL_SECONDS 300 Lease duration — must exceed longest pipeline run
AGORA_DISTRIBUTED_HEARTBEAT_INTERVAL 30 Heartbeat interval in seconds
AGORA_DISTRIBUTED_KEY_PREFIX agora:distributed: Redis key namespace
AGORA_DISTRIBUTED_FALLBACK_TO_LOCAL false If true, continue without coordination when Redis is unavailable (risks duplicate runs)

Plugin auto-discovery

All plugins register themselves via Python entry-points. After installing, run:

agora plugins list

to see all registered plugins and their capabilities.


Requirements

  • Python 3.11+
  • agora-etl >= 0.1.0

License

Apache 2.0 — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agora_etl_plugins-0.1.0.tar.gz (26.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agora_etl_plugins-0.1.0-py3-none-any.whl (28.9 kB view details)

Uploaded Python 3

File details

Details for the file agora_etl_plugins-0.1.0.tar.gz.

File metadata

  • Download URL: agora_etl_plugins-0.1.0.tar.gz
  • Upload date:
  • Size: 26.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.11.9 HTTPX/0.28.1

File hashes

Hashes for agora_etl_plugins-0.1.0.tar.gz
Algorithm Hash digest
SHA256 3dda7b50239feccee3422e05f751c53c33e5c53f2cc695372c4b352e3d7425e8
MD5 30cfb1678f50390c603a4277ae464309
BLAKE2b-256 a2ebc7280354fef81797af3413bf74b9c62d5137593b1a4d32dec192c9bd27f2

See more details on using hashes here.

File details

Details for the file agora_etl_plugins-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: agora_etl_plugins-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 28.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Hatch/1.16.5 cpython/3.11.9 HTTPX/0.28.1

File hashes

Hashes for agora_etl_plugins-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7720457289e22a76cf4441c716c25b5d02b697a10f031acfd1581bf06be64db5
MD5 d86982b45d2efc645dd276292165ecbe
BLAKE2b-256 1dec6e0072b17faeff68fd91374b5b6bcd6407504cfcd048da3ae7577ada9ec1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page