ArchitectureHandlers for all parts of the Maverick Architecture
Project description
Architecture Handlers
A minimal, extensible base class for building handler components that accept inputs, optionally perform network requests, and publish results via a thread-safe queue. The same class exposes async wrappers so we only need to maintain one implementation.
Features
- Threaded processing loop reading from an input queue and writing to a result queue
- Pluggable callback
generate_results(item, extra)or providegenerate_results_callbackin config - Template methods for easy adaptation:
validate_item,preprocess_item,postprocess_result - Optional type checking / custom validation
- Dropping policy when queues are full (with warnings & counters)
- Runtime timestamps and counters (processed/dropped/errors/network errors)
- Async wrappers (
async_feed,async_get_result,async_generate,async_start,async_cleanup) built on the same sync core - Hook methods for instrumentation: before/after feed, generate, network call, result retrieval, errors
- Non-blocking external callbacks: a dedicated callback worker thread is available to run user callbacks off the hot path (see “Callbacks”)
Install / Use
Install from PyPI
you can install and import the project like this:
pip install scs-architecture-handlers
from base_handler import ArchitectureHandler
Releasing to PyPI via GitLab CI
- Create a Git tag and push it; the CI pipeline builds and uploads the package:
git tag v0.1.0
git push origin v0.1.0
- In GitLab CI/CD settings, define a masked variable
PYPI_TOKENwith your PyPI token. The deploy job uses Twine with__token__andTWINE_PASSWORD=$PYPI_TOKEN.
Three common setups
Below are focused, copy‑pasteable examples for direct generation, callbacks & non‑blocking execution, and async generation.
1) Direct synchronous generation
Subclass ArchitectureHandler and override generate_results.
from base_handler import ArchitectureHandler
class EchoHandler(ArchitectureHandler):
def generate_results(self, item, extra):
# Your core logic here (ML inference, transformation, etc.)
return {"echo": item, "meta": extra}
h = EchoHandler(run_as_thread=False) # no queues/threads needed for direct generate
out = h.generate("hello", tag=1)
print(out) # {"echo": "hello", "meta": {"tag": 1}}
h.cleanup()
2) Callbacks & non‑blocking execution
Run slow user callbacks without blocking the main processing path by scheduling them on the handler’s dedicated callback thread via _emit_async_callback. You still pass your user callback through generate_results_callback.
import time
from base_handler import ArchitectureHandler
# 1) Define a user callback (could be slow/heavy)
def on_result(item, extra):
# Simulate heavy work (db write, network, etc.)
time.sleep(0.1)
print("callback got:", item)
# 2) Subclass and schedule the callback asynchronously from generate_results
class NonBlockingHandler(ArchitectureHandler):
def generate_results(self, item, extra):
result = {"value": str(item).upper(), "extra": extra}
# Schedule user callback without blocking this call
self._emit_async_callback(result, extra)
return result
# 3) Use it — callbacks run on a dedicated thread and won’t stall generate()
h = NonBlockingHandler(
run_as_thread=False,
disable_thread=False, # enable the sidecar callback thread
generate_results_callback=on_result,
callback_queue_size=8, # tune callback backpressure as needed
)
start = time.time()
for i in range(5):
out = h.generate(f"msg-{i}") # returns immediately
assert out["value"] == f"MSG-{i}" # core logic result is available right away
elapsed = time.time() - start
print(f"generated 5 items in {elapsed:.3f}s (callbacks running in background)")
# Give callbacks a moment to drain (optional)
time.sleep(0.5)
h.cleanup()
Notes
- If the callback queue fills (bounded by
callback_queue_size), new callback tasks are dropped anddropped_countincreases; processing continues. - For producer-style handlers that generate items on their own (e.g., microphone/STT), use
_emit_async_callbackinside that production path to keep the producer responsive.
3) Async generation
All public sync methods have async wrappers that run on threads under the hood.
import asyncio
from base_handler import ArchitectureHandler
class EchoHandler(ArchitectureHandler):
def generate_results(self, item, extra):
return {"echo": item}
async def main():
h = EchoHandler(run_as_thread=False)
out = await h.async_generate("async-hello")
print(out) # {"echo": "async-hello"}
await h.async_cleanup()
asyncio.run(main())
Threaded pipeline example (optional)
Use the internal worker thread to consume items from an input queue and push processed results to a result queue:
import time
from base_handler import ArchitectureHandler
class Doubler(ArchitectureHandler):
def generate_results(self, item, extra):
return item * 2
h = Doubler(run_as_thread=True, disable_thread=False)
h.feed(10)
print(h.get_result(timeout=1.0)) # 20
h.cleanup()
Callbacks & non‑blocking execution
By default, providing generate_results_callback makes generate() call the callback inline and return its result immediately. For producer‑style handlers (for example microphone‑based STT) or advanced subclasses, you can emit callbacks asynchronously so slow callbacks don’t block processing.
- The base class exposes a dedicated, bounded callback queue and a single callback worker thread.
- Subclasses can schedule work onto this thread using the protected helper
_emit_async_callback(result, extra). - If the callback queue is full, the task is dropped (and
dropped_countis incremented) but processing continues.
Example (inside a subclass):
class ProducerHandler(ArchitectureHandler):
def generate_results(self, item, extra):
result = heavy_compute(item)
self._emit_async_callback(result, extra) # schedule user callback non‑blocking
return result
If you rely on async callbacks, you can tune capacity via callback_queue_size.
Configuration keys
host,port,auth: optional network settingsrun_as_thread: start internal worker threads (default True)disable_thread: force-disable threading even ifrun_as_threadis Truemax_queue_size: capacity for input queue (default 128)result_queue_size: capacity for result queue (default =max_queue_size)verbose: enable more logsexpected_type: type or tuple for basic validation (used by defaultvalidate_item)generate_results_callback: alternative to subclassinggenerate_resultsnetwork_timeout,client_id_prefix: optional settingscallback_queue_size: capacity for the async callback queue (default 128); used when scheduling callbacks with_emit_async_callback
Extension Points
To adapt the handler (e.g., Face Recognition, Gesture Generation), override these methods:
| Method | Purpose |
|---|---|
validate_item(item, extra) |
Return False to drop invalid items early (shape/content/type). |
preprocess_item(item, extra) |
Normalize or transform raw input before generate_results. |
generate_results(item, extra) |
Core computation (required unless callback supplied). |
postprocess_result(item, extra, result) |
Final transformation (e.g., map indices to labels). |
_prepare_request_payload(item, extra) |
Build structured payload for network I/O. |
_perform_request(payload) |
Implement actual network request (HTTP, gRPC, etc.). Call via perform_request. |
Instrumentation / Hooks (optional override):
_hook_before_feed,_hook_after_feed_hook_before_generate,_hook_after_generate_hook_before_network_call,_hook_after_network_call_hook_after_result_hook_on_error(generation errors)_hook_network_error(network failures)
Counters available via stats():
- processed_count, dropped_count, error_count, network_error_count
Retrieving Stats
stats = handler.stats()
print(stats["processed_count"], stats["dropped_count"]) # etc.
Dependencies
On Debian/Ubuntu you may need:
sudo apt-get update
sudo apt-get install -y portaudio19-dev
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file scs_architecture_handlers-0.1.6.tar.gz.
File metadata
- Download URL: scs_architecture_handlers-0.1.6.tar.gz
- Upload date:
- Size: 33.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
73aac09d97ef6cb0bb6828046f6725eca76062629d0f4272ef48b3a6612b0562
|
|
| MD5 |
df1b362cf901da24f70288d760ef84b1
|
|
| BLAKE2b-256 |
2dfbe2db38de924086f73ea00303af62fd772aa5f9186a51f016547efd848b77
|
File details
Details for the file scs_architecture_handlers-0.1.6-py3-none-any.whl.
File metadata
- Download URL: scs_architecture_handlers-0.1.6-py3-none-any.whl
- Upload date:
- Size: 26.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c4c24c4db9f680ad795ee2f9dd8f5ca6c0f7e5b15c6c5e739cc224b23d4aaa3a
|
|
| MD5 |
1fe593a725d987d91b7e4d579a106c35
|
|
| BLAKE2b-256 |
75896a092a05ef3e6b673a5027e6976a88aaf59fa95f80a299adabc1ff1ab495
|