A fast, production-ready task queue for Python
Project description
tarsq
The background job runtime for Python. Tasks run in their own subprocess. Timeouts are hard kills. Crashed workers restart themselves.
Features
@taskdecorator to register handlers — sync and async@scheduledecorator for cron-based recurring tasksdispatch()to enqueue jobs from anywhere in your appstatus()to track job progress- Each task runs in its own subprocess — timeouts enforced with a hard process kill
- Automatic retries with exponential backoff
- Per-worker context via
on_startup - Crashed worker auto-restart
- Stuck job recovery on startup
Requirements
- Python 3.11+
- Redis
Installation
pip install tarsq
Quick start
1. Define your tasks (myapp/tasks.py):
from tarsq import task
@task("send_email", timeout=10, max_retries=2)
def send_email(ctx, payload):
print(f"Sending email to {payload['to']}")
@task("resize_image")
async def resize_image(ctx, payload):
print(f"Resizing image at {payload['url']}")
2. Configure the worker (myapp/worker_settings.py):
from tarsq import WorkerSettings
class MyWorkerSettings(WorkerSettings):
app = "myapp.tasks"
workers = 4
3. Start the worker:
tarsq --settings myapp.worker_settings.MyWorkerSettings
Or without a settings class:
tarsq --app myapp.tasks --workers 4
4. Dispatch jobs from anywhere in your application:
from tarsq import dispatch, status
job_id = dispatch("send_email", payload={"to": "user@example.com"})
job = status(job_id)
print(job.status) # queued | in_progress | completed | failed
The @task decorator
@task("send_email", timeout=30, max_retries=3)
def send_email(ctx, payload):
...
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str |
— | Unique task name. Must match what is passed to dispatch() |
timeout |
int |
30 |
Max seconds before the task process is killed. Must be > 0 |
max_retries |
int |
3 |
Retry attempts on failure. Must be >= 0 |
Both sync and async functions are supported. Each task receives:
ctx— dict populated byon_startup, containing shared resourcespayload— the dict passed todispatch()
Scheduled tasks
Use @schedule to register tasks that run automatically on a cron schedule:
from tarsq import schedule
@schedule("daily_report", cron="every day at 9am")
def daily_report(ctx, payload):
generate_report()
@schedule("cleanup", cron="0 2 * * *")
def cleanup(ctx, payload):
purge_old_records()
Built-in presets:
| Preset | Cron expression |
|---|---|
"every minute" |
* * * * * |
"every 5 minutes" |
*/5 * * * * |
"every hour" |
0 * * * * |
"every day at midnight" |
0 0 * * * |
"every day at 9am" |
0 9 * * * |
"every monday" |
0 0 * * 1 |
Standard 5-field cron expressions are also accepted.
WorkerSettings
| Attribute | Type | Default | Description |
|---|---|---|---|
app |
str |
None |
Dotted module path to import so @task decorators register |
workers |
int |
5 |
Number of concurrent worker processes |
on_startup |
callable |
None |
Sync or async function called inside each worker process on spawn. Receives ctx as argument |
on_shutdown |
callable |
None |
Sync or async function called once in the main process after all workers exit |
Inheritance from tarsq's WorkerSettings is optional — only define the attributes you need.
Context & on_startup
on_startup runs inside each worker process after it spawns. Use it to initialise per-process resources and store them in ctx. Every task handler receives this ctx.
from tarsq import WorkerSettings
def startup(ctx):
ctx["db"] = SessionLocal # store the factory, not a live session
class MyWorkerSettings(WorkerSettings):
app = "myapp.tasks"
on_startup = startup
@task("create_user")
def create_user(ctx, payload):
db = ctx["db"]() # create a session inside the task
...
db.close()
Note:
ctxis pickled when passed to the task subprocess. Do not store live resources (db sessions, open connections) directly inctx— store factories or classes instead. tarsq will warn you at startup if it detects unpicklable values.
Job status
from tarsq import status
job = status(job_id)
job.job_id # UUID string
job.task # task name
job.status # "queued" | "in_progress" | "completed" | "failed"
job.retries # number of retry attempts so far
job.created_at # ISO 8601 timestamp
job.updated_at # ISO 8601 timestamp of last status change
Returns None if no job with the given ID exists.
Environment variables
| Variable | Default | Description |
|---|---|---|
REDIS_HOST |
localhost |
Redis host |
REDIS_PORT |
6379 |
Redis port |
REDIS_PASSWORD |
None |
Redis password |
Variables can be set in a .env file in your project root.
CLI reference
tarsq [--settings <path>] [--app <module>] [--workers <n>]
| Option | Description |
|---|---|
--settings |
Dotted path to a WorkerSettings class |
--app |
Dotted module path containing @task/@schedule handlers |
--workers |
Number of worker processes (overrides WorkerSettings) |
CLI args take priority over WorkerSettings.
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file tarsq-0.2.0.tar.gz.
File metadata
- Download URL: tarsq-0.2.0.tar.gz
- Upload date:
- Size: 112.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f1744bfd35824ff61a01fa34fe7a5eca4c22b7c69c0738de2ae7106d3258dc5c
|
|
| MD5 |
2c8b2a235e098f2e1077b2f361d5e6d7
|
|
| BLAKE2b-256 |
4b9627dda3c3edc7dc95ccc501f01056f90132ccc0ec79f4dcf07eaf948e4fd2
|
File details
Details for the file tarsq-0.2.0-py3-none-any.whl.
File metadata
- Download URL: tarsq-0.2.0-py3-none-any.whl
- Upload date:
- Size: 124.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.5.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b46db69262a614edba805e22ff608e8f37d25a7e8607a7b543ef5beb16d1e46b
|
|
| MD5 |
5c9f3ec0dc8597dba8db04b46d31d694
|
|
| BLAKE2b-256 |
749e245e7eaadcf6db3b251d8e07e65ec418f2c88879a71888a13b0cd05678e6
|