Skip to main content

Lightweight flow tracing and visualization for Python

Project description

penstock

A lightweight Python library for defining, tracing, and visualizing application flows.

What is penstock?

Penstock lets you declaratively define how data flows through your application—from entrypoints like Kafka messages or HTTP requests, through processing steps, to side effects. It provides:

  • Correlation IDs for grouping logs and traces across a single operation
  • Flow definitions via decorators that double as living documentation
  • DAG visualization to see all possible paths through your system at a glance

Penstock's unique value prop: Describe flows that already exist in your code, get correlation IDs automatically for log grouping, and generate static documentation of all possible paths — without changing how your code executes or adding infrastructure.

Installation

pip install penstock

Quick Start

from penstock import flow, entrypoint, step

@flow("bom_sync")
@entrypoint
def handle_kafka_message(msg):
    """Receives BOM update from Kafka."""
    bom_id = msg.payload["bom_id"]
    return fetch_from_api(bom_id)

@flow("bom_sync")
@step(after="handle_kafka_message")
def fetch_from_api(bom_id):
    """Fetches full BOM data from external API."""
    data = api.get_bom(bom_id)
    return sync_to_database(data)

@flow("bom_sync")
@step(after="fetch_from_api")
def sync_to_database(data):
    """Syncs BOM parts to local database."""
    db.bulk_upsert(data.parts)

Every invocation of handle_kafka_message automatically gets a correlation ID that propagates through the entire flow, available in your logs via penstock.current_flow_id().

Visualize Your Flows

from penstock import generate_dag

# Output as Mermaid diagram
print(generate_dag("bom_sync", format="mermaid"))

# Or render directly to PNG
generate_dag("bom_sync", format="png", output="bom_sync_flow.png")
graph TD
    handle_kafka_message --> fetch_from_api
    fetch_from_api --> sync_to_database

Features

Correlation IDs

import logging
from penstock import current_flow_id

logger = logging.getLogger(__name__)

@flow("bom_sync")
@step(after="fetch_from_api")
def sync_to_database(data):
    logger.info(f"[{current_flow_id()}] Syncing {len(data.parts)} parts")
    # All logs in this flow share the same ID for easy filtering

Multiple Entrypoints

@flow("user_update")
@entrypoint
def handle_api_request(request):
    """User updates via REST API."""
    return process_user_update(request.data)

@flow("user_update")
@entrypoint
def handle_admin_action(action):
    """Admin updates via internal tool."""
    return process_user_update(action.payload)

@flow("user_update")
@step(after=["handle_api_request", "handle_admin_action"])
def process_user_update(data):
    """Common processing for user updates."""
    ...

Branching Flows

@flow("order_processing")
@step(after="validate_order")
def charge_payment(order):
    ...

@flow("order_processing")
@step(after="validate_order")
def reserve_inventory(order):
    ...

@flow("order_processing")
@step(after=["charge_payment", "reserve_inventory"])
def send_confirmation(order):
    ...

Integrations

Django

# settings.py
MIDDLEWARE = [
    "penstock.contrib.django.FlowMiddleware",
    ...
]

Celery

from penstock.contrib.celery import flow_task

@flow("async_processing")
@flow_task
def process_in_background(data):
    # Correlation ID automatically propagates to the worker
    ...

Logging (structlog)

import structlog
from penstock.contrib.structlog import flow_processor

structlog.configure(
    processors=[
        flow_processor,  # Adds flow_id to all log entries
        ...
    ]
)

Design Goals

  1. Minimal overhead — Decorators are cheap, context propagation uses contextvars
  2. Zero magic — Your functions remain normal functions, callable without penstock
  3. Static analysis — DAG is built at import time, not runtime
  4. Framework agnostic — Works with Django, Flask, FastAPI, Celery, or plain Python

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

penstock-0.1.0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

penstock-0.1.0-py3-none-any.whl (16.1 kB view details)

Uploaded Python 3

File details

Details for the file penstock-0.1.0.tar.gz.

File metadata

  • Download URL: penstock-0.1.0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.11

File hashes

Hashes for penstock-0.1.0.tar.gz
Algorithm Hash digest
SHA256 591c22eaf27020063ef9c1f8771fb237b1076edb24eabd9eef1e81d9b96c0172
MD5 f4252564b3f86cd4007a8684d25ca959
BLAKE2b-256 eed81196841893bb09a3b3128db3ce0e2601af2c1ad65be736095a42e505f105

See more details on using hashes here.

File details

Details for the file penstock-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: penstock-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 16.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.11

File hashes

Hashes for penstock-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b784d09d4df041f9edf81202abca0922305da3ba73ce87c35a1f573b254e8a28
MD5 d4ea4856c5c79ff928bd2c615c00386c
BLAKE2b-256 3c525dc77b9c8c37a70c1f514eafcda5727c19cdc80919c30608695ff9af6c59

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page