Skip to main content

Context-driven data pipeline with budget enforcement and pluggable context slicing

Project description

Relay

Agent-agent context passing, done right.

Relay is a lightweight, open source Python middleware library for passing context reliably between AI agents in a multi-agent pipeline. Works with any LLM provider or framework — LangChain, OpenAI, Anthropic, LiteLLM, or your own agents.


The Problem

One hallucinating agent silently corrupts the shared context, and every downstream agent inherits the damage. Existing orchestration tools treat the context window as a mutable blob with no version control.

The Solution

Relay treats context like a ledger: append-only, signed at every step, and reversible.


Features

  • Agent Runners — Universal adapter layer for any LLM provider or framework (v0.3)
  • Context Broker — Normalizes, timestamps, and cryptographically signs context envelopes
  • Handoff Validator — Detects contradictions and triggers rollback on corruption
  • Snapshot Store — Persists immutable checkpoints for automatic rollback
  • Budget Enforcer — Hard token cap enforcement before every agent call
  • Slicer — Pluggable context slicing strategies (recency, relevance, structural)
  • Manifest Boundaries — Agent manifests define read/write permissions with hash verification

Installation

pip install relay-middleware

Or from source:

git clone https://github.com/kridaydave/Relay.git
cd Relay
pip install -e .

Optional: install tiktoken for precise token counting:

pip install relay-middleware[tiktoken]

The Aha Moment

Without Relay (manual, error-prone):

# Agent 1 produces output
agent1_output = {"entities": ["Apple", "2024 revenue"], "summary": "Apple grew"}

# Manual serialization — easy to lose data, corrupt context
context = json.dumps(agent1_output)

# Agent 2 receives corrupted context
agent2_input = f"Given: {context}\nAnalyze this."

With Relay (automatic, verified):

from relay.core_pipeline import CoreRelayPipeline

pipeline = CoreRelayPipeline(
    signing_secret="your-secret-key",
    token_budget=8000
)

# Agent 1 — creates signed envelope
result = pipeline.execute_step({"entities": ["Apple"], "revenue": "2024"})
envelope1 = result.value  # signed, immutable

# Agent 2 — validator detects contradiction
# If Agent 2 accidentally drops "entities", rollback triggers automatically
result = pipeline.execute_step({"summary": "growth"})  # contradiction!

What happens on contradiction:

# Validator detects: critical key "entities" disappeared
# Relay automatically rolls back to last clean snapshot

result = pipeline.rollback()
restored_envelope = result.value
# Now you have the clean envelope from step 1

Budget & Slicing (v0.2)

Enforce token limits and slice context intelligently:

from relay.core_pipeline import CoreRelayPipeline
from relay.budget import TiktokenCounter
from relay.slicer import AgentManifest, RecencySlicePacker

# Create manifest defining agent permissions
manifest = AgentManifest(
    agent_id="agent-1",
    task_description="Analyze entities and summarize findings",
    reads=frozenset({"entities", "summary"}),
    writes=frozenset({"analysis"}),
    max_tokens=4000
)

# Initialize pipeline with budget enforcement and slicer
pipeline = CoreRelayPipeline(
    signing_secret="your-secret",
    token_budget=8000,
    token_counter=TiktokenCounter(),
    slice_packer=RecencySlicePacker()
)

# Execute step with manifest validation
result = pipeline.execute_step_with_manifest(
    agent_output={"analysis": "growth at 5%"},
    manifest=manifest
)

The budget enforcer checks projected token cost before each call. The slicer selects context based on strategy. Manifest boundaries validate write permissions.


Agent Runners (v0.3)

Use the adapter registry to plug any LLM provider into Relay without touching Relay internals:

import asyncio
from relay.runners import AdapterRegistry, RawSDKAdapter, AgentManifest

registry = AdapterRegistry()

# Register any callable — sync or async
def openai_callable(messages):
    return openai.chat.completions.create(model="gpt-4", messages=messages)

registry.register("openai", RawSDKAdapter(callable=openai_callable))

# Or use bundled adapters
from relay.runners import LocalModelAdapter
registry.register("ollama", LocalModelAdapter(base_url="http://localhost:11434", model="llama3"))
from relay.core_pipeline import CoreRelayPipeline

pipeline = CoreRelayPipeline(
    signing_secret="your-secret",
    token_budget=8000,
    registry=registry,
)

manifest = AgentManifest(
    agent_id="openai",
    task_description="Analyze entities and summarize findings",
    reads=frozenset({"entities", "summary"}),
    writes=frozenset({"analysis"}),
    max_tokens=4000,
)

async def run():
    # First step: seed the pipeline
    pipeline.execute_step({"entities": ["Apple"], "summary": "revenue up"})

    # Execute via adapter — no LLM calls in Relay, only normalisation
    result = await pipeline.execute_step_with_runner("openai", manifest)
    # result is Success(SignedEnvelope) or Failure(ErrorCode.*)

All adapters are lazy-loaded. Install only what you need:

