Skip to main content

Task Runner plugin for Fastpluggy

Project description

FastPluggy Task Runner

Task Runner Release Pipeline Status Coverage

A powerful and extensible task execution framework for Python, built on top of FastPluggy.
Easily register, run, monitor, and schedule background tasks with full support for retries, logging, live WebSocket updates, and notifications.


✨ Features

  • 🔧 Task registration with metadata, retries, scheduling, and custom parameters
  • 🧠 Dynamic form generation from metadata
  • 📡 Live logs and WebSocket updates
  • 📅 CRON-based scheduler with optional notification rules
  • 🔁 Retry logic with auto-link to parent task
  • 🔒 Non-concurrent task execution with lock tracking
  • 🧩 Extensible subscribers system (Console, Slack, Webhook...)
  • 📊 Admin UI to manage tasks, schedules, locks, and reports
  • 💾 Persistent task context and rehydration
  • 📈 Task metrics from process/thread info

🛠️ How It Works

@TaskWorker.register(
    description="Sync data every 5 mins",
    schedule="*/5 * * * *",
    max_retries=3,
    allow_concurrent=False
)
def sync_data_task():
    print("Sync running...")

For detailed instructions on creating tasks and triggering them from JavaScript, see the Task Creation and JS Triggering Guide.

For information about Jinja template global variables available for task triggering, see the Jinja Template Globals documentation.


📋 Roadmap

✅ Completed / In Progress

  • Task registration with metadata (description, tags, max_retries, schedule, allow_concurrent)
  • Dynamic task form rendering via metadata
  • Notification/subscribers system with:
    • Console / webhook / Slack (optional)
    • Selectable events: task_started, task_failed, logs, etc.
  • Context/report tracking in DB
  • Task retry linking via parent_task_id
  • CRON-based scheduler loop
  • Web UI for:
    • Task logs
    • Task reports
    • Scheduled tasks
    • Locks
    • Running task status
  • Lock manager (TaskLockManager) with DB tracking
  • Cancel button for live-running tasks

📌 Upcoming Features

🔁 Task Queue Enhancements

  • Priority & rate-limit execution
  • Per-user concurrency limits
  • Task dependencies / DAG runner

🧠 Task Registry & Detection

  • Auto-discovery of task definitions from modules
  • Celery-style shared task detection

💾 Persistence & Rehydration

  • Save function reference + args for replay/retry
  • Task dependency tree and retry visualization

🌐 Remote Workers

  • Register and manage remote workers
  • Assign tasks based on tags/strategies
  • Remote heartbeat & health monitoring

📈 Observability

  • Task metrics via psutil (CPU, memory, threads)
  • UI views for thread/process diagnostics

Standalone Worker

Run task workers as a standalone long-running process, independent of the FastAPI dev server:

# Worker only (consumes and executes tasks)
fastpluggy tasks-worker start

# Worker + scheduler (beat) — simple setups, dev
fastpluggy tasks-worker start --beat

# Standalone scheduler (recommended for production)
fastpluggy tasks-worker beat

# Use RabbitMQ in production
fastpluggy tasks-worker start --broker-type rabbitmq --broker-dsn amqp://user:pass@rabbit:5672/

# Consume only specific topics with 4 threads per worker
fastpluggy tasks-worker start --topics email,reports --max-workers 4

# Verbose logging for debugging
fastpluggy tasks-worker start --log-level DEBUG

# Multiple workers with PostgreSQL broker
fastpluggy tasks-worker start -n 3 --broker-type postgres --broker-dsn postgresql://localhost/tasks

The process blocks until interrupted with Ctrl+C or SIGTERM, then performs a graceful shutdown.

RabbitMQ vhost auto-creation

When using the RabbitMQ broker, the worker automatically creates the vhost specified in the DSN if it does not already exist. This uses the RabbitMQ Management HTTP API (port 15672) and grants full permissions to the connecting user. If the management API is unreachable (not exposed, firewalled, or the user lacks admin rights), the check is silently skipped — the worker will connect normally if the vhost already exists, or fail with a clear error if it doesn't.

Production deployment

For production, run the scheduler (beat) and workers as separate processes:

# One beat process — reads scheduled tasks from DB, submits when due
fastpluggy tasks-worker beat --broker-type rabbitmq --broker-dsn amqp://...

# N worker processes — consume and execute tasks
fastpluggy tasks-worker start -n 4 --broker-type rabbitmq --broker-dsn amqp://...

