Task queues, workers, schedules, and backend integrations for Litestar
Project description
Litestar Queues
Litestar Queues adds background task queues to Litestar applications. Define a task with a decorator, enqueue it from a route handler or service, and let a worker run it now, later, after a retry, or on a schedule.
Use it when a request should return quickly while work continues elsewhere: sending email, refreshing reports, syncing accounts, importing files, calling slow APIs, or running operational maintenance jobs.
What You Get
- Simple task API:
@task(...)registers async or sync callables with defaults for queues, retries, priority, timeout, delay, and metadata. - Litestar plugin:
QueuePluginwires a managedQueueServiceinto Litestar dependency injection, app state, startup, shutdown, and CLI commands. - Workers included: run an in-app worker for local/lightweight apps or a standalone worker process for production deployments.
- Scheduling: run recurring interval or five-field cron tasks.
- Result tracking: queued records move through
pending,scheduled,running,completed,failed, andcancelled. - Pluggable backends: start with the in-memory backend, then move to SQLSpec, Advanced Alchemy, Redis, Valkey, or Cloud Run execution when needed.
- Realtime events: publish lifecycle, progress, log, and custom task events to your own Litestar Channels setup.
Install
pip install litestar-queues
The base install is intentionally small. It includes the task API, Litestar plugin, in-memory queue backend, immediate execution, and local workers. Persistent or remote integrations are optional extras.
Quick Start
Create an app.py:
from litestar import Litestar, post
from litestar.di import NamedDependency
from litestar_queues import QueueConfig, QueuePlugin, QueueService, task
@task("accounts.sync", queue="accounts", retries=3, timeout=300)
async def sync_account(account_id: str) -> dict[str, str]:
return {"account_id": account_id, "status": "synced"}
@post("/accounts/{account_id:str}/sync")
async def create_sync_job(
account_id: str,
queue_service: NamedDependency[QueueService],
) -> dict[str, str]:
result = await queue_service.enqueue(sync_account, account_id)
return {"task_id": str(result.id), "status": result.status or "queued"}
app = Litestar(
route_handlers=[create_sync_job],
plugins=[QueuePlugin(config=QueueConfig())],
)
Run the app:
LITESTAR_APP=app:app litestar run --reload
Call the route:
curl -X POST http://127.0.0.1:8000/accounts/acct-123/sync
Trigger the same job in whichever form fits the caller:
# 1. Pass the decorated task object.
await queue_service.enqueue(sync_account, account_id)
# 2. Pass the registered task name when the caller should not import the task.
await queue_service.enqueue("accounts.sync", account_id)
# 3. Use the task helper when the QueuePlugin has an active default service.
await sync_account.enqueue(account_id)
If you enqueue by string to avoid importing the task function, make sure the module is loaded at startup so the decorator can register the task name:
app = Litestar(
route_handlers=[create_sync_job],
plugins=[
QueuePlugin(
config=QueueConfig(task_modules=("app.accounts.tasks",)),
),
],
)
All three forms can still override execution for one job:
await queue_service.enqueue(
"accounts.sync",
account_id,
execution_backend="cloudrun",
execution_profile="heavy",
)
By default, Litestar Queues uses in-memory queue storage and starts a local worker inside the Litestar process. That is useful for learning, tests, and small local apps. For production, use a persistent queue backend and usually run workers separately from the web process.
The Basic Model
Litestar Queues keeps two decisions separate:
- A queue backend stores task records and state.
- An execution backend decides where claimed work runs.
The default is queue_backend="memory" and execution_backend="local".
memory stores records inside the current Python process. local runs claimed
tasks in the worker process. immediate is available for inline execution,
mostly in tests.
Running Workers
For local development, the in-app worker is the shortest path:
config = QueueConfig(in_app_worker=True)
For heavier deployments, turn off the in-app worker in the web app:
config = QueueConfig(in_app_worker=False)
Then run one or more standalone workers:
LITESTAR_APP=app:app litestar queues run --drain-timeout 30
Workers process every queue by default. Restrict a worker to one or more queue
names with --queue:
LITESTAR_APP=app:app litestar queues run --queue accounts --queue emails
Documentation
Optional backend and execution choices
Install only the extras your app needs:
pip install "litestar-queues[sqlspec]"
pip install "litestar-queues[advanced-alchemy]"
pip install "litestar-queues[redis]"
pip install "litestar-queues[valkey]"
pip install "litestar-queues[cloudrun]"
| Name | Type | Typical use |
|---|---|---|
memory |
Queue backend | Tests, examples, and local in-process apps |
sqlspec |
Queue backend | SQL-backed persistence through SQLSpec adapters |
advanced-alchemy |
Queue backend | SQLAlchemy/Advanced Alchemy persistence |
redis |
Queue backend | Redis-backed task records and worker wakeups |
valkey |
Queue backend | Valkey-backed task records and worker wakeups |
immediate |
Execution backend | Inline execution for tests and scripts |
local |
Execution backend | In-process worker execution |
cloudrun |
Execution backend | Dispatch to Google Cloud Run Jobs |
SQLSpec example:
from sqlspec.adapters.aiosqlite import AiosqliteConfig
from litestar_queues import QueueConfig
from litestar_queues.backends.sqlspec import SQLSpecBackendConfig
queue_config = QueueConfig(
queue_backend=SQLSpecBackendConfig(
config=AiosqliteConfig(connection_config={"database": "queue.db"}),
run_migrations=True,
),
execution_backend="local",
)
Redis example:
from litestar_queues import QueueConfig
from litestar_queues.backends.redis import RedisBackendConfig
queue_config = QueueConfig(
queue_backend=RedisBackendConfig(url="redis://localhost:6379/0"),
execution_backend="local",
)
Task options, scheduling, events, and background responses
Task defaults can live on the decorator:
@task(
"reports.render",
queue="reports",
priority=10,
retries=2,
timeout=120,
run_after=30,
)
async def render_report(report_id: str) -> str:
return report_id
Override those defaults for one enqueue call:
result = await queue_service.enqueue(
render_report,
"report-1",
queue="slow-reports",
priority=1,
retries=5,
timeout=600,
metadata={"requested_by": "user-123"},
)
await result.wait(timeout=30)
Run recurring tasks with intervals or cron expressions:
from datetime import timedelta
from litestar_queues import task
@task("reports.refresh", interval=timedelta(minutes=15), jitter=30)
async def refresh_reports() -> None:
...
@task("billing.close-day", cron="0 0 * * *", timezone="UTC")
async def close_billing_day() -> None:
...
Publish progress from inside a running task:
from litestar_queues.events import publish_task_log, publish_task_progress
@task("imports.process")
async def process_import(path: str) -> None:
await publish_task_log("Import started")
await publish_task_progress(current=5, total=10, message="Halfway done")
Queue work after a Litestar response is sent:
from litestar import Response, post
from litestar_queues import QueuedBackgroundTask
@post("/trigger")
async def trigger() -> Response[dict[str, str]]:
return Response(
{"status": "queued"},
background=QueuedBackgroundTask(process_import, "/tmp/data.csv"),
)
CLI commands
QueuePlugin adds a queues command group to the Litestar CLI:
# Start a standalone worker.
LITESTAR_APP=app:app litestar queues run --drain-timeout 30
# Process only selected queues.
LITESTAR_APP=app:app litestar queues run --queue accounts --max-concurrency 4
# Print queue status counts.
LITESTAR_APP=app:app litestar queues status
# Emit queue status as JSON.
LITESTAR_APP=app:app litestar queues status --json
# Check whether a scheduler canary task completed recently.
LITESTAR_APP=app:app litestar queues scheduler-health --minutes 5
Every command loads the application the same way the Litestar CLI does: via
LITESTAR_APP, --app, or the standard app discovery paths.
Development
# Install local development dependencies.
make install
# Run unit tests only.
make test-unit
# Run integration tests. Docker-backed services autoskip when unavailable.
make test-integration
# Build documentation.
make docs
# Run linting and type checks.
make lint
The source lives under src/litestar_queues. Tests live under
src/tests/unit and src/tests/integration.
Links
- Docs: https://cofin.github.io/litestar-queues/
- Source: https://github.com/cofin/litestar-queues
- Issues: https://github.com/cofin/litestar-queues/issues
- Litestar: https://litestar.dev/
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file litestar_queues-0.1.0.tar.gz.
File metadata
- Download URL: litestar_queues-0.1.0.tar.gz
- Upload date:
- Size: 373.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8e1dc1ef303d374637195eb72a52eab581ecac23bab17e7b992f2119a3580afd
|
|
| MD5 |
86c3d63358695ff6695d0c8d8e8ba9b5
|
|
| BLAKE2b-256 |
27015903380376c0f63692220b942a3127ff2e71ea3cc0b4ef4b71f9645de57c
|
Provenance
The following attestation bundles were made for litestar_queues-0.1.0.tar.gz:
Publisher:
publish.yml on cofin/litestar-queues
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
litestar_queues-0.1.0.tar.gz -
Subject digest:
8e1dc1ef303d374637195eb72a52eab581ecac23bab17e7b992f2119a3580afd - Sigstore transparency entry: 2013399745
- Sigstore integration time:
-
Permalink:
cofin/litestar-queues@81f7263435bbc36eae0b192dff708ea829a1e48d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/cofin
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@81f7263435bbc36eae0b192dff708ea829a1e48d -
Trigger Event:
release
-
Statement type:
File details
Details for the file litestar_queues-0.1.0-py3-none-any.whl.
File metadata
- Download URL: litestar_queues-0.1.0-py3-none-any.whl
- Upload date:
- Size: 117.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4ee7bbe0543503724087170fdb864351377c60b1f6981ed6bab0bc0874c16afb
|
|
| MD5 |
9b01bfd50f5406e576f94fb534006c54
|
|
| BLAKE2b-256 |
faaa2454268a450b7f8494cbba2a8fe90fceeafef338b31df1302424b64c9eae
|
Provenance
The following attestation bundles were made for litestar_queues-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on cofin/litestar-queues
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
litestar_queues-0.1.0-py3-none-any.whl -
Subject digest:
4ee7bbe0543503724087170fdb864351377c60b1f6981ed6bab0bc0874c16afb - Sigstore transparency entry: 2013399923
- Sigstore integration time:
-
Permalink:
cofin/litestar-queues@81f7263435bbc36eae0b192dff708ea829a1e48d -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/cofin
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@81f7263435bbc36eae0b192dff708ea829a1e48d -
Trigger Event:
release
-
Statement type: