Distributed event processing for Python based on Redis Streams
Project description
Runnel
Distributed event processing for Python based on Redis Streams.
RunnelPy allows you to easily create scalable stream processors, which operate on partitions of event streams in Redis. RunnelPy takes care of assigning partitions to workers and acknowledging events automatically, so you can focus on your application logic.
Whereas traditional job queues do not provide ordering guarantees, RunnelPy is designed to process partitions of your event stream strictly in the order events are created.
Installation
pip install runnelpy
Basic Usage
from datetime import datetime
from runnelpy import App, Record
app = App(name="myapp", redis_url="redis://127.0.0.1")
# Specify event types using the Record class.
class Order(Record):
order_id: int
created_at: datetime
amount: float
orders = app.stream("orders", record=Order, partition_by="order_id")
# Every 4 seconds, send an example record to the stream.
@app.timer(interval=4)
async def sender():
await orders.send(Order(order_id=1, created_at=datetime.utcnow(), amount=9.99))
# Iterate over a continuous stream of events in your processors.
@app.processor(orders)
async def printer(events):
async for order in events.records():
print(f"processed {order.amount}")
Meanwhile, run the worker (assuming code in example.py
and PYTHONPATH
is set):
$ runnelpy worker example:app
Features
Designed to support a similar paradigm to Kafka Streams, but on top of Redis.
- At least once processing semantics
- Automatic partitioning of events by key
- Each partition maintains strict ordering
- Dynamic rebalance algorithm distributes partitions among workers on-the-fly
- Support for nested Record types with custom serialisation and compression
- Background tasks, including timers and cron-style scheduling
- User-defined middleware for exception handling, e.g. dead-letter-queueing
- A builtin batching mechanism to efficiently process events in bulk
- A
runnelpy[fast]
bundle for C or Rust extension dependencies (uvloop, xxhash, orjson, lz4)
Documentation
Full documenation is available at https://runnelpy.dev.
Blog posts
Essays about this project or the technology it's using:
Local development
To run the test suite locally, clone the repo and install the optional deps
(e.g. via poetry install -E fast
). Make sure Redis is running on localhost at
port 6379, then run pytest
.
See also
For a traditional task queue that doesn't provide ordering guarantees, see our sister project Fennel.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file runnelpy-0.2.13.tar.gz
.
File metadata
- Download URL: runnelpy-0.2.13.tar.gz
- Upload date:
- Size: 32.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.8.18 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1ca2d6c8f09f4d2d0686a7b75279d183bf7fdf3260d920b30d5af1793dfb6fd5 |
|
MD5 | dba5af09417c348350199deb11cc7eb3 |
|
BLAKE2b-256 | e4735110b09575001417ada703307b80788a8e8a70d581c019ddec4de908779b |
File details
Details for the file runnelpy-0.2.13-py3-none-any.whl
.
File metadata
- Download URL: runnelpy-0.2.13-py3-none-any.whl
- Upload date:
- Size: 41.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.4 CPython/3.8.18 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d9e0f4780a2902978a1492cb718f97ded80ee2de5ebad78e57915af807f58a08 |
|
MD5 | 6141c8c06ac52c1704e897dd53a50fdb |
|
BLAKE2b-256 | f1ca208db5299db2156da373405da3bc2c5a65dfe2ccd5abd36a0a41f36d17fa |