Skip to main content

Multi-process worker dispatcher built on uhttp-server

Project description

uhttp-workers

Multi-process worker dispatcher built on uhttp-server.

Single dispatcher process handles all connections, N worker processes handle business logic in parallel. Communication via multiprocessing.Queue with efficient select() integration (POSIX only).

Architecture

                                    ┌─────────────┐
                                    │  Worker 0   │
                                    │  Worker 1   │
                 request_queue A    │  Worker 2   │
              ┌────────────────►    │  Worker 3   │  ComputeWorker
              │                     └──────┬──────┘
              │                            │
┌─────────────┤                            │
│             │                     ┌──────┴──────┐
│ Dispatcher  │  request_queue B    │  Worker 0   │
│  (main)     ├────────────────►    │  Worker 1   │  StorageWorker
│             │                     └──────┬──────┘
│  - static   │                            │
│  - sync     │    response_queue          │
│  - auth     │◄───────────────────────────┘
│             │    (shared)
└─────────────┘

Key design decisions:

  • Sockets never leave the dispatcher — only serializable data goes through queues
  • Each worker pool has its own request queue, all pools share one response queue
  • Workers send heartbeats via response queue — dispatcher detects stuck/dead workers
  • Each worker has a private control queue for config updates and stop signals

Installation

pip install uhttp-workers

Quick Start

import uhttp.workers as _workers


class ItemWorker(_workers.Worker):
    def setup(self):
        self.items = {}
        self.next_id = 1

    @_workers.api('/api/items', 'GET')
    def list_items(self, request):
        return {'items': list(self.items.values())}

    @_workers.api('/api/item/{id:int}', 'GET')
    def get_item(self, request):
        item = self.items.get(request.path_params['id'])
        if not item:
            return {'error': 'Not found'}, 404
        return item


class MyDispatcher(_workers.Dispatcher):
    def do_check(self, client):
        api_key = client.headers.get('x-api-key')
        if api_key not in VALID_KEYS:
            client.respond({'error': 'unauthorized'}, status=401)
            raise _workers.RejectRequest()

    @_workers.sync('/health')
    def health(self, client, path_params):
        client.respond({'status': 'ok'})


def main():
    dispatcher = MyDispatcher(
        port=8080,
        pools=[
            _workers.WorkerPool(
                ItemWorker, num_workers=4,
                routes=['/api/**'],
                timeout=30,
            ),
        ],
    )
    dispatcher.run()


if __name__ == '__main__':
    main()

Multiple Worker Pools

Route different endpoints to different worker pools with independent scaling:

dispatcher = MyDispatcher(
    port=8080,
    pools=[
        _workers.WorkerPool(
            ComputeWorker, num_workers=4,
            routes=['/api/compute/**'],
            timeout=60,
        ),
        _workers.WorkerPool(
            StorageWorker, num_workers=2,
            routes=['/api/items/**', '/api/item/**'],
            timeout=10,
        ),
        _workers.WorkerPool(
            GeneralWorker, num_workers=1,
        ),  # no routes = default/fallback pool
    ],
)

Request routing order:

  1. Static files — served directly by dispatcher
  2. Sync handlers — run in dispatcher process
  3. Worker pools — first pool with matching route prefix, or fallback pool
  4. 404 — no match

API Handlers

Group related endpoints under a common URL prefix using ApiHandler:

import uhttp.workers as _workers

class UserHandler(_workers.ApiHandler):
    PATTERN = '/api/user'

    @_workers.api('', 'GET')
    def list_users(self, request):
        return {'users': self.worker.db.list_users()}

    @_workers.api('/{id:int}', 'GET')
    def get_user(self, request):
        return self.worker.db.get_user(request.path_params['id'])

    @_workers.api('/{id:int}', 'DELETE')
    def delete_user(self, request):
        self.worker.db.delete_user(request.path_params['id'])
        return {'deleted': request.path_params['id']}

class OrderHandler(_workers.ApiHandler):
    PATTERN = '/api/order'

    @_workers.api('', 'GET')
    def list_orders(self, request):
        return {'orders': []}

    @_workers.api('/{id:int}', 'GET')
    def get_order(self, request):
        return {'id': request.path_params['id']}

class MyWorker(_workers.Worker):
    HANDLERS = [UserHandler, OrderHandler]

    def setup(self):
        self.db = Database(self.kwargs['db_login'])

@api patterns on handlers are automatically prefixed with the handler's PATTERN. Handlers access the worker instance via self.worker.

You can also define @api methods directly on the worker class — useful for simple workers that don't need handler grouping.

Handlers support inheritance — a subclass inherits all routes from its parent, using the subclass PATTERN as prefix.

Static Files

