Skip to main content

Distributed event processing for Python based on Redis Streams

Project description

Runnel

pyversions LGPLv3 version

Distributed event processing for Python based on Redis Streams.

RunnelPy allows you to easily create scalable stream processors, which operate on partitions of event streams in Redis. RunnelPy takes care of assigning partitions to workers and acknowledging events automatically, so you can focus on your application logic.

Whereas traditional job queues do not provide ordering guarantees, RunnelPy is designed to process partitions of your event stream strictly in the order events are created.

Installation

pip install runnelpy

Basic Usage

from datetime import datetime

from runnelpy import App, Record

app = App(name="myapp", redis_url="redis://127.0.0.1")


# Specify event types using the Record class.
class Order(Record):
    order_id: int
    created_at: datetime
    amount: float


orders = app.stream("orders", record=Order, partition_by="order_id")


# Every 4 seconds, send an example record to the stream.
@app.timer(interval=4)
async def sender():
    await orders.send(Order(order_id=1, created_at=datetime.utcnow(), amount=9.99))


# Iterate over a continuous stream of events in your processors.
@app.processor(orders)
async def printer(events):
    async for order in events.records():
        print(f"processed {order.amount}")

Meanwhile, run the worker (assuming code in example.py and PYTHONPATH is set):

$ runnelpy worker example:app

Features

Designed to support a similar paradigm to Kafka Streams, but on top of Redis.

  • At least once processing semantics
  • Automatic partitioning of events by key
  • Each partition maintains strict ordering
  • Dynamic rebalance algorithm distributes partitions among workers on-the-fly
  • Support for nested Record types with custom serialisation and compression
  • Background tasks, including timers and cron-style scheduling
  • User-defined middleware for exception handling, e.g. dead-letter-queueing
  • A builtin batching mechanism to efficiently process events in bulk
  • A runnelpy[fast] bundle for C or Rust extension dependencies (uvloop, xxhash, orjson, lz4)

Documentation

Full documenation is available at https://runnelpy.dev.

Blog posts

Essays about this project or the technology it's using:

Local development

To run the test suite locally, clone the repo and install the optional deps (e.g. via poetry install -E fast). Make sure Redis is running on localhost at port 6379, then run pytest.

See also

For a traditional task queue that doesn't provide ordering guarantees, see our sister project Fennel.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

runnelpy-0.3.1.tar.gz (31.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

runnelpy-0.3.1-py3-none-any.whl (42.2 kB view details)

Uploaded Python 3

File details

Details for the file runnelpy-0.3.1.tar.gz.

File metadata

  • Download URL: runnelpy-0.3.1.tar.gz
  • Upload date:
  • Size: 31.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for runnelpy-0.3.1.tar.gz
Algorithm Hash digest
SHA256 ed1c8568135500c85f892e0d58ad72c4195faeb1afb7dd5eb358cd0be421b3ad
MD5 3165d8305e0827644e2ba895fabaa4e3
BLAKE2b-256 48110fed5590d5374a4ef9a6f7b115b35b84d589e148858933586e5549251942

See more details on using hashes here.

File details

Details for the file runnelpy-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: runnelpy-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 42.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.2 Linux/6.8.0-1021-azure

File hashes

Hashes for runnelpy-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 33fa5c6d726a0129af77694299893ef96c063a06f91b1925b003cd86c9627d6a
MD5 ae0b9919afb1aff48b86109f0ab455fb
BLAKE2b-256 4642ef00fd252382224ba37a4040a4e8b6ab25bb24fb8566012d5652cb9d9882

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page