Skip to main content

Redis Pub/Sub event bus backend for varco — RedisEventBus built on redis.asyncio

Project description

varco-redis

Redis Pub/Sub event bus backend for varco.

RedisEventBus implements AbstractEventBus from varco_core using redis.asyncio. Published events are serialized to JSON (via JsonEventSerializer) and sent to Redis Pub/Sub channels. A background listener task polls those channels and dispatches messages to locally registered handlers.


Installation

pip install varco-redis
# or with uv:
uv add varco-redis

Quick start

from varco_redis import RedisEventBus, RedisConfig
from varco_core.event import BusEventProducer, EventConsumer, listen, Event

# Define your events
class OrderPlacedEvent(Event):
    __event_type__ = "order.placed"
    order_id: str
    total: float

# Configure the bus
config = RedisConfig(url="redis://localhost:6379/0")

async def main():
    async with RedisEventBus(config) as bus:
        # --- Consumer side ---
        class OrderConsumer(EventConsumer):
            @listen(OrderPlacedEvent, channel="orders")
            async def on_placed(self, event: OrderPlacedEvent) -> None:
                print(f"Order placed: {event.order_id}")

        OrderConsumer().register_to(bus)

        # Give the Pub/Sub subscription time to establish
        import asyncio
        await asyncio.sleep(0.1)

        # --- Producer side ---
        producer = BusEventProducer(bus)
        await producer._produce(
            OrderPlacedEvent(order_id="abc", total=99.0),
            channel="orders",
        )

Configuration

from varco_redis import RedisConfig

config = RedisConfig(
    url="redis://redis.internal:6379/0",   # Redis connection URL
    channel_prefix="prod:",               # optional — "orders" → "prod:orders"
    socket_timeout=5.0,                   # seconds, None = no timeout
)
Field Default Description
url "redis://localhost:6379/0" Redis connection URL
channel_prefix "" Prepended to every channel name
decode_responses False Must be False — bus expects raw bytes
socket_timeout None Socket operation timeout in seconds
redis_kwargs {} Extra kwargs for redis.asyncio.from_url()

Lifecycle

# Explicit lifecycle
bus = RedisEventBus(config)
await bus.start()    # connects to Redis, starts listener task
# ... use bus ...
await bus.stop()     # cancels listener, closes connection

# Context manager (recommended)
async with RedisEventBus(config) as bus:
    ...

Wildcard subscriptions

Subscribing with channel=CHANNEL_ALL (the default) uses Redis PSUBSCRIBE "*" — the handler receives events from every channel published to this Redis instance. Use channel_prefix to scope channels to your service and avoid cross-service interference:

# All events with prefix "svc-a:" on this Redis
config = RedisConfig(channel_prefix="svc-a:")
bus.subscribe(MyEvent, handler)  # receives from all "svc-a:*" channels

Running tests

# Unit tests (no Redis required)
uv sync
uv run pytest

# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration

Delivery semantics

Redis Pub/Sub provides at-most-once delivery — messages published while no subscriber is connected are silently dropped. If you need at-least-once or exactly-once delivery, use Redis Streams (planned as varco_redis.streams in a future release) or switch to varco_kafka.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_redis-0.0.1.tar.gz (16.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_redis-0.0.1-py3-none-any.whl (13.0 kB view details)

Uploaded Python 3

File details

Details for the file varco_redis-0.0.1.tar.gz.

File metadata

  • Download URL: varco_redis-0.0.1.tar.gz
  • Upload date:
  • Size: 16.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_redis-0.0.1.tar.gz
Algorithm Hash digest
SHA256 d78d1cb53be6a23f29145f198f661fe93c6e78140cea542190a374ace8f9c658
MD5 c0e52ceaf94fe2db119384348f4c7484
BLAKE2b-256 24e8f752e9c22c476dc6ac6622d87b58aff13c31f02e1ccb7cd883661dc7dd5a

See more details on using hashes here.

File details

Details for the file varco_redis-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: varco_redis-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 13.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_redis-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 bd6c00e6f79575bac10090a81beda038388750bb8635231c4ea8525d455c9461
MD5 40bdfc6e30eaf63f6414e65aa86f73c5
BLAKE2b-256 f58cbb33087be4e50528b999d09fa74f9cf441c088492b0167618fdd4a28e24e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page