Skip to main content

A fast, production-ready task queue for Python

Project description

tarsq

The background job runtime for Python. Tasks run in their own subprocess. Timeouts are hard kills. Crashed workers restart themselves.

Features

  • @task decorator to register handlers — sync and async
  • @schedule decorator for cron-based recurring tasks
  • dispatch() to enqueue jobs from anywhere in your app
  • status() to track job progress
  • Each task runs in its own subprocess — timeouts enforced with a hard process kill
  • Automatic retries with exponential backoff
  • Per-worker context via on_startup
  • Crashed worker auto-restart
  • Stuck job recovery on startup

Requirements

  • Python 3.11+
  • Redis

Installation

pip install tarsq

Quick start

1. Define your tasks (myapp/tasks.py):

from tarsq import task

@task("send_email", timeout=10, max_retries=2)
def send_email(ctx, payload):
    print(f"Sending email to {payload['to']}")

@task("resize_image")
async def resize_image(ctx, payload):
    print(f"Resizing image at {payload['url']}")

2. Configure the worker (myapp/worker_settings.py):

from tarsq import WorkerSettings

class MyWorkerSettings(WorkerSettings):
    app = "myapp.tasks"
    workers = 4

3. Start the worker:

tarsq --settings myapp.worker_settings.MyWorkerSettings

Or without a settings class:

tarsq --app myapp.tasks --workers 4

4. Dispatch jobs from anywhere in your application:

from tarsq import dispatch, status

job_id = dispatch("send_email", payload={"to": "user@example.com"})

job = status(job_id)
print(job.status)  # queued | in_progress | completed | failed

The @task decorator

@task("send_email", timeout=30, max_retries=3)
def send_email(ctx, payload):
    ...
Parameter Type Default Description
name str Unique task name. Must match what is passed to dispatch()
timeout int 30 Max seconds before the task process is killed. Must be > 0
max_retries int 3 Retry attempts on failure. Must be >= 0

Both sync and async functions are supported. Each task receives:

  • ctx — dict populated by on_startup, containing shared resources
  • payload — the dict passed to dispatch()

Scheduled tasks

Use @schedule to register tasks that run automatically on a cron schedule:

from tarsq import schedule

@schedule("daily_report", cron="every day at 9am")
def daily_report(ctx, payload):
    generate_report()

@schedule("cleanup", cron="0 2 * * *")
def cleanup(ctx, payload):
    purge_old_records()

Built-in presets:

Preset Cron expression
"every minute" * * * * *
"every 5 minutes" */5 * * * *
"every hour" 0 * * * *
"every day at midnight" 0 0 * * *
"every day at 9am" 0 9 * * *
"every monday" 0 0 * * 1

Standard 5-field cron expressions are also accepted.

WorkerSettings

Attribute Type Default Description
app str None Dotted module path to import so @task decorators register
workers int 5 Number of concurrent worker processes
on_startup callable None Sync or async function called inside each worker process on spawn. Receives ctx as argument
on_shutdown callable None Sync or async function called once in the main process after all workers exit

Inheritance from tarsq's WorkerSettings is optional — only define the attributes you need.

Context & on_startup

on_startup runs inside each worker process after it spawns. Use it to initialise per-process resources and store them in ctx. Every task handler receives this ctx.

from tarsq import WorkerSettings

def startup(ctx):
    ctx["db"] = SessionLocal  # store the factory, not a live session

class MyWorkerSettings(WorkerSettings):
    app = "myapp.tasks"
    on_startup = startup
@task("create_user")
def create_user(ctx, payload):
    db = ctx["db"]()  # create a session inside the task
    ...
    db.close()

Note: ctx is pickled when passed to the task subprocess. Do not store live resources (db sessions, open connections) directly in ctx — store factories or classes instead. tarsq will warn you at startup if it detects unpicklable values.

Job status

from tarsq import status

job = status(job_id)

job.job_id      # UUID string
job.task        # task name
job.status      # "queued" | "in_progress" | "completed" | "failed"
job.retries     # number of retry attempts so far
job.created_at  # ISO 8601 timestamp
job.updated_at  # ISO 8601 timestamp of last status change

Returns None if no job with the given ID exists.

Environment variables

Variable Default Description
REDIS_HOST localhost Redis host
REDIS_PORT 6379 Redis port
REDIS_PASSWORD None Redis password

Variables can be set in a .env file in your project root.

CLI reference

tarsq [--settings <path>] [--app <module>] [--workers <n>]
Option Description
--settings Dotted path to a WorkerSettings class
--app Dotted module path containing @task/@schedule handlers
--workers Number of worker processes (overrides WorkerSettings)

CLI args take priority over WorkerSettings.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tarsq-0.2.0.tar.gz (112.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tarsq-0.2.0-py3-none-any.whl (124.1 kB view details)

Uploaded Python 3

File details

Details for the file tarsq-0.2.0.tar.gz.

File metadata

  • Download URL: tarsq-0.2.0.tar.gz
  • Upload date:
  • Size: 112.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.5.0

File hashes

Hashes for tarsq-0.2.0.tar.gz
Algorithm Hash digest
SHA256 f1744bfd35824ff61a01fa34fe7a5eca4c22b7c69c0738de2ae7106d3258dc5c
MD5 2c8b2a235e098f2e1077b2f361d5e6d7
BLAKE2b-256 4b9627dda3c3edc7dc95ccc501f01056f90132ccc0ec79f4dcf07eaf948e4fd2

See more details on using hashes here.

File details

Details for the file tarsq-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: tarsq-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 124.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.4 CPython/3.12.13 Darwin/25.5.0

File hashes

Hashes for tarsq-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b46db69262a614edba805e22ff608e8f37d25a7e8607a7b543ef5beb16d1e46b
MD5 5c9f3ec0dc8597dba8db04b46d31d694
BLAKE2b-256 749e245e7eaadcf6db3b251d8e07e65ec418f2c88879a71888a13b0cd05678e6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page