Skip to main content

Universal Plan Intermediate Representation - Formal verification, synthesis, and optimization for distributed systems

Project description

UPIR: Universal Plan Intermediate Representation

Formal verification, automatic synthesis, and continuous optimization of distributed system architectures.

License Python Code style: black


Table of Contents


Overview

UPIR (Universal Plan Intermediate Representation) is an open-source framework for formally specifying, verifying, synthesizing, and optimizing distributed system architectures. It bridges the gap between high-level architectural requirements and production-ready implementations.

What UPIR Does

  • Formal Verification: Prove that your architecture satisfies correctness properties using SMT solvers
  • Automatic Synthesis: Generate implementation code from architectural specifications using CEGIS
  • Continuous Optimization: Learn from production metrics to improve architectures using reinforcement learning
  • Pattern Management: Extract and reuse proven architectural patterns
  • Incremental Verification: Cache proofs for faster iteration

Why UPIR?

Designing distributed systems is hard. Traditional approaches rely on:

  • Manual design prone to errors
  • Ad-hoc validation that misses edge cases
  • Trial-and-error optimization that wastes resources
  • Reinventing patterns instead of reusing proven solutions

UPIR automates these processes using formal methods, program synthesis, and machine learning.


Attribution

This is a clean-room implementation based solely on public sources:

Primary Source: Automated Synthesis and Verification of Distributed Systems Using UPIR by Subhadip Mitra, published at TD Commons under CC BY 4.0 license.

Additional references listed in SOURCES.md.

Author: Subhadip Mitra

License: Apache 2.0

Project Status: Personal open source project, no affiliations


Key Features

1. Formal Specifications with Temporal Logic

Define requirements using Linear Temporal Logic (LTL):

from upir.core.temporal import TemporalOperator, TemporalProperty

# ALWAYS: Data consistency must always hold
always_consistent = TemporalProperty(
    operator=TemporalOperator.ALWAYS,
    predicate="data_consistent"
)

# WITHIN: Respond within 100ms
low_latency = TemporalProperty(
    operator=TemporalOperator.WITHIN,
    predicate="respond",
    time_bound=100  # milliseconds
)

2. SMT-Based Verification

Verify architectures satisfy specifications:

from upir.verification.verifier import Verifier
from upir.verification.solver import VerificationStatus

verifier = Verifier()
results = verifier.verify_specification(upir)

for result in results:
    if result.status == VerificationStatus.PROVED:
        print(f"✓ Verified: {result.property.predicate}")
    else:
        print(f"✗ Failed: {result.property.predicate}")
        print(f"  Counterexample: {result.counterexample}")

3. Program Synthesis with CEGIS

Generate code from specifications:

from upir.synthesis.cegis import Synthesizer

synthesizer = Synthesizer(max_iterations=10)
sketch = synthesizer.generate_sketch(specification)
result = synthesizer.synthesize(upir, sketch)

if result.status.value == "SUCCESS":
    print(result.implementation)  # Generated code!

4. Reinforcement Learning Optimization

Optimize architectures based on production metrics:

from upir.learning.learner import ArchitectureLearner

learner = ArchitectureLearner()
metrics = {"latency_p99": 150, "throughput_qps": 5000}
optimized_upir = learner.learn_from_metrics(upir, metrics)

5. Pattern Library

Discover and reuse architectural patterns:

from upir.patterns.library import PatternLibrary

library = PatternLibrary()
matches = library.match_architecture(upir, threshold=0.8)

for pattern, score in matches:
    print(f"{pattern.name}: {score:.1%} match")

Installation

Prerequisites

  • Python 3.11 or higher
  • pip or poetry for package management

From Source

# Clone the repository
git clone https://github.com/bassrehab/upir.git
cd upir

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install -e .

# Install development dependencies (optional)
pip install -e ".[dev]"

Using pip (when published)

pip install upir

Verify Installation

python -c "import upir; print('UPIR installed successfully')"

Quick Start

Here's a complete example of designing a streaming pipeline with UPIR:

