Add your description here
Project description
A lightweight, Async-first task queue built on Redis.
Installation
pip install ltq
# or
uv add ltq
Broker Backends
LTQ supports multiple broker backends:
- Redis (default):
broker_url="redis://localhost:6379" - Memory:
broker_url="memory://"(useful for testing)
All workers and schedulers accept a broker_url parameter.
Quick Start
import asyncio
import ltq
worker = ltq.Worker("emails", broker_url="redis://localhost:6379")
@worker.task()
async def send_email(to: str, subject: str, body: str) -> None:
# your async code here
pass
async def main():
# Enqueue a task
await send_email.send("user@example.com", "Hello", "World")
# Or enqueue multiple tasks
for email in ["a@example.com", "b@example.com"]:
await send_email.send(email, "Hi", "Message")
asyncio.run(main())
Each worker has a namespace (e.g., "emails"), and tasks are automatically namespaced as {namespace}:{function_name}.
Running Workers
# Run a single worker
ltq run myapp:worker
# With options
ltq run myapp:worker --concurrency 100 --log-level DEBUG
Running an App
Register multiple workers into an App to run them together:
import ltq
app = ltq.App()
app.register_worker(emails_worker)
app.register_worker(notifications_worker)
ltq run --app myapp:app
App Middleware
Apply middleware globally to all workers in an app:
from ltq.middleware import Sentry
app = ltq.App(middlewares=[Sentry(dsn="https://...")])
# Or register after creation
app.register_middleware(Sentry(dsn="https://..."))
app.register_middleware(MyMiddleware(), pos=0)
# When workers are registered, app middlewares are prepended to each worker's stack
app.register_worker(emails_worker)
Threading Model
By default, App runs each worker in its own thread with a separate event loop. This provides isolation between workers while keeping them in the same process. Workers won't block each other since each has its own async event loop.
For maximum isolation (separate memory, crash protection), run each worker in its own process:
# Terminal 1
ltq run myapp:emails_worker
# Terminal 2
ltq run myapp:notifications_worker
This gives you full process isolation at the cost of more overhead.
Queue Management
Manage queues using the CLI:
# Clear a task queue
ltq clear emails:send_email
# Get queue size
ltq size emails:send_email
# With custom Redis URL
ltq clear emails:send_email --redis-url redis://localhost:6380
ltq size emails:send_email --redis-url redis://localhost:6380
Queue names are automatically namespaced as {worker_name}:{function_name}.
Scheduler
Run tasks on a cron schedule (requires ltq[scheduler]):
import ltq
scheduler = ltq.Scheduler()
scheduler.cron("*/5 * * * *", send_email.message("admin@example.com", "Report", "..."))
scheduler.start() # Runs scheduler in blocking mode with asyncio.run()
Task Options
Configure task behavior with options:
from datetime import timedelta
@worker.task(max_tries=3, max_age=timedelta(hours=1), max_rate="10/s")
async def send_email(to: str, subject: str, body: str) -> None:
# your async code here
pass
Available options:
max_tries(int): Maximum retry attempts before rejecting the messagemax_age(timedelta): Maximum message age before rejectionmax_rate(str): Rate limit in format"N/s","N/m", or"N/h"(requests per second/minute/hour)
Middleware
Middleware are async context managers that wrap task execution. The default stack is [MaxTries(), MaxAge(), MaxRate()], so you only need to specify middlewares if you want to customize or add additional ones:
from ltq.middleware import MaxTries, MaxAge, MaxRate, Sentry
worker = ltq.Worker(
"emails",
broker_url="redis://localhost:6379",
middlewares=[
MaxTries(),
MaxAge(),
MaxRate(),
Sentry(dsn="https://..."),
],
)
Built-in: MaxTries, MaxAge, MaxRate, Sentry (requires ltq[sentry])
You can also register middleware after creating the worker:
worker.register_middleware(Sentry(dsn="https://..."))
# Insert at specific position (default is -1 for append)
worker.register_middleware(MyMiddleware(), pos=0)
Custom middleware:
from contextlib import asynccontextmanager
from ltq.middleware import Middleware
from ltq.message import Message
from ltq.task import Task
class Logger(Middleware):
@asynccontextmanager
async def __call__(self, message: Message, task: Task):
print(f"Processing {message.task_name}")
yield
print(f"Completed {message.task_name}")
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ltq-0.4.0.tar.gz.
File metadata
- Download URL: ltq-0.4.0.tar.gz
- Upload date:
- Size: 9.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f50feb243b996357758b48b174949a2f06b39fb6d79d077391f5b051c674016
|
|
| MD5 |
6695f179047c5fe6455d1037111a5bea
|
|
| BLAKE2b-256 |
5c853191132a15f86617ab8d6f2dbae621cfe5e3e8dddd0159e43dccf634d6ec
|
Provenance
The following attestation bundles were made for ltq-0.4.0.tar.gz:
Publisher:
publish.yml on tclesius/ltq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ltq-0.4.0.tar.gz -
Subject digest:
6f50feb243b996357758b48b174949a2f06b39fb6d79d077391f5b051c674016 - Sigstore transparency entry: 919717456
- Sigstore integration time:
-
Permalink:
tclesius/ltq@be976e5de41f600c276e85e490406c5caade5f0a -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/tclesius
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@be976e5de41f600c276e85e490406c5caade5f0a -
Trigger Event:
release
-
Statement type:
File details
Details for the file ltq-0.4.0-py3-none-any.whl.
File metadata
- Download URL: ltq-0.4.0-py3-none-any.whl
- Upload date:
- Size: 13.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c926c71347b5af355c84bcbc8660331651bc0a85956d34de99c5df20919698ac
|
|
| MD5 |
11d2fd1c1ac5c3b5673f911b0824d83a
|
|
| BLAKE2b-256 |
0e4a8ac0e30b4f81014eb97faaab03e7d7c2e3cac5b63c996cbafda173358d51
|
Provenance
The following attestation bundles were made for ltq-0.4.0-py3-none-any.whl:
Publisher:
publish.yml on tclesius/ltq
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
ltq-0.4.0-py3-none-any.whl -
Subject digest:
c926c71347b5af355c84bcbc8660331651bc0a85956d34de99c5df20919698ac - Sigstore transparency entry: 919717461
- Sigstore integration time:
-
Permalink:
tclesius/ltq@be976e5de41f600c276e85e490406c5caade5f0a -
Branch / Tag:
refs/tags/v0.4.0 - Owner: https://github.com/tclesius
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@be976e5de41f600c276e85e490406c5caade5f0a -
Trigger Event:
release
-
Statement type: