Skip to main content

FastAPI Queue

Project description

FastAPI queue tasks

Introduction

FastAPI Queue helps you add tasks to a queue and run them in parallel, supporting both asynchronous and synchronous task handlers.

How to use

1. Instantiate the Queue

First, create an instance of the Queue class by providing a Redis instance and configuration options like concurrency and max retry attempts:

# config/queue.py

from fastapi_queue_task import Queue
from redis.asyncio.utils import from_url

queue = Queue('main', redis, {'concurrency': QUEUE_MAX_CONCURRENCY, 'max_attempt': QUEUE_MAX_ATTEMP})

def queue_processing(task: Task):
  print('TASK_DETAIL')
  print('task name', task.get('name'))
  print('task data', task.get('data'))
  print('task attempt', task.get('attempt'))


def init_queue():
  if env.IS_ALLOW_RUN_QUEUE:
    queue.run(queue_processing)
# bootstrap/app.py

@asynccontextmanager
async def lifespan(app: FastAPI):
  init_queue()
  yield

app = FastAPI(title='API', lifespan=lifespan)

2. Add task to queue:

You can add a task to the queue by calling the add_to_queue method, passing the task name and the data to be processed:

# mail_service.py
from config import queue

await queue.add(name="TASK_NAME", data: Any = {})

async def track(channel_id: UUID, dto: TrackHotspotMediaViewDto):
    return await queue.add(
      TrackViewTaskEnum.POST,
      {
        'post_id': dto.post_id
      },
    )

3. Define a Task Handler

Define the task handler using the @task decorator. The handler will process the task asynchronously or synchronously based on its definition:

# tasks/track_view.py

from fastapi_queue_task import BaseTask, task

@task(TrackViewTaskEnum.POST)
class TrackViewPostTask(BaseTask):
  def __init__(self, track_post_view_service: TrackPostViewService = Depends()):
    self.track_post_view_service = track_post_view_service

  async def handler(self, data: TrackPostViewDataType):
    await self.track_post_view_service.handle_tracking(data)

In this example, the task TrackViewPostTask is added to the queue, and the handler method will process the task asynchronously.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_queue_task-1.0.2.tar.gz (7.9 kB view details)

Uploaded Source

Built Distribution

fastapi_queue_task-1.0.2-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_queue_task-1.0.2.tar.gz.

File metadata

  • Download URL: fastapi_queue_task-1.0.2.tar.gz
  • Upload date:
  • Size: 7.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.14

File hashes

Hashes for fastapi_queue_task-1.0.2.tar.gz
Algorithm Hash digest
SHA256 49e698457c21ec9e70fd556d166db1d36b6953c2c370bbd722c0a9b717e68cd0
MD5 56512bf3b1bb6dc5bec1e3afa5d1da0a
BLAKE2b-256 d8d8557534e412db0a4d1f11db2d7e864d56a80bccb2c51310cfca16e59be255

See more details on using hashes here.

File details

Details for the file fastapi_queue_task-1.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for fastapi_queue_task-1.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 0528532176e08c6686f92265afea5cd1421441646259eac87a9265bd9a6c9f2b
MD5 a45cd3f8df3417260f7b6efa60d8a92e
BLAKE2b-256 008280268daf544d0218540b7ebfa74c017d15c40210644bcc36dd7e06b85b33

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page