Skip to main content

Rust-powered task queue for Python. No broker required.

Project description

taskito

PyPI version Python versions License

A Rust-powered task queue for Python. No broker required — just SQLite or Postgres.

pip install taskito                # SQLite (default)
pip install taskito[postgres]      # with Postgres backend

Quickstart

1. Define tasks in tasks.py:

from taskito import Queue

queue = Queue(db_path="tasks.db")

@queue.task()
def add(a: int, b: int) -> int:
    return a + b

2. Start a worker in one terminal:

taskito worker --app tasks:queue

3. Enqueue jobs from another terminal or script:

from tasks import add

job = add.delay(2, 3)
print(job.result(timeout=10))  # 5

Why taskito?

Most Python task queues require a separate broker (Redis, RabbitMQ) even for single-machine workloads. taskito embeds everything — storage, scheduling, and worker management — into a single pip install with no external dependencies beyond Python itself. For distributed setups, an optional Postgres backend enables multi-machine workers with the same API.

The heavy lifting runs in Rust: a Tokio async scheduler, OS thread worker pool with crossbeam channels, and Diesel ORM over SQLite in WAL mode. Python's GIL is only held during task execution.

Features

  • Priority queues — higher priority jobs run first
  • Retry with exponential backoff — automatic retries with jitter
  • Dead letter queue — inspect and replay failed jobs
  • Rate limiting — token bucket with "100/m" syntax
  • Task dependenciesdepends_on for DAG workflows with cascade cancel
  • Task workflowschain, group, chord primitives
  • Periodic tasks — cron scheduling with seconds granularity
  • Progress tracking — report and read progress from inside tasks
  • Job cancellation — cancel pending or running jobs
  • Unique tasks — deduplicate active jobs by key
  • Batch enqueuetask.map() for high-throughput bulk inserts
  • Named queues — route tasks to isolated queues
  • Hooks — before/after/success/failure middleware
  • Per-task middlewareTaskMiddleware with before/after/on_retry hooks
  • Pluggable serializersCloudpickleSerializer (default), JsonSerializer, or custom
  • Cancel running tasks — cooperative cancellation with check_cancelled()
  • Soft timeoutscheck_timeout() inside tasks
  • Worker heartbeat — monitor worker health via queue.workers()
  • Job expirationexpires parameter for time-sensitive jobs
  • Exception filteringretry_on / dont_retry_on for selective retries
  • OpenTelemetry — optional tracing integration via pip install taskito[otel]
  • Async supportawait job.aresult(), await queue.astats()
  • Web dashboardtaskito dashboard --app myapp:queue serves a built-in monitoring UI
  • FastAPI integrationTaskitoRouter for instant REST API over the queue
  • Postgres backend — optional multi-machine storage via PostgreSQL
  • CLItaskito worker, taskito info --watch, taskito dashboard

Examples

Retry with Backoff

@queue.task(max_retries=5, retry_backoff=2.0)
def fetch_url(url: str) -> str:
    return requests.get(url).text

Priority Queues

urgent_report.apply_async(args=[data], priority=10)
bulk_report.delay(data)  # default priority 0

Rate Limiting

@queue.task(rate_limit="100/m")
def call_api(endpoint: str) -> dict:
    return requests.get(endpoint).json()

Task Dependencies

download = fetch_file.delay("data.csv")
parsed = parse_file.apply_async(
    args=["data.csv"],
    depends_on=[download.id],
)
# parsed waits until download completes; if download fails, parsed is cancelled

Workflows

from taskito import chain, group, chord

# Sequential pipeline — each step receives the previous result
chain(fetch.s(url), parse.s(), store.s()).apply()

# Parallel fan-out
group(process.s(chunk) for chunk in chunks).apply()

# Parallel + callback when all complete
chord([download.s(u) for u in urls], merge.s()).apply()

Periodic Tasks

@queue.periodic(cron="0 0 */6 * * *")
def cleanup_temp_files():
    ...

Progress Tracking

from taskito import current_job

@queue.task()
def train_model(epochs: int):
    for i in range(epochs):
        ...
        current_job.update_progress(int((i + 1) / epochs * 100))

Hooks

@queue.on_failure
def alert_on_failure(task_name, args, kwargs, error):
    slack.post(f"Task {task_name} failed: {error}")

Exception Filtering

@queue.task(
    max_retries=5,
    retry_on=[ConnectionError, TimeoutError],
    dont_retry_on=[ValueError],
)
def fetch_data(url: str) -> dict:
    return requests.get(url).json()

Per-Task Middleware

from taskito import TaskMiddleware

