Causal Event-Driven Architecture Modeling for Python based on RAPIDE 1.0
Project description
PyRapide
____ ____ _ __
/ __ \__ __/ __ \____ _____ (_)___/ /__
/ /_/ / / / / /_/ / __ `/ __ \/ / __ / _ \
/ ____/ /_/ / _, _/ /_/ / /_/ / / /_/ / __/
/_/ \__, /_/ |_|\__,_/ .___/_/\__,_/\___/
/____/ /_/
⟨e₁⟩ ──causes──▶ ⟨e₂⟩
│ │
▼ ▼
⟨e₃⟩ ⟨e₄⟩ ──causes──▶ ⟨e₅⟩
Causal event-driven architecture modeling for Python, based on the RAPIDE 1.0 specification.
What is PyRapide?
PyRapide is a Python library for modeling, executing, and analyzing causal event-driven architectures. It gives you first-class tools to define system components, connect them through typed event patterns, enforce behavioral constraints, and inspect the full causal history of every event that flows through your system.
PyRapide is based on RAPIDE (Rapid Prototyping for Application Domain-specific Integrated Environments), an architecture description language developed at Stanford University by David Luckham and his team. RAPIDE introduced the idea that the causal relationships between events, not just their order in time, are the fundamental structure worth capturing in a distributed system. PyRapide brings these ideas to modern Python with async-native execution, Pydantic-based events, and a composable pattern algebra.
This matters today because MCP (Model Context Protocol) servers and agentic AI systems generate complex, concurrent event streams where understanding why something happened is as important as what happened. PyRapide lets you define constraints like "every tool call must produce a result," detect anomalies like unexpected causal chains, and trace root causes across multiple interacting servers — all with a clean, declarative Python API.
Quick Start
pip install pyrapide
import asyncio
from pyrapide import (
Event, Computation, Poset, Pattern,
interface, action, architecture, connect,
module, when, get_context, Engine,
must_match, visualization,
)
# 1. Define interfaces
@interface
class Sender:
@action
async def send(self, msg: str) -> None: ...
@interface
class Receiver:
@action
async def receive(self, msg: str) -> None: ...
# 2. Define module implementations
@module(implements=Sender)
class SenderModule:
async def start(self):
ctx = get_context(self)
ctx.generate_event("Sender.send", payload={"msg": "hello"})
@module(implements=Receiver)
class ReceiverModule:
@when(Pattern.match("Sender.send"))
async def on_send(self, match):
ctx = get_context(self)
event = list(match.events)[0]
ctx.generate_event("Receiver.receive",
payload={"msg": event.payload["msg"]},
caused_by=list(match.events))
# 3. Define the architecture
@architecture
class MessagingArch:
sender: Sender
receiver: Receiver
def connections(self):
return [connect(Pattern.match("Sender.send"), "receiver")]
# 4. Run it
async def main():
arch = MessagingArch()
engine = Engine()
engine.bind(arch, "sender", SenderModule)
engine.bind(arch, "receiver", ReceiverModule)
comp = await engine.run(arch, timeout=2.0)
# 5. Inspect the result
print(visualization.summary(comp))
# 6. Check a constraint
constraint = must_match(Pattern.match("Sender.send") >> Pattern.match("Receiver.receive"),
message="Every send must cause a receive")
violations = constraint.check(comp)
print(f"Violations: {len(violations)}")
asyncio.run(main())
Core Concepts
Events and Causal History
An Event is an immutable, uniquely identified record of something that happened: a tool call, a message, a state change. Events carry a name, payload, source, and timestamp. Crucially, events are linked to their causes, forming a causal history rather than a flat log.
Posets (Partially Ordered Event Sets)
A Poset is a directed acyclic graph (DAG) of events ordered by causality. Unlike a total-order log, a poset captures that some events are causally independent; they happened concurrently with no causal link. This is the foundational data structure in PyRapide.
Interfaces and Modules
An @interface declares the event types a component can produce (actions) and consume (requires). A @module implements an interface with concrete logic — lifecycle hooks (start, finish) and reactive handlers (@when). Modules are checked at decoration time for conformance to their interface.
Architectures and Connections
An @architecture composes interfaces into a system. Connections (connect, pipe, agent) route events between components by pattern matching. The Engine orchestrates module lifecycle, event propagation, and connection dispatch.
Event Patterns
Pattern provides a composable algebra for matching events in a poset. Basic patterns match by name and payload fields. Composite operators combine patterns: >> (causal sequence), & (join), | (independence), or_ (disjunction). Patterns can be guarded with predicates, timed with deadlines, and repeated.
Constraints and Monitoring
Constraints declaratively specify behavioral rules: must_match asserts a pattern must occur, never forbids it. The ConstraintMonitor checks constraints in real time as events arrive. The @constraint decorator enables custom constraint logic with full access to the computation history.
Streaming and MCP Integration
StreamProcessor consumes events from multiple concurrent EventSource instances in real time, applying pattern watches and constraint enforcement with a configurable sliding window. MCPEventAdapter translates MCP protocol events (tool calls, results, errors) into PyRapide events with automatic causal linking via correlation IDs. LLMEventAdapter does the same for LLM request/response cycles.
MCP Integration Example
import asyncio
from pyrapide import (
MCPEventAdapter, MCPEventTypes, MCPPatterns,
StreamProcessor, must_match, never,
)
from pyrapide.integrations.mcp import MockMCPClient
async def monitor_mcp_servers():
# Set up two MCP server adapters
db_client = MockMCPClient("database-server")
ai_client = MockMCPClient("ai-server")
db_adapter = MCPEventAdapter("database-server", db_client)
ai_adapter = MCPEventAdapter("ai-server", ai_client)
# Define constraints
no_errors = never(MCPPatterns.tool_error(), message="No tool errors allowed")
# Run the stream processor
processor = StreamProcessor()
processor.add_source("db", db_adapter)
processor.add_source("ai", ai_adapter)
processor.enforce(no_errors)
violations = []
processor.on_violation(lambda v: violations.append(v))
# Simulate server activity then close
async def simulate():
await db_client.connect()
await db_client.call_tool("query", {"sql": "SELECT * FROM users"})
await ai_client.connect()
await ai_client.call_tool_error("generate", error="rate limited")
await db_client.close()
await ai_client.close()
await asyncio.gather(simulate(), processor.run())
print(f"Events processed: {processor.stats['event_count']}")
print(f"Violations: {len(violations)}")
asyncio.run(monitor_mcp_servers())
API Reference
All public symbols are available from the top-level pyrapide package:
import pyrapide
print(pyrapide.__version__) # "0.1.0"
Main Import Paths
| Category | Imports |
|---|---|
| Core | Event, Poset, Computation, Clock, ClockManager, CausalCycleError |
| Types | interface, action, provides, requires, behavior, is_subtype, conforms_to |
| Patterns | Pattern, BasicPattern, PatternMatch, placeholder |
| Architecture | architecture, connect, pipe, agent |
| Constraints | Constraint, MustMatch, Never, must_match, never, constraint, ConstraintMonitor |
| Executable | module, when |
| Runtime | Engine, StreamProcessor, InMemoryEventSource |
| Integrations | MCPEventAdapter, MCPEventTypes, MCPPatterns, LLMEventAdapter, LLMEventTypes |
| Analysis | queries, visualization, CausalPredictor, AnomalyDetector |
Full API documentation is available via docstrings on every public class and function. Sphinx documentation is planned for a future release.
Architecture
pyrapide/
core/ Core primitives: Event, Poset, Computation, Clock, exceptions
types/ RAPIDE type system: interfaces, actions, subtype conformance
patterns/ Causal event pattern algebra: basic, composite, guarded, timed
architecture/ Architecture definitions: composition, connections, bindings
constraints/ Constraint enforcement: monitors, filters, conformance checks
executable/ Module lifecycle: @module, @when reactive handlers, processes
runtime/ Execution engine, event scheduling, streaming, serialization
integrations/ MCP and LLM protocol adapters with mock clients
analysis/ Queries, visualization (DOT/Mermaid/ASCII), prediction, anomaly detection
utils/ Internal utilities (ID generation)
The package is organized around the RAPIDE specification's layered concepts: types define component interfaces, architectures compose them, executables implement behavior, and the runtime orchestrates everything. Analysis tools operate on the resulting causal computations.
Development
Setup
git clone https://github.com/ShaneDolphin/pyrapide.git
cd PyRapide
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"
Run Tests
pytest tests/ -v
All tests use pytest-asyncio with asyncio_mode = "auto". The test suite includes unit tests for every module and end-to-end scenario tests.
Type Checking and Linting
mypy pyrapide/ --ignore-missing-imports
ruff check pyrapide/
Both must pass with zero errors.
Based on RAPIDE 1.0
PyRapide is inspired by the RAPIDE architecture description language developed at the Stanford University Program Analysis and Verification Group by David Luckham, John Kenney, Larry Augustin, James Vera, and Doug Bryan.
Key references:
- Luckham, D.C. et al. "Specification and Analysis of System Architecture Using Rapide." IEEE Transactions on Software Engineering, 1995.
- Luckham, D.C. "The Power of Events: An Introduction to Complex Event Processing in Distributed Enterprise Systems." Addison-Wesley, 2002.
- Luckham, D.C. and Vera, J. "An Event-Based Architecture Definition Language." IEEE Transactions on Software Engineering, 1995.
PyRapide is an independent implementation that adapts RAPIDE's core ideas (causal event posets, interface types, architecture composition, and pattern-based constraints) to modern Python with async execution, Pydantic models, and integrations for MCP and LLM protocols.
License
MIT License. See LICENSE for details.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyrapide-0.2.0.tar.gz.
File metadata
- Download URL: pyrapide-0.2.0.tar.gz
- Upload date:
- Size: 128.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12990663fb72a8037c6383016ed93aaa479e84bc7e7b3ab7a465fa65f765a027
|
|
| MD5 |
b658c68c244250d2e52b31a4e912081a
|
|
| BLAKE2b-256 |
2ba759fb036ef55fe3ba3ffce92af6a8f354d9b7de2dde349900138c18bcfbc7
|
File details
Details for the file pyrapide-0.2.0-py3-none-any.whl.
File metadata
- Download URL: pyrapide-0.2.0-py3-none-any.whl
- Upload date:
- Size: 57.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9853bd47eb3d0f96bdcfb7f0587e3021d55e2058577e63913ad21fbb2f0b34ba
|
|
| MD5 |
f5751e22db6adbd8f16140342e8b26a6
|
|
| BLAKE2b-256 |
f3379f778af9f7b56f0111814413bdf8161fa7866996d2608f694c167432b462
|