from upir.core.architecture import Architecture
from upir.core.specification import FormalSpecification
from upir.core.temporal import TemporalOperator, TemporalProperty
from upir.core.upir import UPIR
from upir.verification.verifier import Verifier
from upir.verification.solver import VerificationStatus

# 1. Define formal specification
spec = FormalSpecification(
    properties=[
        TemporalProperty(
            operator=TemporalOperator.WITHIN,
            predicate="process_event",
            time_bound=100  # 100ms latency requirement
        )
    ],
    invariants=[
        TemporalProperty(
            operator=TemporalOperator.ALWAYS,
            predicate="data_consistent"
        )
    ],
    constraints={
        "latency_p99": {"max": 100.0},
        "monthly_cost": {"max": 5000.0}
    }
)

# 2. Create architecture
components = [
    {"id": "pubsub", "name": "Event Source", "type": "pubsub_source"},
    {"id": "beam", "name": "Processor", "type": "streaming_processor"},
    {"id": "bq", "name": "Database", "type": "database"}
]

connections = [
    {"from": "pubsub", "to": "beam"},
    {"from": "beam", "to": "bq"}
]

arch = Architecture(components=components, connections=connections)

# 3. Create UPIR instance
upir = UPIR(
    id="streaming-pipeline",
    name="Real-time Event Processing",
    description="Streaming pipeline for event analytics",
    architecture=arch,
    specification=spec
)

# 4. Verify specification
verifier = Verifier()
results = verifier.verify_specification(upir)

print(f"Verified {len(results)} properties")
for result in results:
    print(f"  {result.property.predicate}: {result.status.value}")

Output:

Verified 2 properties
  process_event: PROVED
  data_consistent: PROVED

See examples/streaming_example.py for a complete workflow including synthesis, optimization, and pattern extraction.


Core Concepts

1. UPIR Instance

A UPIR represents a complete distributed system design:

UPIR
├── Architecture (components + connections)
├── Specification (properties + constraints)
├── Evidence (supporting documentation)
├── Reasoning (design decisions)
└── Metadata (versioning, ownership)

Key components:

  • Architecture: Physical structure (microservices, databases, queues)
  • Specification: Formal requirements (latency, consistency, availability)
  • Evidence: Documentation, test results, performance data
  • Reasoning: Design rationale and trade-offs

2. Temporal Properties

Express requirements using Linear Temporal Logic (LTL):

Operator Meaning Example
ALWAYS Property holds at all times □ data_consistent
EVENTUALLY Property holds at some future time ◇ all_events_processed
WITHIN Property holds within time bound ◇_{≤100ms} respond
UNTIL Property holds until another holds processing □ complete

Implementation:

# Safety property: Always maintain consistency
TemporalProperty(
    operator=TemporalOperator.ALWAYS,
    predicate="data_consistent"
)

# Liveness property: Eventually complete processing
TemporalProperty(
    operator=TemporalOperator.EVENTUALLY,
    predicate="processing_complete",
    time_bound=60000  # Within 60 seconds
)

3. Formal Verification

UPIR uses SMT (Satisfiability Modulo Theories) solvers to prove correctness:

  1. Encode architecture and specification as logical formulas
  2. Solve using Z3 SMT solver
  3. Verify if properties hold for all possible executions
  4. Generate counterexamples if verification fails

Verification results:

  • PROVED: Property definitely holds ✓
  • DISPROVED: Counterexample found ✗
  • UNKNOWN: Solver couldn't determine
  • TIMEOUT: Exceeded time limit

4. Program Synthesis (CEGIS)

Counterexample-Guided Inductive Synthesis:

1. Generate program sketch with holes
   └─> def process(data): return data.window(size=???)

2. Fill holes with candidate values
   └─> def process(data): return data.window(size=60)

3. Verify against specification
   └─> Verify latency ≤ 100ms

4. If verification fails, use counterexample to refine
   └─> Counterexample: size=60 causes 150ms latency

