Skip to main content

Task queues, workers, schedules, and backend integrations for Litestar

Project description

Litestar Queues

PyPI Python License CI Docs

Litestar Queues adds background task queues to Litestar applications. Define a task with a decorator, enqueue it from a route handler or service, and let a worker run it now, later, after a retry, or on a schedule.

Use it when a request should return quickly while work continues elsewhere: sending email, refreshing reports, syncing accounts, importing files, calling slow APIs, or running operational maintenance jobs.

What You Get

  • Simple task API: @task(...) registers async or sync callables with defaults for queues, retries, priority, timeout, delay, and metadata.
  • Litestar plugin: QueuePlugin wires a managed QueueService into Litestar dependency injection, app state, startup, shutdown, and CLI commands.
  • Workers included: run an in-app worker for local/lightweight apps or a standalone worker process for production deployments.
  • Scheduling: run recurring interval or five-field cron tasks.
  • Result tracking: queued records move through pending, scheduled, running, completed, failed, and cancelled.
  • Pluggable backends: start with the in-memory backend, then move to SQLSpec, Advanced Alchemy, Redis, Valkey, or Cloud Run execution when needed.
  • Realtime events: publish lifecycle, progress, log, and custom task events to your own Litestar Channels setup.

Install

pip install litestar-queues

The base install is intentionally small. It includes the task API, Litestar plugin, in-memory queue backend, immediate execution, and local workers. Persistent or remote integrations are optional extras.

Quick Start

Create an app.py:

from litestar import Litestar, post
from litestar.di import NamedDependency

from litestar_queues import QueueConfig, QueuePlugin, QueueService, task


@task("accounts.sync", queue="accounts", retries=3, timeout=300)
async def sync_account(account_id: str) -> dict[str, str]:
    return {"account_id": account_id, "status": "synced"}


@post("/accounts/{account_id:str}/sync")
async def create_sync_job(
    account_id: str,
    queue_service: NamedDependency[QueueService],
) -> dict[str, str]:
    result = await queue_service.enqueue(sync_account, account_id)
    return {"task_id": str(result.id), "status": result.status or "queued"}


app = Litestar(
    route_handlers=[create_sync_job],
    plugins=[QueuePlugin(config=QueueConfig())],
)

Run the app:

LITESTAR_APP=app:app litestar run --reload

Call the route:

curl -X POST http://127.0.0.1:8000/accounts/acct-123/sync

Trigger the same job in whichever form fits the caller:

# 1. Pass the decorated task object.
await queue_service.enqueue(sync_account, account_id)

# 2. Pass the registered task name when the caller should not import the task.
await queue_service.enqueue("accounts.sync", account_id)

# 3. Use the task helper when the QueuePlugin has an active default service.
await sync_account.enqueue(account_id)

If you enqueue by string to avoid importing the task function, make sure the module is loaded at startup so the decorator can register the task name:

app = Litestar(
    route_handlers=[create_sync_job],
    plugins=[
        QueuePlugin(
            config=QueueConfig(task_modules=("app.accounts.tasks",)),
        ),
    ],
)

All three forms can still override execution for one job:

await queue_service.enqueue(
    "accounts.sync",
    account_id,
    execution_backend="cloudrun",
    execution_profile="heavy",
)

By default, Litestar Queues uses in-memory queue storage and starts a local worker inside the Litestar process. That is useful for learning, tests, and small local apps. For production, use a persistent queue backend and usually run workers separately from the web process.

The Basic Model

Litestar Queues keeps two decisions separate:

  • A queue backend stores task records and state.
  • An execution backend decides where claimed work runs.

The default is queue_backend="memory" and execution_backend="local". memory stores records inside the current Python process. local runs claimed tasks in the worker process. immediate is available for inline execution, mostly in tests.

Running Workers

For local development, the in-app worker is the shortest path:

config = QueueConfig(in_app_worker=True)

For heavier deployments, turn off the in-app worker in the web app:

config = QueueConfig(in_app_worker=False)

Then run one or more standalone workers:

LITESTAR_APP=app:app litestar queues run --drain-timeout 30

Workers process every queue by default. Restrict a worker to one or more queue names with --queue:

LITESTAR_APP=app:app litestar queues run --queue accounts --queue emails

Documentation

Optional backend and execution choices

Install only the extras your app needs:

pip install "litestar-queues[sqlspec]"
pip install "litestar-queues[advanced-alchemy]"
pip install "litestar-queues[redis]"
pip install "litestar-queues[valkey]"
pip install "litestar-queues[cloudrun]"
Name Type Typical use
memory Queue backend Tests, examples, and local in-process apps
sqlspec Queue backend SQL-backed persistence through SQLSpec adapters
advanced-alchemy Queue backend SQLAlchemy/Advanced Alchemy persistence
redis Queue backend Redis-backed task records and worker wakeups
valkey Queue backend Valkey-backed task records and worker wakeups
immediate Execution backend Inline execution for tests and scripts
local Execution backend In-process worker execution
cloudrun Execution backend Dispatch to Google Cloud Run Jobs