dispatcher = Dispatcher(
    port=8080,
    static_routes={
        '/static/': './static/',
        '/images/': '/var/data/images/',
    },
)

Static files are served directly by the dispatcher process with path traversal protection. Directory requests automatically serve index.html if present.

Sync Handlers

Lightweight handlers that run directly in the dispatcher process — no queue overhead. Define them as methods on the dispatcher class with the @sync decorator:

import uhttp.workers as _workers

class MyDispatcher(_workers.Dispatcher):
    @_workers.sync('/health')
    def health(self, client, path_params):
        client.respond({
            'status': 'ok',
            'pools': [pool.status() for pool in self._pools],
        })

    @_workers.sync('/version')
    def version(self, client, path_params):
        client.respond({'version': '1.0.0'})

Use sync handlers for fast, non-blocking responses only — long operations block the entire dispatcher.

Worker Lifecycle

Setup

setup() is called once when a worker process starts. Use it to initialize resources that cannot be shared across processes (database connections, models, etc.):

class MyWorker(_workers.Worker):
    def setup(self):
        self.db = Database(self.kwargs['db_login'])

Extra keyword arguments from WorkerPool(...) are available as self.kwargs.

Configuration Updates

Dispatcher can send configuration to workers at runtime via per-worker control queues:

# dispatcher side
for pool in dispatcher._pools:
    pool.send_config({'rate_limit': 100})

# worker side
class MyWorker(_workers.Worker):
    def on_config(self, config):
        self.rate_limit = config['rate_limit']

Health Monitoring

Workers send heartbeats automatically via the shared response queue. When a worker takes a request, it reports which request_id it is working on. If a worker stops responding:

  • Dead worker (segfault, crash) — detected via is_alive(), restarted immediately
  • Stuck worker (infinite loop in C code) — detected via heartbeat timeout, killed and restarted
  • Too many restarts — pool marked as degraded, returns 503

Request Handling

@_workers.api('/process/{id:int}', 'POST')
def process(self, request):
    # request.request_id  — internal ID for dispatcher pairing
    # request.method       — 'POST'
    # request.path         — '/process/42'
    # request.path_params  — {'id': 42}
    # request.query        — {'page': '1'} or None
    # request.data         — dict (JSON), bytes (binary), or None
    # request.headers      — dict
    # request.content_type — 'application/json'

    # return data (status 200)
    return {'result': 'ok'}

    # return data with status
    return {'error': 'not found'}, 404

URL Patterns

Dispatcher uses prefix matching to route requests to pools:

_workers.WorkerPool(MyWorker, routes=['/api/users/**'])  # matches /api/users/anything
_workers.WorkerPool(MyWorker, routes=['/api/status'])     # exact match only
_workers.WorkerPool(MyWorker)                              # fallback — catches everything else

Workers use full pattern matching with type conversion:

@_workers.api('/user/{id:int}', 'GET')        # id converted to int
@_workers.api('/price/{amount:float}', 'GET') # amount converted to float
@_workers.api('/tag/{name}')                   # name as str, all methods

Authentication

Override do_check() on the dispatcher — runs before any request is queued:

class MyDispatcher(_workers.Dispatcher):
    def __init__(self, valid_keys, **kwargs):
        super().__init__(**kwargs)
        self.valid_keys = valid_keys

    def do_check(self, client):
        api_key = client.headers.get('x-api-key')
        if api_key not in self.valid_keys:
            client.respond({'error': 'unauthorized'}, status=401)
            raise _workers.RejectRequest()

do_check() is only called for requests going to worker pools — static files and sync handlers are not affected.

Post-Response Hook

Override on_response() on the dispatcher to post-process after a response is sent to the client — e.g., forward data to another worker pool:

class MyDispatcher(_workers.Dispatcher):
    def on_response(self, response, pending):
        if response.status == 200 and pending.pool.name == 'LprWorker':
            storage_pool = self._find_pool('/internal/storage')
            storage_pool.request_queue.put(_workers.Request(
                request_id=-1,
                method='POST',
                path='/internal/storage/save',
                data={
                    'image': pending.client.data,
                    'result': response.data,
                }))

pending is a _PendingRequest with client (original connection) and pool (source pool). Requests with request_id=-1 are ignored by the dispatcher when the worker responds.

Dispatcher Idle Hook

Override on_idle() on the dispatcher for periodic background tasks — called on each select() timeout (every SELECT_TIMEOUT seconds, default 1s):

class MyDispatcher(_workers.Dispatcher):
    def on_idle(self):
        # periodic cleanup, monitoring, etc.
        pass

Workers have their own on_idle() hook, called on each heartbeat_interval timeout.

Graceful Shutdown

