Skip to main content

FastAPI Queue

Project description

FastAPI queue tasks

Introduction

FastAPI Queue helps you add tasks to a queue and run them in parallel, supporting both asynchronous and synchronous task handlers.

How to use

1. Instantiate the Queue

First, create an instance of the Queue class by providing a Redis instance and configuration options like concurrency and max retry attempts:

# config/queue.py

from fastapi_queue_task import Queue
from redis.asyncio.utils import from_url

queue = Queue('main', redis, {'concurrency': QUEUE_MAX_CONCURRENCY, 'max_attempt': QUEUE_MAX_ATTEMP})

def queue_processing(task: Task):
  print('TASK_DETAIL')
  print('task name', task.get('name'))
  print('task data', task.get('data'))
  print('task attempt', task.get('attempt'))


def init_queue():
  if env.IS_ALLOW_RUN_QUEUE:
    queue.run(queue_processing)
# bootstrap/app.py

@asynccontextmanager
async def lifespan(app: FastAPI):
  init_queue()
  yield

app = FastAPI(title='API', lifespan=lifespan)

2. Add task to queue:

You can add a task to the queue by calling the add_to_queue method, passing the task name and the data to be processed:

# mail_service.py
from config import queue

await queue.add_to_queue(name="TASK_NAME", data: Any = {})

async def track(channel_id: UUID, dto: TrackHotspotMediaViewDto):
    return await queue.add(
      TrackViewTaskEnum.POST,
      {
        'post_id': dto.post_id
      },
    )

3. Define a Task Handler

Define the task handler using the @task decorator. The handler will process the task asynchronously or synchronously based on its definition:

# tasks/track_view.py

from fastapi_queue_task import BaseTask, task

@task(TrackViewTaskEnum.POST)
class TrackViewPostTask(BaseTask):
  def __init__(self, track_post_view_service: TrackPostViewService = Depends()):
    self.track_post_view_service = track_post_view_service

  async def handler(self, data: TrackPostViewDataType):
    await self.track_post_view_service.handle_tracking(data)

In this example, the task TrackViewPostTask is added to the queue, and the handler method will process the task asynchronously.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastapi_queue_task-1.0.1.tar.gz (7.9 kB view details)

Uploaded Source

Built Distribution

fastapi_queue_task-1.0.1-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file fastapi_queue_task-1.0.1.tar.gz.

File metadata

  • Download URL: fastapi_queue_task-1.0.1.tar.gz
  • Upload date:
  • Size: 7.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.14

File hashes

Hashes for fastapi_queue_task-1.0.1.tar.gz
Algorithm Hash digest
SHA256 0d70e07116db1c5110217342a25c75c3bc688a4326f37fc0487f2eb67d6671c1
MD5 1e752684dd1e1c335ea4549848659be3
BLAKE2b-256 5c2bcc5f45c6fce746ca13a175cd4f38182dfafd7514de2efce01274afd0546e

See more details on using hashes here.

File details

Details for the file fastapi_queue_task-1.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for fastapi_queue_task-1.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4a12958308685a140c986f136fbe65ea18d4608892e535dc3c5b32c171069914
MD5 3b695515812524082234854728b9dbe1
BLAKE2b-256 8834b0dd3af86007c136cb8274fba1c0f199319a5749a9d799d5d20f41e0e051

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page