Taqu Task Queue system
Python Task Queue system built on Azure Service Bus queues and pydantic models.
What is this?
Lots of systems benefit from having a queue for background tasks, that run independently of e.g. your APIs or other processes. This can help e.g. by enhancing your API performance, limiting the effects of traffic peaks, as well as scaling out with parallel processing of various things.
The defacto standard for Python is Celery + RabbitMQ, but hosting RabbitMQ is another liability and not always the most fun experience, and Celery doesn’t support asyncio yet. Fully hosted solutions such as the Azure Service Bus help you get off the ground faster, with less things to worry about, and can allow you to save on costs as well.
Primarily intended for use with asyncio (from taqu.aio module), but works with non-async code just as well (using imports from the taqu module).
Supports all the basic things you could need:
- Fast insertion of tasks to queue
- Async task processing
- Easy to scale workers
- Retry logic - if there’s an uncaught exception the task will automatically be put back in the queue
- Clean shutdown on Ctrl+C (waits until tasks finish processing)
Licensing is important. This project itself uses BSD 3-clause license, but e.g. Azure library for Storage Bus and other such libraries used by it may have their own licenses.
For more information check the LICENSE -file.
In the Azure Portal set up a new Service Bus (any tier is fine), and then a queue in it. You probably want to enable partitioning, maybe also dead-lettering. Then you’ll want to get the access credentials for your code. Ensure you’ve got the Azure CLI installed and then run:
az login # Ensure you're logged in to Azure az account list # List subscriptions az account set --subscription <subscriptionId> # Set active subscription az servicebus namespace authorization-rule keys list \ --resource-group <rgName> \ --namespace-name <namespace> \ --name RootManageSharedAccessKey \ --query primaryConnectionString \ --output tsv
Also you’ll need the Taqu library installed, e.g. for use with the Azure:
pip install taqu[azure]
Then, you can set up your worker, here’s an example worker.py that you can just run with python worker.py:
import asyncio from taqu.aio import TaquAzureWorker from pydantic import BaseModel CONNECTION_STRING = "..." worker = TaquAzureWorker(CONNECTION_STRING) class CreateUser(BaseModel): username: str async def create_user(user: CreateUser): print(user.username) worker.register(create_user) worker.run()
With the worker in place, you can create a client and send some tasks
from taqu import TaquAzureClient from pydantic import BaseModel CONNECTION_STRING = "..." taqu = TaquAzureClient(CONNECTION_STRING) class CreateUser(BaseModel): username: str taqu.add_task(CreateUser(username="my_new_username"))
You can also check out the examples.
This project is run on GitHub using the issue tracking and pull requests here. If you want to contribute, feel free to submit issues (incl. feature requests) or PRs here.
To test changes locally python setup.py develop is a good way to run this, and you can python setup.py develop --uninstall afterwards (you might want to also use the --user flag).
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.