Skip to main content

A brokerless task queue using nng patterns

Project description

FastWorker

A brokerless task queue for Python applications with automatic worker discovery and priority handling.

No Redis. No RabbitMQ. Just Python.

PyPI version License: MIT

Why FastWorker?

Traditional task queues (Celery + Redis) require deploying and managing 4-6+ separate services:

  • Your application
  • Redis broker
  • Celery workers
  • Redis result backend
  • Optional: Flower monitoring
  • Optional: Redis Sentinel for HA

FastWorker requires just 2-3 Python processes:

  • Your application
  • FastWorker control plane
  • FastWorker workers (optional, for scaling)

That's it. No external dependencies. No Redis to configure, monitor, backup, or secure. Just Python.

Features

  • Brokerless Architecture - No Redis, RabbitMQ, or other message brokers required
  • Control Plane Architecture - Centralized coordination with distributed subworkers
  • Automatic Worker Discovery - Workers find each other automatically on the network
  • Priority Queues - Support for critical, high, normal, and low priority tasks
  • Result Caching - Task results cached with expiration and memory limits
  • Task Completion Callbacks - Receive real-time notifications when tasks complete
  • Built-in Reliability - Automatic retries and error handling
  • FastAPI Integration - Seamless integration with web applications
  • OpenTelemetry Support - Optional distributed tracing and metrics for observability
  • Zero Configuration - Works out of the box with sensible defaults

Note: FastWorker is designed for moderate-scale Python applications (1K-10K tasks/min). For extreme scale, multi-language support, or complex workflows, see Limitations & Scope.

Installation

pip install fastworker

Quick Start

1. Define Tasks

# mytasks.py
from fastworker import task

@task
def add(x: int, y: int) -> int:
    """Add two numbers."""
    return x + y

@task
def multiply(x: int, y: int) -> int:
    """Multiply two numbers."""
    return x * y

2. Start Control Plane

# Terminal 1 - Start the control plane (coordinates and also processes tasks)
fastworker control-plane --worker-id control-plane --task-modules mytasks

3. Start Subworkers (Optional - for scaling)

# Terminal 2 - Start subworker 1
fastworker subworker --worker-id subworker1 --control-plane-address tcp://127.0.0.1:5555 --base-address tcp://127.0.0.1:5561 --task-modules mytasks

# Terminal 3 - Start subworker 2 (optional)
fastworker subworker --worker-id subworker2 --control-plane-address tcp://127.0.0.1:5555 --base-address tcp://127.0.0.1:5565 --task-modules mytasks

4. Submit Tasks

Blocking mode (wait for result):

fastworker submit --task-name add --args 5 3

Non-blocking mode (get task ID immediately):

fastworker submit --task-name add --args 5 3 --non-blocking
# Returns: Task ID: <uuid>

Check task status:

fastworker status --task-id <uuid>

5. Using Python Client

Non-blocking (recommended):

from fastworker import Client
import asyncio

async def main():
    client = Client()
    await client.start()

    # Non-blocking: Returns immediately with task ID
    task_id = await client.delay("add", 5, 3)
    print(f"Task submitted: {task_id}")

    # Check result later
    result = await client.get_task_result(task_id)
    if result:
        print(f"Result: {result.result}")

    client.stop()

asyncio.run(main())

Blocking (when you need the result immediately):

# Blocking: Waits for result
result = await client.submit_task("add", args=(5, 3))
print(f"Result: {result.result}")

Architecture

FastWorker uses a Control Plane Architecture:

  • Control Plane Worker: Central coordinator that manages subworkers and also processes tasks
  • Subworkers: Additional workers that register with the control plane for load distribution
  • Clients: Connect only to the control plane for task submission

Benefits

  • Centralized Management: Control plane coordinates all task distribution
  • Load Balancing: Tasks automatically distributed to least-loaded subworkers
  • High Availability: Control plane processes tasks if no subworkers available
  • Result Persistence: Results cached in control plane with expiration
  • Scalability: Add subworkers dynamically without reconfiguration

CLI Usage

# Start control plane
fastworker control-plane --worker-id control-plane --task-modules mytasks

# Start subworker
fastworker subworker --worker-id subworker1 --control-plane-address tcp://127.0.0.1:5555 --task-modules mytasks

# Submit task (blocking)
fastworker submit --task-name add --args 5 3

# Submit task (non-blocking)
fastworker submit --task-name add --args 5 3 --non-blocking

# Check task status
fastworker status --task-id <uuid>

# List available tasks
fastworker list --task-modules mytasks

Priority Handling

from fastworker.tasks.models import TaskPriority

# Submit with priority
await client.delay("critical_task", priority=TaskPriority.CRITICAL)
await client.delay("normal_task", priority=TaskPriority.NORMAL)

Result Caching

The control plane maintains a result cache with:

  • Configurable Size: Default 10,000 results (configurable via --result-cache-size)
  • TTL: Default 1 hour (configurable via --result-cache-ttl)
  • LRU Eviction: Least recently accessed results evicted when cache is full
  • Automatic Cleanup: Expired results cleaned up every minute

Configuration

Control Plane

fastworker control-plane \
  --worker-id control-plane \
  --base-address tcp://127.0.0.1:5555 \
  --discovery-address tcp://127.0.0.1:5550 \
  --result-cache-size 10000 \
  --result-cache-ttl 3600 \
  --task-modules mytasks

Subworker

fastworker subworker \
  --worker-id subworker1 \
  --control-plane-address tcp://127.0.0.1:5555 \
  --base-address tcp://127.0.0.1:5561 \
  --task-modules mytasks

Client

client = Client(
    discovery_address="tcp://127.0.0.1:5550",
    timeout=60,
    retries=5
)

Development

# Clone repository
git clone https://github.com/dipankar/fastworker.git
cd fastworker

# Install dependencies
poetry install

# Run tests
poetry run pytest

# Format code
poetry run black .

Requirements

  • Python 3.12+
  • pynng
  • pydantic

License

MIT License - see LICENSE file for details.

Documentation

For detailed documentation, see:

Contributing

Contributions welcome! Please read CONTRIBUTING.md for guidelines.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fastworker-0.1.0.tar.gz (28.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

fastworker-0.1.0-py3-none-any.whl (37.8 kB view details)

Uploaded Python 3

File details

Details for the file fastworker-0.1.0.tar.gz.

File metadata

  • Download URL: fastworker-0.1.0.tar.gz
  • Upload date:
  • Size: 28.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fastworker-0.1.0.tar.gz
Algorithm Hash digest
SHA256 8451048b2919c59e32baa5da3c5d748404a710eb558eac44781975913106a495
MD5 57697a8f6f5c718b8a7a10978abb8194
BLAKE2b-256 c6f45c3b503b8f943cb6e188380a65c9cb5999ef30a541db8b31e466a8c430b5

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastworker-0.1.0.tar.gz:

Publisher: publish.yml on neul-labs/fastworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file fastworker-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: fastworker-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 37.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for fastworker-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e100e39c6f5c17fc91707504ecc6d2c56ff6efbccf1077ada5b401161521a3a4
MD5 79026c12396d2d012032159e14d79cb5
BLAKE2b-256 9d70a21bc1afb48e5718482965934e6e6599536f2842920cef67aaca9c71b77a

See more details on using hashes here.

Provenance

The following attestation bundles were made for fastworker-0.1.0-py3-none-any.whl:

Publisher: publish.yml on neul-labs/fastworker

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page