class TimingMiddleware(TaskMiddleware):
    def before(self, ctx):
        ctx._start = time.time()

    def after(self, ctx, result, error):
        elapsed = time.time() - ctx._start
        print(f"{ctx.task_name} took {elapsed:.2f}s")

@queue.task(middleware=[TimingMiddleware()])
def process(data):
    ...

Delayed Scheduling

# Run 30 minutes from now
reminder.apply_async(args=[user_id, msg], delay=1800)

Unique Tasks

report.apply_async(args=[user_id], unique_key=f"report:{user_id}")
# Second enqueue with same key is silently deduplicated while first is active

FastAPI Integration

from fastapi import FastAPI
from taskito.contrib.fastapi import TaskitoRouter

app = FastAPI()
app.include_router(TaskitoRouter(queue), prefix="/tasks")
# GET /tasks/stats, GET /tasks/jobs/{id}, GET /tasks/jobs/{id}/progress (SSE), ...

Batch Enqueue

jobs = send_email.map([("alice@x.com",), ("bob@x.com",), ("carol@x.com",)])

Async Support

job = expensive_task.delay(data)
result = await job.aresult(timeout=30)
stats = await queue.astats()

Testing

taskito includes a built-in test mode — no worker needed:

def test_add():
    with queue.test_mode() as results:
        add.delay(2, 3)
        assert results[0].return_value == 5

Documentation

Full documentation with guides, API reference, architecture diagrams, and examples:

Read the docs →

Coming from Celery? See the Migration Guide.

Comparison

Feature taskito Celery RQ Dramatiq Huey
Broker required No Yes Yes Yes Yes
Core language Rust + Python Python Python Python Python
Priority queues Yes Yes No No Yes
Rate limiting Yes Yes No Yes No
Dead letter queue Yes No Yes No No
Task dependencies Yes No No No No
Task chaining Yes Yes No Yes No
Built-in dashboard Yes No No No No
FastAPI integration Yes No No No No
Per-task middleware Yes No No Yes No
Cancel running tasks Yes Yes No No No
Custom serializers Yes Yes No No No
Postgres backend Yes Yes No No No
Setup pip install Broker + backend Redis Broker Redis

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskito-0.2.3.tar.gz (93.9 kB view details)

Uploaded Source

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

taskito-0.2.3-cp312-cp312-win_amd64.whl (1.6 MB view details)

Uploaded CPython 3.12Windows x86-64

taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl (1.8 MB view details)

Uploaded CPython 3.12macOS 11.0+ ARM64

taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.0 MB view details)

Uploaded CPython 3.8manylinux: glibc 2.17+ x86-64

File details

Details for the file taskito-0.2.3.tar.gz.

File metadata

  • Download URL: taskito-0.2.3.tar.gz
  • Upload date:
  • Size: 93.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for taskito-0.2.3.tar.gz
Algorithm Hash digest
SHA256 9c85c652790ffb770c91ec42f943b5a044a5fc7afd7a2c555956c01f46eff42f
MD5 68ec9b48dbdfc6adc9d55bf2143579a1
BLAKE2b-256 959112696f2b76a28194e097e2fb0f64865812145cf86488ae6c62f59243a8ab

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.3.tar.gz:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskito-0.2.3-cp312-cp312-win_amd64.whl.

File metadata

  • Download URL: taskito-0.2.3-cp312-cp312-win_amd64.whl
  • Upload date:
  • Size: 1.6 MB
  • Tags: CPython 3.12, Windows x86-64
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for taskito-0.2.3-cp312-cp312-win_amd64.whl
Algorithm Hash digest
SHA256 56749a64c2e483af4b1dcb30d472bcb9cfd2ba5f8ab08e56571cbaa9b9a3dc72
MD5 72f7f9f51bdeba3c65919f8fd81f7e92
BLAKE2b-256 b1742ef83f098f48784e275fd4986b093dec2c583f42e84d7476ce00067ea60d

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.3-cp312-cp312-win_amd64.whl:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 d5d5478fcd1aaef8cabb99caaf3cf36c52ed7a8fde1b759a502118dc680601be
MD5 dbcf4b8c2ef17cae41765ccb277851b9
BLAKE2b-256 fdc1844ed969db32228604322abd6438f9261b436f67263ad79461332677359c

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 70bedf72bde9fea95ac9f9c21be3bed271808abd6ccc020b69b770aa83a12cbf
MD5 ed966497e76b55451125221ea8e91b80
BLAKE2b-256 726f27b1c8beb38e547846fa201188db72edea7b4f5e4a3d8fde410c2866a9b1

See more details on using hashes here.

Provenance

The following attestation bundles were made for taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:

Publisher: publish.yml on pratyush618/taskito

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page