A simple and reliable tasks manager. Attempt for a celery like, asyncio friendly.
Project description
aio-task
Reliable and perfoment asynchronous tasks manager that is asyncio friendly.
Key Features
- A simple worker interface to register coroutines as tasks.
- A simple broker interface to produce and fetch tasks.
- Broker and worker(s) can be setup in a single program avoiding external service dependencies (by using dummies queue and cache).
- Task is not lost if worker crash during processing it, it's keep in queue and re-processed until a worker ack.
- Task exceptions are not lost: you will retreiv them in the task's result.
- Works out of the box with rabbitmq and redis.
- Easily hackable to add new queuing/caching services
Getting Started
Use examples/docker-compose.yml
to bring up a rabbitmq and a redis to run this example.
Worker → run tasks
import asyncio
from aio_task import Worker
async def addition(a, b):
""" Task example. """
return a + b
async def start_worker():
rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}
redis_config = {"address": "redis://localhost"}
worker = await Worker.create("rabbitmq", rabbitmq_config,
"redis", redis_config)
worker.register_handler(addition)
await worker.start()
return worker
loop = asyncio.get_event_loop()
worker = loop.run_until_complete(start_worker())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(worker.close()) # gracefull shutdown
loop.close()
Broker → produce tasks
import asyncio
from aio_task import Broker
async def sample_addition():
# setup broker
rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}
redis_config = {"address": "redis://localhost"}
return await Broker.create("rabbitmq", rabbitmq_config,
"redis", redis_config)
# produce task
task_id = await broker.create_task("addition", {"a": 1, "b": 2})
await asyncio.sleep(0.1)
# fetch task
task = await broker.get_task(task_id)
print(task)
await broker.close() # gracefull shutdown
loop = asyncio.get_event_loop()
loop.run_until_complete(sample_addition())
loop.run_until_complete(broker.close())
More examples in examples/...
Run tests
unit tests
pip install -e .[test]
pytest -xvs tests/unit
integration tests
pip install -e .[test]
docker-compose -f tests/integration/compose/docker-compose up -d
pytest -xvs tests/integration
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aio-task-1.0.1.tar.gz
(19.5 kB
view hashes)
Built Distribution
aio_task-1.0.1-py3-none-any.whl
(27.3 kB
view hashes)