Asyncio Reliable Queue (based on redis)
Project description
arque
Asyncio Reliable Queue (based on redis)
Inspired by Tom DeWire's article "Reliable Queueing in Redis (Part 1)" [1] [2] and the "torrelque" python module [3].
Features:
- Asynchronous: based on asyncio and aioredis
- Reliable: at any moment task data stored in redis database
- Throttling: controls number of tasks in execution
- Delayed queue: defers task availability
- Dead letters: put task data in failed queue after number of predefined retry attempts
- Tested on Python 3.7 and redis server '>=3.0.6', '<=5.0.5'
- Used in containerized applications (managed by kubernetes) in high load environments
Install:
pip install arque
Usage:
import signal
import random
import logging
import asyncio
import aioredis
import time
from functools import wraps
from arque import Arque
logger = logging.getLogger(__name__)
async def shutdown(signal, loop):
"""Cleanup tasks tied to the service's shutdown."""
logging.info(f"Received exit signal {signal.name}...")
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
[task.cancel() for task in tasks]
logging.info(f"Cancelling {len(tasks)}outstanding tasks")
await asyncio.gather(*tasks)
logging.info(f"Flushing metrics")
loop.stop()
def aioredis_pool(host='redis://localhost', encoding='utf8'):
def wrapper(func):
@wraps(func)
async def wrapped():
redis = await aioredis.create_redis_pool(host, encoding=encoding)
try:
return await func(redis=redis)
finally:
redis.close()
await redis.wait_closed()
return wrapped
return wrapper
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def produce_task(redis=None):
logger.info('Starting producing...')
queue = Arque(redis=redis)
while True:
for _ in range(1):
task = {'value': random.randint(0, 99)}
task_id = f"custom_{task['value']}_{time.time()}"
logger.debug('Produced task %s', task)
await queue.enqueue(task, task_id=task_id, task_timeout=10, delay=1)
await asyncio.sleep(1)
async def process(task_data):
logger.debug('Consumed task %s', task_data)
await asyncio.sleep(1)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def consume_task(redis=None):
logger.info('Starting consuming...')
queue = Arque(redis=redis, working_limit=3)
while True:
task_id, task_data = await queue.dequeue()
if task_id == '__not_found__':
continue
if task_id == '__overloaded__':
print(f'TASK ID: {task_id}')
await asyncio.sleep(1)
continue
if task_id == '__marked_as_failed___':
print(f'FAILED ID: {task_id}')
continue
try:
await process(task_data)
await queue.release(task_id)
except Exception:
logger.exception('Job processing has failed')
await queue.requeue(task_id, delay=5)
stats = await queue.get_stats()
logger.info(stats)
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def sweep_task(redis=None):
logger.info('Starting sweeping...')
queue = Arque(redis=redis, sweep_interval=5)
await queue.schedule_sweep()
@aioredis_pool(host='redis://localhost', encoding='utf8')
async def stats_task(redis=None):
logger.info('Starting stats...')
queue = Arque(redis=redis)
while True:
stats = await queue.get_stats()
logger.info(stats)
await asyncio.sleep(5)
async def example():
tasks = []
for _ in range(5):
tasks.append(consume_task())
tasks.append(produce_task())
tasks.append(sweep_task())
tasks.append(stats_task())
await asyncio.gather(*tasks)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(message)s')
loop = asyncio.get_event_loop()
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT, signal.SIGUSR1)
for s in signals:
loop.add_signal_handler(s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
try:
loop.run_until_complete(example())
finally:
loop.close()
logging.info("Successfully shutdown...")
Reference
[1] Reliable Queueing in Redis (Part 1)
[2] DEWIRE Redis as a Reliable Work Queue.pdf
[3] torrelque
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
arque-1.1.0.tar.gz
(6.8 kB
view details)
Built Distribution
arque-1.1.0-py3-none-any.whl
(7.5 kB
view details)
File details
Details for the file arque-1.1.0.tar.gz
.
File metadata
- Download URL: arque-1.1.0.tar.gz
- Upload date:
- Size: 6.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 66c0af553a2155ae2ee96938b040c8703fa0b547e71520b73a1a2345d3129ae7 |
|
MD5 | 6970b6469aba6598fcff3ebe698812bd |
|
BLAKE2b-256 | 818a914ff4fe52519c4e9728f43ac1d91981a41ac73a609676c88a849f8a463c |
File details
Details for the file arque-1.1.0-py3-none-any.whl
.
File metadata
- Download URL: arque-1.1.0-py3-none-any.whl
- Upload date:
- Size: 7.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | dfc7383417bc8a2d99a20f4d5ce119a3d4cd80055185153a0e39e3377387febb |
|
MD5 | e54a8dff03aa32000d545093fc7ca5c3 |
|
BLAKE2b-256 | 6f06ceaaab770679ec5c1b4078fca8a461471ee89e3bc9ee12df4b8082f5b859 |