5. Repeat until successful or max iterations
   └─> Try size=30 → Verified! ✓

5. Reinforcement Learning Optimization

Use Proximal Policy Optimization (PPO) to learn optimal architectures:

State: Current architecture features
Action: Modify component (parallelism, type, connections)
Reward: Performance improvement + constraint satisfaction
Policy: Learn which modifications work best

Optimization loop:

for iteration in range(num_iterations):
    # 1. Encode current architecture
    state = learner.encode_state(upir)

    # 2. Select optimization action (via PPO)
    action, _, _ = learner.ppo.select_action(state)

    # 3. Apply modification
    optimized_upir = learner.decode_action(action, upir)

    # 4. Measure new metrics
    new_metrics = simulate_deployment(optimized_upir)

    # 5. Compute reward
    reward = learner.compute_reward(new_metrics, spec)

    # 6. Update policy
    learner._update_policy()

6. Pattern Library

Discover, store, and reuse architectural patterns:

library = PatternLibrary()

# Built-in patterns:
# - Streaming ETL Pipeline
# - Batch Processing Pipeline
# - Request-Response API
# - Event-Driven Microservices
# - Lambda Architecture
# - CQRS, Event Sourcing, etc.

# Match your architecture
matches = library.match_architecture(upir, threshold=0.8)

# Use best match as starting point
best_pattern, score = matches[0]
template = best_pattern.template

Usage Examples

Example 1: Verify Latency Requirements

from upir.core.specification import FormalSpecification
from upir.core.temporal import TemporalOperator, TemporalProperty

# Define latency requirement
spec = FormalSpecification(
    properties=[
        TemporalProperty(
            operator=TemporalOperator.WITHIN,
            predicate="api_response",
            time_bound=50  # 50ms
        )
    ]
)

# Create architecture with API gateway
components = [
    {"id": "lb", "type": "load_balancer", "latency_ms": 5},
    {"id": "api", "type": "api_server", "latency_ms": 20},
    {"id": "cache", "type": "cache", "latency_ms": 2},
    {"id": "db", "type": "database", "latency_ms": 15}
]

# Total latency: 5 + 20 + 2 + 15 = 42ms < 50ms ✓

Example 2: Synthesize Batch Processing Pipeline

from upir.synthesis.cegis import Synthesizer

# Specification for batch job
spec = FormalSpecification(
    properties=[
        TemporalProperty(
            operator=TemporalOperator.EVENTUALLY,
            predicate="batch_complete",
            time_bound=3600000  # 1 hour
        )
    ],
    constraints={
        "throughput_records_per_sec": {"min": 10000}
    }
)

# Synthesize implementation
synthesizer = Synthesizer()
sketch = synthesizer.generate_sketch(spec)
result = synthesizer.synthesize(upir, sketch)

# Get generated code
if result.status.value == "SUCCESS":
    with open("batch_pipeline.py", "w") as f:
        f.write(result.implementation)

Example 3: Optimize for Cost

from upir.learning.learner import ArchitectureLearner

learner = ArchitectureLearner()

# Current metrics
current_metrics = {
    "latency_p99": 45,
    "throughput_qps": 15000,
    "monthly_cost": 8000  # Over budget!
}

# Optimize
for i in range(10):  # 10 optimization iterations
    optimized_upir = learner.learn_from_metrics(
        upir,
        current_metrics,
        previous_metrics
    )

    # Simulate new metrics
    current_metrics = simulate(optimized_upir)

    if current_metrics["monthly_cost"] <= 5000:
        print(f"✓ Optimized in {i+1} iterations")
        break

Example 4: Pattern Matching

from upir.patterns.library import PatternLibrary

library = PatternLibrary()

# Find patterns for your architecture
matches = library.match_architecture(upir, threshold=0.7)

print("Similar patterns:")
for pattern, score in matches[:5]:
    print(f"\n{pattern.name} ({score:.1%} match)")
    print(f"  Success rate: {pattern.success_rate:.1%}")
    print(f"  Avg latency: {pattern.average_performance.get('latency_p99', 'N/A')}ms")
    print(f"  Instances: {len(pattern.instances)}")

# Search patterns by criteria
streaming_patterns = library.search_patterns({
    "component_types": ["streaming_processor"],
    "min_success_rate": 0.85
})

Architecture

UPIR is organized into five main modules:

upir/
├── core/           # Core data structures
│   ├── upir.py           # Main UPIR class
│   ├── architecture.py   # Component & connection definitions
│   ├── specification.py  # Formal specifications
│   ├── temporal.py       # Temporal logic operators
│   ├── evidence.py       # Evidence & reasoning
│   └── ...
├── verification/   # Formal verification
│   ├── verifier.py       # Main verifier (incremental)
│   └── solver.py         # SMT solver interface (Z3)
├── synthesis/      # Program synthesis
│   ├── cegis.py          # CEGIS synthesizer
│   └── sketch.py         # Program sketches
├── learning/       # RL optimization
│   ├── learner.py        # Architecture learner
│   └── ppo.py            # PPO algorithm
└── patterns/       # Pattern management
    ├── pattern.py        # Pattern dataclass
    ├── extractor.py      # Pattern extraction
    └── library.py        # Pattern library

Module Dependencies:

core ←─── verification
  ↑         ↑
  │         │
  └─── synthesis
  │         ↑
  │         │
  └─── learning
  │         ↑
  │         │
  └─── patterns

API Documentation

Core Classes

UPIR

Main class representing a complete system design.

class UPIR:
    """Universal Plan Intermediate Representation."""

    def __init__(
        self,
        id: str,
        name: str,
        description: str,
        architecture: Optional[Architecture] = None,
        specification: Optional[FormalSpecification] = None,
        **kwargs
    ):
        """Create a UPIR instance."""

Key Methods:

  • to_dict(): Serialize to dictionary
  • from_dict(data): Deserialize from dictionary
  • compute_hash(): Compute content hash
  • validate(): Validate consistency

FormalSpecification

Captures requirements.

class FormalSpecification:
    """Formal specification with temporal properties."""

    properties: List[TemporalProperty]    # Liveness properties
    invariants: List[TemporalProperty]    # Safety properties
    constraints: Dict[str, Dict[str, Any]]  # Resource constraints
    assumptions: List[str]                # Environmental assumptions

Verification API

Verifier

Main verification interface.

class Verifier:
    """SMT-based verifier with incremental verification."""

    def verify_specification(
        self,
        upir: UPIR
    ) -> List[VerificationResult]:
        """Verify all properties in specification."""

    def verify_property(
        self,
        property: TemporalProperty,
        architecture: Architecture
    ) -> VerificationResult:
        """Verify single property."""

Synthesis API

Synthesizer

CEGIS-based synthesizer.

class Synthesizer:
    """Counterexample-guided inductive synthesis."""

    def generate_sketch(
        self,
        spec: FormalSpecification
    ) -> ProgramSketch:
        """Generate program sketch with holes."""

    def synthesize(
        self,
        upir: UPIR,
        sketch: ProgramSketch
    ) -> CEGISResult:
        """Synthesize implementation from sketch."""

Learning API

ArchitectureLearner

RL-based optimizer.

class ArchitectureLearner:
    """Learn optimal architectures using PPO."""

    def learn_from_metrics(
        self,
        upir: UPIR,
        metrics: Dict[str, float],
        previous_metrics: Optional[Dict[str, float]] = None
    ) -> UPIR:
        """Optimize architecture based on metrics."""

Patterns API

PatternLibrary

Pattern storage and matching.

class PatternLibrary:
    """Library of architectural patterns."""

    def match_architecture(
        self,
        upir: UPIR,
        threshold: float = 0.8
    ) -> List[Tuple[Pattern, float]]:
        """Find patterns matching architecture."""

    def search_patterns(
        self,
        query: Dict[str, Any]
    ) -> List[Pattern]:
        """Search by component types, success rate, etc."""

Examples

The examples/ directory contains complete demonstrations:

streaming_example.py

Complete workflow for real-time event processing pipeline:

  1. Define formal specification with temporal properties
  2. Create UPIR with Pub/Sub → Beam → BigQuery architecture
  3. Verify specification (all properties proved)
  4. Synthesize Apache Beam implementation
  5. Simulate production metrics
  6. Optimize using RL (3 iterations: 95ms → 79ms latency)
  7. Extract and save pattern for reuse

Run:

PYTHONPATH=. python examples/streaming_example.py

Contributing

We welcome contributions! Please see CONTRIBUTING.md for guidelines.

Development Setup

# Clone repository
git clone https://github.com/bassrehab/upir.git
cd upir

# Create development environment
python -m venv venv
source venv/bin/activate

# Install in editable mode with dev dependencies
pip install -e ".[dev]"

# Run tests
pytest tests/ -v

# Check coverage
pytest tests/ --cov=upir --cov-report=html

# Format code
black upir/ tests/
isort upir/ tests/

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Summary

  • Free to use for commercial and non-commercial purposes
  • Modify and distribute with attribution
  • No warranty provided

References

Primary Source

Mitra, Subhadip. "Automated Synthesis and Verification of Distributed Systems Using UPIR." TD Commons, Defensive Publications Series, 8852 (2025). https://www.tdcommons.org/dpubs_series/8852/ Licensed under CC BY 4.0

Academic Papers

Verification & Synthesis:

  • Solar-Lezama, Armando, et al. "Program synthesis by sketching." (2008)
  • Pnueli, Amir. "The temporal logic of programs." (1977)
  • De Moura, Leonardo, and Nikolaj Bjørner. "Z3: An efficient SMT solver." (2008)

Reinforcement Learning:

  • Schulman, John, et al. "Proximal policy optimization algorithms." (2017)

Distributed Systems:

  • Lamport, Leslie. "The part-time parliament." (1998) - Paxos
  • Chang, Fay, et al. "Bigtable: A distributed storage system." (2008)
  • Dean, Jeffrey, and Sanjay Ghemawat. "MapReduce: simplified data processing." (2008)

Tools & Frameworks


Contact & Support


Acknowledgments

  • TD Commons for publishing the foundational disclosure
  • Apache Software Foundation for Beam and other tools
  • Z3 Team at Microsoft Research for the SMT solver
  • Open source community for libraries and inspiration

Built with ❤️ for the distributed systems community

⭐ Star on GitHub | 🐛 Report Bug | 💡 Request Feature

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

upir-0.1.0.tar.gz (78.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

upir-0.1.0-py3-none-any.whl (79.5 kB view details)

Uploaded Python 3

File details

Details for the file upir-0.1.0.tar.gz.

File metadata

  • Download URL: upir-0.1.0.tar.gz
  • Upload date:
  • Size: 78.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for upir-0.1.0.tar.gz
Algorithm Hash digest
SHA256 6437e8bd6434ce6af841e2c07c7205dda5c5cbd3f6d0d45504bf5ea2d16d9a70
MD5 fe24b97c6e898ddf1665e3a353411d17
BLAKE2b-256 a7286364f347320f0641a4947de595def26619c99d24542e0a935c7a1aec92d7

See more details on using hashes here.

Provenance

The following attestation bundles were made for upir-0.1.0.tar.gz:

Publisher: publish.yml on bassrehab/upir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file upir-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: upir-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 79.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for upir-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 8a6197eeb2a3d09f4ccf0897266782286ad2d152fc1b293bcc45be4e1f2cc48c
MD5 0faca238692a86d5ebe9869b52c60887
BLAKE2b-256 77dce137cbfc25ba39386e342713a48e2e2829a0187d3033186fd0a9f5c15687

See more details on using hashes here.

Provenance

The following attestation bundles were made for upir-0.1.0-py3-none-any.whl:

Publisher: publish.yml on bassrehab/upir

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page