On SIGTERM or SIGINT:

  1. Stop accepting new connections
  2. Wait for pending responses (up to shutdown_timeout)
  3. Respond 503 to remaining pending requests
  4. Send stop signal to all workers via control queues
  5. Wait for workers to finish, kill after timeout

Monitoring

class MyDispatcher(_workers.Dispatcher):
    @_workers.sync('/monitor')
    def monitor(self, client, path_params):
        client.respond({
            'pools': [pool.status() for pool in self._pools],
            'pending': len(self._pending),
        })

Pool status includes per-worker info: alive, last seen, current request ID, queue size.

Logging

Workers have a built-in Logger accessible via self.log:

class MyWorker(_workers.Worker):
    @_workers.api('/item/{id:int}', 'GET')
    def get_item(self, request):
        item_id = request.path_params['id']
        # %-style (Python logging compatible)
        self.log.info("Getting item %d", item_id)
        # {}-style (kwargs)
        self.log.info("Getting item {id}", id=item_id)
        return {'id': item_id}

Log messages are sent to the dispatcher via the shared response queue and printed in the dispatcher process — no interleaved output from multiple processes.

Log levels: LOG_DEBUG (10), LOG_INFO (20), LOG_WARNING (30), LOG_ERROR (40), LOG_CRITICAL (50)

Set minimum level per pool:

_workers.WorkerPool(
    MyWorker, num_workers=4,
    log_level=_workers.LOG_INFO,  # default: LOG_WARNING
)

Output format — auto-detected at dispatcher init:

  • Terminal: ANSI colors — bold red (critical), red (error), yellow (warning), dim (debug)
  • systemd: Syslog priority prefixes (<3>, <4>, etc.) — journalctl colors by priority

Override Dispatcher.on_log(name, level, message) to customize output or forward to a logging framework.

Errors are logged automatically:

  • Handler exceptions → ERROR with full traceback (returns 500 to client)
  • setup() crash → CRITICAL with traceback (worker exits and restarts)
  • Worker restart → ERROR with reason (died/stuck)
  • Request timeout → WARNING with request ID and timeout value

Configuration

Dispatcher

Parameter Default Description
port 8080 Listen port
address '0.0.0.0' Listen address
pools [] List of WorkerPool instances
static_routes {} URL prefix → filesystem path
shutdown_timeout 10 Seconds to wait on shutdown
max_pending 1000 Max pending requests (503 when exceeded)
ssl_context None ssl.SSLContext for HTTPS

WorkerPool

Parameter Default Description
worker_class Worker subclass
num_workers 1 Number of worker processes
routes None Prefix patterns (None = fallback pool)
timeout 30 Request timeout in seconds (504)
stuck_timeout 60 Heartbeat timeout before kill
heartbeat_interval 1 Seconds between worker heartbeats
log_level LOG_WARNING Minimum log level for worker loggers
max_restarts 10 Max restarts per restart_window
restart_window 300 Time window for restart counting (seconds)
queue_warning 100 Log warning when queue size exceeds this (0 = disable)

Extra **kwargs on WorkerPool are passed to worker constructor (accessible as self.kwargs).

Requirements

  • Python >= 3.10
  • POSIX system (Linux, macOS) — uses select() with queue._reader
  • uhttp-server

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

uhttp_workers-1.0.0.tar.gz (29.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

uhttp_workers-1.0.0-py3-none-any.whl (15.8 kB view details)

Uploaded Python 3

File details

Details for the file uhttp_workers-1.0.0.tar.gz.

File metadata

  • Download URL: uhttp_workers-1.0.0.tar.gz
  • Upload date:
  • Size: 29.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for uhttp_workers-1.0.0.tar.gz
Algorithm Hash digest
SHA256 394c1cecaa92e1dd0adb33c77405052ecee84d9f95a1075503543d297ab57542
MD5 69979f5609b04a72cd1847acc649df63
BLAKE2b-256 dd1689e0edabd873dd063833351fda898fa3d49f8ed684d8ff47da2b13c60a92

See more details on using hashes here.

Provenance

The following attestation bundles were made for uhttp_workers-1.0.0.tar.gz:

Publisher: publish.yml on pavelrevak/uhttp-workers

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file uhttp_workers-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: uhttp_workers-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 15.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for uhttp_workers-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 29b39a30e1a2894361a9918bd34f6f1fc6632260017b13ad38856fdb245e33d7
MD5 25d7dec15bb217bc7314c04edc565d9a
BLAKE2b-256 e1bde3b5b6d115695d04b0b46ae191e469620892787db158d837425d8948b5ba

See more details on using hashes here.

Provenance

The following attestation bundles were made for uhttp_workers-1.0.0-py3-none-any.whl:

Publisher: publish.yml on pavelrevak/uhttp-workers

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page