Skip to main content

A fast, production-ready task queue for Python

Project description

tarsq

The background job runtime for Python. Tasks run in their own subprocess. Timeouts are hard kills. Crashed workers restart themselves.

Features

  • @task decorator to register handlers — sync and async
  • @schedule decorator for cron-based recurring tasks
  • dispatch() to enqueue jobs from anywhere in your app
  • status() to track job progress
  • Each task runs in its own subprocess — timeouts enforced with a hard process kill
  • Automatic retries with exponential backoff
  • Per-worker context via on_startup
  • Crashed worker auto-restart
  • Stuck job recovery on startup

Requirements

  • Python 3.11+
  • Redis

Installation

pip install tarsq

Quick start

1. Define your tasks (myapp/tasks.py):

from tarsq import task

@task("send_email", timeout=10, max_retries=2)
def send_email(ctx, payload):
    print(f"Sending email to {payload['to']}")

@task("resize_image")
async def resize_image(ctx, payload):
    print(f"Resizing image at {payload['url']}")

2. Configure the worker (myapp/worker_settings.py):

from tarsq import WorkerSettings

class MyWorkerSettings(WorkerSettings):
    app = "myapp.tasks"
    workers = 4

3. Start the worker:

tarsq --settings myapp.worker_settings.MyWorkerSettings

Or without a settings class:

tarsq --app myapp.tasks --workers 4

4. Dispatch jobs from anywhere in your application:

from tarsq import dispatch, status

job_id = dispatch("send_email", payload={"to": "user@example.com"})

job = status(job_id)
print(job.status)  # queued | in_progress | completed | failed

The @task decorator

@task("send_email", timeout=30, max_retries=3)
def send_email(ctx, payload):
    ...
Parameter Type Default Description
name str Unique task name. Must match what is passed to dispatch()
timeout int 30 Max seconds before the task process is killed. Must be > 0
max_retries int 3 Retry attempts on failure. Must be >= 0

Both sync and async functions are supported. Each task receives:

  • ctx — dict populated by on_startup, containing shared resources
  • payload — the dict passed to dispatch()

Scheduled tasks

Use @schedule to register tasks that run automatically on a cron schedule:

from tarsq import schedule

@schedule("daily_report", cron="every day at 9am")
def daily_report(ctx, payload):
    generate_report()

@schedule("cleanup", cron="0 2 * * *")
def cleanup(ctx, payload):
    purge_old_records()

Built-in presets:

Preset Cron expression
"every minute" * * * * *
"every 5 minutes" */5 * * * *
"every hour" 0 * * * *
"every day at midnight" 0 0 * * *
"every day at 9am" 0 9 * * *
"every monday" 0 0 * * 1

Standard 5-field cron expressions are also accepted.

WorkerSettings

Attribute Type Default Description
app str None Dotted module path to import so @task decorators register
workers int 5 Number of concurrent worker processes
on_startup callable None Sync or async function called inside each worker process on spawn. Receives ctx as argument
on_shutdown callable None Sync or async function called once in the main process after all workers exit

Inheritance from tarsq's WorkerSettings is optional — only define the attributes you need.

Context & on_startup

on_startup runs inside each worker process after it spawns. Use it to initialise per-process resources and store them in ctx. Every task handler receives this ctx.

from tarsq import WorkerSettings

def startup(ctx):
    ctx["db"] = SessionLocal  # store the factory, not a live session

class MyWorkerSettings(WorkerSettings):
    app = "myapp.tasks"
    on_startup = startup
@task("create_user")
def create_user(ctx, payload):
    db = ctx["db"]()  # create a session inside the task
    ...
    db.close()

Note: ctx is pickled when passed to the task subprocess. Do not store live resources (db sessions, open connections) directly in ctx — store factories or classes instead. tarsq will warn you at startup if it detects unpicklable values.

Job status

from tarsq import status

job = status(job_id)

job.job_id      # UUID string
job.task        # task name
job.status      # "queued" | "in_progress" | "completed" | "failed"
job.retries     # number of retry attempts so far
job.created_at  # ISO 8601 timestamp
job.updated_at  # ISO 8601 timestamp of last status change

Returns None if no job with the given ID exists.

Environment variables

Variable Default Description
REDIS_HOST localhost Redis host
REDIS_PORT 6379 Redis port
REDIS_PASSWORD None Redis password

Variables can be set in a .env file in your project root.

CLI reference

tarsq [--settings <path>] [--app <module>] [--workers <n>]
Option Description
--settings Dotted path to a WorkerSettings class
--app Dotted module path containing @task/@schedule handlers
--workers Number of worker processes (overrides WorkerSettings)

CLI args take priority over WorkerSettings.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tarsq-0.1.17.tar.gz (11.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tarsq-0.1.17-py3-none-any.whl (14.3 kB view details)

Uploaded Python 3

File details

Details for the file tarsq-0.1.17.tar.gz.

File metadata

  • Download URL: tarsq-0.1.17.tar.gz
  • Upload date:
  • Size: 11.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.0.0

File hashes

Hashes for tarsq-0.1.17.tar.gz
Algorithm Hash digest
SHA256 b1b5bb1b56283021a3b6c25aa63084b24c5838296a42323c20b4f8e7da04e928
MD5 aef94682706b6e9b550f51eeb5d3c051
BLAKE2b-256 b2f1355d72ace2b588b0ba441def10e7d00f61206edf71a086ade39924eee2cf

See more details on using hashes here.

File details

Details for the file tarsq-0.1.17-py3-none-any.whl.

File metadata

  • Download URL: tarsq-0.1.17-py3-none-any.whl
  • Upload date:
  • Size: 14.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.0.0

File hashes

Hashes for tarsq-0.1.17-py3-none-any.whl
Algorithm Hash digest
SHA256 1dd2e3b6ae387483baa5c4c254d684e2685e6d28c9cd3448cfe4d718c48f13a9
MD5 b4c01905580d59113d94f5dbe02e7ee6
BLAKE2b-256 224e60c9f2cb32916906ffaf6d29088ca433cacb9e12a1ec5effa8635ac9f46d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page