Skip to main content

Redis Pub/Sub event bus backend for varco — RedisEventBus built on redis.asyncio

Project description

varco-redis

Redis backend for varco.

Provides two independent subsystems backed by Redis:

  • RedisEventBus — Pub/Sub event bus implementing AbstractEventBus from varco_core
  • RedisCache — async cache backend implementing CacheBackend from varco_core

Installation

pip install varco-redis
# or with uv:
uv add varco-redis

Event bus quick start

from varco_redis import RedisEventBus, RedisEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen, Event

# Define your events
class OrderPlacedEvent(Event):
    __event_type__ = "order.placed"
    order_id: str
    total: float

# Configure the bus
settings = RedisEventBusSettings(url="redis://localhost:6379/0")

async def main():
    async with RedisEventBus(settings) as bus:
        # --- Consumer side ---
        class OrderConsumer(EventConsumer):
            @listen(OrderPlacedEvent, channel="orders")
            async def on_placed(self, event: OrderPlacedEvent) -> None:
                print(f"Order placed: {event.order_id}")

        OrderConsumer().register_to(bus)

        # Give the Pub/Sub subscription time to establish
        import asyncio
        await asyncio.sleep(0.1)

        # --- Producer side ---
        producer = BusEventProducer(bus)
        await producer._produce(
            OrderPlacedEvent(order_id="abc", total=99.0),
            channel="orders",
        )

Event bus configuration

from varco_redis import RedisEventBusSettings

settings = RedisEventBusSettings(
    url="redis://redis.internal:6379/0",   # Redis connection URL
    channel_prefix="prod:",               # optional — "orders" → "prod:orders"
    socket_timeout=5.0,                   # seconds, None = no timeout
)
Field Default Env var Description
url "redis://localhost:6379/0" VARCO_REDIS_URL Redis connection URL
channel_prefix "" VARCO_REDIS_CHANNEL_PREFIX Prepended to every channel name
decode_responses False Must be False — bus expects raw bytes
socket_timeout None VARCO_REDIS_SOCKET_TIMEOUT Socket operation timeout in seconds
redis_kwargs {} Extra kwargs for redis.asyncio.from_url()

Event bus lifecycle

# Explicit lifecycle
bus = RedisEventBus(settings)
await bus.start()    # connects to Redis, starts listener task
# ... use bus ...
await bus.stop()     # cancels listener, closes connection

# Context manager (recommended)
async with RedisEventBus(settings) as bus:
    ...

Wildcard subscriptions

Subscribing with channel=CHANNEL_ALL (the default) uses Redis PSUBSCRIBE "*" — the handler receives events from every channel published to this Redis instance. Use channel_prefix to scope channels to your service and avoid cross-service interference:

# All events with prefix "svc-a:" on this Redis
settings = RedisEventBusSettings(channel_prefix="svc-a:")
bus.subscribe(MyEvent, handler)  # receives from all "svc-a:*" channels

Cache quick start

from varco_redis.cache import RedisCache, RedisCacheSettings

cache_settings = RedisCacheSettings(
    url="redis://localhost:6379/0",
    key_prefix="myapp:",   # all keys stored as "myapp:<key>"
    default_ttl=300,       # seconds; None = no expiry
)

async with RedisCache(cache_settings) as cache:
    await cache.set("user:42", {"name": "Alice"})
    user = await cache.get("user:42")    # returns dict or None
    await cache.delete("user:42")
    await cache.clear()                  # removes all "myapp:*" keys

Cache configuration

from varco_redis.cache import RedisCacheSettings

