Skip to main content

Add your description here

Project description

LTQ

A lightweight, Async-first task queue built on Redis.

Installation

pip install ltq
# or
uv add ltq

Broker Backends

LTQ supports multiple broker backends:

  • Redis (default): broker_url="redis://localhost:6379"
  • Memory: broker_url="memory://" (useful for testing)

All workers and schedulers accept a broker_url parameter.

Quick Start

import asyncio
import ltq

worker = ltq.Worker("emails", broker_url="redis://localhost:6379")

@worker.task()
async def send_email(to: str, subject: str, body: str) -> None:
    # your async code here
    pass

async def main():
    # Enqueue a task
    await send_email.send("user@example.com", "Hello", "World")

    # Or enqueue multiple tasks
    for email in ["a@example.com", "b@example.com"]:
        await send_email.send(email, "Hi", "Message")

asyncio.run(main())

Each worker has a namespace (e.g., "emails"), and tasks are automatically namespaced as {namespace}:{function_name}.

Running Workers

# Run a single worker
ltq run myapp:worker

# With options
ltq run myapp:worker --concurrency 100 --log-level DEBUG

Running an App

Register multiple workers into an App to run them together:

import ltq

app = ltq.App()
app.register_worker(emails_worker)
app.register_worker(notifications_worker)
ltq run --app myapp:app

App Middleware

Apply middleware globally to all workers in an app:

from ltq.middleware import Sentry

app = ltq.App(middlewares=[Sentry(dsn="https://...")])

# Or register after creation
app.register_middleware(Sentry(dsn="https://..."))
app.register_middleware(MyMiddleware(), pos=0)

# When workers are registered, app middlewares are prepended to each worker's stack
app.register_worker(emails_worker)

Threading Model

By default, App runs each worker in its own thread with a separate event loop. This provides isolation between workers while keeping them in the same process. Workers won't block each other since each has its own async event loop.

For maximum isolation (separate memory, crash protection), run each worker in its own process:

# Terminal 1
ltq run myapp:emails_worker

# Terminal 2
ltq run myapp:notifications_worker

This gives you full process isolation at the cost of more overhead.

Queue Management

Manage queues using the CLI:

# Clear a task queue
ltq clear emails:send_email

# Get queue size
ltq size emails:send_email

# With custom Redis URL
ltq clear emails:send_email --redis-url redis://localhost:6380
ltq size emails:send_email --redis-url redis://localhost:6380

Queue names are automatically namespaced as {worker_name}:{function_name}.

Scheduler

Run tasks on a cron schedule (requires ltq[scheduler]):

import ltq

scheduler = ltq.Scheduler()
scheduler.cron("*/5 * * * *", send_email.message("admin@example.com", "Report", "..."))
scheduler.start()  # Runs scheduler in blocking mode with asyncio.run()

Task Options

Configure task behavior with options:

from datetime import timedelta

@worker.task(max_tries=3, max_age=timedelta(hours=1), max_rate="10/s")
async def send_email(to: str, subject: str, body: str) -> None:
    # your async code here
    pass

Available options:

  • max_tries (int): Maximum retry attempts before rejecting the message
  • max_age (timedelta): Maximum message age before rejection
  • max_rate (str): Rate limit in format "N/s", "N/m", or "N/h" (requests per second/minute/hour)

Middleware

Middleware are async context managers that wrap task execution. The default stack is [MaxTries(), MaxAge(), MaxRate()], so you only need to specify middlewares if you want to customize or add additional ones:

from ltq.middleware import MaxTries, MaxAge, MaxRate, Sentry

worker = ltq.Worker(
    "emails",
    broker_url="redis://localhost:6379",
    middlewares=[
        MaxTries(),
        MaxAge(),
        MaxRate(),
        Sentry(dsn="https://..."),
    ],
)

Built-in: MaxTries, MaxAge, MaxRate, Sentry (requires ltq[sentry])

You can also register middleware after creating the worker:

worker.register_middleware(Sentry(dsn="https://..."))

# Insert at specific position (default is -1 for append)
worker.register_middleware(MyMiddleware(), pos=0)

Custom middleware:

from contextlib import asynccontextmanager
from ltq.middleware import Middleware
from ltq.message import Message
from ltq.task import Task

class Logger(Middleware):
    @asynccontextmanager
    async def __call__(self, message: Message, task: Task):
        print(f"Processing {message.task_name}")
        yield
        print(f"Completed {message.task_name}")

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ltq-0.4.0.tar.gz (9.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ltq-0.4.0-py3-none-any.whl (13.6 kB view details)

Uploaded Python 3

File details

Details for the file ltq-0.4.0.tar.gz.

File metadata

  • Download URL: ltq-0.4.0.tar.gz
  • Upload date:
  • Size: 9.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ltq-0.4.0.tar.gz
Algorithm Hash digest
SHA256 6f50feb243b996357758b48b174949a2f06b39fb6d79d077391f5b051c674016
MD5 6695f179047c5fe6455d1037111a5bea
BLAKE2b-256 5c853191132a15f86617ab8d6f2dbae621cfe5e3e8dddd0159e43dccf634d6ec

See more details on using hashes here.

Provenance

The following attestation bundles were made for ltq-0.4.0.tar.gz:

Publisher: publish.yml on tclesius/ltq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ltq-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: ltq-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 13.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ltq-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c926c71347b5af355c84bcbc8660331651bc0a85956d34de99c5df20919698ac
MD5 11d2fd1c1ac5c3b5673f911b0824d83a
BLAKE2b-256 0e4a8ac0e30b4f81014eb97faaab03e7d7c2e3cac5b63c996cbafda173358d51

See more details on using hashes here.

Provenance

The following attestation bundles were made for ltq-0.4.0-py3-none-any.whl:

Publisher: publish.yml on tclesius/ltq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page