pip install relay-middleware[langchain]   # LangChain Runnable
pip install relay-middleware[crewai]      # CrewAI Agent
pip install relay-middleware[autogen]     # AutoGen AssistantAgent
pip install relay-middleware[local]       # Ollama / vLLM / OpenAI-compatible
pip install relay-middleware[all]         # everything

Parallel Fork-Join (v0.4)

Run multiple agents concurrently against the same context, then merge their outputs:

import asyncio
from relay.core_pipeline import CoreRelayPipeline
from relay.parallel import ForkSpec, JoinStrategy
from relay.slicer import AgentManifest
from relay.runners import AdapterRegistry, LocalModelAdapter

registry = AdapterRegistry()
registry.register("analyst", LocalModelAdapter(base_url="http://localhost:11434", model="llama3"))
registry.register("researcher", LocalModelAdapter(base_url="http://localhost:11434", model="llama3"))

pipeline = CoreRelayPipeline(
    signing_secret="your-secret-key",
    token_budget=8000,
    registry=registry,
)

manifest_a = AgentManifest("analyst", "Analyze data", frozenset({"input"}), frozenset({"analysis"}), 4000)
manifest_b = AgentManifest("researcher", "Research context", frozenset({"input"}), frozenset({"research"}), 4000)

# Step 1: seed pipeline
pipeline.execute_step({"input": "Q3 earnings report"})

# Step 2: parallel fork — both agents run concurrently
result = asyncio.run(pipeline.execute_parallel_step(
    fork_specs=[ForkSpec("analyst", manifest_a), ForkSpec("researcher", manifest_b)],
    join_strategy=JoinStrategy.UNION,
))
# Success: merged payload has {"text": "...", "analysis": "...", "research": "..."}

Three join strategies:

Strategy Behavior Use Case
UNION Merge all passing outputs; conflict → rollback Independent sections of context
VOTE Accept highest-confidence output; discard failures Redundant agents, best-of-N
FIRST_WINS Accept first passing output; cancel remaining tasks Latency-critical fallbacks
# VOTE: pick the best analysis
result = asyncio.run(pipeline.execute_parallel_step(
    fork_specs=[ForkSpec("gpt4", manifest_a), ForkSpec("claude", manifest_b)],
    join_strategy=JoinStrategy.VOTE,
))

# FIRST_WINS: fast agent wins, slow agents cancelled
result = asyncio.run(pipeline.execute_parallel_step(
    fork_specs=[ForkSpec("fast-model", manifest), ForkSpec("backup-model", manifest)],
    join_strategy=JoinStrategy.FIRST_WINS,
))

Forks share the same base payload (immutable snapshot). No LLM calls during parallel step — Relay orchestrates, adapters execute. If all forks fail, state is unchanged (no rollback needed).


How It Works

Agent 1 → [Sign Envelope] → Agent 2 → [Validate] → Agent 3
                              ↓
                         [Snapshot]
                              ↓
                    [Rollback if dirty]

Every handoff is signed and validated. If corruption is detected, Relay silently rolls back to the last clean checkpoint.


Context Envelope

Every context move between agents is wrapped in a signed, immutable envelope:

{
  "relay_version": "0.3.0",
  "pipeline_id": "uuid-v4",
  "step": 2,
  "timestamp": "2026-05-04T10:22:00Z",
  "token_budget_used": 1840,
  "token_budget_total": 8000,
  "payload": {...},
  "manifest_hash": "sha256:abc123...",
  "signature": "sha256:def456..."
}

Error Handling

Relay uses Result types instead of exceptions:

from relay.types import Success, Failure, Result

result = pipeline.execute_step({"task": "work"})
if isinstance(result, Success):
    envelope = result.value
elif isinstance(result, Failure):
    print(f"Error: {result.reason} (code: {result.code})")

Testing

pytest tests/unit -v

Quality gates:

  • mypy --strict passes
  • 80% test coverage

  • Every public function has a test

License

MIT License - see LICENSE file


Resources

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

relay_middleware-0.4.1.tar.gz (40.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

relay_middleware-0.4.1-py3-none-any.whl (48.4 kB view details)

Uploaded Python 3

File details

Details for the file relay_middleware-0.4.1.tar.gz.

File metadata

  • Download URL: relay_middleware-0.4.1.tar.gz
  • Upload date:
  • Size: 40.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.2

File hashes

Hashes for relay_middleware-0.4.1.tar.gz
Algorithm Hash digest
SHA256 8d81f4101a660c1ceafe63426a22d122264eb36a376deca6b7bcd420c5de1cd7
MD5 2b26cc98ca4a4700bfb39dc53ecd2858
BLAKE2b-256 f35ec3b8606abd2bdb81b3794b897be63186bed4e7b1ae9bccc94fd1edc0daae

See more details on using hashes here.

File details

Details for the file relay_middleware-0.4.1-py3-none-any.whl.

File metadata

File hashes

Hashes for relay_middleware-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f90930951f8653873eacf0fd6875f08f6663529afc1a2c72c472ee366ea79ac5
MD5 3d26f783f691963e48a6d9fe63e63ffc
BLAKE2b-256 6d1148acafc2f2f46cbfee8d29cc5db2e74dc1ae81a6b40a9566c5f21b62412a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page