Skip to main content

Redis Pub/Sub event bus backend for varco — RedisEventBus built on redis.asyncio

Project description

varco-redis

Redis backend for varco.

Provides two independent subsystems backed by Redis:

  • RedisEventBus — Pub/Sub event bus implementing AbstractEventBus from varco_core
  • RedisCache — async cache backend implementing CacheBackend from varco_core

Installation

pip install varco-redis
# or with uv:
uv add varco-redis

Event bus quick start

from varco_redis import RedisEventBus, RedisEventBusSettings
from varco_core.event import BusEventProducer, EventConsumer, listen, Event

# Define your events
class OrderPlacedEvent(Event):
    __event_type__ = "order.placed"
    order_id: str
    total: float

# Configure the bus
settings = RedisEventBusSettings(url="redis://localhost:6379/0")

async def main():
    async with RedisEventBus(settings) as bus:
        # --- Consumer side ---
        class OrderConsumer(EventConsumer):
            @listen(OrderPlacedEvent, channel="orders")
            async def on_placed(self, event: OrderPlacedEvent) -> None:
                print(f"Order placed: {event.order_id}")

        OrderConsumer().register_to(bus)

        # Give the Pub/Sub subscription time to establish
        import asyncio
        await asyncio.sleep(0.1)

        # --- Producer side ---
        producer = BusEventProducer(bus)
        await producer._produce(
            OrderPlacedEvent(order_id="abc", total=99.0),
            channel="orders",
        )

Event bus configuration

from varco_redis import RedisEventBusSettings

settings = RedisEventBusSettings(
    url="redis://redis.internal:6379/0",   # Redis connection URL
    channel_prefix="prod:",               # optional — "orders" → "prod:orders"
    socket_timeout=5.0,                   # seconds, None = no timeout
)
Field Default Env var Description
url "redis://localhost:6379/0" VARCO_REDIS_URL Redis connection URL
channel_prefix "" VARCO_REDIS_CHANNEL_PREFIX Prepended to every channel name
decode_responses False Must be False — bus expects raw bytes
socket_timeout None VARCO_REDIS_SOCKET_TIMEOUT Socket operation timeout in seconds
redis_kwargs {} Extra kwargs for redis.asyncio.from_url()

Event bus lifecycle

# Explicit lifecycle
bus = RedisEventBus(settings)
await bus.start()    # connects to Redis, starts listener task
# ... use bus ...
await bus.stop()     # cancels listener, closes connection

# Context manager (recommended)
async with RedisEventBus(settings) as bus:
    ...

Wildcard subscriptions

Subscribing with channel=CHANNEL_ALL (the default) uses Redis PSUBSCRIBE "*" — the handler receives events from every channel published to this Redis instance. Use channel_prefix to scope channels to your service and avoid cross-service interference:

# All events with prefix "svc-a:" on this Redis
settings = RedisEventBusSettings(channel_prefix="svc-a:")
bus.subscribe(MyEvent, handler)  # receives from all "svc-a:*" channels

Cache quick start

from varco_redis.cache import RedisCache, RedisCacheSettings

cache_settings = RedisCacheSettings(
    url="redis://localhost:6379/0",
    key_prefix="myapp:",   # all keys stored as "myapp:<key>"
    default_ttl=300,       # seconds; None = no expiry
)

async with RedisCache(cache_settings) as cache:
    await cache.set("user:42", {"name": "Alice"})
    user = await cache.get("user:42")    # returns dict or None
    await cache.delete("user:42")
    await cache.clear()                  # removes all "myapp:*" keys

Cache configuration

from varco_redis.cache import RedisCacheSettings

settings = RedisCacheSettings(
    url="redis://localhost:6379/0",
    key_prefix="prod:",
    default_ttl=600,
    socket_timeout=2.0,
)
Field Default Env var Description
url "redis://localhost:6379/0" VARCO_REDIS_CACHE_URL Redis connection URL
key_prefix "" VARCO_REDIS_CACHE_KEY_PREFIX Prepended to every stored key
default_ttl None VARCO_REDIS_CACHE_DEFAULT_TTL Default TTL in seconds; None = no expiry
decode_responses False Must be False — cache stores raw bytes
socket_timeout None VARCO_REDIS_CACHE_SOCKET_TIMEOUT Socket operation timeout in seconds
redis_kwargs {} Extra kwargs for redis.asyncio.from_url()

Layered cache (L1 memory + L2 Redis)

from varco_core.cache import InMemoryCache, LayeredCache, TTLStrategy
from varco_redis.cache import RedisCache, RedisCacheSettings

l1 = InMemoryCache(strategy=TTLStrategy(60))       # fast in-process layer
l2 = RedisCache(RedisCacheSettings(key_prefix="app:"))  # shared Redis layer

async with LayeredCache(l1, l2, promote_ttl=60) as cache:
    await cache.set("product:1", product, ttl=300)
    # First read: L1 miss → L2 hit → promote to L1
    result = await cache.get("product:1")
    # Second read: served from L1 (no network round-trip)

Cache + CachedService with cross-process invalidation

from varco_core.cache import CachedService, LayeredCache, TTLStrategy, InMemoryCache
from varco_redis.cache import RedisCache, RedisCacheSettings
from varco_redis import RedisEventBus, RedisEventBusSettings

bus_settings = RedisEventBusSettings(url="redis://localhost:6379/0")
cache_settings = RedisCacheSettings(url="redis://localhost:6379/0", key_prefix="posts:")

