Skip to main content

A brokerless task queue using nng patterns

Project description

FastWorker

No Redis. No RabbitMQ. Just Python.

PyPI version Python Downloads License: MIT Ruff Docs

Background tasks in 30 seconds. Zero infrastructure.

pip install fastworker

Why FastWorker?

FastWorker Celery + Redis RabbitMQ AWS SQS
External dependencies 0 2+ (broker + backend) 1+ (RabbitMQ) SQS + IAM
Setup time 30 seconds 30+ minutes 30+ minutes 15+ minutes
Built-in dashboard Yes No (needs Flower) No (needs RabbitMQ UI) No (needs CloudWatch)
Worker discovery Automatic Manual config Manual config None
Cron/periodic tasks Built-in needs celery-beat needs scheduler needs CloudWatch Events
FastAPI integration Native (FastWorker(app)) Manual Manual Manual
Lines of config 1 15+ 20+ 10+

Celery — 10+ lines of config:

# celery_app.py
from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
app.conf.update(
    task_serializer="json",
    result_serializer="json",
    accept_content=["json"],
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_acks_late=True,
    worker_prefetch_multiplier=1,
)
redis-server &                        # broker
celery -A celery_app worker &         # worker
celery -A celery_app beat &           # scheduler (for periodic tasks)
flower -A celery_app &                # monitoring (optional)

FastWorker — 1 line:

# mytasks.py
from fastworker import task

@task
def add(x: int, y: int) -> int:
    return x + y
fastworker control-plane --task-modules mytasks

Features

  • Zero Infrastructure — No Redis, RabbitMQ, or message broker. Pure Python.
  • FastAPI NativeFastWorker(app) auto-wires lifecycle, feels like a built-in feature.
  • Periodic & Cron Tasks@task(repeat_interval=60) or @task(cron="0 */6 * * *"). Built-in, no beat scheduler.
  • Built-in Dashboard — Real-time web UI with dark mode. Workers, queues, task history. No extra setup.
  • Automatic Worker Discovery — Workers and clients find each other on the network. Zero config.
  • Priority Queues — CRITICAL, HIGH, NORMAL, LOW. Tasks routed by urgency.
  • Result Caching — LRU cache with configurable TTL and size limits.
  • Task Callbacks — Real-time notifications when tasks complete.
  • OpenTelemetry — Optional distributed tracing and metrics.

FastWorker is designed for moderate-scale Python applications (1K-10K tasks/min). For extreme scale or complex workflows, see Limitations & Scope.

Quick Start

1. Install

pip install fastworker

2. Define Tasks

# mytasks.py
from fastworker import task

@task
def add(x: int, y: int) -> int:
    return x + y

@task(repeat_interval=300)
def refresh_cache():
    return {"cache": "refreshed"}

3. Run

fastworker control-plane --task-modules mytasks

The dashboard opens at http://127.0.0.1:8080.

Submit tasks:

fastworker submit --task-name add --args 5 3
fastworker submit --task-name add --args 10 20 --non-blocking

FastAPI Integration (v0.3.0)

from fastapi import FastAPI
from fastworker.integration.fastapi import FastWorker

app = FastAPI()
fw = FastWorker(app)  # done — lifecycle, discovery, everything


@task
def send_welcome_email(user_id: int, email: str) -> str:
    return f"Welcome email sent to {email}"


@app.post("/users/{user_id}/welcome")
async def welcome_user(user_id: int, email: str):
    task_id = await fw.delay("send_welcome_email", user_id, email)
    return {"task_id": task_id, "status": "queued"}


@app.get("/health")
async def health():
    return {"status": "healthy", "workers_online": fw.worker_count}
# Terminal 1
fastworker control-plane --task-modules app

# Terminal 2
uvicorn app:app --reload

Full FastAPI docs →

Periodic & Cron Tasks (v0.3.0)

@task(repeat_interval=60)        # every 60 seconds
def heartbeat():
    ...

@task(cron="*/5 * * * *")        # every 5 minutes
def sync_data():
    ...

@task(cron="0 9 * * 1-5")        # weekdays at 9am
def morning_report():
    ...

@task(repeat_interval=30, repeat_count=100)  # exactly 100 times
def limited_job():
    ...

Periodic tasks docs →

Project Structure — Grow From One File

FastWorker scales with your project:

# Level 1: Single file
mytasks.py

# Level 2: Package
tasks/
├── __init__.py
├── emails.py
└── reports.py

# Level 3: Organized
app/
├── tasks/
│   ├── background.py
│   └── scheduled.py
├── services/
└── models/

# Level 4: FastAPI
app/
├── api/
├── tasks/
├── main.py      # FastWorker(app)
└── ...

Project structure guide →

Client Usage

from fastworker import Client

client = Client()
await client.start()

# Non-blocking — returns task ID immediately
task_id = await client.delay("add", 5, 3)

# Blocking — waits for result
result = await client.submit_task("add", args=(5, 3))
print(result.result)  # 8

# Batch submit
task_ids = await client.submit_batch([
    {"task_name": "add", "args": (1, 2)},
    {"task_name": "add", "args": (3, 4)},
])

# With callback
task_id = await client.delay_with_callback(
    "process_data", "tcp://127.0.0.1:6000", data,
    callback_data={"source": "api"},
)

# Query status
result = await client.get_task_result(task_id)

client.stop()

CLI Reference

# Control plane (with dashboard)
fastworker control-plane --task-modules mytasks

# Subworker (for scaling)
fastworker subworker --worker-id w1 --control-plane-address tcp://127.0.0.1:5555 --task-modules mytasks

# Submit tasks
fastworker submit --task-name add --args 5 3
fastworker submit --task-name add --args 5 3 --non-blocking
fastworker submit --task-name add --args 5 3 --priority critical
fastworker submit --task-name report --args '"Q1"' --countdown 60

# List tasks
fastworker list --task-modules mytasks
fastworker list --task-modules mytasks --list-periodic
fastworker list --task-modules mytasks --tree

# Task management
fastworker status --task-id <uuid>
fastworker cancel --task-id <uuid>

Dashboard

Start the control plane and open http://127.0.0.1:8080.

  • Real-time worker status and load metrics
  • Queue sizes by priority
  • Task history with status and timing
  • Cache utilization stats
  • Dark mode
  • Auto-refresh
fastworker control-plane --gui-host 0.0.0.0 --gui-port 9000 --task-modules mytasks
fastworker control-plane --no-gui --task-modules mytasks  # disable dashboard

Extending FastWorker

Clear extension points for custom behavior:

  • Task Hooks@task(before=..., after=...) for per-task middleware
  • Event Bus — Subscribe to task.success, task.failure, worker.inactive events
  • Custom Serializers — Implement your own serialization format
  • Result Backends — Redis, S3, PostgreSQL persistence

Extending FastWorker →

Development

git clone https://github.com/neul-labs/fastworker.git
cd fastworker

uv sync
uv run pytest
uv run ruff check .

Requirements

  • Python 3.12+
  • pynng >= 0.8.1
  • pydantic >= 2.0.0

Documentation

License

MIT — see LICENSE.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastworker-0.3.0.tar.gz (205.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastworker-0.3.0-py3-none-any.whl (144.5 kB view details)

Uploaded Python 3

File details

Details for the file fastworker-0.3.0.tar.gz.

File metadata

  • Download URL: fastworker-0.3.0.tar.gz
  • Upload date:
  • Size: 205.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastworker-0.3.0.tar.gz
Algorithm Hash digest
SHA256 883b03c2fb0064c0ef09f3fa4fda72b8f6591013d75d95ddbb17c9fda676ff8f
MD5 7ae319d526cba6a61a8446d454fccff7
BLAKE2b-256 6d029618dc5c859c827a531e930a955973e6991df47775873e941451d2de76a2

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastworker-0.3.0.tar.gz:

Publisher: publish.yml on neul-labs/fastworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fastworker-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: fastworker-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 144.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for fastworker-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 661ddf657a8c0f721a8144588b5955c0763899d85a97330335b9d192445737dc
MD5 21762b9ac330fdc85d8b4d662a95cb26
BLAKE2b-256 d5af518fdab25c673f868a750cbcf7644ec5c5e86bc45046991b950ef6a59eeb

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastworker-0.3.0-py3-none-any.whl:

Publisher: publish.yml on neul-labs/fastworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page