Skip to main content

Distributed event processing for Python based on Redis Streams

Project description

Runnel

Distributed event processing for Python based on Redis Streams.

https://runnel.dev

Runnel allows you to easily create scalable stream processors, which operate on partitions of event streams in Redis. Runnel takes care of assigning partitions to workers and acknowledging events automatically, so you can focus on your application logic.

Whereas traditional job queues do not provide ordering guarantees, Runnel is designed to process partitions of your event stream strictly in the order events are created.

Installation

pip install runnel

Basic Usage

from datetime import datetime

from runnel import App, Record

app = App(name="myapp", redis_url="redis://127.0.0.1")


# Specify event types using the Record class.
class Order(Record):
    order_id: int
    created_at: datetime
    amount: float


orders = app.stream("orders", record=Order, partition_by="order_id")


# Every 4 seconds, send an example record to the stream.
@app.timer(interval=4)
async def sender():
    await orders.send(Order(order_id=1, created_at=datetime.utcnow(), amount=9.99))


# Iterate over a continuous stream of events in your processors.
@app.processor(orders)
async def printer(events):
    async for order in events.records():
        print(f"processed {order.amount}")

Meanwhile, run the worker (assuming code in example.py and PYTHONPATH is set):

$ runnel worker example:app

Features

Designed to support a similar paradigm to Kafka Streams, but on top of Redis.

  • At least once processing semantics
  • Automatic partitioning of events by key
  • Each partition maintains strict ordering
  • Dynamic rebalance algorithm distributes partitions among workers on-the-fly
  • Support for nested Record types with custom serialisation and compression
  • Background tasks, including timers and cron-style scheduling
  • User-defined middleware for exception handling, e.g. dead-letter-queueing
  • A builtin batching mechanism to efficiently process events in bulk
  • A runnel[fast] bundle for C or Rust extension dependencies (uvloop, xxhash, orjson, lz4)

Documentation

Full documenation is available at https://runnel.dev.

Blog posts

Essays about this project or the technology it's using:

Local development

To run the test suite locally, clone the repo and install the optional deps (e.g. via poetry install -E fast). Make sure Redis is running on localhost at port 6379, then run pytest.

See also

For a traditional task queue that doesn't provide ordering guarantees, see our sister project Fennel.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

runnel-0.1.0b1.tar.gz (33.0 kB view details)

Uploaded Source

Built Distribution

runnel-0.1.0b1-py3-none-any.whl (41.5 kB view details)

Uploaded Python 3

File details

Details for the file runnel-0.1.0b1.tar.gz.

File metadata

  • Download URL: runnel-0.1.0b1.tar.gz
  • Upload date:
  • Size: 33.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.10 CPython/3.8.5 Linux/5.3.0-1034-azure

File hashes

Hashes for runnel-0.1.0b1.tar.gz
Algorithm Hash digest
SHA256 12f3dd6156d2580c31643bf8af5f7ae78a2160b7a74af37f704fdc403f0da8c7
MD5 d14805e00de269c07cfd2e94f60722a3
BLAKE2b-256 cb3c54e471c4e297aed4618bb1037fff3223964f78e9026013aefe893a63f379

See more details on using hashes here.

File details

Details for the file runnel-0.1.0b1-py3-none-any.whl.

File metadata

  • Download URL: runnel-0.1.0b1-py3-none-any.whl
  • Upload date:
  • Size: 41.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.10 CPython/3.8.5 Linux/5.3.0-1034-azure

File hashes

Hashes for runnel-0.1.0b1-py3-none-any.whl
Algorithm Hash digest
SHA256 b0accbebca0f2098a8f5138077ac388163c124dca610032df66f8b45e12ecced
MD5 d2aab8d14b6bbbfc099b7258d5654bb2
BLAKE2b-256 f6033edeecf927fe3cdd1e2f67c2551afbf7f55ee6e90ab57493d35199e29981

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page