async with (
    RedisEventBus(bus_settings) as bus,
    LayeredCache(
        InMemoryCache(strategy=TTLStrategy(60)),
        RedisCache(cache_settings),
        promote_ttl=60,
    ) as cache,
):
    cached = CachedService(
        post_service,
        cache,
        namespace="posts",
        default_ttl=300,
        bus=bus,                           # publish invalidation events
        bus_channel="posts.invalidations", # other nodes subscribe here
    )

    post = await cached.get(42)           # cached
    posts = await cached.list()           # cached
    await cached.update(42, {"title": "New"})  # invalidates + publishes event

DI integration

from providify import DIContainer
from varco_core.cache import CacheBackend
from varco_redis.cache import RedisCacheConfiguration

container = DIContainer()
await container.ainstall(RedisCacheConfiguration)

cache = await container.aget(CacheBackend)  # RedisCache singleton
await container.ashutdown()

Running tests

# Unit tests (no Redis required)
uv sync
uv run pytest

# Integration tests (requires Docker)
VARCO_RUN_INTEGRATION=1 uv run pytest -m integration

Connection settings

RedisConnectionSettings is a structured, env-var loadable config object that produces a URL and kwargs for redis.asyncio.

Plain connection

import redis.asyncio
from varco_redis.connection import RedisConnectionSettings

conn = RedisConnectionSettings(host="my-redis", port=6379, db=0)

client = redis.asyncio.from_url(conn.to_url(), **conn.to_redis_kwargs())
# to_url()  → "redis://my-redis:6379/0"

From environment variables

REDIS_HOST=my-redis
REDIS_PORT=6379
REDIS_DB=1
conn = RedisConnectionSettings.from_env()
client = redis.asyncio.from_url(conn.to_url(), **conn.to_redis_kwargs())

With password (AUTH)

conn = RedisConnectionSettings(host="my-redis", password="s3cret")
# to_url() → "redis://:s3cret@my-redis:6379/0"
client = redis.asyncio.from_url(conn.to_url(), **conn.to_redis_kwargs())

With ACL username + password (Redis 6+)

conn = RedisConnectionSettings(
    host="my-redis",
    username="alice",
    password="s3cret",
)
# to_url() → "redis://alice:s3cret@my-redis:6379/0"

With TLS / SSL

from varco_core.connection import SSLConfig
from pathlib import Path

ssl = SSLConfig(ca_cert=Path("/etc/ssl/redis-ca.pem"), verify=True)
conn = RedisConnectionSettings.with_ssl(ssl, host="prod-redis")
# to_url()           → "rediss://prod-redis:6379/0"
# to_redis_kwargs()  → {"decode_responses": False, "ssl": <SSLContext>}

client = redis.asyncio.from_url(conn.to_url(), **conn.to_redis_kwargs())

Or from env:

REDIS_HOST=prod-redis
REDIS_SSL__CA_CERT=/etc/ssl/redis-ca.pem
REDIS_SSL__VERIFY=true

With mTLS (client certificates)

ssl = SSLConfig(
    ca_cert=Path("/etc/ssl/ca.pem"),
    client_cert=Path("/etc/ssl/client.crt"),
    client_key=Path("/etc/ssl/client.key"),
)
conn = RedisConnectionSettings.with_ssl(ssl, host="prod-redis")

Connection settings reference

Env var Default Description
REDIS_HOST localhost Redis server hostname
REDIS_PORT 6379 Redis server port
REDIS_DB 0 Database index (0–15)
REDIS_PASSWORD AUTH password
REDIS_USERNAME ACL username (Redis 6+)
REDIS_DECODE_RESPONSES false Return strings instead of bytes
REDIS_SOCKET_TIMEOUT Socket timeout in seconds
REDIS_SSL__CA_CERT Path to CA certificate
REDIS_SSL__CLIENT_CERT Path to client certificate (mTLS)
REDIS_SSL__CLIENT_KEY Path to client private key (mTLS)
REDIS_SSL__VERIFY true TLS peer verification

Note: RedisConnectionSettings is a general-purpose connection config. RedisEventBusSettings (used by RedisEventBus) is a separate, independent class — existing code that uses the event bus is unaffected.


Delivery semantics

Redis Pub/Sub provides at-most-once delivery — messages published while no subscriber is connected are silently dropped. If you need at-least-once or exactly-once delivery, use Redis Streams (planned as varco_redis.streams in a future release) or switch to varco_kafka.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

varco_redis-1.0.6.tar.gz (84.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

varco_redis-1.0.6-py3-none-any.whl (69.4 kB view details)

Uploaded Python 3

File details

Details for the file varco_redis-1.0.6.tar.gz.

File metadata

  • Download URL: varco_redis-1.0.6.tar.gz
  • Upload date:
  • Size: 84.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_redis-1.0.6.tar.gz
Algorithm Hash digest
SHA256 bc51e4aba95aa1607132f2b67e61cadf15f72e51db074764aa390d612a7875f2
MD5 6ef04d76aafe969ebe8d50d2a84d4300
BLAKE2b-256 176a35051f40be2499cdc93a0a394b39f77cac67437431beef4062ae0559ebea

See more details on using hashes here.

File details

Details for the file varco_redis-1.0.6-py3-none-any.whl.

File metadata

  • Download URL: varco_redis-1.0.6-py3-none-any.whl
  • Upload date:
  • Size: 69.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.12 {"installer":{"name":"uv","version":"0.10.12","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Debian GNU/Linux","version":"12","id":"bookworm","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for varco_redis-1.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 0023ac6e85fe8ddd1bc94d633c67edeaf8799002276283073f057f2a169af9eb
MD5 f30d452483bb94f2d04b4f4a63a071aa
BLAKE2b-256 8482ec64c3845d405639b257882c631ab6571f49d2633085330e3829e1420b04

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page