Skip to main content

Lightweight production-ready background task worker with cron, rate limiting and JSON observability

Project description

PyBgWorker

A lightweight, production-ready background task library for Python.

PyBgWorker provides a durable SQLite-backed task queue, scheduling (cron and countdown/ETA), rate limiting, retries, and structured observability without external infrastructure.

It is designed to be simple, reliable, and easy to deploy.


Features

  • Persistent SQLite task queue
  • Multi-worker safe execution
  • Task scheduling: cron + countdown/ETA
  • Retry + failure handling
  • Task cancellation support
  • Crash isolation via subprocess
  • Task priority execution
  • Task status tracking
  • Result storage and retrieval
  • Worker statistics and monitoring
  • JSON structured logging
  • Task duration tracking
  • Rate limiting (overload protection)
  • Heartbeat monitoring
  • Configurable single-worker concurrency
  • CLI tools: inspect, retry, failed, purge, cancel, stats
  • Production-safe worker loop

Installation

pip install pybgworker

Basic Usage

Define a task

from pybgworker.task import task

@task(name="add")
def add(a, b):
    return a + b

Enqueue a task

add.delay(1, 2)

Run worker

python -m pybgworker.cli run --app example

Worker Concurrency

Run multiple tasks in parallel within a single worker process:

PYBGWORKER_CONCURRENCY=4 python -m pybgworker.cli run --app example

Or with a CLI flag:

python -m pybgworker.cli run --app example --concurrency 4

Defaults to 1 for backward-compatible behavior.


Cron Scheduler

Run recurring tasks:

from pybgworker.scheduler import cron
from pybgworker.task import task

@task(name="heartbeat_task")
@cron("*/1 * * * *")
def heartbeat():
    print("alive")

Cron runs automatically inside the worker.


JSON Logging

All worker events are structured JSON:

{"event":"task_start","task_id":"..."}
{"event":"task_success","duration":0.12}

This enables:

  • monitoring
  • analytics
  • alerting
  • observability pipelines

Rate Limiting

Protect infrastructure from overload:

RATE_LIMIT = 5  # tasks per second

Ensures predictable execution under heavy load.


Retry Backoff + Jitter

Enable exponential backoff and jitter to spread retry load:

from pybgworker.task import task

@task(
    name="api_call",
    retries=5,
    retry_delay=2,
    retry_backoff=True,
    retry_backoff_factor=2,
    retry_max_delay=60,
    retry_jitter=0.2,
)
def api_call():
    ...
  • retry_backoff: enable exponential backoff
  • retry_backoff_factor: multiplier per attempt (default 2)
  • retry_max_delay: cap delay in seconds
  • retry_jitter: randomize delay (ratio <=1 or seconds if >1)

CLI Commands

Inspect queue:

python -m pybgworker.cli inspect

Retry failed task:

python -m pybgworker.cli retry <task_id>

Cancel task:

python -m pybgworker.cli cancel <task_id>

Purge queued tasks:

python -m pybgworker.cli purge

Observability

PyBgWorker logs:

  • worker start
  • cron events
  • task start
  • success
  • retry
  • failure
  • timeout
  • crash
  • heartbeat errors

All machine-readable.


Database Cleanup

Enable automatic retention cleanup for completed tasks:

python -m pybgworker.cli run --app example --retention-days 30

Environment variable alternative:

PYBGWORKER_RETENTION_DAYS=30 python -m pybgworker.cli run --app example

Optional cleanup interval (hours, default 24):

python -m pybgworker.cli run --app example --cleanup-interval-hours 12

When enabled, PyBgWorker prunes finished tasks older than the retention window and runs a VACUUM after deletions.


Failed vs Dead

  • failed: a task failed but may still be retried (or was manually marked failed).
  • dead: a task exhausted all retries and was moved to a terminal state for inspection.

Use pybgworker failed to see both failed + dead, or pybgworker dead for dead-only.


Design Goals

  • zero external dependencies
  • SQLite durability
  • safe multiprocessing
  • operator-friendly CLI
  • production observability
  • infrastructure protection

Roadmap

Planned but not yet included:

  • Task/result TTL cleanup (beyond retention of finished tasks)
  • Multiple named queues + routing
  • Pluggable backends (Redis first)
  • Cluster coordination / leader election for scheduler
  • Metrics endpoint and health checks
  • Dashboard API + web UI
  • Workflow pipelines / DAGs

Feedback and Issues

For feedback, enhancement requests, or error reports, please use this form:

Submit feedback / report an issue

Copy/paste link:

https://forms.gle/bUFRximzAGN6bCBQA

License

MIT License

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pybgworker-0.3.0.tar.gz (18.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pybgworker-0.3.0-py3-none-any.whl (19.1 kB view details)

Uploaded Python 3

File details

Details for the file pybgworker-0.3.0.tar.gz.

File metadata

  • Download URL: pybgworker-0.3.0.tar.gz
  • Upload date:
  • Size: 18.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pybgworker-0.3.0.tar.gz
Algorithm Hash digest
SHA256 67e76e137c69ab746fd86b5270263d1988d5d2469998f5f2f964a11c773b6e5d
MD5 90ca02513d713a4689073b6d9f36dc9f
BLAKE2b-256 aac7fbe5e219de21e93666939d40a9ba0d4bd7f76c6b88c01327a02e4c470359

See more details on using hashes here.

File details

Details for the file pybgworker-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pybgworker-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 19.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.12

File hashes

Hashes for pybgworker-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 eedd4720c9831e2c53d30f864bdf27d7546dc6ec6f712243e11a9817233803d2
MD5 93a3a5931f17df16da6e76a4d42c712c
BLAKE2b-256 3562a8f15fdea21545fd11ab18d1344f0835227b8d0d3e0065dcee0c2c3063cb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page