Skip to main content

Event processing library for Python based on Redis Streams

Project description

Vomero

Vomero is event processing mini-library based on Redis Streams. Currently still work in progress, although core features are already in place. The library is meant to not provide too much abstraction over Redis, so it's advised to get familiar with key concepts of Redis Streams itself before usage.

Features

  • "At least one" processing semantics
  • Auto-recovery for failed tasks using auto-claiming
  • Easy scaling with consumer groups
  • Support for async
  • Strictly typed

Limitations

Vomero does not provide strict ordering inside consumer groups, aiming to take maximum advantage of low delivery latency. If strict ordering is a must for your project, you probably need other solution.

Quickstart

Key concepts

  • Event is simply a sparse Python dict with some limitations. The dictionary values must be one of: bytes, memoryview, str, int, float.
  • Producer is an async function returning Event object. Returned Event is sent to the event stream.
  • Consumer is an async function which gets at leas one optional argument: Event objects. The consumer defines how the event is to be processed.

Defining producers

from vomero import Streams, Event

streams = Streams()

# Producer has to be defined as an async function returning Event
@streams.producer(stream="my-event-stream")
async def produce_event() -> Event:
    return {"content": "Hello world"}

Defining consumers

import typing

from vomero import Streams, Event

streams = Streams(decode_responses=True)

# Consumer has to be defined as async function which gets
# At least one optional argument (the event) and may return any type
@streams.consumer(
    stream="my-event-stream",
    consumer_group="my-consumer-group",
    consumer="my-consumer",
)
async def consume_and_print_event(event: typing.Optional[Event] = None) -> None:
    if event:
        content = event["content"]
        print(content)

Running as worker

import asyncio
import typing

from vomero import Streams, Event, run_as_worker

streams = Streams()

# The decorator's block parameter is recommended when running as worker
# To enable graceful stopping at SIGINT and SIGTERM signals
@streams.consumer(
    stream="my-event-stream",
    consumer_group="my-consumer-group",
    consumer="my-consumer",
    block=100
)
async def consume_event(event: typing.Optional[Event] = None) -> None:
    ...

if __name__ == "__main__":
    # Running a consumer as worker will call it in a loop
    # until SIGINT or SIGTERM is raised
    asyncio.run(run_as_worker(consume_event))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

vomero-0.1.3.tar.gz (4.3 kB view details)

Uploaded Source

Built Distribution

vomero-0.1.3-py3-none-any.whl (5.4 kB view details)

Uploaded Python 3

File details

Details for the file vomero-0.1.3.tar.gz.

File metadata

  • Download URL: vomero-0.1.3.tar.gz
  • Upload date:
  • Size: 4.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.4 Linux/6.5.0-1017-azure

File hashes

Hashes for vomero-0.1.3.tar.gz
Algorithm Hash digest
SHA256 5b8860eea446d8c510e8bc6db99f7a684f49cf2f2d57e032761689e109ac49a4
MD5 2f3e2deb5ad275b72c8b64b8b0e8ca5d
BLAKE2b-256 85827c9a076257887a58b6c42672aa0666df2d6edd149591615f880a02525b0d

See more details on using hashes here.

File details

Details for the file vomero-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: vomero-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 5.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.4 Linux/6.5.0-1017-azure

File hashes

Hashes for vomero-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 12fa98b76d5606211d2a98ba44edd614547de127e0e65259a69f52dfef76612d
MD5 bed71ea0851a8c13c6403802256b0bdb
BLAKE2b-256 17b3a1e0a5386b955c9bcf77506ff3703c9d57f31aab75b4566cdacc63aa162c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page