Asynchronous Redis-based reliable queue package
Project description
Torrelque
Torrelque is a Python package that provides an asynchronous reliable Redis-backed queues.
Without further ado it’s easy to say that the package is an implementation of the queue described in this blog post [1] with some required changes and improvements. And here’s a great overview presentation, Redis as a Reliable Work Queue [2], from the same engineers.
Prior to version 0.2 Torrelque was Tornado-specific. Since version 5 Tornado runs on asyncio event loop by default, hence Torrelque can still be used with it.
Install
pip install Torrelque
Use
#!/usr/bin/env python3.7
import random
import logging
import asyncio
import aredis
from torrelque import Torrelque
logger = logging.getLogger(__name__)
async def produce():
redis = aredis.StrictRedis(decode_responses=True)
queue = Torrelque(redis)
while True:
for _ in range(5):
task = {'value': random.randint(0, 99)}
logger.debug('Produced task %s', task)
await queue.enqueue(task)
await asyncio.sleep(10)
async def process(task_data):
logger.debug('Consmed task %s', task_data)
await asyncio.sleep(1)
async def consume():
redis = aredis.StrictRedis(decode_responses=True)
queue = Torrelque(redis)
while True:
task_id, task_data = await queue.dequeue()
if not task_id:
continue
try:
await process(task_data)
await queue.release(task_id)
except Exception:
logger.exception('Job processing has failed')
await queue.requeue(task_id, delay = 30)
async def main():
for _ in range(4):
asyncio.create_task(consume())
await produce()
if __name__ == '__main__':
logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s %(message)s')
asyncio.run(main())
Prior art
There’s related, synchronous task queue package called saferedisqueue [3].
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.