A simple worker task queue with async support
Project description
AsyncQ
A simple worker task queue with async support
Installation
pip install async_q
Features
- Submit asynchronously task
- Task routing
- Distributed async worker process
Use Case: Asynchronous Task Queue for I/O-Bound Task Processing
Overview
In scenarios where you need to execute I/O-bound tasks asynchronously within a single application, the "async_q" library provides a straightforward solution. This use case illustrates how to create and utilize the library for such purposes.
Prerequisites
Before using the "async_q" library, ensure you have Python 3.8 and Redis (Version 5.0 to current) installed, along with the required dependencies.
Setting Up the Application
1. Creating the Async Task Queue App
Begin by creating an instance of the AsyncTaskQueue
to set up your application. In this example, we will name our app "async_q_app" and configure it to use Redis as the message broker.
# main_app.py
from async_q import AsyncTaskQueue, RedisBuilder
# Define Async Task Queue App
async_q_app = AsyncTaskQueue(
redis_builder=RedisBuilder(
port='6379',
)
)
2. Defining the Task Function
Next, define a task function that will be submitted to the queue for processing. In this example, we have a task function named my_task
. This function simulates I/O waiting with a specified delay.
# my_task.py
import logging
import asyncio
from async_q import submit_task
# For initializing the app
from main_app import async_q_app
async def my_task(idx, delay=2, *args, **kwargs):
# Simulate I/O waiting
await asyncio.sleep(delay)
logging.info(f'{idx} has finished the task. Task ID: {kwargs.get("task_id")}')
3. Submitting Tasks
To submit tasks for processing, you can use the submit_task
function. In this example, we submit 20 tasks to be processed by the queue.
if __name__ == '__main__':
for i in range(20):
submit_task(my_task, kwargs={'idx': i, 'delay': 10})
In the previous example, the submit_task
function also includes a queue
argument with the default value set to default
. This offers flexibility in case you want to specify a different queue for a particular task, but if no queue is provided, it will use the default value of default
.
if __name__ == '__main__':
for i in range(20):
# Submit a task with a default queue value
submit_task(my_task, kwargs={'idx': i, 'delay': 10, 'queue': 'default'})
Running Worker Processes
4. Initializing Worker Processes
In order to efficiently handle incoming tasks, it's crucial to set up and launch worker processes. The level of concurrency can be precisely determined by employing the -c
flag. In the following example, we will initiate five worker processes that target the async_q_app
module specified in main_app.py
. By default, these workers will exclusively process tasks from the default
queue.
$ python -m async_q -a main_app.py:async_q_app -c 5
However, you also have the flexibility to assign specific workers to process tasks from different queues, as demonstrated below:
$ python -m async_q -a main_app.py:async_q_app -c 5 -q mail_ps
By making these adjustments, you can tailor the behavior of your worker processes to suit your application's specific requirements.
5. Submitting Tasks for Processing
With the worker processes running, you can now submit tasks for processing. Use the following command to execute the my_task.py
script, which submits tasks to the queue.
$ python my_task.py
Result
The worker processes will asynchronously process the submitted tasks with the specified delays. You can monitor the progress and completion of tasks through log messages generated by the my_task
function. The "Async Queue" library is suitable for I/O-bound workloads that benefit from asynchronous processing within a single application.
Todo
- Test and check back compatibility
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file async_q-0.2-py3-none-any.whl
.
File metadata
- Download URL: async_q-0.2-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9aeafbf8213edd4f9b70bfa8824c5e76ae0ec3aa8780438b9b738e72692ddf17 |
|
MD5 | 5ab3efd84cbd3e1cee580f3559796547 |
|
BLAKE2b-256 | 4b675bdee790e1da0cae080fc22759dbd9a4b3bb45dc94a3850d362d81de1a7e |