Skip to main content

Python-native workflow engine with Human-in-the-Loop support. Inspired by Mastra (TypeScript).

Project description

Pymastra

Python-native workflow engine with Human-in-the-Loop support

Building AI-powered workflows where humans stay in control. Define complex pipelines with Python, pause execution for human review, and integrate seamlessly with LLMs and external services.

Python 3.11+ License: MIT Type Checked Code Quality Tests Passing Coverage

๐ŸŽฏ Why Pymastra?

Building AI workflows is complex. You need:

  • Sequential execution with error handling
  • Human review at critical points (approvals, validations)
  • LLM integration without vendor lock-in
  • Persistent storage for reliability
  • Monitoring & observability for production
  • Security hardening for sensitive operations

Pymastra solves all of this with a simple, Python-native API.

โœจ Status: Production Ready (v0.2.0)

Phase Status Features
0 โœ… Complete Core workflow engine, HITL, in-memory storage
1 โœ… Complete LLM integration, FastAPI server, web dashboard
2 โœ… Complete SQLite/PostgreSQL, webhooks, observability, security

Over 1,000+ lines of documentation, 10 production examples, 100% type coverage, 66%+ test coverage.


๐Ÿš€ Quick Start (2 minutes)

Installation

# Minimal (workflow engine only)
pip install pymastra

# With LLM support (OpenAI + Claude)
pip install pymastra[llm]

# With REST API server
pip install pymastra[server]

# Complete installation (all features)
pip install pymastra[all]

Your First Workflow

import asyncio
from pymastra import Workflow, Step, StepContext
from pydantic import BaseModel

# Define input and output schemas
class DocumentRequest(BaseModel):
    content: str
    confidence_threshold: float = 0.8

class ClassificationResult(BaseModel):
    category: str
    confidence: float

# Define a workflow step
async def classify_document(ctx: StepContext) -> ClassificationResult:
    """Classify document and request approval if low confidence."""
    doc = ctx.input

    # Your classification logic here
    category = "Technical"  # Replace with actual logic
    confidence = 0.92

    # Pause for human approval if confidence is low
    if confidence < doc.confidence_threshold:
        await ctx.suspend({
            "message": "Low confidence classification - requires approval",
            "suggested_category": category,
            "confidence": confidence,
            "document_preview": doc.content[:500]
        })
        # After resume, use human feedback if provided
        if ctx.resume_input and ctx.resume_input.get("approved"):
            category = ctx.resume_input.get("category", category)

    return ClassificationResult(category=category, confidence=confidence)

# Build the workflow
workflow = Workflow(
    name="document_classifier",
    description="Classify documents with human-in-the-loop approval",
    trigger_schema=DocumentRequest
)

workflow.step(Step(
    id="classify",
    description="Classify document",
    input_schema=DocumentRequest,
    output_schema=ClassificationResult,
    execute=classify_document
)).commit()

# Execute the workflow
async def main():
    # Start execution
    run = await workflow.execute(DocumentRequest(
        content="Technical documentation about distributed systems...",
        confidence_threshold=0.9
    ))

    print(f"Status: {run.status}")  # Output: "suspended"
    print(f"Run ID: {run.run_id}")

    if run.status == "suspended":
        # Human reviews the suspension payload
        print("Waiting for human approval...")

        # After approval (e.g., from web UI or API)
        final_result = await run.resume({
            "approved": True,
            "category": "Technical"
        })

        print(f"Final status: {final_result.status}")  # "completed"
        print(f"Output: {final_result.outputs}")

# Run it
asyncio.run(main())

๐Ÿ“š Core Features

1. Workflow as Code

Define complex workflows with simple Python syntax:

# Sequential execution
wf.step(step1).then(step2).then(step3).commit()

# With branching
wf.step(decision_step).then(
    if_approved=approval_step,
    if_rejected=rejection_step
).commit()

Benefits:

  • Full IDE support and autocomplete
  • Type safety with Pydantic validation
  • Easy debugging and testing
  • Version control friendly
  • No YAML/config file overhead

2. Human-in-the-Loop (HITL)

Pause workflows at any point for human input:

# Request approval
await ctx.suspend({
    "request_type": "approval",
    "message": "Please review and approve",
    "data": analysis_result
})

# Resume with human input
await run.resume({
    "approved": True,
    "feedback": "Changes look good"
})

Use Cases:

  • Approval workflows (documents, contracts, spending)
  • Quality checks (content moderation, validation)
  • Human feedback loops (training data, corrections)
  • Escalations (edge cases, exceptions)

3. LLM Integration

Unified interface for OpenAI and Anthropic Claude:

from pymastra import create_llm_step, LLMProvider, LLMConfig

# Create an LLM step
llm_step = create_llm_step(
    id="analyze",
    description="Analyze document sentiment",
    system_prompt="You are a sentiment analysis expert...",
    provider=LLMProvider.ANTHROPIC,
    model="claude-3-sonnet-20240229",
    temperature=0.7
)

# Easy integration in workflows
workflow.step(llm_step).then(next_step).commit()

Supported:

  • โœ… OpenAI (GPT-4, GPT-3.5-turbo)
  • โœ… Anthropic (Claude 3 Opus, Sonnet, Haiku)
  • โœ… Token counting and cost estimation
  • โœ… Streaming responses
  • โœ… Temperature, top_p, max_tokens configuration

4. Tool/Function Calling

Create AI agents that can call functions:

from pymastra import tool, create_tool_calling_step

@tool
def add(a: float, b: float) -> float:
    """Add two numbers together."""
    return a + b

@tool
def multiply(a: float, b: float) -> float:
    """Multiply two numbers."""
    return a * b

# Create a math agent
agent_step = create_tool_calling_step(
    id="math_agent",
    description="Solve math problems using available tools",
    system_prompt="You are a helpful math assistant. Solve the problem step by step.",
    tools=[add, multiply],
    provider=LLMProvider.ANTHROPIC,
)

workflow.step(agent_step).commit()

5. Persistent Storage

Support for multiple storage backends:

from pymastra import Workflow, SQLiteStorage, PostgresStorage

# SQLite (self-hosted, development)
storage = SQLiteStorage(path="./data/workflows.db")

# PostgreSQL (production, multi-user)
storage = PostgresStorage(
    url="postgresql://user:pass@localhost/pymastra"
)

# Use with server
server = WorkflowServer(storage=storage)

Features:

  • Query API with filtering and pagination
  • Automatic indexing for performance
  • Migration between backends
  • ~1MB per 1000 workflow runs

6. REST API Server

Production-ready FastAPI server:

from pymastra.server import WorkflowServer
from pymastra import SQLiteStorage

# Create server with security
server = WorkflowServer(
    storage=SQLiteStorage(path="./data/workflows.db"),
    auth_enabled=True,
    api_key=os.environ.get("PYMASTRA_API_KEY"),
    rate_limit=100,  # requests per minute
    cors_origins=["https://yourdomain.com"],
    debug=False
)

# Register workflows
server.register_workflow("classifier", classification_workflow)

# Start with uvicorn
# uvicorn server:app --port 8000

Endpoints:

  • POST /workflows/execute - Start workflow
  • GET /runs/{run_id} - Check status
  • GET /runs/{run_id}/suspend_payload - Get suspension data
  • POST /runs/{run_id}/resume - Resume with input
  • GET /dashboard - Web approval dashboard
  • GET /health - Health check

7. Web Dashboard

Beautiful approval dashboard included:

  • Real-time workflow monitoring
  • One-click approval/rejection
  • Suspension context display
  • Auto-refresh
  • Mobile responsive

Access at: http://localhost:8000/dashboard

8. Webhooks & Integrations

Trigger external services on workflow events:

from pymastra import Webhook, EventType, get_webhook_registry

