Skip to main content

FastAPI Queue

Project description

FastAPI queue tasks

Introduction

FastAPI Queue helps you add tasks to a queue and run them in parallel, supporting both asynchronous and synchronous task handlers.

How to use

1. Instantiate the Queue

First, create an instance of the Queue class by providing a Redis instance and configuration options like concurrency and max retry attempts:

# config/queue.py

from fastapi_queue_task import Queue
from redis.asyncio.utils import from_url

queue = Queue('main', redis, {'concurrency': QUEUE_MAX_CONCURRENCY, 'max_attempt': QUEUE_MAX_ATTEMP})

def queue_processing(task: Task):
  print('TASK_DETAIL')
  print('task name', task.get('name'))
  print('task data', task.get('data'))
  print('task attempt', task.get('attempt'))


def init_queue():
  if env.IS_ALLOW_RUN_QUEUE:
    queue.run(queue_processing)
# bootstrap/app.py

@asynccontextmanager
async def lifespan(app: FastAPI):
  init_queue()
  yield

app = FastAPI(title='API', lifespan=lifespan)

2. Add task to queue:

You can add a task to the queue by calling the add_to_queue method, passing the task name and the data to be processed:

# mail_service.py
from config import queue

await queue.add(name="TASK_NAME", data: Any = {})

async def track(channel_id: UUID, dto: TrackHotspotMediaViewDto):
    return await queue.add(
      TrackViewTaskEnum.POST,
      {
        'post_id': dto.post_id
      },
    )

3. Define a Task Handler

Define the task handler using the @task decorator. The handler will process the task asynchronously or synchronously based on its definition:

# tasks/track_view.py

from fastapi_queue_task import BaseTask, task

@task(TrackViewTaskEnum.POST)
class TrackViewPostTask(BaseTask):
  def __init__(self, track_post_view_service: TrackPostViewService = Depends()):
    self.track_post_view_service = track_post_view_service

  async def handler(self, data: TrackPostViewDataType):
    await self.track_post_view_service.handle_tracking(data)

In this example, the task TrackViewPostTask is added to the queue, and the handler method will process the task asynchronously.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_queue_task-1.0.4.tar.gz (7.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastapi_queue_task-1.0.4-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_queue_task-1.0.4.tar.gz.

File metadata

  • Download URL: fastapi_queue_task-1.0.4.tar.gz
  • Upload date:
  • Size: 7.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.10.16

File hashes

Hashes for fastapi_queue_task-1.0.4.tar.gz
Algorithm Hash digest
SHA256 7e95057cc5dc1a318d569875fed06e4177e024afd341e548129950d1e575d85e
MD5 fe2c942c2645802ea788d46729339956
BLAKE2b-256 3c14cecfec45b93d8ec8096fa6784c455feac6e3eae76cfebd0d75b63786dba7

See more details on using hashes here.

File details

Details for the file fastapi_queue_task-1.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for fastapi_queue_task-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 38f95ea8d4c771f09aa36bf9d963338d26e17458352d7d7e6a149152ce40c484
MD5 2b7990d8407b7786322619749b0a9c9e
BLAKE2b-256 ea8afda7315026fd6621f0176ff597fdc2f640f741bb5fee3db7bb41a55a3cb2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page