Crash-safe, stateful processing on top of Redis Streams
Project description
atomic-redis-streams
atomic-streams is a small Python library that adds crash-safe, stateful processing on top of Redis Streams.
Raw Redis Streams give you persistence and at-least-once delivery but do not give:
- Atomic application state that survives crashes
- Exactly-once processing (no duplicate messages after restart)
- Downstream message publications that only happen if the callback succeeded
This library fills that gap. It wraps Redis Streams consumers in a transaction model where the callback result, application state, consumer position, and any outgoing publishes all commit together or not at all.
Installation
pip install atomic-redis-streams
Examples
The core concept is a handler: a class with a handle() method that the library calls for each message. You point a RedisStreamConsumer at a Redis stream, give it a handler, and it takes care of the rest.
Stateless consumer
The simplest handler just implements handle(). Use this when you don't need to track any state between messages:
from atomic_redis_streams import RedisStreamConsumer, TransactionManager
class Logger:
def handle(self, message: dict, transaction: TransactionManager) -> None:
print(f"Received: {message}")
consumer = RedisStreamConsumer(
stream="events",
consumer_id="logger",
handler=Logger(),
redis_url="redis://localhost:6379",
)
consumer.start()
Stateful consumer with crash recovery
Extend Persistable to persist state atomically with each message. On restart, state is restored and no message is processed twice:
from atomic_redis_streams import Persistable, PersistableAttribute, RedisStreamConsumer, TransactionManager
class OrderCounter(Persistable):
count = PersistableAttribute(default=0)
@property
def unique_id(self) -> str:
return "order-counter"
def handle(self, message: dict, transaction: TransactionManager) -> None:
self.count += 1
print(f"Processed order {message['order_id']} (total: {self.count})")
consumer = RedisStreamConsumer(
stream="orders",
consumer_id="order-processor",
handler=OrderCounter(),
redis_url="redis://localhost:6379",
)
consumer.start()
Atomic downstream publishing
Use transaction.publish() to forward messages to another stream. The publish is committed atomically with the handler, so if the handler raises, the publish is rolled back:
from atomic_redis_streams import RedisStreamConsumer, TransactionManager
class OrderRouter:
def handle(self, message: dict, transaction: TransactionManager) -> None:
transaction.publish("invoices", {"order_id": message["order_id"]})
# if an exception is raised here, the above publish() call is rolled back
consumer = RedisStreamConsumer(
stream="orders",
consumer_id="order-router",
handler=OrderRouter(),
redis_url="redis://localhost:6379",
)
consumer.start()
Full example
This example combines all three features: reading a message, updating persistent state, and publishing to a downstream stream. All three happen atomically:
from atomic_redis_streams import Persistable, PersistableAttribute, RedisStreamConsumer, TransactionManager
RESTOCK_THRESHOLD = 10
class InventoryTracker(Persistable):
stock = PersistableAttribute(default=0)
@property
def unique_id(self) -> str:
return "inventory-tracker"
def handle(self, message: dict, transaction: TransactionManager) -> None:
self.stock -= int(message["quantity"])
if self.stock <= RESTOCK_THRESHOLD:
transaction.publish("restock-alerts", {
"sku": message["sku"],
"stock": str(self.stock),
})
consumer = RedisStreamConsumer(
stream="sales",
consumer_id="inventory-tracker",
handler=InventoryTracker(),
redis_url="redis://localhost:6379",
)
consumer.start()
If the process crashes mid-flight, the stock count is not updated and the restock alert is not published. On restart, processing resumes from the last successfully committed message.
Security
This library uses pickle to serialize and deserialize Persistable handler state in Redis. It assumes a trusted Redis connection. If an attacker can write arbitrary bytes to your Redis instance, they can execute arbitrary code when the state is restored on startup. Do not use this library against an untrusted or publicly accessible Redis instance.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file atomic_redis_streams-0.1.0.tar.gz.
File metadata
- Download URL: atomic_redis_streams-0.1.0.tar.gz
- Upload date:
- Size: 66.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4e3994b5e3bbc4a7f01da5add08473e4ffc6bac3577c6b9bc2f3a01a65e6677b
|
|
| MD5 |
efd68fceb7c9f6d19d3994cdfb4e6246
|
|
| BLAKE2b-256 |
d82da3a61105c8d3d91b7f6e0ae9da025e8d956c7b516d61fd5d9dd9a1a06000
|
Provenance
The following attestation bundles were made for atomic_redis_streams-0.1.0.tar.gz:
Publisher:
publish.yml on imandrefsilva/atomic-redis-streams
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
atomic_redis_streams-0.1.0.tar.gz -
Subject digest:
4e3994b5e3bbc4a7f01da5add08473e4ffc6bac3577c6b9bc2f3a01a65e6677b - Sigstore transparency entry: 2011593784
- Sigstore integration time:
-
Permalink:
imandrefsilva/atomic-redis-streams@db0620dc54194b24e73f793edcd733840a07bca0 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/imandrefsilva
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@db0620dc54194b24e73f793edcd733840a07bca0 -
Trigger Event:
release
-
Statement type:
File details
Details for the file atomic_redis_streams-0.1.0-py3-none-any.whl.
File metadata
- Download URL: atomic_redis_streams-0.1.0-py3-none-any.whl
- Upload date:
- Size: 20.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b9cfa8642762aba7b51eeaad7c903f94a6c7b5afc071f4f8bf24bf581641edf5
|
|
| MD5 |
52e32836558ffb1dece60450236b77e7
|
|
| BLAKE2b-256 |
5a083851e9405d387023f882f56e44bfbdcb85e8eade10ab5d1ecba766dadb48
|
Provenance
The following attestation bundles were made for atomic_redis_streams-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on imandrefsilva/atomic-redis-streams
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
atomic_redis_streams-0.1.0-py3-none-any.whl -
Subject digest:
b9cfa8642762aba7b51eeaad7c903f94a6c7b5afc071f4f8bf24bf581641edf5 - Sigstore transparency entry: 2011593889
- Sigstore integration time:
-
Permalink:
imandrefsilva/atomic-redis-streams@db0620dc54194b24e73f793edcd733840a07bca0 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/imandrefsilva
-
Access:
private
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@db0620dc54194b24e73f793edcd733840a07bca0 -
Trigger Event:
release
-
Statement type: