A simple and reliable tasks manager. Attempt for a celery like, asyncio friendly.
Project description
aio-task
Simple and reliable asynchronous tasks manager that is asyncio friendly.
Key Features
- A simple worker interface to register coroutines as tasks.
- A simple broker interface to produce and fetch tasks.
- Broker and worker(s) can be setup in a single program avoiding external service dependencies (by using dummies queue and cache).
- Task is not lost if worker crash during processing it, it's kept in the queue and re-processed until a worker acknowledge it.
- Task exceptions are not lost: you will retrieve them in the task's result.
- Support rabbitmq, redis and sentinel.
- Easily hackable to add new queuing/caching services
Getting Started
Use docker-compose -f examples/docker-compose.yml up
to bring up a rabbitmq and a redis to run this example.
Install
pip install aio-task
Worker → run tasks
import asyncio
from aio_task import Worker
async def addition(a, b):
""" Task example. """
return a + b
async def start_worker():
rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}
redis_config = {"address": "redis://localhost"}
worker = await Worker.create("rabbitmq", rabbitmq_config,
"redis", redis_config)
worker.register_handler(addition)
await worker.start()
return worker
loop = asyncio.get_event_loop()
worker = loop.run_until_complete(start_worker())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(worker.close()) # gracefull shutdown
loop.close()
Broker → produce tasks
import asyncio
from aio_task import Broker
async def sample_addition():
# setup broker
rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}
redis_config = {"address": "redis://localhost"}
broker = await Broker.create("rabbitmq", rabbitmq_config,
"redis", redis_config)
# produce task
task_id = await broker.create_task("addition", {"a": 1, "b": 2})
await asyncio.sleep(0.1)
# fetch task
task = await broker.get_task(task_id)
print(task)
await broker.close() # graceful shutdown
loop = asyncio.get_event_loop()
loop.run_until_complete(sample_addition())
loop.run_until_complete(broker.close())
💡 More examples in examples/ !
Run tests
unit tests
pip install -e .[test]
pytest -xvs tests/unit
integration tests
pip install -e .[test]
docker-compose -f tests/integration/compose/docker-compose.yml up -d
IP_HOST=localhost pytest -xvs tests/integration
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aio-task-1.1.0.tar.gz
(19.1 kB
view details)
Built Distribution
aio_task-1.1.0-py3-none-any.whl
(28.2 kB
view details)
File details
Details for the file aio-task-1.1.0.tar.gz
.
File metadata
- Download URL: aio-task-1.1.0.tar.gz
- Upload date:
- Size: 19.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/53.0.0 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.6.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e48131054137992f89c809e784cf4b328a459cd9aa54b1d1389d519bf92e8b02 |
|
MD5 | eb1637d8a5c0734f456d2f65d58351be |
|
BLAKE2b-256 | 6becb6591a13989b0888f013cfb3bf29c9816588fee0af582c36086685071805 |
File details
Details for the file aio_task-1.1.0-py3-none-any.whl
.
File metadata
- Download URL: aio_task-1.1.0-py3-none-any.whl
- Upload date:
- Size: 28.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.3.0 pkginfo/1.7.0 requests/2.25.1 setuptools/53.0.0 requests-toolbelt/0.9.1 tqdm/4.56.2 CPython/3.6.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 08dd2fa8d423958e13126b6a97f45e6c41cfb662fc6f6e80cb029efe093b613f |
|
MD5 | 5ccc4091464d84e56d735d734c8b3899 |
|
BLAKE2b-256 | 2dafed107bb4573a9ec6f4ccbe031449a2462e9cb792989408f325204785f168 |