Options

Option Description
-n, --workers Number of workers to start (default: $WORKER_NUMBER or 1)
--beat Also start the scheduler alongside workers (for start command)
--broker-type Broker backend: local, memory, rabbitmq, postgres (overrides $BROKER_TYPE)
--broker-dsn Broker connection string (overrides $BROKER_DSN)
--topics Comma-separated list of topics to consume (default: all)
--max-workers Thread pool size per worker (default: 8)
--log-level Logging level: DEBUG, INFO, WARNING, ERROR (default: INFO)

Topic Routing

Topics determine which queue tasks are published to and consumed from. Resolution order:

  1. FORCE_TASK_TOPIC — if set, overrides everything. Both publish and consume use this value.
  2. Explicit topic= argument — passed to TaskWorker.submit(my_task, topic="email").
  3. Function metadata — set via @TaskWorker.register(topic="reports").
  4. DEFAULT_TOPIC — fallback (default: "default").
Setting Env var Description
default_topic DEFAULT_TOPIC Fallback topic for publish and consume (default: "default")
force_task_topic FORCE_TASK_TOPIC Hard override — locks both publish and consume to this value

Examples:

# Pin a worker to a specific queue (useful for dedicated workers or debugging)
FORCE_TASK_TOPIC=gpu-worker fastpluggy tasks-worker start

# Two workers on the same machine, each consuming a different queue
FORCE_TASK_TOPIC=worker-a fastpluggy tasks-worker start &
FORCE_TASK_TOPIC=worker-b fastpluggy tasks-worker start &

# Normal mode — worker consumes all topics, tasks route via metadata or default
fastpluggy tasks-worker start

🧪 Testing

This plugin includes comprehensive test coverage with pytest.

Running Tests Locally

# Install development dependencies
pip install -e ".[dev]"

# Run all tests
pytest tests/

# Run tests with coverage report
pytest tests/ --cov=src --cov-report=term-missing --cov-report=html

# Run specific test file
pytest tests/test_runner_topics.py -v

# Run tests with specific markers
pytest tests/ -m unit  # Only unit tests
pytest tests/ -m "not slow"  # Skip slow tests

CI/CD Integration

Tests are automatically run in the GitLab CI/CD pipeline on:

  • Merge requests
  • Main branch commits

Coverage reports are generated and stored as artifacts for 30 days.


📦 Tech Stack

  • FastAPI + FastPluggy
  • SQLAlchemy + SQLite/PostgreSQL
  • WTForms + Jinja2 + Bootstrap (Tabler)
  • WebSockets for real-time feedback
  • Plugin-ready & modular architecture

🧠 Philosophy

This runner is built to be:

  • Introspective: auto-generate UIs from functions
  • Composable: integrate with your FastPluggy app
  • Scalable: support single-machine and multi-worker environments
  • Extensible: notifiers, hooks, CRON, logs

📎 License

MIT – Use freely and contribute 💙


🚀 Contributions Welcome!

Open issues, send PRs, share ideas —
Let’s build the most pluggable Python task runner together.

Warning:

Does not work with SQLite due to JSONB field requirements.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastpluggy_tasks_worker-0.3.282.tar.gz (204.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastpluggy_tasks_worker-0.3.282-py3-none-any.whl (233.1 kB view details)

Uploaded Python 3

File details

Details for the file fastpluggy_tasks_worker-0.3.282.tar.gz.

File metadata

File hashes

Hashes for fastpluggy_tasks_worker-0.3.282.tar.gz
Algorithm Hash digest
SHA256 a958c7805963acc05fc335879063326ba8a05b078f999b804b82d445ad1754a1
MD5 c252863ed84b6735994c4ecf48ce5245
BLAKE2b-256 ac5d437a6bf78a3809c276981d5e0ab8f94d98ded9fa61507e7c372c372c7fa3

See more details on using hashes here.

File details

Details for the file fastpluggy_tasks_worker-0.3.282-py3-none-any.whl.

File metadata

File hashes

Hashes for fastpluggy_tasks_worker-0.3.282-py3-none-any.whl
Algorithm Hash digest
SHA256 f01be15875a78403b17523c8592c95168da9ffae17c67fba593d4e8f69ac245a
MD5 31647bfb98661aeac504e709bea28945
BLAKE2b-256 32c54b83144781d308208416e9f87651cde5039203abeeab847198b61e790538

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page