Skip to main content

Causal Event-Driven Architecture Modeling for Python based on RAPIDE 1.0

Project description

PyRapide

    ____        ____              _     __
   / __ \__  __/ __ \____ _____  (_)___/ /__
  / /_/ / / / / /_/ / __ `/ __ \/ / __  / _ \
 / ____/ /_/ / _, _/ /_/ / /_/ / / /_/ /  __/
/_/    \__, /_/ |_|\__,_/ .___/_/\__,_/\___/
      /____/           /_/
        ⟨e₁⟩ ──causes──▶ ⟨e₂⟩
         │                 │
         ▼                 ▼
        ⟨e₃⟩             ⟨e₄⟩ ──causes──▶ ⟨e₅⟩

Causal event-driven architecture modeling for Python, based on the RAPIDE 1.0 specification.


What is PyRapide?

PyRapide is a Python library for modeling, executing, and analyzing causal event-driven architectures. It gives you first-class tools to define system components, connect them through typed event patterns, enforce behavioral constraints, and inspect the full causal history of every event that flows through your system.

PyRapide is based on RAPIDE (Rapid Prototyping for Application Domain-specific Integrated Environments), an architecture description language developed at Stanford University by David Luckham and his team. RAPIDE introduced the idea that the causal relationships between events, not just their order in time, are the fundamental structure worth capturing in a distributed system. PyRapide brings these ideas to modern Python with async-native execution, Pydantic-based events, and a composable pattern algebra.

This matters today because MCP (Model Context Protocol) servers and agentic AI systems generate complex, concurrent event streams where understanding why something happened is as important as what happened. PyRapide lets you define constraints like "every tool call must produce a result," detect anomalies like unexpected causal chains, and trace root causes across multiple interacting servers — all with a clean, declarative Python API.


Quick Start

pip install pyrapide
import asyncio
from pyrapide import (
    Event, Computation, Poset, Pattern,
    interface, action, architecture, connect,
    module, when, get_context, Engine,
    must_match, visualization,
)

# 1. Define interfaces
@interface
class Sender:
    @action
    async def send(self, msg: str) -> None: ...

@interface
class Receiver:
    @action
    async def receive(self, msg: str) -> None: ...

# 2. Define module implementations
@module(implements=Sender)
class SenderModule:
    async def start(self):
        ctx = get_context(self)
        ctx.generate_event("Sender.send", payload={"msg": "hello"})

@module(implements=Receiver)
class ReceiverModule:
    @when(Pattern.match("Sender.send"))
    async def on_send(self, match):
        ctx = get_context(self)
        event = list(match.events)[0]
        ctx.generate_event("Receiver.receive",
                           payload={"msg": event.payload["msg"]},
                           caused_by=list(match.events))

# 3. Define the architecture
@architecture
class MessagingArch:
    sender: Sender
    receiver: Receiver
    def connections(self):
        return [connect(Pattern.match("Sender.send"), "receiver")]

# 4. Run it
async def main():
    arch = MessagingArch()
    engine = Engine()
    engine.bind(arch, "sender", SenderModule)
    engine.bind(arch, "receiver", ReceiverModule)
    comp = await engine.run(arch, timeout=2.0)

    # 5. Inspect the result
    print(visualization.summary(comp))

    # 6. Check a constraint
    constraint = must_match(Pattern.match("Sender.send") >> Pattern.match("Receiver.receive"),
                            message="Every send must cause a receive")
    violations = constraint.check(comp)
    print(f"Violations: {len(violations)}")

asyncio.run(main())

Core Concepts

Events and Causal History

An Event is an immutable, uniquely identified record of something that happened: a tool call, a message, a state change. Events carry a name, payload, source, and timestamp. Crucially, events are linked to their causes, forming a causal history rather than a flat log.

Posets (Partially Ordered Event Sets)

A Poset is a directed acyclic graph (DAG) of events ordered by causality. Unlike a total-order log, a poset captures that some events are causally independent; they happened concurrently with no causal link. This is the foundational data structure in PyRapide.

Interfaces and Modules

An @interface declares the event types a component can produce (actions) and consume (requires). A @module implements an interface with concrete logic — lifecycle hooks (start, finish) and reactive handlers (@when). Modules are checked at decoration time for conformance to their interface.

Architectures and Connections

An @architecture composes interfaces into a system. Connections (connect, pipe, agent) route events between components by pattern matching. The Engine orchestrates module lifecycle, event propagation, and connection dispatch.

Event Patterns

Pattern provides a composable algebra for matching events in a poset. Basic patterns match by name and payload fields. Composite operators combine patterns: >> (causal sequence), & (join), | (independence), or_ (disjunction). Patterns can be guarded with predicates, timed with deadlines, and repeated.

Constraints and Monitoring

Constraints declaratively specify behavioral rules: must_match asserts a pattern must occur, never forbids it. The ConstraintMonitor checks constraints in real time as events arrive. The @constraint decorator enables custom constraint logic with full access to the computation history.

Streaming and MCP Integration

StreamProcessor consumes events from multiple concurrent EventSource instances in real time, applying pattern watches and constraint enforcement with a configurable sliding window. MCPEventAdapter translates MCP protocol events (tool calls, results, errors) into PyRapide events with automatic causal linking via correlation IDs. LLMEventAdapter does the same for LLM request/response cycles.


MCP Integration Example

import asyncio
from pyrapide import (
    MCPEventAdapter, MCPEventTypes, MCPPatterns,
    StreamProcessor, must_match, never,
)
from pyrapide.integrations.mcp import MockMCPClient

async def monitor_mcp_servers():
    # Set up two MCP server adapters
    db_client = MockMCPClient("database-server")
    ai_client = MockMCPClient("ai-server")
    db_adapter = MCPEventAdapter("database-server", db_client)
    ai_adapter = MCPEventAdapter("ai-server", ai_client)

    # Define constraints
    no_errors = never(MCPPatterns.tool_error(), message="No tool errors allowed")

    # Run the stream processor
    processor = StreamProcessor()
    processor.add_source("db", db_adapter)
    processor.add_source("ai", ai_adapter)
    processor.enforce(no_errors)

    violations = []
    processor.on_violation(lambda v: violations.append(v))

    # Simulate server activity then close
    async def simulate():
        await db_client.connect()
        await db_client.call_tool("query", {"sql": "SELECT * FROM users"})
        await ai_client.connect()
        await ai_client.call_tool_error("generate", error="rate limited")
        await db_client.close()
        await ai_client.close()

    await asyncio.gather(simulate(), processor.run())
    print(f"Events processed: {processor.stats['event_count']}")
    print(f"Violations: {len(violations)}")

asyncio.run(monitor_mcp_servers())

API Reference

All public symbols are available from the top-level pyrapide package:

import pyrapide
print(pyrapide.__version__)  # "0.1.0"

Main Import Paths

Category Imports
Core Event, Poset, Computation, Clock, ClockManager, CausalCycleError
Types interface, action, provides, requires, behavior, is_subtype, conforms_to
Patterns Pattern, BasicPattern, PatternMatch, placeholder
Architecture architecture, connect, pipe, agent
Constraints Constraint, MustMatch, Never, must_match, never, constraint, ConstraintMonitor
Executable module, when
Runtime Engine, StreamProcessor, InMemoryEventSource
Integrations MCPEventAdapter, MCPEventTypes, MCPPatterns, LLMEventAdapter, LLMEventTypes
Analysis queries, visualization, CausalPredictor, AnomalyDetector

Full API documentation is available via docstrings on every public class and function. Sphinx documentation is planned for a future release.


Architecture

pyrapide/
  core/           Core primitives: Event, Poset, Computation, Clock, exceptions
  types/          RAPIDE type system: interfaces, actions, subtype conformance
  patterns/       Causal event pattern algebra: basic, composite, guarded, timed
  architecture/   Architecture definitions: composition, connections, bindings
  constraints/    Constraint enforcement: monitors, filters, conformance checks
  executable/     Module lifecycle: @module, @when reactive handlers, processes
  runtime/        Execution engine, event scheduling, streaming, serialization
  integrations/   MCP and LLM protocol adapters with mock clients
  analysis/       Queries, visualization (DOT/Mermaid/ASCII), prediction, anomaly detection
  utils/          Internal utilities (ID generation)

The package is organized around the RAPIDE specification's layered concepts: types define component interfaces, architectures compose them, executables implement behavior, and the runtime orchestrates everything. Analysis tools operate on the resulting causal computations.


Development

Setup

git clone https://github.com/ShaneDolphin/pyrapide.git
cd PyRapide
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"

Run Tests

pytest tests/ -v

All tests use pytest-asyncio with asyncio_mode = "auto". The test suite includes unit tests for every module and end-to-end scenario tests.

Type Checking and Linting

mypy pyrapide/ --ignore-missing-imports
ruff check pyrapide/

Both must pass with zero errors.


Based on RAPIDE 1.0

PyRapide is inspired by the RAPIDE architecture description language developed at the Stanford University Program Analysis and Verification Group by David Luckham, John Kenney, Larry Augustin, James Vera, and Doug Bryan.

Key references:

  • Luckham, D.C. et al. "Specification and Analysis of System Architecture Using Rapide." IEEE Transactions on Software Engineering, 1995.
  • Luckham, D.C. "The Power of Events: An Introduction to Complex Event Processing in Distributed Enterprise Systems." Addison-Wesley, 2002.
  • Luckham, D.C. and Vera, J. "An Event-Based Architecture Definition Language." IEEE Transactions on Software Engineering, 1995.

PyRapide is an independent implementation that adapts RAPIDE's core ideas (causal event posets, interface types, architecture composition, and pattern-based constraints) to modern Python with async execution, Pydantic models, and integrations for MCP and LLM protocols.


License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrapide-0.2.0.tar.gz (128.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrapide-0.2.0-py3-none-any.whl (57.9 kB view details)

Uploaded Python 3

File details

Details for the file pyrapide-0.2.0.tar.gz.

File metadata

  • Download URL: pyrapide-0.2.0.tar.gz
  • Upload date:
  • Size: 128.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pyrapide-0.2.0.tar.gz
Algorithm Hash digest
SHA256 12990663fb72a8037c6383016ed93aaa479e84bc7e7b3ab7a465fa65f765a027
MD5 b658c68c244250d2e52b31a4e912081a
BLAKE2b-256 2ba759fb036ef55fe3ba3ffce92af6a8f354d9b7de2dde349900138c18bcfbc7

See more details on using hashes here.

File details

Details for the file pyrapide-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pyrapide-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 57.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for pyrapide-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9853bd47eb3d0f96bdcfb7f0587e3021d55e2058577e63913ad21fbb2f0b34ba
MD5 f5751e22db6adbd8f16140342e8b26a6
BLAKE2b-256 f3379f778af9f7b56f0111814413bdf8161fa7866996d2608f694c167432b462

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page