FastAPI Queue
Project description
FastAPI queue tasks
Introduction
FastAPI Queue helps you add tasks to a queue and run them in parallel, supporting both asynchronous and synchronous task handlers.
How to use
1. Instantiate the Queue
First, create an instance of the Queue class by providing a Redis instance and configuration options like concurrency and max retry attempts:
# config/queue.py
from fastapi_queue_task import Queue
from redis.asyncio.utils import from_url
queue = Queue('main', redis, {'concurrency': QUEUE_MAX_CONCURRENCY, 'max_attempt': QUEUE_MAX_ATTEMP})
def queue_processing(task: Task):
print('TASK_DETAIL')
print('task name', task.get('name'))
print('task data', task.get('data'))
print('task attempt', task.get('attempt'))
def init_queue():
if env.IS_ALLOW_RUN_QUEUE:
queue.run(queue_processing)
# bootstrap/app.py
@asynccontextmanager
async def lifespan(app: FastAPI):
init_queue()
yield
app = FastAPI(title='API', lifespan=lifespan)
2. Add task to queue:
You can add a task to the queue by calling the add_to_queue method, passing the task name and the data to be processed:
# mail_service.py
from config import queue
await queue.add_to_queue(name="TASK_NAME", data: Any = {})
async def track(channel_id: UUID, dto: TrackHotspotMediaViewDto):
return await queue.add(
TrackViewTaskEnum.POST,
{
'post_id': dto.post_id
},
)
3. Define a Task Handler
Define the task handler using the @task decorator. The handler will process the task asynchronously or synchronously based on its definition:
# tasks/track_view.py
from fastapi_queue_task import BaseTask, task
@task(TrackViewTaskEnum.POST)
class TrackViewPostTask(BaseTask):
def __init__(self, track_post_view_service: TrackPostViewService = Depends()):
self.track_post_view_service = track_post_view_service
async def handler(self, data: TrackPostViewDataType):
await self.track_post_view_service.handle_tracking(data)
In this example, the task TrackViewPostTask is added to the queue, and the handler method will process the task asynchronously.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file fastapi_queue_task-1.0.1.tar.gz
.
File metadata
- Download URL: fastapi_queue_task-1.0.1.tar.gz
- Upload date:
- Size: 7.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.14
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0d70e07116db1c5110217342a25c75c3bc688a4326f37fc0487f2eb67d6671c1 |
|
MD5 | 1e752684dd1e1c335ea4549848659be3 |
|
BLAKE2b-256 | 5c2bcc5f45c6fce746ca13a175cd4f38182dfafd7514de2efce01274afd0546e |
File details
Details for the file fastapi_queue_task-1.0.1-py3-none-any.whl
.
File metadata
- Download URL: fastapi_queue_task-1.0.1-py3-none-any.whl
- Upload date:
- Size: 8.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.14
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4a12958308685a140c986f136fbe65ea18d4608892e535dc3c5b32c171069914 |
|
MD5 | 3b695515812524082234854728b9dbe1 |
|
BLAKE2b-256 | 8834b0dd3af86007c136cb8274fba1c0f199319a5749a9d799d5d20f41e0e051 |