# Create webhook
webhook = Webhook(
    id="slack-notifications",
    url="https://hooks.slack.com/services/YOUR/WEBHOOK",
    events=[EventType.WORKFLOW_COMPLETED, EventType.WORKFLOW_FAILED],
    secret="webhook-secret-for-hmac",
)

# Register
registry = get_webhook_registry()
registry.register(webhook)

# Features:
# โœ… Event filtering
# โœ… Automatic retries (exponential backoff)
# โœ… HMAC-SHA256 signatures
# โœ… Custom headers
# โœ… Failure tracking

9. Observability & Monitoring

Built-in observability:

from pymastra import get_metrics, get_event_logger, TraceContext

# Metrics
metrics = get_metrics()
print(f"Completed: {metrics.completed_count}")
print(f"Failed: {metrics.failed_count}")
print(f"Avg time: {metrics.avg_execution_time_ms}ms")

# Event logging
logger = get_event_logger()
for event in logger.events:
    print(f"{event.timestamp} - {event.type}: {event.data}")

# Tracing
trace = TraceContext()
print(f"Spans: {trace.spans}")
print(f"Total duration: {trace.total_duration_ms}ms")

Covered Events:

  • WORKFLOW_START, WORKFLOW_COMPLETED, WORKFLOW_FAILED
  • STEP_START, STEP_COMPLETED, STEP_FAILED
  • TOOL_CALLED, TOOL_COMPLETED
  • ERROR_* events

10. Security

Production-hardened security:

server = WorkflowServer(
    # API key authentication
    auth_enabled=True,
    api_key=os.environ.get("PYMASTRA_API_KEY"),

    # Rate limiting
    rate_limit=100,  # requests/minute

    # CORS restriction
    cors_origins=["https://yourdomain.com"],

    # Error sanitization
    debug=False,  # Hide stack traces
)

Protections:

  • โœ… Bearer token authentication on all API endpoints
  • โœ… Rate limiting to prevent abuse
  • โœ… SSRF protection for webhook URLs
  • โœ… HMAC-SHA256 webhook signatures
  • โœ… Error message sanitization
  • โœ… CORS middleware with origin whitelisting
  • โœ… Environment-based secrets (no hardcoding)

๐Ÿ’ก Common Use Cases

1. Document Processing with Approval

Upload Document
    โ†“
Extract Text
    โ†“
Classify Category (AI)
    โ†“
[SUSPEND] Human Review
    โ†“
Process Based on Category
    โ†“
Send Confirmation

Example: Legal document classification, contract routing

2. Customer Support Workflow

Receive Ticket
    โ†“
Analyze Sentiment (AI)
    โ†“
Extract Keywords (LLM)
    โ†“
[SUSPEND] Assign to Team
    โ†“
Generate Response (AI)
    โ†“
[SUSPEND] Manager Approval
    โ†“
Send to Customer

3. Content Moderation

Receive Content
    โ†“
Scan for Policy Violations (AI)
    โ†“
[IF FLAGGED] Suspend for Review
    โ†“
[SUSPEND] Human Moderation
    โ†“
Take Action (Approve/Reject/Escalate)
    โ†“
Log Result

4. Data Validation & Enrichment

Receive Raw Data
    โ†“
Validate Schema
    โ†“
Enrich with External APIs
    โ†“
[IF INCOMPLETE] Suspend for Manual Entry
    โ†“
Transform to Final Format
    โ†“
Store in Database

5. AI Agent with Human Oversight

User Request
    โ†“
AI Agent (with tools)
    โ†“
Execute Tool Calls
    โ†“
[IF HIGH-RISK] Suspend for Approval
    โ†“
Proceed or Modify
    โ†“
Return Result

๐Ÿ“ฆ Installation & Setup

Option 1: Pip Install (Easiest)

# For different use cases:

# Just the core engine
pip install pymastra

# Want to use LLMs?
pip install pymastra[llm]

# Want the REST API server?
pip install pymastra[server]

# Want persistent storage?
pip install pymastra[sqlite]      # SQLite
pip install pymastra[postgres]    # PostgreSQL

# Want everything?
pip install pymastra[all]

Option 2: From Source (Development)

# Clone the repository
git clone https://github.com/akashs101199/pymastra.git
cd pymastra

# Create virtual environment
python3 -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# Install with development tools
pip install -e ".[dev]"

# Run tests
pytest tests/ -v

# Type check
mypy pymastra/ --strict

# Lint
ruff check pymastra/

Configuration

Create .env file for configuration:

# API Security
PYMASTRA_API_KEY=your-secret-key

# Database
DATABASE_URL=sqlite:///./data/workflows.db
# Or PostgreSQL:
# DATABASE_URL=postgresql://user:password@localhost/pymastra

# LLM Providers
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...

# Webhooks
WEBHOOK_SECRET=your-webhook-secret

# Server
PYMASTRA_DEBUG=false
PYMASTRA_PORT=8000
PYMASTRA_RATE_LIMIT=100

๐Ÿ” Detailed Examples

Example 1: Simple Sequential Workflow

from pymastra import Workflow, Step, StepContext

# Step 1: Fetch data
async def fetch_data(ctx: StepContext):
    data = {"count": 100}
    return data

# Step 2: Process data
async def process_data(ctx: StepContext):
    prev = ctx.get_output("fetch")
    return {"processed": prev["count"] * 2}

# Step 3: Store results
async def store_results(ctx: StepContext):
    processed = ctx.get_output("process")
    return {"stored": True, "value": processed["processed"]}

# Build workflow
wf = Workflow(name="data_pipeline")
wf.step(Step("fetch", execute=fetch_data))
wf.step(Step("process", execute=process_data))
wf.step(Step("store", execute=store_results))
wf.commit()

# Execute
run = await wf.execute({})
print(f"Output: {run.outputs}")  # {"stored": True, "value": 200}

Example 2: LLM-Powered Sentiment Analysis

from pymastra import Workflow, Step, StepContext, create_llm_step, LLMProvider

# Create workflow
wf = Workflow(name="sentiment_analyzer")

# Step 1: Analyze sentiment with Claude
llm_step = create_llm_step(
    id="analyze",
    description="Analyze sentiment of text",
    system_prompt="""Analyze the sentiment of the provided text.
    Respond with JSON: {"sentiment": "positive|negative|neutral", "confidence": 0-1, "reasoning": "..."}""",
    provider=LLMProvider.ANTHROPIC,
    model="claude-3-sonnet-20240229",
)

wf.step(llm_step).commit()

# Execute
from pydantic import BaseModel

class TextInput(BaseModel):
    text: str

result = await wf.execute(TextInput(text="I love this product!"))
# Returns: {"sentiment": "positive", "confidence": 0.95, ...}

Example 3: Approval Workflow with Retry

from pymastra import Workflow, Step, StepContext, RetryConfig

async def submit_order(ctx: StepContext):
    order = ctx.input
    # Might fail due to network issues
    await external_api.submit(order)
    return {"order_id": "12345", "status": "submitted"}

async def confirm_approval(ctx: StepContext):
    order_id = ctx.get_output("submit")["order_id"]
    await ctx.suspend({
        "message": "Order submitted. Awaiting approval.",
        "order_id": order_id
    })
    # Resumed with approval decision
    if ctx.resume_input.get("approved"):
        return {"status": "approved"}
    else:
        return {"status": "rejected", "reason": ctx.resume_input.get("reason")}

wf = Workflow(name="approval_workflow")
wf.step(Step(
    id="submit",
    execute=submit_order,
    retry=RetryConfig(max_retries=3, backoff="exponential")
)).then(Step(
    id="approve",
    execute=confirm_approval
)).commit()

Example 4: REST API Server

import os
import asyncio
from pymastra.server import WorkflowServer
from pymastra import SQLiteStorage

async def main():
    # Create storage
    storage = SQLiteStorage(path="./data/workflows.db")

    # Create server with security
    server = WorkflowServer(
        storage=storage,
        auth_enabled=True,
        api_key=os.environ.get("PYMASTRA_API_KEY", "dev-key"),
        rate_limit=100,
        cors_origins=["http://localhost:3000"],
        debug=False
    )

    # Register your workflows
    server.register_workflow("classifier", classification_workflow)

    # Start server
    import uvicorn
    uvicorn.run(
        server.get_app(),
        host="0.0.0.0",
        port=8000,
        workers=4
    )

if __name__ == "__main__":
    asyncio.run(main())

API Usage:

# Execute workflow
curl -X POST http://localhost:8000/workflows/execute \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"workflow_id": "classifier", "data": {"text": "..."}}'

# Check status
curl http://localhost:8000/runs/run-123 \
  -H "Authorization: Bearer YOUR_API_KEY"

# Resume with approval
curl -X POST http://localhost:8000/runs/run-123/resume \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"human_input": {"approved": true}}'

๐Ÿ“Š Performance & Metrics

Benchmarks (v0.2.0)

Metric Result
Single-step workflow < 5ms
3-step workflow ~15ms
Concurrent throughput 2000+ workflows/sec
SQLite storage per 1000 runs ~1MB
Type coverage 100%
Test coverage 66%+

Run benchmarks yourself:

python benchmarks/run_benchmarks.py

๐Ÿ“– Documentation

Document Purpose Read Time
GETTING_STARTED.md 10-minute tutorial 10 min
DEBUGGING.md Error handling guide 15 min
PERSISTENCE.md Storage backends 10 min
DEPLOYMENT.md Production setup 20 min
SECURITY.md Security policies 10 min
CONTRIBUTING.md How to contribute 5 min
.github/CI_CD.md CI/CD pipeline 10 min
CHANGELOG.md Version history 5 min

๐Ÿ“š Examples

10 production-ready examples in examples/:

Example Purpose LOC
document_classification.py Document classifier with HITL 150
ticket_routing.py Support ticket router 200
llm_classification.py Simple LLM integration 80
agent_with_tools.py Math agent with function calling 120
server_example.py FastAPI server setup 100
server_with_dashboard.py Dashboard UI example 150
sqlite_persistence.py Persistent storage 180
observability_demo.py Metrics & tracing 120
webhooks_demo.py Webhook integration 140
production_server.py Production-ready template 250

Run any example:

python examples/document_classification.py

๐Ÿ—๏ธ Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                     Pymastra Workflow Engine                 โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                               โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚  Workflow API (Python-native)                       โ”‚   โ”‚
โ”‚  โ”‚  - Define steps, chains, conditions                 โ”‚   โ”‚
โ”‚  โ”‚  - Type-safe with Pydantic v2                       โ”‚   โ”‚
โ”‚  โ”‚  - Full IDE support & autocomplete                  โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                         โ”‚                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚  Step Execution Engine (Async)                    โ”‚   โ”‚
โ”‚  โ”‚  - Parallel/sequential execution                  โ”‚   โ”‚
โ”‚  โ”‚  - Error handling & retries                       โ”‚   โ”‚
โ”‚  โ”‚  - Human-in-the-Loop suspend/resume               โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                         โ”‚                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚  Storage Layer (Abstract)                         โ”‚   โ”‚
โ”‚  โ”‚  โ”œโ”€ MemoryStorage (dev/test)                      โ”‚   โ”‚
โ”‚  โ”‚  โ”œโ”€ SQLiteStorage (self-hosted)                   โ”‚   โ”‚
โ”‚  โ”‚  โ””โ”€ PostgresStorage (production)                  โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                         โ”‚                                  โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”     โ”‚
โ”‚  โ”‚          โ”‚           โ”‚           โ”‚              โ”‚     โ”‚
โ”‚  โ–ผ          โ–ผ           โ–ผ           โ–ผ              โ–ผ     โ”‚
โ”‚ LLM     Webhooks    Observability  Tools        Events   โ”‚
โ”‚ APIs    Registry    (Metrics,      Registry     Logger   โ”‚
โ”‚         Delivery    Tracing)       Function              โ”‚
โ”‚                                    Calling               โ”‚
โ”‚                                                          โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚  โ”‚  REST API Server (FastAPI)                      โ”‚   โ”‚
โ”‚  โ”‚  - CRUD operations on workflows                 โ”‚   โ”‚
โ”‚  โ”‚  - Web dashboard for approvals                  โ”‚   โ”‚
โ”‚  โ”‚  - WebSocket for real-time updates              โ”‚   โ”‚
โ”‚  โ”‚  - Security: Auth, rate limiting, CORS          โ”‚   โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ”‚                                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ” Security Features

