Lightweight flow tracing and visualization for Python
Project description
penstock
A lightweight Python library for defining, tracing, and visualizing application flows.
What is penstock?
Penstock lets you declaratively define how data flows through your application—from entrypoints like Kafka messages or HTTP requests, through processing steps, to side effects. It provides:
- Correlation IDs for grouping logs and traces across a single operation
- Flow definitions via decorators that double as living documentation
- DAG visualization to see all possible paths through your system at a glance
Penstock's unique value prop: Describe flows that already exist in your code, get correlation IDs automatically for log grouping, and generate static documentation of all possible paths — without changing how your code executes or adding infrastructure.
Installation
pip install penstock
Quick Start
from penstock import flow, entrypoint, step
@flow("bom_sync")
@entrypoint
def handle_kafka_message(msg):
"""Receives BOM update from Kafka."""
bom_id = msg.payload["bom_id"]
return fetch_from_api(bom_id)
@flow("bom_sync")
@step(after="handle_kafka_message")
def fetch_from_api(bom_id):
"""Fetches full BOM data from external API."""
data = api.get_bom(bom_id)
return sync_to_database(data)
@flow("bom_sync")
@step(after="fetch_from_api")
def sync_to_database(data):
"""Syncs BOM parts to local database."""
db.bulk_upsert(data.parts)
Every invocation of handle_kafka_message automatically gets a correlation ID that propagates through the entire flow, available in your logs via penstock.current_flow_id().
Visualize Your Flows
from penstock import generate_dag
# Output as Mermaid diagram
print(generate_dag("bom_sync", format="mermaid"))
# Or render directly to PNG
generate_dag("bom_sync", format="png", output="bom_sync_flow.png")
graph TD
handle_kafka_message --> fetch_from_api
fetch_from_api --> sync_to_database
Features
Correlation IDs
import logging
from penstock import current_flow_id
logger = logging.getLogger(__name__)
@flow("bom_sync")
@step(after="fetch_from_api")
def sync_to_database(data):
logger.info(f"[{current_flow_id()}] Syncing {len(data.parts)} parts")
# All logs in this flow share the same ID for easy filtering
Multiple Entrypoints
@flow("user_update")
@entrypoint
def handle_api_request(request):
"""User updates via REST API."""
return process_user_update(request.data)
@flow("user_update")
@entrypoint
def handle_admin_action(action):
"""Admin updates via internal tool."""
return process_user_update(action.payload)
@flow("user_update")
@step(after=["handle_api_request", "handle_admin_action"])
def process_user_update(data):
"""Common processing for user updates."""
...
Branching Flows
@flow("order_processing")
@step(after="validate_order")
def charge_payment(order):
...
@flow("order_processing")
@step(after="validate_order")
def reserve_inventory(order):
...
@flow("order_processing")
@step(after=["charge_payment", "reserve_inventory"])
def send_confirmation(order):
...
Integrations
Django
# settings.py
MIDDLEWARE = [
"penstock.contrib.django.FlowMiddleware",
...
]
Celery
from penstock.contrib.celery import flow_task
@flow("async_processing")
@flow_task
def process_in_background(data):
# Correlation ID automatically propagates to the worker
...
Logging (structlog)
import structlog
from penstock.contrib.structlog import flow_processor
structlog.configure(
processors=[
flow_processor, # Adds flow_id to all log entries
...
]
)
Design Goals
- Minimal overhead — Decorators are cheap, context propagation uses
contextvars - Zero magic — Your functions remain normal functions, callable without penstock
- Static analysis — DAG is built at import time, not runtime
- Framework agnostic — Works with Django, Flask, FastAPI, Celery, or plain Python
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file penstock-0.1.0.tar.gz.
File metadata
- Download URL: penstock-0.1.0.tar.gz
- Upload date:
- Size: 12.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
591c22eaf27020063ef9c1f8771fb237b1076edb24eabd9eef1e81d9b96c0172
|
|
| MD5 |
f4252564b3f86cd4007a8684d25ca959
|
|
| BLAKE2b-256 |
eed81196841893bb09a3b3128db3ce0e2601af2c1ad65be736095a42e505f105
|
File details
Details for the file penstock-0.1.0-py3-none-any.whl.
File metadata
- Download URL: penstock-0.1.0-py3-none-any.whl
- Upload date:
- Size: 16.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b784d09d4df041f9edf81202abca0922305da3ba73ce87c35a1f573b254e8a28
|
|
| MD5 |
d4ea4856c5c79ff928bd2c615c00386c
|
|
| BLAKE2b-256 |
3c525dc77b9c8c37a70c1f514eafcda5727c19cdc80919c30608695ff9af6c59
|