SQLSpec example:

from sqlspec.adapters.aiosqlite import AiosqliteConfig

from litestar_queues import QueueConfig
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig

queue_config = QueueConfig(
    queue_backend=SQLSpecBackendConfig(
        config=AiosqliteConfig(connection_config={"database": "queue.db"}),
        run_migrations=True,
    ),
    execution_backend="local",
)

Redis example:

from litestar_queues import QueueConfig
from litestar_queues.backends.redis import RedisBackendConfig

queue_config = QueueConfig(
    queue_backend=RedisBackendConfig(url="redis://localhost:6379/0"),
    execution_backend="local",
)
Task options, scheduling, events, and background responses

Task defaults can live on the decorator:

@task(
    "reports.render",
    queue="reports",
    priority=10,
    retries=2,
    timeout=120,
    run_after=30,
)
async def render_report(report_id: str) -> str:
    return report_id

Override those defaults for one enqueue call:

result = await queue_service.enqueue(
    render_report,
    "report-1",
    queue="slow-reports",
    priority=1,
    retries=5,
    timeout=600,
    metadata={"requested_by": "user-123"},
)
await result.wait(timeout=30)

Run recurring tasks with intervals or cron expressions:

from datetime import timedelta

from litestar_queues import task


@task("reports.refresh", interval=timedelta(minutes=15), jitter=30)
async def refresh_reports() -> None:
    ...


@task("billing.close-day", cron="0 0 * * *", timezone="UTC")
async def close_billing_day() -> None:
    ...

Publish progress from inside a running task:

from litestar_queues.events import publish_task_log, publish_task_progress


@task("imports.process")
async def process_import(path: str) -> None:
    await publish_task_log("Import started")
    await publish_task_progress(current=5, total=10, message="Halfway done")

Queue work after a Litestar response is sent:

from litestar import Response, post
from litestar_queues import QueuedBackgroundTask


@post("/trigger")
async def trigger() -> Response[dict[str, str]]:
    return Response(
        {"status": "queued"},
        background=QueuedBackgroundTask(process_import, "/tmp/data.csv"),
    )
CLI commands

QueuePlugin adds a queues command group to the Litestar CLI:

# Start a standalone worker.
LITESTAR_APP=app:app litestar queues run --drain-timeout 30

# Process only selected queues.
LITESTAR_APP=app:app litestar queues run --queue accounts --max-concurrency 4

# Print queue status counts.
LITESTAR_APP=app:app litestar queues status

# Emit queue status as JSON.
LITESTAR_APP=app:app litestar queues status --json

# Check whether a scheduler canary task completed recently.
LITESTAR_APP=app:app litestar queues scheduler-health --minutes 5

Every command loads the application the same way the Litestar CLI does: via LITESTAR_APP, --app, or the standard app discovery paths.

Development
# Install local development dependencies.
make install

# Run unit tests only.
make test-unit

# Run integration tests. Docker-backed services autoskip when unavailable.
make test-integration

# Build documentation.
make docs

# Run linting and type checks.
make lint

The source lives under src/litestar_queues. Tests live under src/tests/unit and src/tests/integration.

Links

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

litestar_queues-0.1.0.tar.gz (373.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

litestar_queues-0.1.0-py3-none-any.whl (117.5 kB view details)

Uploaded Python 3

File details

Details for the file litestar_queues-0.1.0.tar.gz.

File metadata

  • Download URL: litestar_queues-0.1.0.tar.gz
  • Upload date:
  • Size: 373.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for litestar_queues-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8e1dc1ef303d374637195eb72a52eab581ecac23bab17e7b992f2119a3580afd
MD5 86c3d63358695ff6695d0c8d8e8ba9b5
BLAKE2b-256 27015903380376c0f63692220b942a3127ff2e71ea3cc0b4ef4b71f9645de57c

See more details on using hashes here.

Provenance

The following attestation bundles were made for litestar_queues-0.1.0.tar.gz:

Publisher: publish.yml on cofin/litestar-queues

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file litestar_queues-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: litestar_queues-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 117.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for litestar_queues-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4ee7bbe0543503724087170fdb864351377c60b1f6981ed6bab0bc0874c16afb
MD5 9b01bfd50f5406e576f94fb534006c54
BLAKE2b-256 faaa2454268a450b7f8494cbba2a8fe90fceeafef338b31df1302424b64c9eae

See more details on using hashes here.

Provenance

The following attestation bundles were made for litestar_queues-0.1.0-py3-none-any.whl:

Publisher: publish.yml on cofin/litestar-queues

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page