Skip to main content

Python SDK for Stepflow components and workflows.

Project description

Stepflow Python SDK

Python SDK for building Stepflow components and workflows.

Installation

# Install from source
uv add stepflow-py

Configuration

The SDK is configured via environment variables:

Variable Description Default
STEPFLOW_SERVICE_NAME Service name for observability stepflow-python
STEPFLOW_LOG_LEVEL Log level (DEBUG, INFO, WARNING, ERROR) INFO
STEPFLOW_LOG_DESTINATION Log destination (stderr, file, otlp, comma-separated) otlp if OTLP endpoint set, else stderr
STEPFLOW_LOG_FILE File path for file logging -
STEPFLOW_OTLP_ENDPOINT OTLP endpoint (e.g., http://localhost:4317) -
STEPFLOW_TRACE_ENABLED Enable distributed tracing true

Example with OTLP (logs and traces go to OTLP by default):

STEPFLOW_SERVICE_NAME=my-python-components \
STEPFLOW_OTLP_ENDPOINT=http://localhost:4317 \
uv run stepflow_py

Example with OTLP + stderr logging:

STEPFLOW_SERVICE_NAME=my-components \
STEPFLOW_LOG_DESTINATION=stderr,otlp \
STEPFLOW_OTLP_ENDPOINT=http://localhost:4317 \
uv run stepflow_py

Example with only stderr logging (no OTLP):

STEPFLOW_SERVICE_NAME=my-components \
STEPFLOW_LOG_LEVEL=DEBUG \
uv run stepflow_py

Usage

Creating a Component Server

from stepflow_py import StepflowStdioServer, StepflowContext
import msgspec

# Define input/output types
class MyInput(msgspec.Struct):
    message: str
    count: int

class MyOutput(msgspec.Struct):
    result: str

# Create server
server = StepflowStdioServer()

# Register a component
@server.component
def my_component(input: MyInput) -> MyOutput:
    return MyOutput(result=f"Processed: {input.message} x{input.count}")

# Component with context (for blob operations)
@server.component
async def component_with_context(input: MyInput, context: StepflowContext) -> MyOutput:
    # Store data as a blob
    blob_id = await context.put_blob({"processed": input.message})
    return MyOutput(result=f"Stored blob: {blob_id}")

# Run the server
if __name__ == "__main__":
    server.run()

Using the Context API

The StepflowContext provides bidirectional communication with the Stepflow runtime:

# Store JSON data as a blob
blob_id = await context.put_blob({"key": "value"})

# Retrieve blob data
data = await context.get_blob(blob_id)

# Logging (uses Python's standard logging with automatic diagnostic context)
import logging
logger = logging.getLogger(__name__)
logger.info("Processing data")  # Automatically includes flow_id, run_id, step_id, etc.

Development

Running Tests

uv run pytest

Type Checking

uv run mypy src/

Protocol Generation

This SDK uses auto-generated protocol types from the JSON schema. To regenerate the protocol types when the schema changes:

uv run python generate.py

The generation script automatically handles the generation and applies necessary fixes for msgspec compatibility.

Project Structure

  • src/stepflow_py/ - Main SDK code
    • generated_protocol.py - Auto-generated protocol types from JSON schema
    • protocol.py - Hybrid protocol layer with envelope patterns for efficient deserialization
    • server.py - Component server implementation
    • context.py - Runtime context API
    • exceptions.py - SDK-specific exceptions
  • tests/ - Test suite

Architecture

The SDK uses a hybrid approach for protocol handling:

  1. Generated Types (generated_protocol.py) - Auto-generated from the Stepflow JSON schema using datamodel-code-generator
  2. Protocol Layer (protocol.py) - Combines generated types with manual envelope patterns for two-stage deserialization using msgspec.Raw
  3. Server Implementation - Uses the protocol layer for efficient JSON-RPC message handling

This design provides:

  • Type Safety - All protocol messages use properly typed structs
  • Schema Consistency - Generated types match the Rust protocol exactly
  • Performance - Two-stage deserialization with msgspec.Raw for optimal speed
  • Maintainability - Protocol changes can be regenerated automatically

License

Licensed under the Apache License, Version 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stepflow_py-0.7.0.tar.gz (150.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

stepflow_py-0.7.0-py3-none-any.whl (57.3 kB view details)

Uploaded Python 3

File details

Details for the file stepflow_py-0.7.0.tar.gz.

File metadata

  • Download URL: stepflow_py-0.7.0.tar.gz
  • Upload date:
  • Size: 150.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stepflow_py-0.7.0.tar.gz
Algorithm Hash digest
SHA256 8758001c098c93907a7cc27183674d5d03bb6d0bac0101fce49f09610e2e29a4
MD5 668128d6015f5bc9947621b339a8c063
BLAKE2b-256 742e0da5acc5052dc044215f74a0141085cb1e7afc42f3106ee4a6d483e40267

See more details on using hashes here.

Provenance

The following attestation bundles were made for stepflow_py-0.7.0.tar.gz:

Publisher: release_python.yml on stepflow-ai/stepflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file stepflow_py-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: stepflow_py-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 57.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for stepflow_py-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5c621a0b7654ed8a5e9ba30524a3c3e56c674723f1c7efbeb094928bb4b1f72f
MD5 921de271b4878d59d10d6ab630e6bb2f
BLAKE2b-256 b96b226e2901bc31f6614a7ce664e88252addec3c9222e3fbd46c42fb0ea555c

See more details on using hashes here.

Provenance

The following attestation bundles were made for stepflow_py-0.7.0-py3-none-any.whl:

Publisher: release_python.yml on stepflow-ai/stepflow

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page