Skip to main content

Add your description here

Project description

LTQ

A lightweight, Async-first task queue built on Redis.

Installation

pip install ltq
# or
uv add ltq

Quick Start

import asyncio
import ltq

worker = ltq.Worker(url="redis://localhost:6379")

@worker.task()
async def send_email(to: str, subject: str, body: str) -> None:
    # your async code here
    pass

async def main():
    # Enqueue a task
    await send_email.send("user@example.com", "Hello", "World")

    # Or dispatch in bulk
    messages = [
        send_email.message("a@example.com", "Hi", "A"),
        send_email.message("b@example.com", "Hi", "B"),
    ]
    await ltq.dispatch(messages)

asyncio.run(main())

Each task gets its own queue by default. To share a queue between tasks, pass queue_name:

@worker.task(queue_name="emails")
async def send_email(...): ...

@worker.task(queue_name="emails")
async def send_newsletter(...): ...

Running Workers

# Run a single worker
ltq myapp:worker

# With options
ltq myapp:worker --concurrency 100 --log-level DEBUG

Running an App

Register multiple workers into an App to run them together:

import ltq

app = ltq.App()
app.register_worker(emails_worker)
app.register_worker(notifications_worker)
ltq --app myapp:app

Scheduler

Run tasks on a cron schedule (requires ltq[scheduler]):

import ltq

scheduler = ltq.Scheduler()
scheduler.cron("*/5 * * * *", send_email.message("admin@example.com", "Report", "..."))
scheduler.run()

Middleware

Add middleware to handle cross-cutting concerns:

from ltq.middleware import Retry, RateLimit, Timeout

worker = ltq.Worker(
    url="redis://localhost:6379",
    middlewares=[
        Retry(max_retries=3, min_delay=1.0),
        RateLimit(requests_per_second=10),
        Timeout(timeout=30.0),
    ],
)

Built-in: Retry, RateLimit, Timeout, Sentry (requires ltq[sentry])

Custom middleware:

from ltq.middleware import Middleware, Handler
from ltq.message import Message

class Logger(Middleware):
    async def handle(self, message: Message, next_handler: Handler):
        print(f"Processing {message.task}")
        result = await next_handler(message)
        print(f"Completed {message.task}")
        return result

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ltq-0.3.1.tar.gz (7.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ltq-0.3.1-py3-none-any.whl (11.5 kB view details)

Uploaded Python 3

File details

Details for the file ltq-0.3.1.tar.gz.

File metadata

  • Download URL: ltq-0.3.1.tar.gz
  • Upload date:
  • Size: 7.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ltq-0.3.1.tar.gz
Algorithm Hash digest
SHA256 1007c75f68abd1ed51fa95747704f7ef2a318a110657d7fb2e708a9ce27df502
MD5 0ba09de920e95a7be3256a93b782e096
BLAKE2b-256 23c47eb4a68466d13ae439865191e59423b6d09434dde5fecccb2cc3d28d1eec

See more details on using hashes here.

Provenance

The following attestation bundles were made for ltq-0.3.1.tar.gz:

Publisher: publish.yml on tclesius/ltq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file ltq-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: ltq-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 11.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for ltq-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c5f9706e0e5d3402783a2fece0e256785a9911a43cecffeccbee47569ee23ab7
MD5 e9ed3bf99d2fd595956090ad349b3349
BLAKE2b-256 f635f2616d88d844e566ec7af352061b15014315c14ad3a8214f51ed1e289046

See more details on using hashes here.

Provenance

The following attestation bundles were made for ltq-0.3.1-py3-none-any.whl:

Publisher: publish.yml on tclesius/ltq

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page