Asyncio Reliable Queue (based on redis)
Project description
arque
Asyncio Reliable Queue (based on redis)
Inspired by Tom DeWire's article "Reliable Queueing in Redis (Part 1)" [1] [2] and the "torrelque" python module [3].
Features:
- Asynchronous: based on asyncio and aioredis
- Reliable: at any moment task data stored in redis database
- Throttling: controls number of tasks in execution
- Delayed queue: defers task availability
- Dead letters: put task data in failed queue after number of predefined retry attempts
- Tested on Python 3.7 and redis server '>=3.0.6', '<=5.0.5'
- Used in containerized applications (managed by kubernetes) in high load environments
Install:
pip install arque
Usage:
import signal
import random
import logging
import asyncio
import aioredis
import time
from functools import wraps
from arque import Arque
logger = logging.getLogger(__name__)
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
logging.info(f"Received exit signal {signal.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)}outstanding tasks")
await asyncio.gather(*tasks)
logging.info(f"Flushing metrics")
loop.stop()
def aioredis_pool(host='redis://localhost', encoding='utf8'):
def wrapper(func):
@wraps(func)
async def wrapped():
redis = await aioredis.create_redis_pool(host, encoding=encoding)
try:
return await func(redis=redis)
finally:
redis.close()
await redis.wait_closed()
return wrapped
return wrapper
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def produce_task(redis=None):
logger.info('Starting producing...')
queue = Arque(redis=redis)
while True:
for _ in range(1):
task = {'value': random.randint(0, 99)}
task_id = f"custom_{task['value']}_{time.time()}"
logger.debug('Produced task %s', task)
await queue.enqueue(task, task_id=task_id, task_timeout=10, delay=1)
await asyncio.sleep(1)
async def process(task_data):
logger.debug('Consumed task %s', task_data)
await asyncio.sleep(1)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def consume_task(redis=None):
logger.info('Starting consuming...')
queue = Arque(redis=redis, working_limit=3)
while True:
task_id, task_data = await queue.dequeue()
if task_id == '__not_found__':
continue
if task_id == '__overloaded__':
print(f'TASK ID: {task_id}')
await asyncio.sleep(1)
continue
if task_id == '__marked_as_failed___':
print(f'FAILED ID: {task_id}')
continue
try:
await process(task_data)
await queue.release(task_id)
except Exception:
logger.exception('Job processing has failed')
await queue.requeue(task_id, delay=5)
stats = await queue.get_stats()
logger.info(stats)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def sweep_task(redis=None):
logger.info('Starting sweeping...')
queue = Arque(redis=redis, sweep_interval=5)
await queue.schedule_sweep()
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def stats_task(redis=None):
logger.info('Starting stats...')
queue = Arque(redis=redis)
while True:
stats = await queue.get_stats()
logger.info(stats)
await asyncio.sleep(5)
async def example():
tasks = []
for _ in range(5):
tasks.append(consume_task())
tasks.append(produce_task())
tasks.append(sweep_task())
tasks.append(stats_task())
await asyncio.gather(*tasks)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s')
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGUSR1)
for s in signals:
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
try:
loop.run_until_complete(example())
finally:
loop.close()
logging.info("Successfully shutdown...")
Reference
[1] Reliable Queueing in Redis (Part 1)
[2] DEWIRE Redis as a Reliable Work Queue.pdf
[3] torrelque
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
arque-1.1.0.tar.gz
(6.8 kB
view hashes)
Built Distribution
arque-1.1.0-py3-none-any.whl
(7.5 kB
view hashes)