Task Runner plugin for Fastpluggy
Project description
FastPluggy Task Runner
A powerful and extensible task execution framework for Python, built on top of FastPluggy.
Easily register, run, monitor, and schedule background tasks with full support for retries, logging, live WebSocket updates, and notifications.
✨ Features
- 🔧 Task registration with metadata, retries, scheduling, and custom parameters
- 🧠 Dynamic form generation from metadata
- 📡 Live logs and WebSocket updates
- 📅 CRON-based scheduler with optional notification rules
- 🔁 Retry logic with auto-link to parent task
- 🔒 Non-concurrent task execution with lock tracking
- 🧩 Extensible subscribers system (Console, Slack, Webhook...)
- 📊 Admin UI to manage tasks, schedules, locks, and reports
- 💾 Persistent task context and rehydration
- 📈 Task metrics from process/thread info
🛠️ How It Works
@TaskWorker.register(
description="Sync data every 5 mins",
schedule="*/5 * * * *",
max_retries=3,
allow_concurrent=False
)
def sync_data_task():
print("Sync running...")
For detailed instructions on creating tasks and triggering them from JavaScript, see the Task Creation and JS Triggering Guide.
For information about Jinja template global variables available for task triggering, see the Jinja Template Globals documentation.
📋 Roadmap
✅ Completed / In Progress
- Task registration with metadata (
description,tags,max_retries,schedule,allow_concurrent) - Dynamic task form rendering via metadata
- Notification/subscribers system with:
- Console / webhook / Slack (optional)
- Selectable events:
task_started,task_failed,logs, etc.
- Context/report tracking in DB
- Task retry linking via
parent_task_id - CRON-based scheduler loop
- Web UI for:
- Task logs
- Task reports
- Scheduled tasks
- Locks
- Running task status
- Lock manager (
TaskLockManager) with DB tracking - Cancel button for live-running tasks
📌 Upcoming Features
🔁 Task Queue Enhancements
- Priority & rate-limit execution
- Per-user concurrency limits
- Task dependencies / DAG runner
🧠 Task Registry & Detection
- Auto-discovery of task definitions from modules
- Celery-style shared task detection
💾 Persistence & Rehydration
- Save function reference + args for replay/retry
- Task dependency tree and retry visualization
🌐 Remote Workers
- Register and manage remote workers
- Assign tasks based on tags/strategies
- Remote heartbeat & health monitoring
📈 Observability
- Task metrics via
psutil(CPU, memory, threads) - UI views for thread/process diagnostics
Standalone Worker
Run task workers as a standalone long-running process, independent of the FastAPI dev server:
# Worker only (consumes and executes tasks)
fastpluggy tasks-worker start
# Worker + scheduler (beat) — simple setups, dev
fastpluggy tasks-worker start --beat
# Standalone scheduler (recommended for production)
fastpluggy tasks-worker beat
# Use RabbitMQ in production
fastpluggy tasks-worker start --broker-type rabbitmq --broker-dsn amqp://user:pass@rabbit:5672/
# Consume only specific topics with 4 threads per worker
fastpluggy tasks-worker start --topics email,reports --max-workers 4
# Verbose logging for debugging
fastpluggy tasks-worker start --log-level DEBUG
# Multiple workers with PostgreSQL broker
fastpluggy tasks-worker start -n 3 --broker-type postgres --broker-dsn postgresql://localhost/tasks
The process blocks until interrupted with Ctrl+C or SIGTERM, then performs a graceful shutdown.
RabbitMQ vhost auto-creation
When using the RabbitMQ broker, the worker automatically creates the vhost specified in the DSN if it does not already exist. This uses the RabbitMQ Management HTTP API (port 15672) and grants full permissions to the connecting user. If the management API is unreachable (not exposed, firewalled, or the user lacks admin rights), the check is silently skipped — the worker will connect normally if the vhost already exists, or fail with a clear error if it doesn't.
Production deployment
For production, run the scheduler (beat) and workers as separate processes:
# One beat process — reads scheduled tasks from DB, submits when due
fastpluggy tasks-worker beat --broker-type rabbitmq --broker-dsn amqp://...
# N worker processes — consume and execute tasks
fastpluggy tasks-worker start -n 4 --broker-type rabbitmq --broker-dsn amqp://...
Options
| Option | Description |
|---|---|
-n, --workers |
Number of workers to start (default: $WORKER_NUMBER or 1) |
--beat |
Also start the scheduler alongside workers (for start command) |
--broker-type |
Broker backend: local, memory, rabbitmq, postgres (overrides $BROKER_TYPE) |
--broker-dsn |
Broker connection string (overrides $BROKER_DSN) |
--topics |
Comma-separated list of topics to consume (default: all) |
--max-workers |
Thread pool size per worker (default: 8) |
--log-level |
Logging level: DEBUG, INFO, WARNING, ERROR (default: INFO) |
Topic Routing
Topics determine which queue tasks are published to and consumed from. Resolution order:
FORCE_TASK_TOPIC— if set, overrides everything. Both publish and consume use this value.- Explicit
topic=argument — passed toTaskWorker.submit(my_task, topic="email"). - Function metadata — set via
@TaskWorker.register(topic="reports"). DEFAULT_TOPIC— fallback (default:"default").
| Setting | Env var | Description |
|---|---|---|
default_topic |
DEFAULT_TOPIC |
Fallback topic for publish and consume (default: "default") |
force_task_topic |
FORCE_TASK_TOPIC |
Hard override — locks both publish and consume to this value |
Examples:
# Pin a worker to a specific queue (useful for dedicated workers or debugging)
FORCE_TASK_TOPIC=gpu-worker fastpluggy tasks-worker start
# Two workers on the same machine, each consuming a different queue
FORCE_TASK_TOPIC=worker-a fastpluggy tasks-worker start &
FORCE_TASK_TOPIC=worker-b fastpluggy tasks-worker start &
# Normal mode — worker consumes all topics, tasks route via metadata or default
fastpluggy tasks-worker start
🧪 Testing
This plugin includes comprehensive test coverage with pytest.
Running Tests Locally
# Install development dependencies
pip install -e ".[dev]"
# Run all tests
pytest tests/
# Run tests with coverage report
pytest tests/ --cov=src --cov-report=term-missing --cov-report=html
# Run specific test file
pytest tests/test_runner_topics.py -v
# Run tests with specific markers
pytest tests/ -m unit # Only unit tests
pytest tests/ -m "not slow" # Skip slow tests
CI/CD Integration
Tests are automatically run in the GitLab CI/CD pipeline on:
- Merge requests
- Main branch commits
Coverage reports are generated and stored as artifacts for 30 days.
📦 Tech Stack
- FastAPI + FastPluggy
- SQLAlchemy + SQLite/PostgreSQL
- WTForms + Jinja2 + Bootstrap (Tabler)
- WebSockets for real-time feedback
- Plugin-ready & modular architecture
🧠 Philosophy
This runner is built to be:
- Introspective: auto-generate UIs from functions
- Composable: integrate with your FastPluggy app
- Scalable: support single-machine and multi-worker environments
- Extensible: notifiers, hooks, CRON, logs
📎 License
MIT – Use freely and contribute 💙
🚀 Contributions Welcome!
Open issues, send PRs, share ideas —
Let’s build the most pluggable Python task runner together.
Warning:
Does not work with SQLite due to JSONB field requirements.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastpluggy_tasks_worker-0.3.282.tar.gz.
File metadata
- Download URL: fastpluggy_tasks_worker-0.3.282.tar.gz
- Upload date:
- Size: 204.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a958c7805963acc05fc335879063326ba8a05b078f999b804b82d445ad1754a1
|
|
| MD5 |
c252863ed84b6735994c4ecf48ce5245
|
|
| BLAKE2b-256 |
ac5d437a6bf78a3809c276981d5e0ab8f94d98ded9fa61507e7c372c372c7fa3
|
File details
Details for the file fastpluggy_tasks_worker-0.3.282-py3-none-any.whl.
File metadata
- Download URL: fastpluggy_tasks_worker-0.3.282-py3-none-any.whl
- Upload date:
- Size: 233.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f01be15875a78403b17523c8592c95168da9ffae17c67fba593d4e8f69ac245a
|
|
| MD5 |
31647bfb98661aeac504e709bea28945
|
|
| BLAKE2b-256 |
32c54b83144781d308208416e9f87651cde5039203abeeab847198b61e790538
|