Authentication & Authorization

  • โœ… API key authentication (Bearer tokens)
  • โœ… Environment variable configuration
  • โœ… 401 Unauthorized for invalid keys
  • โœ… No hardcoded secrets

Rate Limiting & DoS Protection

  • โœ… Configurable rate limits (default: 100 req/min)
  • โœ… Per-endpoint limiting
  • โœ… IP-based tracking

Data Protection

  • โœ… SSRF protection for webhook URLs
  • โœ… HMAC-SHA256 webhook signatures
  • โœ… Error message sanitization
  • โœ… Stack trace hiding in production

Access Control

  • โœ… CORS middleware with origin whitelisting
  • โœ… Role-based access (future)
  • โœ… Audit logging of all operations

See SECURITY.md for detailed policy.


๐Ÿค Contributing

We welcome contributions! See CONTRIBUTING.md for:

  • Development setup
  • Code style guidelines (ruff, mypy)
  • Testing requirements
  • Pull request process
  • Commit message format

Quick Setup for Contributors

git clone https://github.com/akashs101199/pymastra.git
cd pymastra
python3 -m venv venv
source venv/bin/activate
pip install -e ".[dev]"

# Run tests
pytest tests/ -v

# Check code quality
mypy pymastra/ --strict
ruff check pymastra/
ruff format pymastra/

โ“ FAQ

Q: How is Pymastra different from Airflow/Prefect/Dagster?

A: Those are DAG-based schedulers for data pipelines. Pymastra is:

  • Synchronous execution in a single process (no scheduling)
  • Human-in-the-Loop native (suspend/resume)
  • Python-native (no YAML config)
  • LLM-native (easy AI integration)
  • Lightweight (start in seconds, not minutes)

Use Pymastra for: AI workflows, approval processes, interactive pipelines. Use Airflow for: batch processing, scheduled jobs, data engineering.

Q: Can I use this in production?

A: Yes! v0.2.0 is production-ready with:

  • 100% type coverage (mypy strict)
  • 66%+ test coverage
  • API authentication & rate limiting
  • SSRF protection
  • Error sanitization
  • Comprehensive docs & examples

Q: What LLMs are supported?

A: Currently:

  • โœ… OpenAI (GPT-4, GPT-3.5-turbo)
  • โœ… Anthropic Claude (3 Opus, Sonnet, Haiku)

Adding more providers is on the roadmap.

Q: How do I handle retries?

A: Use RetryConfig:

Step(
    id="flaky_step",
    execute=my_function,
    retry=RetryConfig(
        max_retries=3,
        backoff="exponential",  # or "fixed"
        retry_on=[ConnectionError, TimeoutError]
    )
)

Q: Can I run workflows in parallel?

A: Yes! Async execution:

runs = await asyncio.gather(
    workflow.execute(input1),
    workflow.execute(input2),
    workflow.execute(input3),
)

Q: How do I monitor workflows?

A: Multiple options:

  1. Metrics: get_metrics() for counters and timings
  2. Events: get_event_logger() for structured logs
  3. Tracing: TraceContext for execution timeline
  4. Webhooks: Webhook for external integrations
  5. Dashboard: Web UI at /dashboard
  6. API: /runs endpoint for querying

Q: Is there a SaaS version?

A: Not yet, but it's on the roadmap for v1.0. For now:

  • Self-hosted: Use SQLiteStorage or PostgresStorage
  • Docker: See DEPLOYMENT.md for Docker setup
  • Cloud: Deploy with AWS/GCP/Azure using provided templates

Q: How much does it cost?

A: Pymastra is free and open source (MIT license).

  • No usage fees
  • No vendor lock-in
  • Pay only for infrastructure (LLM APIs, hosting)

๐Ÿ“ž Support & Community


๐Ÿ—บ๏ธ Roadmap

v0.3.0 (Q3 2026)

  • Advanced AI agents with long-context memory
  • Caching layer for LLM responses
  • Function composition and reusability
  • Enhanced dashboard UI
  • GraphQL API (alternative to REST)

v1.0.0 (Q4 2026)

  • Stable API guarantee (backward compatible)
  • Multi-tenant support
  • Enterprise features (SAML, audit logs, etc.)
  • SaaS platform
  • Additional LLM providers (Gemini, Llama, etc.)

๐Ÿ“ˆ Stats

  • Lines of code: 5,000+
  • Lines of documentation: 3,000+
  • Test coverage: 66%+
  • Type coverage: 100%
  • Examples: 10 production-ready
  • Dependencies: Minimal (pydantic, anyio)
  • Documentation pages: 8+ comprehensive guides

๐Ÿ“„ License

MIT License - see LICENSE

Copyright (c) 2026 Akash Shanmuganatha

Feel free to use in personal and commercial projects.


๐Ÿ™ Credits

Built with Claude Code - AI-native development platform

Special thanks to:

  • Pydantic for type safety
  • FastAPI for REST API framework
  • Anthropic and OpenAI for LLM APIs
  • The Python community for async/await

โญ Show Your Support

If Pymastra helps you build great workflows:

  1. โญ Star the repository on GitHub
  2. ๐Ÿ“ข Share with your team
  3. ๐Ÿ› Report issues you find
  4. ๐Ÿ’ก Suggest features
  5. ๐Ÿค Contribute improvements

๐Ÿš€ Get Started Now!

# Install
pip install pymastra[all]

# Read the guide
cat GETTING_STARTED.md

# Run an example
python examples/document_classification.py

# Start building!

Questions? Check GETTING_STARTED.md or open an issue.

Found a security issue? See SECURITY.md for responsible disclosure.


Built for developers who want to build AI workflows the right way.

โญ Star on GitHub โ€ข ๐Ÿ“– Read Docs โ€ข ๐Ÿ› Report Issue

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pymastra-0.5.0.tar.gz (201.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pymastra-0.5.0-py3-none-any.whl (74.8 kB view details)

Uploaded Python 3

File details

Details for the file pymastra-0.5.0.tar.gz.

File metadata

  • Download URL: pymastra-0.5.0.tar.gz
  • Upload date:
  • Size: 201.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pymastra-0.5.0.tar.gz
Algorithm Hash digest
SHA256 0e333ad35945734818ddec579bf310fdc725bf6562f56d183d3dfcfef47bdc99
MD5 21025cc73d14fc4a2a3ba0968ee96fdb
BLAKE2b-256 05dc442d4658892b81e12052e4dba21234b9695cba7760c377311a378feb2174

See more details on using hashes here.

Provenance

The following attestation bundles were made for pymastra-0.5.0.tar.gz:

Publisher: ci.yml on akashs101199/pymastra

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pymastra-0.5.0-py3-none-any.whl.

File metadata

  • Download URL: pymastra-0.5.0-py3-none-any.whl
  • Upload date:
  • Size: 74.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for pymastra-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c1a06b4fb9d478906e0383d68e4d291cc9a746b595b6a68bd0c250e21d344989
MD5 fe133e8b8679a97effb4804c1b53ae85
BLAKE2b-256 99355a8c6460c0293eacf934d8805f1eff61fad874e947d6ef155da683eb12ef

See more details on using hashes here.

Provenance

The following attestation bundles were made for pymastra-0.5.0-py3-none-any.whl:

Publisher: ci.yml on akashs101199/pymastra

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page