Toolset for asynchronous message processing backed by Redis as the message broker
Project description
redispatcher
redispatcher is a small library that allows you to specify a pool of workers that listen to messages added to queues in Redis. This allows you to execute long running background tasks asynchronously, like sending a welcome email after a user registers.
redispatcher relies on
aioredis
to publish to Redis queues and for your consumers to read from Redispydantic
to validate all messages and make sure they conform to the shape you specify
You should try redispatcher if you
- Have a redis instance
- Have a web service that needs to process long running tasks in the background
- Don't want to deal with setting up Rabbit and cumbersome libraries like Celery
Overview
redispatcher can be broken down into three (ish) parts.
Consumer
It all begins with a consumer. A consumer is just a class that defines the structure of the mssages it will be listening for and a function that implements the logic for processing that message.
Publishing
Every consumer you define will provide you with an easy publish
method that you can use to queue up messages. Because we use Pydantic, it will validate and ensure that any messages you send/receive have to be formatted correctly.
Consumer Pool
A consumer pool is a separate process that listens for all relevant messages queued up in Redis and dispatches them to the designated consumers to be processed.
Install
$ pip install redispatcher
Basic Consumer
# my_consumer.py
from redispatcher import BaseConsumer
class MyConsumer(BaseConsumer):
QUEUE = "my-queue-key"
class Message(BaseConsumer.Message):
email: str
name: str
registered: bool
async def process_message(self, message: Message):
print(f"processing message {message}")
...
Running your consumers in a pool
Defining your pool
# pool.py
from redispatcher import ConsumerPool, RedispatcherConfig, ConsumerConfig
from my_consumer import MyConsumer
config = RedispatcherConfig(
redis_dsn="rediss://", # if not provided, will read from env
consumers=[
ConsumerConfig(
consumer_class=MyConsumer
)
]
)
if __name__ == "__main__":
consumer_pool = ConsumerPool(config)
consumer_pool.start()
$ python pool.py
Publishing messages to your pool
# endpoint.py
from my_consumer import MyConsumer
from clients import my_aioredis_client
@app.post("/signup")
async def signup()
# queue up work to send a welcome email while we continue with the rest of our endpoint logic
await MyConsumer.publish(MyConsumer.Message(email=..., name=..., registered=True), my_aioredis_client)
Advanced usage
We built redispatcher with a couple of handy utilities, but kept it as minimal as possible for your own consumers to be subclassed and implement any logging/tracing/etc logic yourself.
Take a look at examples/nicer_consumer.py
and examples/example_publisher.py
for some examples of what's possible.
Contributing
If you have a suggestion on how to improve redispatcher or experience a bug file an issue at github.com/rafalstapinski/redispatcher/issues.
If you want to contribute, open a PR at github.com/rafalstapinski/redispatcher.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for redispatcher-0.1.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0ce26e6be0763d4010fb010fe7bd1aa56f7905fdb57ffe53008abc12301ab64a |
|
MD5 | 9c8061c63d8f92d6f49676f4a6c14ae9 |
|
BLAKE2b-256 | 29db05e98bb51f0261a2a065c773053d5793b1281eb53bef5c6ff6b0897bfb55 |