Event processing library for Python based on Redis Streams
Project description
Vomero
Vomero is event processing mini-library based on Redis Streams. Currently still work in progress, although core features are already in place. The library is meant to not provide too much abstraction over Redis, so it's advised to get familiar with key concepts of Redis Streams itself before usage.
Features
- "At least one" processing semantics
- Auto-recovery for failed tasks using auto-claiming
- Easy scaling with consumer groups
- Support for async
- Strictly typed
Limitations
Vomero does not provide strict ordering inside consumer groups, aiming to take maximum advantage of low delivery latency. If strict ordering is a must for your project, you probably need other solution.
Quickstart
Key concepts
- Event is simply a sparse Python dict with some limitations. The dictionary values must be one of:
bytes
,memoryview
,str
,int
,float
. - Producer is an async function returning Event object. Returned Event is sent to the event stream.
- Consumer is an async function which gets at leas one optional argument: Event objects. The consumer defines how the event is to be processed.
Defining producers
from vomero import Streams, Event
streams = Streams()
# Producer has to be defined as an async function returning Event
@streams.producer(stream="my-event-stream")
async def produce_event() -> Event:
return {"content": "Hello world"}
Defining consumers
import typing
from vomero import Streams, Event
streams = Streams(decode_responses=True)
# Consumer has to be defined as async function which gets
# At least one optional argument (the event) and may return any type
@streams.consumer(
stream="my-event-stream",
consumer_group="my-consumer-group",
consumer="my-consumer",
)
async def consume_and_print_event(event: typing.Optional[Event] = None) -> None:
if event:
content = event["content"]
print(content)
Running as worker
import asyncio
import typing
from vomero import Streams, Event, run_as_worker
streams = Streams()
# The decorator's block parameter is recommended when running as worker
# To enable graceful stopping at SIGINT and SIGTERM signals
@streams.consumer(
stream="my-event-stream",
consumer_group="my-consumer-group",
consumer="my-consumer",
block=100
)
async def consume_event(event: typing.Optional[Event] = None) -> None:
...
if __name__ == "__main__":
# Running a consumer as worker will call it in a loop
# until SIGINT or SIGTERM is raised
asyncio.run(run_as_worker(consume_event))
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file vomero-0.1.6.tar.gz
.
File metadata
- Download URL: vomero-0.1.6.tar.gz
- Upload date:
- Size: 4.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.4 Linux/6.5.0-1017-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2c7cd9c40f05e2f7d93ac1e6a1241c20022d45f63eb6e601062ffcd61181186b |
|
MD5 | 004ae106c2ba977544306c4a011e84b0 |
|
BLAKE2b-256 | f088b1997996170f59712efc1339780816332066bc6c0b454af1fbb66db0eef2 |
File details
Details for the file vomero-0.1.6-py3-none-any.whl
.
File metadata
- Download URL: vomero-0.1.6-py3-none-any.whl
- Upload date:
- Size: 5.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.4 Linux/6.5.0-1017-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 250a7f0485e00bbbf14e16aa79b7428ee250ae905f302463d643c3cdf2b77e88 |
|
MD5 | 2f71c021b3f8e92e3b733941a7a02f43 |
|
BLAKE2b-256 | 336ef25222194dc1cb85c2d6f3c0707731b455f97b1cafc0e35e048bd1dbaf19 |