settings = RedisCacheSettings(
    url="redis://localhost:6379/0",
    key_prefix="prod:",
    default_ttl=600,
    socket_timeout=2.0,
)
Field Default Env var Description
url "redis://localhost:6379/0" VARCO_REDIS_CACHE_URL Redis connection URL
key_prefix "" VARCO_REDIS_CACHE_KEY_PREFIX Prepended to every stored key
default_ttl None VARCO_REDIS_CACHE_DEFAULT_TTL Default TTL in seconds; None = no expiry
decode_responses False Must be False — cache stores raw bytes
socket_timeout None VARCO_REDIS_CACHE_SOCKET_TIMEOUT Socket operation timeout in seconds
redis_kwargs {} Extra kwargs for redis.asyncio.from_url()

Layered cache (L1 memory + L2 Redis)

from varco_core.cache import InMemoryCache, LayeredCache, TTLStrategy
from varco_redis.cache import RedisCache, RedisCacheSettings

l1 = InMemoryCache(strategy=TTLStrategy(60))       # fast in-process layer
l2 = RedisCache(RedisCacheSettings(key_prefix="app:"))  # shared Redis layer

async with LayeredCache(l1, l2, promote_ttl=60) as cache:
    await cache.set("product:1", product, ttl=300)
    # First read: L1 miss → L2 hit → promote to L1
    result = await cache.get("product:1")
    # Second read: served from L1 (no network round-trip)

Cache + CachedService with cross-process invalidation

from varco_core.cache import CachedService, LayeredCache, TTLStrategy, InMemoryCache
from varco_redis.cache import RedisCache, RedisCacheSettings
from varco_redis import RedisEventBus, RedisEventBusSettings

bus_settings = RedisEventBusSettings(url="redis://localhost:6379/0")
cache_settings = RedisCacheSettings(url="redis://localhost:6379/0", key_prefix="posts:")

async with (
    RedisEventBus(bus_settings) as bus,
    LayeredCache(
        InMemoryCache(strategy=TTLStrategy(60)),
        RedisCache(cache_settings),
        promote_ttl=60,
    ) as cache,
):
    cached = CachedService(
        post_service,
        cache,
        namespace="posts",
        default_ttl=300,
        bus=bus,                           # publish invalidation events
        bus_channel="posts.invalidations", # other nodes subscribe here
    )

    post = await cached.get(42)           # cached
    posts = await cached.list()           # cached
    await cached.update(42, {"title": "New"})  # invalidates + publishes event

DI integration

from providify import DIContainer
from varco_core.cache import CacheBackend
from varco_redis.cache import RedisCacheConfiguration

container = DIContainer()
await container.ainstall(RedisCacheConfiguration)

cache = await container.aget(CacheBackend)  # RedisCache singleton
await container.ashutdown()

Running tests

# Unit tests (no Redis required)
uv sync
uv run pytest

# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration

Delivery semantics

Redis Pub/Sub provides at-most-once delivery — messages published while no subscriber is connected are silently dropped. If you need at-least-once or exactly-once delivery, use Redis Streams (planned as varco_redis.streams in a future release) or switch to varco_kafka.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_redis-0.1.0.tar.gz (78.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_redis-0.1.0-py3-none-any.whl (63.6 kB view details)

Uploaded Python 3

File details

Details for the file varco_redis-0.1.0.tar.gz.

File metadata

  • Download URL: varco_redis-0.1.0.tar.gz
  • Upload date:
  • Size: 78.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_redis-0.1.0.tar.gz
Algorithm Hash digest
SHA256 41f485a9248762d36a950015056dc93947d882f4e77ba48f975bdc27ab6b8979
MD5 a86bcf97d71594030968908dd1acc06d
BLAKE2b-256 7b24e7cdb561e1e2a120427e88317eb79bbbe511769597318eb971e2724aabef

See more details on using hashes here.

File details

Details for the file varco_redis-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: varco_redis-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 63.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_redis-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4c040b3b04079af546e201a74172a983d7447d248f628f48229b853504e89d0e
MD5 388042fc0a8c401d31ca5fa2e9dbfb73
BLAKE2b-256 31e9437bea82227c37994b4bb596dcb7783c867ea802335f92038e805bda9598

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page