Rust-powered task queue for Python. No broker required.
Project description
taskito
A Rust-powered task queue for Python. No broker required — just SQLite or Postgres.
pip install taskito # SQLite (default)
pip install taskito[postgres] # with Postgres backend
Quickstart
1. Define tasks in tasks.py:
from taskito import Queue
queue = Queue(db_path="tasks.db")
@queue.task()
def add(a: int, b: int) -> int:
return a + b
2. Start a worker in one terminal:
taskito worker --app tasks:queue
3. Enqueue jobs from another terminal or script:
from tasks import add
job = add.delay(2, 3)
print(job.result(timeout=10)) # 5
Why taskito?
Most Python task queues require a separate broker (Redis, RabbitMQ) even for single-machine workloads. taskito embeds everything — storage, scheduling, and worker management — into a single pip install with no external dependencies beyond Python itself. For distributed setups, an optional Postgres backend enables multi-machine workers with the same API.
The heavy lifting runs in Rust: a Tokio async scheduler, OS thread worker pool with crossbeam channels, and Diesel ORM over SQLite in WAL mode. Python's GIL is only held during task execution.
Features
- Priority queues — higher priority jobs run first
- Retry with exponential backoff — automatic retries with jitter
- Dead letter queue — inspect and replay failed jobs
- Rate limiting — token bucket with
"100/m"syntax - Task dependencies —
depends_onfor DAG workflows with cascade cancel - Task workflows —
chain,group,chordprimitives - Periodic tasks — cron scheduling with seconds granularity
- Progress tracking — report and read progress from inside tasks
- Job cancellation — cancel pending or running jobs
- Unique tasks — deduplicate active jobs by key
- Batch enqueue —
task.map()for high-throughput bulk inserts - Named queues — route tasks to isolated queues
- Hooks — before/after/success/failure middleware
- Per-task middleware —
TaskMiddlewarewithbefore/after/on_retryhooks - Pluggable serializers —
CloudpickleSerializer(default),JsonSerializer, or custom - Cancel running tasks — cooperative cancellation with
check_cancelled() - Soft timeouts —
check_timeout()inside tasks - Worker heartbeat — monitor worker health via
queue.workers() - Job expiration —
expiresparameter for time-sensitive jobs - Exception filtering —
retry_on/dont_retry_onfor selective retries - OpenTelemetry — optional tracing integration via
pip install taskito[otel] - Async support —
await job.aresult(),await queue.astats() - Web dashboard —
taskito dashboard --app myapp:queueserves a built-in monitoring UI - FastAPI integration —
TaskitoRouterfor instant REST API over the queue - Postgres backend — optional multi-machine storage via PostgreSQL
- CLI —
taskito worker,taskito info --watch,taskito dashboard
Examples
Retry with Backoff
@queue.task(max_retries=5, retry_backoff=2.0)
def fetch_url(url: str) -> str:
return requests.get(url).text
Priority Queues
urgent_report.apply_async(args=[data], priority=10)
bulk_report.delay(data) # default priority 0
Rate Limiting
@queue.task(rate_limit="100/m")
def call_api(endpoint: str) -> dict:
return requests.get(endpoint).json()
Task Dependencies
download = fetch_file.delay("data.csv")
parsed = parse_file.apply_async(
args=["data.csv"],
depends_on=[download.id],
)
# parsed waits until download completes; if download fails, parsed is cancelled
Workflows
from taskito import chain, group, chord
# Sequential pipeline — each step receives the previous result
chain(fetch.s(url), parse.s(), store.s()).apply()
# Parallel fan-out
group(process.s(chunk) for chunk in chunks).apply()
# Parallel + callback when all complete
chord([download.s(u) for u in urls], merge.s()).apply()
Periodic Tasks
@queue.periodic(cron="0 0 */6 * * *")
def cleanup_temp_files():
...
Progress Tracking
from taskito import current_job
@queue.task()
def train_model(epochs: int):
for i in range(epochs):
...
current_job.update_progress(int((i + 1) / epochs * 100))
Hooks
@queue.on_failure
def alert_on_failure(task_name, args, kwargs, error):
slack.post(f"Task {task_name} failed: {error}")
Exception Filtering
@queue.task(
max_retries=5,
retry_on=[ConnectionError, TimeoutError],
dont_retry_on=[ValueError],
)
def fetch_data(url: str) -> dict:
return requests.get(url).json()
Per-Task Middleware
from taskito import TaskMiddleware
class TimingMiddleware(TaskMiddleware):
def before(self, ctx):
ctx._start = time.time()
def after(self, ctx, result, error):
elapsed = time.time() - ctx._start
print(f"{ctx.task_name} took {elapsed:.2f}s")
@queue.task(middleware=[TimingMiddleware()])
def process(data):
...
Delayed Scheduling
# Run 30 minutes from now
reminder.apply_async(args=[user_id, msg], delay=1800)
Unique Tasks
report.apply_async(args=[user_id], unique_key=f"report:{user_id}")
# Second enqueue with same key is silently deduplicated while first is active
FastAPI Integration
from fastapi import FastAPI
from taskito.contrib.fastapi import TaskitoRouter
app = FastAPI()
app.include_router(TaskitoRouter(queue), prefix="/tasks")
# GET /tasks/stats, GET /tasks/jobs/{id}, GET /tasks/jobs/{id}/progress (SSE), ...
Batch Enqueue
jobs = send_email.map([("alice@x.com",), ("bob@x.com",), ("carol@x.com",)])
Async Support
job = expensive_task.delay(data)
result = await job.aresult(timeout=30)
stats = await queue.astats()
Testing
taskito includes a built-in test mode — no worker needed:
def test_add():
with queue.test_mode() as results:
add.delay(2, 3)
assert results[0].return_value == 5
Documentation
Full documentation with guides, API reference, architecture diagrams, and examples:
Coming from Celery? See the Migration Guide.
Comparison
| Feature | taskito | Celery | RQ | Dramatiq | Huey |
|---|---|---|---|---|---|
| Broker required | No | Yes | Yes | Yes | Yes |
| Core language | Rust + Python | Python | Python | Python | Python |
| Priority queues | Yes | Yes | No | No | Yes |
| Rate limiting | Yes | Yes | No | Yes | No |
| Dead letter queue | Yes | No | Yes | No | No |
| Task dependencies | Yes | No | No | No | No |
| Task chaining | Yes | Yes | No | Yes | No |
| Built-in dashboard | Yes | No | No | No | No |
| FastAPI integration | Yes | No | No | No | No |
| Per-task middleware | Yes | No | No | Yes | No |
| Cancel running tasks | Yes | Yes | No | No | No |
| Custom serializers | Yes | Yes | No | No | No |
| Postgres backend | Yes | Yes | No | No | No |
| Setup | pip install |
Broker + backend | Redis | Broker | Redis |
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file taskito-0.2.3.tar.gz.
File metadata
- Download URL: taskito-0.2.3.tar.gz
- Upload date:
- Size: 93.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c85c652790ffb770c91ec42f943b5a044a5fc7afd7a2c555956c01f46eff42f
|
|
| MD5 |
68ec9b48dbdfc6adc9d55bf2143579a1
|
|
| BLAKE2b-256 |
959112696f2b76a28194e097e2fb0f64865812145cf86488ae6c62f59243a8ab
|
Provenance
The following attestation bundles were made for taskito-0.2.3.tar.gz:
Publisher:
publish.yml on pratyush618/taskito
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskito-0.2.3.tar.gz -
Subject digest:
9c85c652790ffb770c91ec42f943b5a044a5fc7afd7a2c555956c01f46eff42f - Sigstore transparency entry: 1057014552
- Sigstore integration time:
-
Permalink:
pratyush618/taskito@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Branch / Tag:
refs/tags/0.2.3 - Owner: https://github.com/pratyush618
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Trigger Event:
release
-
Statement type:
File details
Details for the file taskito-0.2.3-cp312-cp312-win_amd64.whl.
File metadata
- Download URL: taskito-0.2.3-cp312-cp312-win_amd64.whl
- Upload date:
- Size: 1.6 MB
- Tags: CPython 3.12, Windows x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
56749a64c2e483af4b1dcb30d472bcb9cfd2ba5f8ab08e56571cbaa9b9a3dc72
|
|
| MD5 |
72f7f9f51bdeba3c65919f8fd81f7e92
|
|
| BLAKE2b-256 |
b1742ef83f098f48784e275fd4986b093dec2c583f42e84d7476ce00067ea60d
|
Provenance
The following attestation bundles were made for taskito-0.2.3-cp312-cp312-win_amd64.whl:
Publisher:
publish.yml on pratyush618/taskito
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskito-0.2.3-cp312-cp312-win_amd64.whl -
Subject digest:
56749a64c2e483af4b1dcb30d472bcb9cfd2ba5f8ab08e56571cbaa9b9a3dc72 - Sigstore transparency entry: 1057014559
- Sigstore integration time:
-
Permalink:
pratyush618/taskito@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Branch / Tag:
refs/tags/0.2.3 - Owner: https://github.com/pratyush618
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Trigger Event:
release
-
Statement type:
File details
Details for the file taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl.
File metadata
- Download URL: taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl
- Upload date:
- Size: 1.8 MB
- Tags: CPython 3.12, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d5d5478fcd1aaef8cabb99caaf3cf36c52ed7a8fde1b759a502118dc680601be
|
|
| MD5 |
dbcf4b8c2ef17cae41765ccb277851b9
|
|
| BLAKE2b-256 |
fdc1844ed969db32228604322abd6438f9261b436f67263ad79461332677359c
|
Provenance
The following attestation bundles were made for taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl:
Publisher:
publish.yml on pratyush618/taskito
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskito-0.2.3-cp312-cp312-macosx_11_0_arm64.whl -
Subject digest:
d5d5478fcd1aaef8cabb99caaf3cf36c52ed7a8fde1b759a502118dc680601be - Sigstore transparency entry: 1057014564
- Sigstore integration time:
-
Permalink:
pratyush618/taskito@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Branch / Tag:
refs/tags/0.2.3 - Owner: https://github.com/pratyush618
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Trigger Event:
release
-
Statement type:
File details
Details for the file taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 2.0 MB
- Tags: CPython 3.8, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
70bedf72bde9fea95ac9f9c21be3bed271808abd6ccc020b69b770aa83a12cbf
|
|
| MD5 |
ed966497e76b55451125221ea8e91b80
|
|
| BLAKE2b-256 |
726f27b1c8beb38e547846fa201188db72edea7b4f5e4a3d8fde410c2866a9b1
|
Provenance
The following attestation bundles were made for taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl:
Publisher:
publish.yml on pratyush618/taskito
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
taskito-0.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl -
Subject digest:
70bedf72bde9fea95ac9f9c21be3bed271808abd6ccc020b69b770aa83a12cbf - Sigstore transparency entry: 1057014556
- Sigstore integration time:
-
Permalink:
pratyush618/taskito@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Branch / Tag:
refs/tags/0.2.3 - Owner: https://github.com/pratyush618
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@bb84aa84292e67cdf6a8dc00f7cb1c18c1712881 -
Trigger Event:
release
-
Statement type: