Skip to main content

Add your description here

Project description

LTQ

A lightweight, Async-first task queue built on Redis.

Installation

pip install ltq
# or
uv add ltq

Quick Start

import asyncio
import ltq

worker = ltq.Worker(url="redis://localhost:6379")

@worker.task()
async def send_email(to: str, subject: str, body: str) -> None:
    # your async code here
    pass

async def main():
    # Enqueue a task
    await send_email.send("user@example.com", "Hello", "World")

    # Or dispatch in bulk
    messages = [
        send_email.message("a@example.com", "Hi", "A"),
        send_email.message("b@example.com", "Hi", "B"),
    ]
    await ltq.dispatch(messages)

asyncio.run(main())

Each task gets its own queue by default. To share a queue between tasks, pass queue_name:

@worker.task(queue_name="emails")
async def send_email(...): ...

@worker.task(queue_name="emails")
async def send_newsletter(...): ...

Running Workers

# Run a single worker
ltq myapp:worker

# With options
ltq myapp:worker --concurrency 100 --log-level DEBUG

Running an App

Register multiple workers into an App to run them together:

import ltq

app = ltq.App()
app.register_worker(emails_worker)
app.register_worker(notifications_worker)
ltq --app myapp:app

Scheduler

Run tasks on a cron schedule (requires ltq[scheduler]):

import ltq

scheduler = ltq.Scheduler()
scheduler.cron("*/5 * * * *", send_email.message("admin@example.com", "Report", "..."))
scheduler.run()

Middleware

Add middleware to handle cross-cutting concerns:

from ltq.middleware import Retry, RateLimit, Timeout

worker = ltq.Worker(
    url="redis://localhost:6379",
    middlewares=[
        Retry(max_retries=3, min_delay=1.0),
        RateLimit(requests_per_second=10),
        Timeout(timeout=30.0),
    ],
)

Built-in: Retry, RateLimit, Timeout, Sentry (requires ltq[sentry])

Custom middleware:

from ltq.middleware import Middleware, Handler
from ltq.message import Message

class Logger(Middleware):
    async def handle(self, message: Message, next_handler: Handler):
        print(f"Processing {message.task}")
        result = await next_handler(message)
        print(f"Completed {message.task}")
        return result

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ltq-0.3.0.tar.gz (7.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ltq-0.3.0-py3-none-any.whl (11.5 kB view details)

Uploaded Python 3

File details

Details for the file ltq-0.3.0.tar.gz.

File metadata

  • Download URL: ltq-0.3.0.tar.gz
  • Upload date:
  • Size: 7.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ltq-0.3.0.tar.gz
Algorithm Hash digest
SHA256 d09514b3163514e1020cdd266e3204c2665f7358fda1f3cecdf92f7fd72f4f62
MD5 7b08640feeb8e20538d121118a61d4e0
BLAKE2b-256 fa0857f0a09097fa96bb99d2eb83aed0a4b2b5e9a3541a041403c9b7db4d3d25

See more details on using hashes here.

Provenance

The following attestation bundles were made for ltq-0.3.0.tar.gz:

Publisher: publish.yml on tclesius/ltq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ltq-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: ltq-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 11.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ltq-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ac3d3d6c5fe833932c35ae7cca4c837974c0b369ed2bb57e473fcb8a2be3e8e7
MD5 da3e54f1051d309fea1f4480144d112a
BLAKE2b-256 7b51318fdf0394fcac346d367485f07fe64d9b31f17f2da331ef8cc490dafaa5

See more details on using hashes here.

Provenance

The following attestation bundles were made for ltq-0.3.0-py3-none-any.whl:

Publisher: publish.yml on tclesius/ltq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page