Lightweight production-ready background task worker with cron, rate limiting and JSON observability
Project description
PyBgWorker
A lightweight, production-ready background task library for Python.
PyBgWorker provides a durable SQLite-backed task queue, scheduling (cron and countdown/ETA), rate limiting, retries, and structured observability without external infrastructure.
It is designed to be simple, reliable, and easy to deploy.
Features
- Persistent SQLite task queue
- Multi-worker safe execution
- Task scheduling: cron + countdown/ETA
- Retry + failure handling
- Task cancellation support
- Crash isolation via subprocess
- Task priority execution
- Task status tracking
- Result storage and retrieval
- Worker statistics and monitoring
- JSON structured logging
- Task duration tracking
- Rate limiting (overload protection)
- Heartbeat monitoring
- Configurable single-worker concurrency
- CLI tools: inspect, retry, failed, purge, cancel, stats
- Production-safe worker loop
Installation
pip install pybgworker
Basic Usage
Define a task
from pybgworker.task import task
@task(name="add")
def add(a, b):
return a + b
Enqueue a task
add.delay(1, 2)
Run worker
python -m pybgworker.cli run --app example
Worker Concurrency
Run multiple tasks in parallel within a single worker process:
PYBGWORKER_CONCURRENCY=4 python -m pybgworker.cli run --app example
Or with a CLI flag:
python -m pybgworker.cli run --app example --concurrency 4
Defaults to 1 for backward-compatible behavior.
Cron Scheduler
Run recurring tasks:
from pybgworker.scheduler import cron
from pybgworker.task import task
@task(name="heartbeat_task")
@cron("*/1 * * * *")
def heartbeat():
print("alive")
Cron runs automatically inside the worker.
JSON Logging
All worker events are structured JSON:
{"event":"task_start","task_id":"..."}
{"event":"task_success","duration":0.12}
This enables:
- monitoring
- analytics
- alerting
- observability pipelines
Rate Limiting
Protect infrastructure from overload:
RATE_LIMIT = 5 # tasks per second
Ensures predictable execution under heavy load.
Retry Backoff + Jitter
Enable exponential backoff and jitter to spread retry load:
from pybgworker.task import task
@task(
name="api_call",
retries=5,
retry_delay=2,
retry_backoff=True,
retry_backoff_factor=2,
retry_max_delay=60,
retry_jitter=0.2,
)
def api_call():
...
retry_backoff: enable exponential backoffretry_backoff_factor: multiplier per attempt (default2)retry_max_delay: cap delay in secondsretry_jitter: randomize delay (ratio<=1or seconds if>1)
CLI Commands
Inspect queue:
python -m pybgworker.cli inspect
Retry failed task:
python -m pybgworker.cli retry <task_id>
Cancel task:
python -m pybgworker.cli cancel <task_id>
Purge queued tasks:
python -m pybgworker.cli purge
Observability
PyBgWorker logs:
- worker start
- cron events
- task start
- success
- retry
- failure
- timeout
- crash
- heartbeat errors
All machine-readable.
Database Cleanup
Enable automatic retention cleanup for completed tasks:
python -m pybgworker.cli run --app example --retention-days 30
Environment variable alternative:
PYBGWORKER_RETENTION_DAYS=30 python -m pybgworker.cli run --app example
Optional cleanup interval (hours, default 24):
python -m pybgworker.cli run --app example --cleanup-interval-hours 12
When enabled, PyBgWorker prunes finished tasks older than the retention window and runs a VACUUM after deletions.
Failed vs Dead
failed: a task failed but may still be retried (or was manually marked failed).dead: a task exhausted all retries and was moved to a terminal state for inspection.
Use pybgworker failed to see both failed + dead, or pybgworker dead for dead-only.
Design Goals
- zero external dependencies
- SQLite durability
- safe multiprocessing
- operator-friendly CLI
- production observability
- infrastructure protection
Roadmap
Planned but not yet included:
- Task/result TTL cleanup (beyond retention of finished tasks)
- Multiple named queues + routing
- Pluggable backends (Redis first)
- Cluster coordination / leader election for scheduler
- Metrics endpoint and health checks
- Dashboard API + web UI
- Workflow pipelines / DAGs
Feedback and Issues
For feedback, enhancement requests, or error reports, please use this form:
Submit feedback / report an issue
Copy/paste link:
https://forms.gle/bUFRximzAGN6bCBQA
License
MIT License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pybgworker-0.3.0.tar.gz.
File metadata
- Download URL: pybgworker-0.3.0.tar.gz
- Upload date:
- Size: 18.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
67e76e137c69ab746fd86b5270263d1988d5d2469998f5f2f964a11c773b6e5d
|
|
| MD5 |
90ca02513d713a4689073b6d9f36dc9f
|
|
| BLAKE2b-256 |
aac7fbe5e219de21e93666939d40a9ba0d4bd7f76c6b88c01327a02e4c470359
|
File details
Details for the file pybgworker-0.3.0-py3-none-any.whl.
File metadata
- Download URL: pybgworker-0.3.0-py3-none-any.whl
- Upload date:
- Size: 19.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eedd4720c9831e2c53d30f864bdf27d7546dc6ec6f712243e11a9817233803d2
|
|
| MD5 |
93a3a5931f17df16da6e76a4d42c712c
|
|
| BLAKE2b-256 |
3562a8f15fdea21545fd11ab18d1344f0835227b8d0d3e0065dcee0c2c3063cb
|