Skip to main content

Causal Event-Driven Architecture Modeling for Python based on RAPIDE 1.0

Project description

PyRapide

    ____        ____              _     __
   / __ \__  __/ __ \____ _____  (_)___/ /__
  / /_/ / / / / /_/ / __ `/ __ \/ / __  / _ \
 / ____/ /_/ / _, _/ /_/ / /_/ / / /_/ /  __/
/_/    \__, /_/ |_|\__,_/ .___/_/\__,_/\___/
      /____/           /_/
        ⟨e₁⟩ ──causes──▶ ⟨e₂⟩
         │                 │
         ▼                 ▼
        ⟨e₃⟩             ⟨e₄⟩ ──causes──▶ ⟨e₅⟩

Causal event-driven architecture modeling for Python, based on the RAPIDE 1.0 specification.


What is PyRapide?

PyRapide is a Python library for modeling, executing, and analyzing causal event-driven architectures. It gives you first-class tools to define system components, connect them through typed event patterns, enforce behavioral constraints, and inspect the full causal history of every event that flows through your system.

PyRapide is based on RAPIDE (Rapid Prototyping for Application Domain-specific Integrated Environments), an architecture description language developed at Stanford University by David Luckham and his team. RAPIDE introduced the idea that the causal relationships between events, not just their order in time, are the fundamental structure worth capturing in a distributed system. PyRapide brings these ideas to modern Python with async-native execution, Pydantic-based events, and a composable pattern algebra.

This matters today because MCP (Model Context Protocol) servers and agentic AI systems generate complex, concurrent event streams where understanding why something happened is as important as what happened. PyRapide lets you define constraints like "every tool call must produce a result," detect anomalies like unexpected causal chains, and trace root causes across multiple interacting servers — all with a clean, declarative Python API.

PyRapide also includes drop-in adapters for 8 popular agentic frameworks — AutoGen, LangGraph, CrewAI, LlamaIndex, Agno, OpenAI Swarm, MetaGPT, and FlowiseAI — so you can add causal traceability to an existing project in 1-3 lines of code.


Quick Start

pip install pyrapide
import asyncio
from pyrapide import (
    Event, Computation, Poset, Pattern,
    interface, action, architecture, connect,
    module, when, get_context, Engine,
    must_match, visualization,
)

# 1. Define interfaces
@interface
class Sender:
    @action
    async def send(self, msg: str) -> None: ...

@interface
class Receiver:
    @action
    async def receive(self, msg: str) -> None: ...

# 2. Define module implementations
@module(implements=Sender)
class SenderModule:
    async def start(self):
        ctx = get_context(self)
        ctx.generate_event("Sender.send", payload={"msg": "hello"})

@module(implements=Receiver)
class ReceiverModule:
    @when(Pattern.match("Sender.send"))
    async def on_send(self, match):
        ctx = get_context(self)
        event = list(match.events)[0]
        ctx.generate_event("Receiver.receive",
                           payload={"msg": event.payload["msg"]},
                           caused_by=list(match.events))

# 3. Define the architecture
@architecture
class MessagingArch:
    sender: Sender
    receiver: Receiver
    def connections(self):
        return [connect(Pattern.match("Sender.send"), "receiver")]

# 4. Run it
async def main():
    arch = MessagingArch()
    engine = Engine()
    engine.bind(arch, "sender", SenderModule)
    engine.bind(arch, "receiver", ReceiverModule)
    comp = await engine.run(arch, timeout=2.0)

    # 5. Inspect the result
    print(visualization.summary(comp))

    # 6. Check a constraint
    constraint = must_match(Pattern.match("Sender.send") >> Pattern.match("Receiver.receive"),
                            message="Every send must cause a receive")
    violations = constraint.check(comp)
    print(f"Violations: {len(violations)}")

asyncio.run(main())

Core Concepts

Events and Causal History

An Event is an immutable, uniquely identified record of something that happened: a tool call, a message, a state change. Events carry a name, payload, source, and timestamp. Crucially, events are linked to their causes, forming a causal history rather than a flat log.

Posets (Partially Ordered Event Sets)

A Poset is a directed acyclic graph (DAG) of events ordered by causality. Unlike a total-order log, a poset captures that some events are causally independent; they happened concurrently with no causal link. This is the foundational data structure in PyRapide.

Interfaces and Modules

An @interface declares the event types a component can produce (actions) and consume (requires). A @module implements an interface with concrete logic — lifecycle hooks (start, finish) and reactive handlers (@when). Modules are checked at decoration time for conformance to their interface.

Architectures and Connections

An @architecture composes interfaces into a system. Connections (connect, pipe, agent) route events between components by pattern matching. The Engine orchestrates module lifecycle, event propagation, and connection dispatch.

Event Patterns

Pattern provides a composable algebra for matching events in a poset. Basic patterns match by name and payload fields. Composite operators combine patterns: >> (causal sequence), & (join), | (independence), or_ (disjunction). Patterns can be guarded with predicates, timed with deadlines, and repeated.

Constraints and Monitoring

Constraints declaratively specify behavioral rules: must_match asserts a pattern must occur, never forbids it. The ConstraintMonitor checks constraints in real time as events arrive. The @constraint decorator enables custom constraint logic with full access to the computation history.

Streaming and MCP Integration

StreamProcessor consumes events from multiple concurrent EventSource instances in real time, applying pattern watches and constraint enforcement with a configurable sliding window. MCPEventAdapter translates MCP protocol events (tool calls, results, errors) into PyRapide events with automatic causal linking via correlation IDs. LLMEventAdapter does the same for LLM request/response cycles.


MCP Integration Example

import asyncio
from pyrapide import (
    MCPEventAdapter, MCPEventTypes, MCPPatterns,
    StreamProcessor, must_match, never,
)
from pyrapide.integrations.mcp import MockMCPClient

async def monitor_mcp_servers():
    # Set up two MCP server adapters
    db_client = MockMCPClient("database-server")
    ai_client = MockMCPClient("ai-server")
    db_adapter = MCPEventAdapter("database-server", db_client)
    ai_adapter = MCPEventAdapter("ai-server", ai_client)

    # Define constraints
    no_errors = never(MCPPatterns.tool_error(), message="No tool errors allowed")

    # Run the stream processor
    processor = StreamProcessor()
    processor.add_source("db", db_adapter)
    processor.add_source("ai", ai_adapter)
    processor.enforce(no_errors)

    violations = []
    processor.on_violation(lambda v: violations.append(v))

    # Simulate server activity then close
    async def simulate():
        await db_client.connect()
        await db_client.call_tool("query", {"sql": "SELECT * FROM users"})
        await ai_client.connect()
        await ai_client.call_tool_error("generate", error="rate limited")
        await db_client.close()
        await ai_client.close()

    await asyncio.gather(simulate(), processor.run())
    print(f"Events processed: {processor.stats['event_count']}")
    print(f"Violations: {len(violations)}")

asyncio.run(monitor_mcp_servers())

Framework Adapters

PyRapide includes adapter templates for 8 agentic AI frameworks, each translating framework-native events into a unified event taxonomy with automatic causal linking. Adding PyRapide to an existing project typically takes 1-3 lines of code.

Framework Adapter Integration Code
AutoGen AutoGenAdapter adapter = AutoGenAdapter("app", runtime=runtime)
LangGraph LangGraphAdapter graph.invoke(input, config={"callbacks": [adapter.callback_handler]})
CrewAI CrewAIAdapter Crew(..., step_callback=adapter.on_step, task_callback=adapter.on_task)
LlamaIndex LlamaIndexAdapter adapter = LlamaIndexAdapter("app"); adapter.attach()
Agno AgnoAdapter adapter = AgnoAdapter("app", agent=agent)
OpenAI Swarm SwarmAdapter client = adapter.create_instrumented_client()
MetaGPT MetaGPTAdapter adapter = MetaGPTAdapter("app").patch()
FlowiseAI FlowiseAdapter adapter = FlowiseAdapter("app", base_url=url, chatflow_id=id)

Example: LangGraph + PyRapide

import asyncio
from pyrapide.agent_templates.langgraph import LangGraphAdapter
from pyrapide.agent_templates.core import AgentPatterns
from pyrapide import StreamProcessor, never, queries, visualization

# Create adapter and processor
adapter = LangGraphAdapter("my-graph")
processor = StreamProcessor()
processor.add_source("langgraph", adapter)
processor.enforce(never(AgentPatterns.tool_error(), name="no_tool_errors"))

# Run your LangGraph app (pass the callback handler)
result = graph.invoke(
    {"messages": [("user", "What is the weather?")]},
    config={"callbacks": [adapter.callback_handler]},
)

# Analyze the causal event history
comp = adapter.computation
print(visualization.summary(comp))

# Trace root causes of any errors
for err in comp.events_by_name("agent.tool.error"):
    roots = queries.root_causes(comp, err)
    print(f"Error caused by: {[r.name for r in roots]}")

All adapters normalize events into a unified taxonomy (agent.tool.call, agent.llm.request, agent.handoff, etc.) and share the same pattern library (AgentPatterns). See pyrapide-agent-documentation.md for full documentation, or the INTEGRATION.md file in each adapter's directory for framework-specific guides.


API Reference

All public symbols are available from the top-level pyrapide package:

import pyrapide
print(pyrapide.__version__)  # "0.3.0"

Main Import Paths

Category Imports
Core Event, Poset, Computation, Clock, ClockManager, CausalCycleError
Types interface, action, provides, requires, behavior, is_subtype, conforms_to
Patterns Pattern, BasicPattern, PatternMatch, placeholder
Architecture architecture, connect, pipe, agent
Constraints Constraint, MustMatch, Never, must_match, never, constraint, ConstraintMonitor
Executable module, when
Runtime Engine, StreamProcessor, InMemoryEventSource
Integrations MCPEventAdapter, MCPEventTypes, MCPPatterns, LLMEventAdapter, LLMEventTypes
Analysis queries, visualization, CausalPredictor, AnomalyDetector
Agent Templates AgentEventTypes, AgentPatterns, BaseAgentAdapter, CausalTracker, ResultBridge (from agent_templates.core)

Full API documentation is available via docstrings on every public class and function. Sphinx documentation is planned for a future release.


Architecture

pyrapide/
  core/           Core primitives: Event, Poset, Computation, Clock, exceptions
  types/          RAPIDE type system: interfaces, actions, subtype conformance
  patterns/       Causal event pattern algebra: basic, composite, guarded, timed
  architecture/   Architecture definitions: composition, connections, bindings
  constraints/    Constraint enforcement: monitors, filters, conformance checks
  executable/     Module lifecycle: @module, @when reactive handlers, processes
  runtime/        Execution engine, event scheduling, streaming, serialization
  integrations/   MCP and LLM protocol adapters with mock clients
  analysis/       Queries, visualization (DOT/Mermaid/ASCII), prediction, anomaly detection
  utils/          Internal utilities (ID generation)
  agent_templates/
    core/         Shared: AgentEventTypes, BaseAgentAdapter, CausalTracker, AgentPatterns
    autogen/      Microsoft AutoGen adapter (InterventionHandler)
    langgraph/    LangGraph / LangChain adapter (BaseCallbackHandler)
    crewai/       CrewAI adapter (tool hooks + callbacks)
    llamaindex/   LlamaIndex adapter (Dispatcher event handler)
    agno/         Agno adapter (pre/post hooks + RunEvent stream)
    swarm/        OpenAI Swarm adapter (InstrumentedSwarm subclass)
    metagpt/      MetaGPT adapter (monkey-patch wrappers)
    flowise/      FlowiseAI adapter (REST API + SSE consumer)
    tests/        Core adapter tests (no framework dependencies required)

The package is organized around the RAPIDE specification's layered concepts: types define component interfaces, architectures compose them, executables implement behavior, and the runtime orchestrates everything. The agent_templates subpackage provides framework-specific adapters that translate agentic framework events into PyRapide's unified taxonomy, each with an INTEGRATION.md guide, an adapter.py implementation, and a runnable template.py example.


Development

Setup

git clone https://github.com/ShaneDolphin/pyrapide.git
cd PyRapide
python -m venv .venv
source .venv/bin/activate
pip install -e ".[dev]"

Run Tests

pytest tests/ -v

All tests use pytest-asyncio with asyncio_mode = "auto". The test suite includes unit tests for every module and end-to-end scenario tests.

Type Checking and Linting

mypy pyrapide/ --ignore-missing-imports
ruff check pyrapide/

Both must pass with zero errors.


Based on RAPIDE 1.0

PyRapide is inspired by the RAPIDE architecture description language developed at the Stanford University Program Analysis and Verification Group by David Luckham, John Kenney, Larry Augustin, James Vera, and Doug Bryan.

Key references:

  • Luckham, D.C. et al. "Specification and Analysis of System Architecture Using Rapide." IEEE Transactions on Software Engineering, 1995.
  • Luckham, D.C. "The Power of Events: An Introduction to Complex Event Processing in Distributed Enterprise Systems." Addison-Wesley, 2002.
  • Luckham, D.C. and Vera, J. "An Event-Based Architecture Definition Language." IEEE Transactions on Software Engineering, 1995.

PyRapide is an independent implementation that adapts RAPIDE's core ideas (causal event posets, interface types, architecture composition, and pattern-based constraints) to modern Python with async execution, Pydantic models, and integrations for MCP and LLM protocols.


License

MIT License. See LICENSE for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyrapide-0.3.0.tar.gz (168.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyrapide-0.3.0-py3-none-any.whl (140.8 kB view details)

Uploaded Python 3

File details

Details for the file pyrapide-0.3.0.tar.gz.

File metadata

  • Download URL: pyrapide-0.3.0.tar.gz
  • Upload date:
  • Size: 168.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for pyrapide-0.3.0.tar.gz
Algorithm Hash digest
SHA256 003d76da30bbf5d04beb6fbc225b143f111429b5a3288890ce2118ed3a22e630
MD5 a3f9a7ef86af42466cd64779e064055b
BLAKE2b-256 66759c4849681804e6a5c9d2cd8cd6c63081800d0899ddd1903119a67bb98140

See more details on using hashes here.

File details

Details for the file pyrapide-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pyrapide-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 140.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.9.6

File hashes

Hashes for pyrapide-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f605cde66e0434eef499ffabc5d94ef4c61576a8bf1070e51fab954122b5af78
MD5 6e313faac581568f4ce64370bbb50788
BLAKE2b-256 fe215e0a9d56c5626eeb13b119d98e642e5dbd483a1fccddf9df2779a5da0e85

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page