A brokerless task queue using nng patterns
Project description
FastWorker
No Redis. No RabbitMQ. Just Python.
Background tasks in 30 seconds. Zero infrastructure.
pip install fastworker
Why FastWorker?
| FastWorker | Celery + Redis | RabbitMQ | AWS SQS | |
|---|---|---|---|---|
| External dependencies | 0 | 2+ (broker + backend) | 1+ (RabbitMQ) | SQS + IAM |
| Setup time | 30 seconds | 30+ minutes | 30+ minutes | 15+ minutes |
| Built-in dashboard | Yes | No (needs Flower) | No (needs RabbitMQ UI) | No (needs CloudWatch) |
| Worker discovery | Automatic | Manual config | Manual config | None |
| Cron/periodic tasks | Built-in | needs celery-beat | needs scheduler | needs CloudWatch Events |
| FastAPI integration | Native (FastWorker(app)) |
Manual | Manual | Manual |
| Lines of config | 1 | 15+ | 20+ | 10+ |
Celery — 10+ lines of config:
# celery_app.py
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/1")
app.conf.update(
task_serializer="json",
result_serializer="json",
accept_content=["json"],
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
)
redis-server & # broker
celery -A celery_app worker & # worker
celery -A celery_app beat & # scheduler (for periodic tasks)
flower -A celery_app & # monitoring (optional)
FastWorker — 1 line:
# mytasks.py
from fastworker import task
@task
def add(x: int, y: int) -> int:
return x + y
fastworker control-plane --task-modules mytasks
Features
- Zero Infrastructure — No Redis, RabbitMQ, or message broker. Pure Python.
- FastAPI Native —
FastWorker(app)auto-wires lifecycle, feels like a built-in feature. - Periodic & Cron Tasks —
@task(repeat_interval=60)or@task(cron="0 */6 * * *"). Built-in, no beat scheduler. - Built-in Dashboard — Real-time web UI with dark mode. Workers, queues, task history. No extra setup.
- Automatic Worker Discovery — Workers and clients find each other on the network. Zero config.
- Priority Queues — CRITICAL, HIGH, NORMAL, LOW. Tasks routed by urgency.
- Result Caching — LRU cache with configurable TTL and size limits.
- Task Callbacks — Real-time notifications when tasks complete.
- OpenTelemetry — Optional distributed tracing and metrics.
FastWorker is designed for moderate-scale Python applications (1K-10K tasks/min). For extreme scale or complex workflows, see Limitations & Scope.
Quick Start
1. Install
pip install fastworker
2. Define Tasks
# mytasks.py
from fastworker import task
@task
def add(x: int, y: int) -> int:
return x + y
@task(repeat_interval=300)
def refresh_cache():
return {"cache": "refreshed"}
3. Run
fastworker control-plane --task-modules mytasks
The dashboard opens at http://127.0.0.1:8080.
Submit tasks:
fastworker submit --task-name add --args 5 3
fastworker submit --task-name add --args 10 20 --non-blocking
FastAPI Integration (v0.3.0)
from fastapi import FastAPI
from fastworker.integration.fastapi import FastWorker
app = FastAPI()
fw = FastWorker(app) # done — lifecycle, discovery, everything
@task
def send_welcome_email(user_id: int, email: str) -> str:
return f"Welcome email sent to {email}"
@app.post("/users/{user_id}/welcome")
async def welcome_user(user_id: int, email: str):
task_id = await fw.delay("send_welcome_email", user_id, email)
return {"task_id": task_id, "status": "queued"}
@app.get("/health")
async def health():
return {"status": "healthy", "workers_online": fw.worker_count}
# Terminal 1
fastworker control-plane --task-modules app
# Terminal 2
uvicorn app:app --reload
Periodic & Cron Tasks (v0.3.0)
@task(repeat_interval=60) # every 60 seconds
def heartbeat():
...
@task(cron="*/5 * * * *") # every 5 minutes
def sync_data():
...
@task(cron="0 9 * * 1-5") # weekdays at 9am
def morning_report():
...
@task(repeat_interval=30, repeat_count=100) # exactly 100 times
def limited_job():
...
Project Structure — Grow From One File
FastWorker scales with your project:
# Level 1: Single file
mytasks.py
# Level 2: Package
tasks/
├── __init__.py
├── emails.py
└── reports.py
# Level 3: Organized
app/
├── tasks/
│ ├── background.py
│ └── scheduled.py
├── services/
└── models/
# Level 4: FastAPI
app/
├── api/
├── tasks/
├── main.py # FastWorker(app)
└── ...
Client Usage
from fastworker import Client
client = Client()
await client.start()
# Non-blocking — returns task ID immediately
task_id = await client.delay("add", 5, 3)
# Blocking — waits for result
result = await client.submit_task("add", args=(5, 3))
print(result.result) # 8
# Batch submit
task_ids = await client.submit_batch([
{"task_name": "add", "args": (1, 2)},
{"task_name": "add", "args": (3, 4)},
])
# With callback
task_id = await client.delay_with_callback(
"process_data", "tcp://127.0.0.1:6000", data,
callback_data={"source": "api"},
)
# Query status
result = await client.get_task_result(task_id)
client.stop()
CLI Reference
# Control plane (with dashboard)
fastworker control-plane --task-modules mytasks
# Subworker (for scaling)
fastworker subworker --worker-id w1 --control-plane-address tcp://127.0.0.1:5555 --task-modules mytasks
# Submit tasks
fastworker submit --task-name add --args 5 3
fastworker submit --task-name add --args 5 3 --non-blocking
fastworker submit --task-name add --args 5 3 --priority critical
fastworker submit --task-name report --args '"Q1"' --countdown 60
# List tasks
fastworker list --task-modules mytasks
fastworker list --task-modules mytasks --list-periodic
fastworker list --task-modules mytasks --tree
# Task management
fastworker status --task-id <uuid>
fastworker cancel --task-id <uuid>
Dashboard
Start the control plane and open http://127.0.0.1:8080.
- Real-time worker status and load metrics
- Queue sizes by priority
- Task history with status and timing
- Cache utilization stats
- Dark mode
- Auto-refresh
fastworker control-plane --gui-host 0.0.0.0 --gui-port 9000 --task-modules mytasks
fastworker control-plane --no-gui --task-modules mytasks # disable dashboard
Extending FastWorker
Clear extension points for custom behavior:
- Task Hooks —
@task(before=..., after=...)for per-task middleware - Event Bus — Subscribe to
task.success,task.failure,worker.inactiveevents - Custom Serializers — Implement your own serialization format
- Result Backends — Redis, S3, PostgreSQL persistence
Development
git clone https://github.com/neul-labs/fastworker.git
cd fastworker
uv sync
uv run pytest
uv run ruff check .
Requirements
- Python 3.12+
- pynng >= 0.8.1
- pydantic >= 2.0.0
Documentation
- Index
- Why FastWorker?
- Project Structure
- Architecture
- FastAPI Integration
- Periodic & Cron Tasks
- Management GUI
- Extending FastWorker
- Internals
- Benchmarks
- API Reference
- Telemetry
- Limitations & Scope
License
MIT — see LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastworker-0.3.0.tar.gz.
File metadata
- Download URL: fastworker-0.3.0.tar.gz
- Upload date:
- Size: 205.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
883b03c2fb0064c0ef09f3fa4fda72b8f6591013d75d95ddbb17c9fda676ff8f
|
|
| MD5 |
7ae319d526cba6a61a8446d454fccff7
|
|
| BLAKE2b-256 |
6d029618dc5c859c827a531e930a955973e6991df47775873e941451d2de76a2
|
Provenance
The following attestation bundles were made for fastworker-0.3.0.tar.gz:
Publisher:
publish.yml on neul-labs/fastworker
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastworker-0.3.0.tar.gz -
Subject digest:
883b03c2fb0064c0ef09f3fa4fda72b8f6591013d75d95ddbb17c9fda676ff8f - Sigstore transparency entry: 1535665147
- Sigstore integration time:
-
Permalink:
neul-labs/fastworker@6f63f96c84c82bc0181c5b851ad9a1109a133a49 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/neul-labs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@6f63f96c84c82bc0181c5b851ad9a1109a133a49 -
Trigger Event:
release
-
Statement type:
File details
Details for the file fastworker-0.3.0-py3-none-any.whl.
File metadata
- Download URL: fastworker-0.3.0-py3-none-any.whl
- Upload date:
- Size: 144.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
661ddf657a8c0f721a8144588b5955c0763899d85a97330335b9d192445737dc
|
|
| MD5 |
21762b9ac330fdc85d8b4d662a95cb26
|
|
| BLAKE2b-256 |
d5af518fdab25c673f868a750cbcf7644ec5c5e86bc45046991b950ef6a59eeb
|
Provenance
The following attestation bundles were made for fastworker-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on neul-labs/fastworker
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastworker-0.3.0-py3-none-any.whl -
Subject digest:
661ddf657a8c0f721a8144588b5955c0763899d85a97330335b9d192445737dc - Sigstore transparency entry: 1535665223
- Sigstore integration time:
-
Permalink:
neul-labs/fastworker@6f63f96c84c82bc0181c5b851ad9a1109a133a49 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/neul-labs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@6f63f96c84c82bc0181c5b851ad9a1109a133a49 -
Trigger Event:
release
-
Statement type: