Multi-process worker dispatcher built on uhttp-server
Project description
uhttp-workers
Multi-process worker dispatcher built on uhttp-server.
Single dispatcher process handles all connections, N worker processes handle business logic in parallel. Communication via multiprocessing.Queue with efficient select() integration (POSIX only).
Architecture
┌─────────────┐
│ Worker 0 │
│ Worker 1 │
request_queue A │ Worker 2 │
┌────────────────► │ Worker 3 │ ComputeWorker
│ └──────┬──────┘
│ │
┌─────────────┤ │
│ │ ┌──────┴──────┐
│ Dispatcher │ request_queue B │ Worker 0 │
│ (main) ├────────────────► │ Worker 1 │ StorageWorker
│ │ └──────┬──────┘
│ - static │ │
│ - sync │ response_queue │
│ - auth │◄───────────────────────────┘
│ │ (shared)
└─────────────┘
Key design decisions:
- Sockets never leave the dispatcher — only serializable data goes through queues
- Each worker pool has its own request queue, all pools share one response queue
- Workers send heartbeats via response queue — dispatcher detects stuck/dead workers
- Each worker has a private control queue for config updates and stop signals
Installation
pip install uhttp-workers
Quick Start
import uhttp.workers as _workers
class ItemWorker(_workers.Worker):
def setup(self):
self.items = {}
self.next_id = 1
@_workers.api('/api/items', 'GET')
def list_items(self, request):
return {'items': list(self.items.values())}
@_workers.api('/api/item/{id:int}', 'GET')
def get_item(self, request):
item = self.items.get(request.path_params['id'])
if not item:
return {'error': 'Not found'}, 404
return item
class MyDispatcher(_workers.Dispatcher):
def do_check(self, client):
api_key = client.headers.get('x-api-key')
if api_key not in VALID_KEYS:
client.respond({'error': 'unauthorized'}, status=401)
raise _workers.RejectRequest()
@_workers.sync('/health')
def health(self, client, path_params):
client.respond({'status': 'ok'})
def main():
dispatcher = MyDispatcher(
port=8080,
pools=[
_workers.WorkerPool(
ItemWorker, num_workers=4,
routes=['/api/**'],
timeout=30,
),
],
)
dispatcher.run()
if __name__ == '__main__':
main()
Multiple Worker Pools
Route different endpoints to different worker pools with independent scaling:
dispatcher = MyDispatcher(
port=8080,
pools=[
_workers.WorkerPool(
ComputeWorker, num_workers=4,
routes=['/api/compute/**'],
timeout=60,
),
_workers.WorkerPool(
StorageWorker, num_workers=2,
routes=['/api/items/**', '/api/item/**'],
timeout=10,
),
_workers.WorkerPool(
GeneralWorker, num_workers=1,
), # no routes = default/fallback pool
],
)
Request routing order:
- Static files — served directly by dispatcher
- Sync handlers — run in dispatcher process
- Worker pools — first pool with matching route prefix, or fallback pool
- 404 — no match
API Handlers
Group related endpoints under a common URL prefix using ApiHandler:
import uhttp.workers as _workers
class UserHandler(_workers.ApiHandler):
PATTERN = '/api/user'
@_workers.api('', 'GET')
def list_users(self, request):
return {'users': self.worker.db.list_users()}
@_workers.api('/{id:int}', 'GET')
def get_user(self, request):
return self.worker.db.get_user(request.path_params['id'])
@_workers.api('/{id:int}', 'DELETE')
def delete_user(self, request):
self.worker.db.delete_user(request.path_params['id'])
return {'deleted': request.path_params['id']}
class OrderHandler(_workers.ApiHandler):
PATTERN = '/api/order'
@_workers.api('', 'GET')
def list_orders(self, request):
return {'orders': []}
@_workers.api('/{id:int}', 'GET')
def get_order(self, request):
return {'id': request.path_params['id']}
class MyWorker(_workers.Worker):
HANDLERS = [UserHandler, OrderHandler]
def setup(self):
self.db = Database(self.kwargs['db_login'])
@api patterns on handlers are automatically prefixed with the handler's PATTERN. Handlers access the worker instance via self.worker.
You can also define @api methods directly on the worker class — useful for simple workers that don't need handler grouping.
Handlers support inheritance — a subclass inherits all routes from its parent, using the subclass PATTERN as prefix.
Static Files
dispatcher = Dispatcher(
port=8080,
static_routes={
'/static/': './static/',
'/images/': '/var/data/images/',
},
)
Static files are served directly by the dispatcher process with path traversal protection. Directory requests automatically serve index.html if present.
Sync Handlers
Lightweight handlers that run directly in the dispatcher process — no queue overhead. Define them as methods on the dispatcher class with the @sync decorator:
import uhttp.workers as _workers
class MyDispatcher(_workers.Dispatcher):
@_workers.sync('/health')
def health(self, client, path_params):
client.respond({
'status': 'ok',
'pools': [pool.status() for pool in self._pools],
})
@_workers.sync('/version')
def version(self, client, path_params):
client.respond({'version': '1.0.0'})
Use sync handlers for fast, non-blocking responses only — long operations block the entire dispatcher.
Worker Lifecycle
Setup
setup() is called once when a worker process starts. Use it to initialize resources that cannot be shared across processes (database connections, models, etc.):
class MyWorker(_workers.Worker):
def setup(self):
self.db = Database(self.kwargs['db_login'])
Extra keyword arguments from WorkerPool(...) are available as self.kwargs.
Configuration Updates
Dispatcher can send configuration to workers at runtime via per-worker control queues:
# dispatcher side
for pool in dispatcher._pools:
pool.send_config({'rate_limit': 100})
# worker side
class MyWorker(_workers.Worker):
def on_config(self, config):
self.rate_limit = config['rate_limit']
Health Monitoring
Workers send heartbeats automatically via the shared response queue. When a worker takes a request, it reports which request_id it is working on. If a worker stops responding:
- Dead worker (segfault, crash) — detected via
is_alive(), restarted immediately - Stuck worker (infinite loop in C code) — detected via heartbeat timeout, killed and restarted
- Too many restarts — pool marked as degraded, returns 503
Request Handling
@_workers.api('/process/{id:int}', 'POST')
def process(self, request):
# request.request_id — internal ID for dispatcher pairing
# request.method — 'POST'
# request.path — '/process/42'
# request.path_params — {'id': 42}
# request.query — {'page': '1'} or None
# request.data — dict (JSON), bytes (binary), or None
# request.headers — dict
# request.cookies — dict (lazy-parsed from Cookie header)
# request.content_type — 'application/json'
# return data (status 200)
return {'result': 'ok'}
# return data with status
return {'error': 'not found'}, 404
# defer response — worker continues accepting requests
return _workers.DEFERRED
Deferred Responses
Return DEFERRED to skip immediate response. The worker stays in the select loop, accepts new requests, and sends the response later via request.respond():
class MyWorker(_workers.Worker):
def setup(self):
self._jobs = {}
@_workers.api('/process', 'POST')
def process(self, request):
job_id = start_background_work(request.data)
self._jobs[job_id] = request
return _workers.DEFERRED
def on_work_done(self, job_id, result):
request = self._jobs.pop(job_id)
request.respond(data={'result': result})
Note: deferred requests are still subject to dispatcher timeout — call self.keep_alive() periodically to prevent 504.
Keep Alive
Call self.keep_alive() during long operations to reset both the request timeout and stuck worker detection:
@_workers.api('/export', 'POST')
def export(self, request):
for chunk in generate_large_export():
self.keep_alive()
return {'status': 'done'}
Server-Sent Events (SSE)
Stream events to clients using the same API as uhttp.server.Client:
class MyWorker(_workers.Worker):
def setup(self):
self._subscribers = {}
@_workers.api('/events', 'GET')
def events(self, request):
request.response_stream() # sends headers, keeps connection open
self._subscribers[request.request_id] = request
return _workers.DEFERRED
def notify(self, data):
for req in self._subscribers.values():
req.send_event(data=data, event='update')
def on_disconnect(self, request_id):
self._subscribers.pop(request_id, None)
Available streaming methods on Request:
| Method | Description |
|---|---|
response_stream(content_type, headers, cookies) |
Start streaming (default: text/event-stream) |
send_event(data, event, event_id, retry) |
Send SSE event |
send_chunk(data) |
Send raw data chunk |
response_stream_end() |
End stream and close connection |
Streaming requests are excluded from dispatcher timeout expiration. When the client disconnects, the dispatcher notifies the worker via on_disconnect(request_id).
Flow Control
Workers can stop accepting new requests when busy. Requests stay in the shared pool queue for other workers to pick up:
class MyWorker(_workers.Worker):
@_workers.api('/events', 'GET')
def subscribe(self, request):
request.response_stream()
self._subscribers[request.request_id] = request
if len(self._subscribers) >= 100:
self.pause()
return _workers.DEFERRED
def on_disconnect(self, request_id):
self._subscribers.pop(request_id, None)
if not self._accepting and len(self._subscribers) < 100:
self.resume()
pause() excludes the request queue from select() — the worker continues processing control messages, custom fd events, and on_idle(). resume() re-enables request acceptance.
URL Patterns
Dispatcher uses prefix matching to route requests to pools:
_workers.WorkerPool(MyWorker, routes=['/api/users/**']) # matches /api/users/anything
_workers.WorkerPool(MyWorker, routes=['/api/status']) # exact match only
_workers.WorkerPool(MyWorker) # fallback — catches everything else
Workers use full pattern matching with type conversion:
@_workers.api('/user/{id:int}', 'GET') # id converted to int
@_workers.api('/price/{amount:float}', 'GET') # amount converted to float
@_workers.api('/tag/{name}') # name as str, all methods
Authentication
Override do_check() on the dispatcher — runs before any request is queued:
class MyDispatcher(_workers.Dispatcher):
def __init__(self, valid_keys, **kwargs):
super().__init__(**kwargs)
self.valid_keys = valid_keys
def do_check(self, client):
api_key = client.headers.get('x-api-key')
if api_key not in self.valid_keys:
client.respond({'error': 'unauthorized'}, status=401)
raise _workers.RejectRequest()
do_check() is only called for requests going to worker pools — static files and sync handlers are not affected.
Worker-Level Validation
Override do_check() on the worker — runs before routing to handler:
class MyWorker(_workers.Worker):
def do_check(self, request):
token = request.cookies.get('session')
if not token:
return {'error': 'unauthorized'}, 401
Return (data, status) tuple to reject, or None to continue. You can also raise RejectRequest:
def do_check(self, request):
if not self.validate_token(request.cookies.get('session')):
raise _workers.RejectRequest(
data={'error': 'forbidden'}, status=403)
RejectRequest accepts optional data (default: {'error': 'Rejected'}) and status (default: 403).
Error Handling
Override on_request_error() on the worker to customize error handling when a request handler raises an exception:
class MyWorker(_workers.Worker):
def on_request_error(self, request, err):
if isinstance(err, DatabaseError):
self.db.reconnect()
return super().on_request_error(request, err)
Default behavior logs the error with traceback and returns 500.
Post-Response Hook
Override on_response() on the dispatcher to post-process after a response is sent to the client — e.g., forward data to another worker pool:
class MyDispatcher(_workers.Dispatcher):
def on_response(self, response, pending):
if response.status == 200 and pending.pool.name == 'LprWorker':
storage_pool = self._find_pool('/internal/storage')
storage_pool.request_queue.put(_workers.Request(
request_id=-1,
method='POST',
path='/internal/storage/save',
data={
'image': pending.client.data,
'result': response.data,
}))
pending is a _PendingRequest with client (original connection) and pool (source pool).
Requests with request_id=-1 are ignored by the dispatcher when the worker responds.
Dispatcher Idle Hook
Override on_idle() on the dispatcher for periodic background tasks — called on each select() timeout (every SELECT_TIMEOUT seconds, default 1s):
class MyDispatcher(_workers.Dispatcher):
def on_idle(self):
# periodic cleanup, monitoring, etc.
pass
Workers have their own on_idle() hook, called on each heartbeat_interval timeout.
Graceful Shutdown
On SIGTERM or SIGINT:
- Stop accepting new connections
- Wait for pending responses (up to
shutdown_timeout) - Respond 503 to remaining pending requests
- Send stop signal to all workers via control queues
- Wait for workers to finish, kill after timeout
Monitoring
class MyDispatcher(_workers.Dispatcher):
@_workers.sync('/monitor')
def monitor(self, client, path_params):
client.respond({
'pools': [pool.status() for pool in self._pools],
'pending': len(self._pending),
})
Pool status includes per-worker info: alive, last seen, current request ID, queue size.
Logging
Workers have a built-in Logger accessible via self.log:
class MyWorker(_workers.Worker):
@_workers.api('/item/{id:int}', 'GET')
def get_item(self, request):
item_id = request.path_params['id']
# %-style (Python logging compatible)
self.log.info("Getting item %d", item_id)
# {}-style (kwargs)
self.log.info("Getting item {id}", id=item_id)
return {'id': item_id}
Log messages are sent to the dispatcher via the shared response queue and printed in the dispatcher process — no interleaved output from multiple processes.
Log levels: LOG_DEBUG (10), LOG_INFO (20), LOG_WARNING (30), LOG_ERROR (40), LOG_CRITICAL (50)
Check current level with is_* properties to skip expensive formatting:
if self.log.is_debug:
self.log.debug("Details: %s", expensive_computation())
Available: is_debug, is_info, is_warning, is_error.
Set minimum level per pool:
_workers.WorkerPool(
MyWorker, num_workers=4,
log_level=_workers.LOG_INFO, # default: LOG_WARNING
)
Output format — auto-detected at dispatcher init:
- Terminal: ANSI colors — bold red (critical), red (error), yellow (warning), dim (debug)
- systemd: Syslog priority prefixes (
<3>,<4>, etc.) — journalctl colors by priority
Override Dispatcher.on_log(name, level, message) to customize output or forward to a logging framework.
Errors are logged automatically:
- Handler exceptions → ERROR with full traceback (returns 500 to client)
setup()crash → CRITICAL with traceback (worker exits and restarts)- Worker restart → ERROR with reason (died/stuck)
- Request timeout → WARNING with request ID and timeout value
Configuration
Dispatcher
| Parameter | Default | Description |
|---|---|---|
port |
8080 | Listen port |
address |
'0.0.0.0' |
Listen address |
pools |
[] |
List of WorkerPool instances |
static_routes |
{} |
URL prefix → filesystem path |
shutdown_timeout |
10 | Seconds to wait on shutdown |
max_pending |
1000 | Max pending requests (503 when exceeded) |
ssl_context |
None |
ssl.SSLContext for HTTPS |
WorkerPool
| Parameter | Default | Description |
|---|---|---|
worker_class |
— | Worker subclass |
num_workers |
1 | Number of worker processes |
routes |
None |
Prefix patterns (None = fallback pool) |
timeout |
30 | Request timeout in seconds (504) |
stuck_timeout |
60 | Heartbeat timeout before kill |
heartbeat_interval |
1 | Seconds between worker heartbeats |
log_level |
LOG_WARNING |
Minimum log level for worker loggers |
max_restarts |
10 | Max restarts per restart_window |
restart_window |
300 | Time window for restart counting (seconds) |
queue_warning |
100 | Log warning when queue size exceeds this (0 = disable) |
Extra **kwargs on WorkerPool are passed to worker constructor (accessible as self.kwargs).
Requirements
- Python >= 3.10
- POSIX system (Linux, macOS) — uses
select()withqueue._reader - uhttp-server
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file uhttp_workers-1.2.0.tar.gz.
File metadata
- Download URL: uhttp_workers-1.2.0.tar.gz
- Upload date:
- Size: 37.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
01206a743c40cc46acd604966b4de2cdf5d227fc47df3a02ce497cf15aa49b56
|
|
| MD5 |
e3cd0c850b2a3c041ad5fbc784747736
|
|
| BLAKE2b-256 |
d7699904a86a50278e3ea7c3ebaba0f9054c693e687dff5929ba78d3b9552cee
|
Provenance
The following attestation bundles were made for uhttp_workers-1.2.0.tar.gz:
Publisher:
publish.yml on pavelrevak/uhttp-workers
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
uhttp_workers-1.2.0.tar.gz -
Subject digest:
01206a743c40cc46acd604966b4de2cdf5d227fc47df3a02ce497cf15aa49b56 - Sigstore transparency entry: 1238865032
- Sigstore integration time:
-
Permalink:
pavelrevak/uhttp-workers@58b4da6fc1b77c5193ee2672815fa74219f6f639 -
Branch / Tag:
refs/tags/v1.2.0 - Owner: https://github.com/pavelrevak
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@58b4da6fc1b77c5193ee2672815fa74219f6f639 -
Trigger Event:
release
-
Statement type:
File details
Details for the file uhttp_workers-1.2.0-py3-none-any.whl.
File metadata
- Download URL: uhttp_workers-1.2.0-py3-none-any.whl
- Upload date:
- Size: 18.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3e575e479be5bceb43909cab4d6efbb2ebe1e4030573fed90fb47b4f5b426540
|
|
| MD5 |
09f4c79aa2a3fc84f55eb36b29f8654b
|
|
| BLAKE2b-256 |
e19540c25b238bd51a5a6215225eceae54b47d8b41cc44ed2d1bc785b9b2220e
|
Provenance
The following attestation bundles were made for uhttp_workers-1.2.0-py3-none-any.whl:
Publisher:
publish.yml on pavelrevak/uhttp-workers
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
uhttp_workers-1.2.0-py3-none-any.whl -
Subject digest:
3e575e479be5bceb43909cab4d6efbb2ebe1e4030573fed90fb47b4f5b426540 - Sigstore transparency entry: 1238865093
- Sigstore integration time:
-
Permalink:
pavelrevak/uhttp-workers@58b4da6fc1b77c5193ee2672815fa74219f6f639 -
Branch / Tag:
refs/tags/v1.2.0 - Owner: https://github.com/pavelrevak
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@58b4da6fc1b77c5193ee2672815fa74219f6f639 -
Trigger Event:
release
-
Statement type: