Skip to main content

Crash-safe, stateful processing on top of Redis Streams

Project description

atomic-redis-streams

atomic-streams is a small Python library that adds crash-safe, stateful processing on top of Redis Streams.

Raw Redis Streams give you persistence and at-least-once delivery but do not give:

  • Atomic application state that survives crashes
  • Exactly-once processing (no duplicate messages after restart)
  • Downstream message publications that only happen if the callback succeeded

This library fills that gap. It wraps Redis Streams consumers in a transaction model where the callback result, application state, consumer position, and any outgoing publishes all commit together or not at all.

Installation

pip install atomic-redis-streams

Examples

The core concept is a handler: a class with a handle() method that the library calls for each message. You point a RedisStreamConsumer at a Redis stream, give it a handler, and it takes care of the rest.

Stateless consumer

The simplest handler just implements handle(). Use this when you don't need to track any state between messages:

from atomic_redis_streams import RedisStreamConsumer, TransactionManager

class Logger:
    def handle(self, message: dict, transaction: TransactionManager) -> None:
        print(f"Received: {message}")

consumer = RedisStreamConsumer(
    stream="events",
    consumer_id="logger",
    handler=Logger(),
    redis_url="redis://localhost:6379",
)
consumer.start()

Stateful consumer with crash recovery

Extend Persistable to persist state atomically with each message. On restart, state is restored and no message is processed twice:

from atomic_redis_streams import Persistable, PersistableAttribute, RedisStreamConsumer, TransactionManager

class OrderCounter(Persistable):
    count = PersistableAttribute(default=0)

    @property
    def unique_id(self) -> str:
        return "order-counter"

    def handle(self, message: dict, transaction: TransactionManager) -> None:
        self.count += 1
        print(f"Processed order {message['order_id']} (total: {self.count})")

consumer = RedisStreamConsumer(
    stream="orders",
    consumer_id="order-processor",
    handler=OrderCounter(),
    redis_url="redis://localhost:6379",
)
consumer.start()

Atomic downstream publishing

Use transaction.publish() to forward messages to another stream. The publish is committed atomically with the handler, so if the handler raises, the publish is rolled back:

from atomic_redis_streams import RedisStreamConsumer, TransactionManager

class OrderRouter:
    def handle(self, message: dict, transaction: TransactionManager) -> None:
        transaction.publish("invoices", {"order_id": message["order_id"]})
        # if an exception is raised here, the above publish() call is rolled back

consumer = RedisStreamConsumer(
    stream="orders",
    consumer_id="order-router",
    handler=OrderRouter(),
    redis_url="redis://localhost:6379",
)
consumer.start()

Full example

This example combines all three features: reading a message, updating persistent state, and publishing to a downstream stream. All three happen atomically:

from atomic_redis_streams import Persistable, PersistableAttribute, RedisStreamConsumer, TransactionManager

RESTOCK_THRESHOLD = 10

class InventoryTracker(Persistable):
    stock = PersistableAttribute(default=0)

    @property
    def unique_id(self) -> str:
        return "inventory-tracker"

    def handle(self, message: dict, transaction: TransactionManager) -> None:
        self.stock -= int(message["quantity"])
        if self.stock <= RESTOCK_THRESHOLD:
            transaction.publish("restock-alerts", {
                "sku": message["sku"],
                "stock": str(self.stock),
            })

consumer = RedisStreamConsumer(
    stream="sales",
    consumer_id="inventory-tracker",
    handler=InventoryTracker(),
    redis_url="redis://localhost:6379",
)
consumer.start()

If the process crashes mid-flight, the stock count is not updated and the restock alert is not published. On restart, processing resumes from the last successfully committed message.

Security

This library uses pickle to serialize and deserialize Persistable handler state in Redis. It assumes a trusted Redis connection. If an attacker can write arbitrary bytes to your Redis instance, they can execute arbitrary code when the state is restored on startup. Do not use this library against an untrusted or publicly accessible Redis instance.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

atomic_redis_streams-0.1.0.tar.gz (66.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

atomic_redis_streams-0.1.0-py3-none-any.whl (20.8 kB view details)

Uploaded Python 3

File details

Details for the file atomic_redis_streams-0.1.0.tar.gz.

File metadata

  • Download URL: atomic_redis_streams-0.1.0.tar.gz
  • Upload date:
  • Size: 66.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.13

File hashes

Hashes for atomic_redis_streams-0.1.0.tar.gz
Algorithm Hash digest
SHA256 4e3994b5e3bbc4a7f01da5add08473e4ffc6bac3577c6b9bc2f3a01a65e6677b
MD5 efd68fceb7c9f6d19d3994cdfb4e6246
BLAKE2b-256 d82da3a61105c8d3d91b7f6e0ae9da025e8d956c7b516d61fd5d9dd9a1a06000

See more details on using hashes here.

Provenance

The following attestation bundles were made for atomic_redis_streams-0.1.0.tar.gz:

Publisher: publish.yml on imandrefsilva/atomic-redis-streams

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file atomic_redis_streams-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for atomic_redis_streams-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b9cfa8642762aba7b51eeaad7c903f94a6c7b5afc071f4f8bf24bf581641edf5
MD5 52e32836558ffb1dece60450236b77e7
BLAKE2b-256 5a083851e9405d387023f882f56e44bfbdcb85e8eade10ab5d1ecba766dadb48

See more details on using hashes here.

Provenance

The following attestation bundles were made for atomic_redis_streams-0.1.0-py3-none-any.whl:

Publisher: publish.yml on imandrefsilva/atomic-redis-streams

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page