Kafka subprocess orchestration framework
Project description
Drakkar
Poll -> Execute -> Sink
Kafka subprocess orchestration framework with pluggable output sinks. Consumes messages from Kafka, runs CPU-intensive external binaries in a managed subprocess pool, and delivers results to any combination of Kafka, PostgreSQL, MongoDB, Redis, HTTP, and filesystem.
Workers are the Drakkars, executors are the Vikings.
What it does
Kafka source topic
|
v
[ Drakkar Worker ]
|
+-- poll messages (per-partition pipelines)
+-- arrange() -> executor tasks (user hook)
+-- run external binary via subprocess pool
+-- on_task_complete() -> sink payloads (user hook)
+-- on_message_complete() -> aggregate per source message (user hook, optional)
+-- deliver to configured sinks (Kafka, Postgres, Mongo, Redis, HTTP, files)
+-- commit offsets (watermark-based, only after all sinks confirm)
|
v
Configured sinks (any combination)
- Per-partition independent pipelines with offset watermark tracking
- Pluggable sinks -- configure any combination of Kafka, PostgreSQL, MongoDB, Redis, HTTP, filesystem
- Dead letter queue -- failed deliveries go to a DLQ Kafka topic with error metadata
- Cooperative-sticky rebalancing -- non-revoked partitions continue without interruption
- Backpressure via Kafka pause/resume -- memory stays bounded regardless of consumer lag
- Subprocess executor pool with semaphore-based concurrency limiting
- Typed message models -- define Pydantic schemas for input/output, get auto-deserialization
- Cache (optional) --
self.cachekey/value store with memory + write-behind SQLite + eventually-consistent peer sync across workers (docs) - Built-in debug UI (FastAPI) with executor timeline, partition lag, message tracing
- Flight recorder -- SQLite event log with retention and rotation
- Prometheus metrics -- pipeline, executor, and per-sink metrics
- Structured JSON logging -- ECS-compatible, ready for Elastic
Quick start
uv init my-processor && cd my-processor
uv add py-drakkar
1. Define your message models
# models.py
from pydantic import BaseModel
class InputMessage(BaseModel):
request_id: str
data: str
priority: int = 1
class ProcessedResult(BaseModel):
request_id: str
result: str
processed: bool
class ResultSummary(BaseModel):
request_id: str
status: str
2. Implement your handler
# handler.py
import structlog
from prometheus_client import Counter
from drakkar import (
BaseDrakkarHandler, CollectResult, DeliveryAction, DeliveryError,
ErrorAction, ExecutorTask, KafkaPayload, PostgresPayload,
RedisPayload, make_task_id,
)
from models import InputMessage, ProcessedResult, ResultSummary
logger = structlog.get_logger()
# custom Prometheus metric (user-defined)
items_processed = Counter('app_items_processed_total', 'Total processed items')
class MyHandler(BaseDrakkarHandler[InputMessage, ProcessedResult]):
async def arrange(self, messages, pending):
tasks = []
for msg in messages:
# msg.payload is an InputMessage instance (auto-deserialized)
tasks.append(ExecutorTask(
task_id=make_task_id("proc"),
args=["--input", msg.payload.data],
source_offsets=[msg.offset],
metadata={"request_id": msg.payload.request_id},
))
return tasks
async def on_task_complete(self, result):
output = ProcessedResult(
request_id=result.task.metadata["request_id"],
result=result.stdout.strip(),
processed=result.exit_code == 0,
)
summary = ResultSummary(
request_id=output.request_id,
status="done" if output.processed else "failed",
)
# custom Prometheus metric
items_processed.inc()
# async structured logging
await logger.ainfo(
"item_processed",
category="handler",
request_id=output.request_id,
processed=output.processed,
)
# route to sinks based on business logic
sinks = CollectResult(
kafka=[KafkaPayload(data=output, key=output.request_id.encode())],
postgres=[PostgresPayload(table="results", data=summary)],
)
# conditional: cache successful results in Redis
if output.processed:
sinks.redis.append(
RedisPayload(key=f"result:{output.request_id}", data=summary, ttl=3600)
)
return sinks
async def on_error(self, task, error):
await logger.awarning(
"task_failed", category="handler",
task_id=task.task_id, exit_code=error.exit_code,
)
return ErrorAction.RETRY
async def on_delivery_error(self, error: DeliveryError):
await logger.awarning(
"delivery_failed", category="handler",
sink=error.sink_name, error=error.error,
)
# retry transient failures, DLQ for permanent ones
if error.sink_type in ("http", "redis"):
return DeliveryAction.RETRY
return DeliveryAction.DLQ
3. Configure
# drakkar.yaml
kafka:
brokers: "localhost:9092"
source_topic: "input-events"
consumer_group: "my-workers"
executor:
binary_path: "/usr/local/bin/my-processor"
max_executors: 8
task_timeout_seconds: 120
window_size: 20
sinks:
kafka:
results:
topic: "output-results"
postgres:
main:
dsn: "postgresql://user:pass@localhost:5432/mydb"
redis:
cache:
url: "redis://localhost:6379/0"
key_prefix: "app:"
dlq:
topic: "" # auto-derived: input-events_dlq
metrics:
port: 9090
debug:
port: 8080
4. Run
# main.py
from drakkar import DrakkarApp
from handler import MyHandler
app = DrakkarApp(
handler=MyHandler(),
config_path="drakkar.yaml",
)
app.run()
Worker name is read from the WORKER_ID environment variable by default (configurable via worker_name_env in config).
Handler hooks
| Hook | When | Purpose |
|---|---|---|
on_startup(config) |
Before components start | Modify config (e.g., auto-detect CPU count) |
on_ready(config, db_pool) |
After sinks connected | Initialize state from DB, run migrations |
arrange(messages, pending) |
Window of messages received | Transform messages into executor tasks |
collect(result) |
Each task completes | Process result into sink payloads |
on_window_complete(results, messages) |
All tasks in a window done | Aggregate results across a window |
on_error(task, error) |
Task fails | Return RETRY, SKIP, or replacement tasks |
on_delivery_error(error) |
Sink delivery fails | Return DLQ (default), RETRY, or SKIP |
on_assign(partitions) |
Partitions assigned | Initialize per-partition state |
on_revoke(partitions) |
Partitions revoked | Cleanup per-partition state |
Periodic tasks
Use the @periodic decorator to schedule recurring background coroutines on the handler. They run in the same async loop alongside the poll loop, start after on_ready(), and are cancelled on shutdown. Overlapping runs are prevented -- the next interval starts only after the current invocation finishes.
from drakkar import BaseDrakkarHandler, periodic
class MyHandler(BaseDrakkarHandler):
async def on_ready(self, config, db_pool):
self.db_pool = db_pool
@periodic(seconds=60)
async def refresh_cache(self):
async with self.db_pool.acquire() as conn:
self.cache = await conn.fetch("SELECT * FROM lookup")
@periodic(seconds=30, on_error="stop")
async def health_ping(self):
await http_post("https://health.example.com/ping")
| Parameter | Type | Default | Description |
|---|---|---|---|
seconds |
float |
required | Interval between runs |
on_error |
"continue" | "stop" |
"continue" |
"continue" logs and retries next interval; "stop" logs and cancels the task |
Sinks
Configure any combination in the sinks: section. Each type supports multiple named instances.
| Sink | Payload | Serialization |
|---|---|---|
KafkaPayload |
data: BaseModel, key: bytes |
data.model_dump_json().encode() -> value |
PostgresPayload |
data: BaseModel, table: str |
data.model_dump() -> column mapping |
MongoPayload |
data: BaseModel, collection: str |
data.model_dump() -> BSON document |
HttpPayload |
data: BaseModel |
data.model_dump_json() -> POST body |
RedisPayload |
data: BaseModel, key: str, ttl: int? |
data.model_dump_json() -> string value |
FilePayload |
data: BaseModel, path: str |
data.model_dump_json() + "\n" -> JSONL line |
Routing: if you have multiple sinks of the same type, set sink="name" on the payload. With a single sink per type, the framework routes automatically.
Error handling: on delivery failure, on_delivery_error() is called. Default action: write to DLQ. The DLQ topic is auto-derived as {source_topic}_dlq.
Typed messages
Define Pydantic models for your input/output and use them as type parameters:
class MyHandler(BaseDrakkarHandler[InputModel, OutputModel]):
async def arrange(self, messages, pending):
for msg in messages:
msg.payload # InputModel instance, auto-deserialized
msg.value # raw bytes, always available as fallback
Non-generic BaseDrakkarHandler (no type params) works too -- you get raw bytes in msg.value.
Scaling
Run multiple instances with the same consumer_group. Kafka's cooperative-sticky rebalancing distributes partitions across workers.
WORKER_ID=worker-1 python main.py
WORKER_ID=worker-2 python main.py
Configuration
All config fields support environment variable override with DRAKKAR_ prefix and __ for nesting:
DRAKKAR_KAFKA__BROKERS=kafka:9092
DRAKKAR_EXECUTOR__MAX_EXECUTORS=16
DRAKKAR_DEBUG__PORT=8081
Observability
Debug UI
Enabled by default at :8080. Pages:
/-- dashboard with partition tiles, pool utilization, event counters/partitions-- per-partition stats/live-- tabbed live view: Arrange, Executors (timeline), Collect, Trace/history-- filterable event browser with partition and event type toggles/trace/{partition}/{offset}-- full lifecycle of a single message/task/{task_id}-- task detail with PID, duration, CLI command, stdout/stderr
Prometheus metrics
Exposed at :9090/metrics. Key metrics:
drakkar_messages_consumed_total{partition}drakkar_executor_tasks_total{status},drakkar_executor_duration_secondsdrakkar_sink_payloads_delivered_total{sink_type, sink_name}drakkar_sink_deliver_errors_total{sink_type, sink_name}drakkar_sink_deliver_duration_seconds{sink_type, sink_name}drakkar_sink_dlq_messages_totaldrakkar_backpressure_active,drakkar_total_queueddrakkar_offset_lag{partition},drakkar_assigned_partitionsdrakkar_handler_duration_seconds{hook}drakkar_worker_info(worker_id, version, consumer_group)
Structured logging
JSON to stderr, ECS-compatible. Every log line includes service_name, worker_id, consumer_group, category, and timestamp.
Use structlog.get_logger() for async logging in your handlers:
import structlog
logger = structlog.get_logger()
# in any async hook
await logger.ainfo("my_event", category="handler", custom_field="value")
Integration test
A full docker-compose example lives in integration/ with all 6 sink types:
cd integration
docker compose up --build
Services and web UIs:
| URL | Service |
|---|---|
http://localhost:8081 |
Worker 1 debug UI |
http://localhost:8082 |
Worker 2 debug UI |
http://localhost:8083 |
Worker 3 debug UI |
http://localhost:8088 |
Kafka UI |
http://localhost:8089 |
MongoDB Express |
http://localhost:8087 |
Redis Commander |
The integration scenario:
- 3 workers consuming from 50-partition topic
- Each result goes to Kafka + Postgres + MongoDB + Redis (always)
- High-match results (>20) trigger HTTP webhook
- Very high-match results (>50) write to JSONL file
- 5% simulated executor failures with retry via
on_error() - Failed deliveries route to DLQ or retry based on sink type
Debug databases are stored in integration/shared/ with per-worker filenames and automatic timestamping (e.g. worker-1-2026-03-23__14_55_00.db). A {worker}-live.db symlink points to the current database while running. Files persist across restarts and are rotated automatically. See integration/shared/README.md for details.
Development
uv sync --extra=dev
uv run pytest --cov=drakkar
uvx ruff check drakkar/ tests/
uv run ty check drakkar/
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file py_drakkar-0.10.0.tar.gz.
File metadata
- Download URL: py_drakkar-0.10.0.tar.gz
- Upload date:
- Size: 2.9 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
df8dc0e41f865371b83ee0c7102cca2d1ee24142a0ae16b5bf5d1d7fa8f594c8
|
|
| MD5 |
0893d6e24f7f9029fbb053179cb90e5e
|
|
| BLAKE2b-256 |
e5082c112e6fe152ded9ba0d1f98ddd4ebfd295a153eed1907e3af302ed00345
|
Provenance
The following attestation bundles were made for py_drakkar-0.10.0.tar.gz:
Publisher:
release.yml on wlame/drakkar
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
py_drakkar-0.10.0.tar.gz -
Subject digest:
df8dc0e41f865371b83ee0c7102cca2d1ee24142a0ae16b5bf5d1d7fa8f594c8 - Sigstore transparency entry: 1356362307
- Sigstore integration time:
-
Permalink:
wlame/drakkar@512ae3eb19d9e61a58bda43b8c72318329b1dc60 -
Branch / Tag:
refs/tags/v0.10.0 - Owner: https://github.com/wlame
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@512ae3eb19d9e61a58bda43b8c72318329b1dc60 -
Trigger Event:
release
-
Statement type:
File details
Details for the file py_drakkar-0.10.0-py3-none-any.whl.
File metadata
- Download URL: py_drakkar-0.10.0-py3-none-any.whl
- Upload date:
- Size: 170.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a4a01c5f22ef9fc0a82522f56de433039020f1515953afb241f777789556cf7c
|
|
| MD5 |
b36330e60ab70b25d1c00b0f27dc1385
|
|
| BLAKE2b-256 |
cb70b1bcfdbea69ef573d0079ded3a938b12dc7f756ed301d9a1b5f9546e66ec
|
Provenance
The following attestation bundles were made for py_drakkar-0.10.0-py3-none-any.whl:
Publisher:
release.yml on wlame/drakkar
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
py_drakkar-0.10.0-py3-none-any.whl -
Subject digest:
a4a01c5f22ef9fc0a82522f56de433039020f1515953afb241f777789556cf7c - Sigstore transparency entry: 1356362322
- Sigstore integration time:
-
Permalink:
wlame/drakkar@512ae3eb19d9e61a58bda43b8c72318329b1dc60 -
Branch / Tag:
refs/tags/v0.10.0 - Owner: https://github.com/wlame
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@512ae3eb19d9e61a58bda43b8c72318329b1dc60 -
Trigger Event:
release
-
Statement type: