A brokerless task queue using nng patterns
Project description
FastWorker
A brokerless task queue for Python applications with automatic worker discovery and priority handling.
No Redis. No RabbitMQ. Just Python.
Why FastWorker?
Traditional task queues (Celery + Redis) require deploying and managing 4-6+ separate services:
- Your application
- Redis broker
- Celery workers
- Redis result backend
- Optional: Flower monitoring
- Optional: Redis Sentinel for HA
FastWorker requires just 2-3 Python processes:
- Your application
- FastWorker control plane
- FastWorker workers (optional, for scaling)
That's it. No external dependencies. No Redis to configure, monitor, backup, or secure. Just Python.
Features
- Brokerless Architecture - No Redis, RabbitMQ, or other message brokers required
- Control Plane Architecture - Centralized coordination with distributed subworkers
- Automatic Worker Discovery - Workers find each other automatically on the network
- Priority Queues - Support for critical, high, normal, and low priority tasks
- Result Caching - Task results cached with expiration and memory limits
- Task Completion Callbacks - Receive real-time notifications when tasks complete
- Built-in Reliability - Automatic retries and error handling
- FastAPI Integration - Seamless integration with web applications
- OpenTelemetry Support - Optional distributed tracing and metrics for observability
- Zero Configuration - Works out of the box with sensible defaults
Note: FastWorker is designed for moderate-scale Python applications (1K-10K tasks/min). For extreme scale, multi-language support, or complex workflows, see Limitations & Scope.
Installation
pip install fastworker
Quick Start
1. Define Tasks
# mytasks.py
from fastworker import task
@task
def add(x: int, y: int) -> int:
"""Add two numbers."""
return x + y
@task
def multiply(x: int, y: int) -> int:
"""Multiply two numbers."""
return x * y
2. Start Control Plane
# Terminal 1 - Start the control plane (coordinates and also processes tasks)
fastworker control-plane --worker-id control-plane --task-modules mytasks
3. Start Subworkers (Optional - for scaling)
# Terminal 2 - Start subworker 1
fastworker subworker --worker-id subworker1 --control-plane-address tcp://127.0.0.1:5555 --base-address tcp://127.0.0.1:5561 --task-modules mytasks
# Terminal 3 - Start subworker 2 (optional)
fastworker subworker --worker-id subworker2 --control-plane-address tcp://127.0.0.1:5555 --base-address tcp://127.0.0.1:5565 --task-modules mytasks
4. Submit Tasks
Blocking mode (wait for result):
fastworker submit --task-name add --args 5 3
Non-blocking mode (get task ID immediately):
fastworker submit --task-name add --args 5 3 --non-blocking
# Returns: Task ID: <uuid>
Check task status:
fastworker status --task-id <uuid>
5. Using Python Client
Non-blocking (recommended):
from fastworker import Client
import asyncio
async def main():
client = Client()
await client.start()
# Non-blocking: Returns immediately with task ID
task_id = await client.delay("add", 5, 3)
print(f"Task submitted: {task_id}")
# Check result later
result = await client.get_task_result(task_id)
if result:
print(f"Result: {result.result}")
client.stop()
asyncio.run(main())
Blocking (when you need the result immediately):
# Blocking: Waits for result
result = await client.submit_task("add", args=(5, 3))
print(f"Result: {result.result}")
Architecture
FastWorker uses a Control Plane Architecture:
- Control Plane Worker: Central coordinator that manages subworkers and also processes tasks
- Subworkers: Additional workers that register with the control plane for load distribution
- Clients: Connect only to the control plane for task submission
Benefits
- Centralized Management: Control plane coordinates all task distribution
- Load Balancing: Tasks automatically distributed to least-loaded subworkers
- High Availability: Control plane processes tasks if no subworkers available
- Result Persistence: Results cached in control plane with expiration
- Scalability: Add subworkers dynamically without reconfiguration
CLI Usage
# Start control plane
fastworker control-plane --worker-id control-plane --task-modules mytasks
# Start subworker
fastworker subworker --worker-id subworker1 --control-plane-address tcp://127.0.0.1:5555 --task-modules mytasks
# Submit task (blocking)
fastworker submit --task-name add --args 5 3
# Submit task (non-blocking)
fastworker submit --task-name add --args 5 3 --non-blocking
# Check task status
fastworker status --task-id <uuid>
# List available tasks
fastworker list --task-modules mytasks
Priority Handling
from fastworker.tasks.models import TaskPriority
# Submit with priority
await client.delay("critical_task", priority=TaskPriority.CRITICAL)
await client.delay("normal_task", priority=TaskPriority.NORMAL)
Result Caching
The control plane maintains a result cache with:
- Configurable Size: Default 10,000 results (configurable via
--result-cache-size) - TTL: Default 1 hour (configurable via
--result-cache-ttl) - LRU Eviction: Least recently accessed results evicted when cache is full
- Automatic Cleanup: Expired results cleaned up every minute
Configuration
Control Plane
fastworker control-plane \
--worker-id control-plane \
--base-address tcp://127.0.0.1:5555 \
--discovery-address tcp://127.0.0.1:5550 \
--result-cache-size 10000 \
--result-cache-ttl 3600 \
--task-modules mytasks
Subworker
fastworker subworker \
--worker-id subworker1 \
--control-plane-address tcp://127.0.0.1:5555 \
--base-address tcp://127.0.0.1:5561 \
--task-modules mytasks
Client
client = Client(
discovery_address="tcp://127.0.0.1:5550",
timeout=60,
retries=5
)
Development
# Clone repository
git clone https://github.com/dipankar/fastworker.git
cd fastworker
# Install dependencies
poetry install
# Run tests
poetry run pytest
# Format code
poetry run black .
Requirements
- Python 3.12+
- pynng
- pydantic
License
MIT License - see LICENSE file for details.
Documentation
For detailed documentation, see:
- Documentation Index - Complete documentation
- Limitations & Scope - Start here - What FastWorker is and when to use it
- API Reference - Full API documentation
- FastAPI Integration - Web framework integration
- OpenTelemetry Integration - Distributed tracing and metrics
- Configuration Guide - Environment variables and settings
- Troubleshooting - Common issues and solutions
Contributing
Contributions welcome! Please read CONTRIBUTING.md for guidelines.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastworker-0.1.0.tar.gz.
File metadata
- Download URL: fastworker-0.1.0.tar.gz
- Upload date:
- Size: 28.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8451048b2919c59e32baa5da3c5d748404a710eb558eac44781975913106a495
|
|
| MD5 |
57697a8f6f5c718b8a7a10978abb8194
|
|
| BLAKE2b-256 |
c6f45c3b503b8f943cb6e188380a65c9cb5999ef30a541db8b31e466a8c430b5
|
Provenance
The following attestation bundles were made for fastworker-0.1.0.tar.gz:
Publisher:
publish.yml on neul-labs/fastworker
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastworker-0.1.0.tar.gz -
Subject digest:
8451048b2919c59e32baa5da3c5d748404a710eb558eac44781975913106a495 - Sigstore transparency entry: 705191881
- Sigstore integration time:
-
Permalink:
neul-labs/fastworker@83e1ae051f6479a8d1d146ccc3dd11424edc33a8 -
Branch / Tag:
refs/tags/0.1.0 - Owner: https://github.com/neul-labs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@83e1ae051f6479a8d1d146ccc3dd11424edc33a8 -
Trigger Event:
release
-
Statement type:
File details
Details for the file fastworker-0.1.0-py3-none-any.whl.
File metadata
- Download URL: fastworker-0.1.0-py3-none-any.whl
- Upload date:
- Size: 37.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e100e39c6f5c17fc91707504ecc6d2c56ff6efbccf1077ada5b401161521a3a4
|
|
| MD5 |
79026c12396d2d012032159e14d79cb5
|
|
| BLAKE2b-256 |
9d70a21bc1afb48e5718482965934e6e6599536f2842920cef67aaca9c71b77a
|
Provenance
The following attestation bundles were made for fastworker-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on neul-labs/fastworker
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fastworker-0.1.0-py3-none-any.whl -
Subject digest:
e100e39c6f5c17fc91707504ecc6d2c56ff6efbccf1077ada5b401161521a3a4 - Sigstore transparency entry: 705191884
- Sigstore integration time:
-
Permalink:
neul-labs/fastworker@83e1ae051f6479a8d1d146ccc3dd11424edc33a8 -
Branch / Tag:
refs/tags/0.1.0 - Owner: https://github.com/neul-labs
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@83e1ae051f6479a8d1d146ccc3dd11424edc33a8 -
Trigger Event:
release
-
Statement type: