Skip to main content

Rust-powered task queue for Python. No broker required.

Project description

taskito

PyPI version Python versions License

A Rust-powered task queue for Python. No broker required — just SQLite.

pip install taskito

Quickstart

1. Define tasks in tasks.py:

from taskito import Queue

queue = Queue(db_path="tasks.db")

@queue.task()
def add(a: int, b: int) -> int:
    return a + b

2. Start a worker in one terminal:

taskito worker --app tasks:queue

3. Enqueue jobs from another terminal or script:

from tasks import add

job = add.delay(2, 3)
print(job.result(timeout=10))  # 5

Why taskito?

Most Python task queues require a separate broker (Redis, RabbitMQ) even for single-machine workloads. taskito embeds everything — storage, scheduling, and worker management — into a single pip install with no external dependencies beyond Python itself.

The heavy lifting runs in Rust: a Tokio async scheduler, OS thread worker pool with crossbeam channels, and Diesel ORM over SQLite in WAL mode. Python's GIL is only held during task execution.

Features

  • Priority queues — higher priority jobs run first
  • Retry with exponential backoff — automatic retries with jitter
  • Dead letter queue — inspect and replay failed jobs
  • Rate limiting — token bucket with "100/m" syntax
  • Task dependenciesdepends_on for DAG workflows with cascade cancel
  • Task workflowschain, group, chord primitives
  • Periodic tasks — cron scheduling with seconds granularity
  • Progress tracking — report and read progress from inside tasks
  • Job cancellation — cancel pending or running jobs
  • Unique tasks — deduplicate active jobs by key
  • Batch enqueuetask.map() for high-throughput bulk inserts
  • Named queues — route tasks to isolated queues
  • Hooks — before/after/success/failure middleware
  • Per-task middlewareTaskMiddleware with before/after/on_retry hooks
  • Pluggable serializersCloudpickleSerializer (default), JsonSerializer, or custom
  • Cancel running tasks — cooperative cancellation with check_cancelled()
  • Soft timeoutscheck_timeout() inside tasks
  • Worker heartbeat — monitor worker health via queue.workers()
  • Job expirationexpires parameter for time-sensitive jobs
  • Exception filteringretry_on / dont_retry_on for selective retries
  • OpenTelemetry — optional tracing integration via pip install taskito[otel]
  • Async supportawait job.aresult(), await queue.astats()
  • Web dashboardtaskito dashboard --app myapp:queue serves a built-in monitoring UI
  • FastAPI integrationTaskitoRouter for instant REST API over the queue
  • CLItaskito worker, taskito info --watch, taskito dashboard

Examples

Retry with Backoff

@queue.task(max_retries=5, retry_backoff=2.0)
def fetch_url(url: str) -> str:
    return requests.get(url).text

Priority Queues

urgent_report.apply_async(args=[data], priority=10)
bulk_report.delay(data)  # default priority 0

Rate Limiting

@queue.task(rate_limit="100/m")
def call_api(endpoint: str) -> dict:
    return requests.get(endpoint).json()

Task Dependencies

download = fetch_file.delay("data.csv")
parsed = parse_file.apply_async(
    args=["data.csv"],
    depends_on=[download.id],
)
# parsed waits until download completes; if download fails, parsed is cancelled

Workflows

from taskito import chain, group, chord

# Sequential pipeline — each step receives the previous result
chain(fetch.s(url), parse.s(), store.s()).apply()

# Parallel fan-out
group(process.s(chunk) for chunk in chunks).apply()

# Parallel + callback when all complete
chord([download.s(u) for u in urls], merge.s()).apply()

Periodic Tasks

@queue.periodic(cron="0 */6 * * *")
def cleanup_temp_files():
    ...

Progress Tracking

from taskito import current_job

@queue.task()
def train_model(epochs: int):
    for i in range(epochs):
        ...
        current_job.update_progress(int((i + 1) / epochs * 100))

Hooks

@queue.on_failure
def alert_on_failure(task_name, args, kwargs, error):
    slack.post(f"Task {task_name} failed: {error}")

Exception Filtering

@queue.task(
    max_retries=5,
    retry_on=[ConnectionError, TimeoutError],
    dont_retry_on=[ValueError],
)
def fetch_data(url: str) -> dict:
    return requests.get(url).json()

Per-Task Middleware

from taskito import TaskMiddleware

class TimingMiddleware(TaskMiddleware):
    def before(self, ctx):
        ctx._start = time.time()

    def after(self, ctx, result, error):
        elapsed = time.time() - ctx._start
        print(f"{ctx.task_name} took {elapsed:.2f}s")

@queue.task(middleware=[TimingMiddleware()])
def process(data):
    ...

Delayed Scheduling

# Run 30 minutes from now
reminder.apply_async(args=[user_id, msg], delay=1800)

Unique Tasks

report.apply_async(args=[user_id], unique_key=f"report:{user_id}")
# Second enqueue with same key is silently deduplicated while first is active

FastAPI Integration

from fastapi import FastAPI
from taskito.contrib.fastapi import TaskitoRouter

app = FastAPI()
app.include_router(TaskitoRouter(queue), prefix="/tasks")
# GET /tasks/stats, GET /tasks/jobs/{id}, GET /tasks/jobs/{id}/progress (SSE), ...

Batch Enqueue

jobs = send_email.map([("alice@x.com",), ("bob@x.com",), ("carol@x.com",)])

Async Support

job = expensive_task.delay(data)
result = await job.aresult(timeout=30)
stats = await queue.astats()

Testing

taskito includes a built-in test mode — no worker needed:

def test_add():
    with queue.test_mode() as results:
        add.delay(2, 3)
        assert results[0].return_value == 5

Documentation

Full documentation with guides, API reference, architecture diagrams, and examples:

Read the docs →

Coming from Celery? See the Migration Guide.

Comparison

Feature taskito Celery RQ Dramatiq Huey
Broker required No Yes Yes Yes Yes
Core language Rust + Python Python Python Python Python
Priority queues Yes Yes No No Yes
Rate limiting Yes Yes No Yes No
Dead letter queue Yes No Yes No No
Task dependencies Yes No No No No
Task chaining Yes Yes No Yes No
Built-in dashboard Yes No No No No
FastAPI integration Yes No No No No
Per-task middleware Yes No No Yes No
Cancel running tasks Yes Yes No No No
Custom serializers Yes Yes No No No
Setup pip install Broker + backend Redis Broker Redis

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskito-0.2.2.tar.gz (77.5 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

taskito-0.2.2-cp312-cp312-win_amd64.whl (1.6 MB view details)

Uploaded CPython 3.12Windows x86-64

taskito-0.2.2-cp312-cp312-macosx_11_0_arm64.whl (1.8 MB view details)

Uploaded CPython 3.12macOS 11.0+ ARM64

taskito-0.2.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.0 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.17+ x86-64

File details

Details for the file taskito-0.2.2.tar.gz.

File metadata

  • Download URL: taskito-0.2.2.tar.gz
  • Upload date:
  • Size: 77.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for taskito-0.2.2.tar.gz
Algorithm Hash digest
SHA256 1d4abe6d57900cac5c098d2d2e2dd252bb978c058fbbedc3f641cfeb8a4211c8
MD5 efd6efcee669b233653ad54e25b4f9d1
BLAKE2b-256 ec7c327b811d6b283bc5b2836029eda584411c43161833aa7d3fc494a12d4b86

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.2.tar.gz:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskito-0.2.2-cp312-cp312-win_amd64.whl.

File metadata

  • Download URL: taskito-0.2.2-cp312-cp312-win_amd64.whl
  • Upload date:
  • Size: 1.6 MB
  • Tags: CPython 3.12, Windows x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for taskito-0.2.2-cp312-cp312-win_amd64.whl
Algorithm Hash digest
SHA256 64a6043a383868432e29824bece382288870e9fcd5b08d142030e6bc95a390ba
MD5 cdfe29e69125df7c434a947a693a589b
BLAKE2b-256 7e5a5aae71ecc9367cbca014bd0081de8c16608744b7bc93edce417a20285631

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.2-cp312-cp312-win_amd64.whl:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskito-0.2.2-cp312-cp312-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for taskito-0.2.2-cp312-cp312-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 8ab5b4711badc44041590b30c1bd90a301d3a998683be60e4168c311445b60c7
MD5 48207bccc20fd16df855c720d95686a9
BLAKE2b-256 c61f99b8a4a21b02054bc7dfd5c9cae45b5f29d73fc982bdd2348b8d41475b53

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.2-cp312-cp312-macosx_11_0_arm64.whl:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskito-0.2.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for taskito-0.2.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 05c3728ecad388b118dd26d77852b64b5bbbaea0cd63a0b27a065b8f7a2cf097
MD5 8ad36470adde7a88b4eaa21f8bc358b1
BLAKE2b-256 c77f9eefb59d4587c6ea597573539d25acbd485dbecfead5a995d233887dca8f

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page