Unified orchestration pipeline for Antaris Analytics Suite
Project description
antaris-pipeline
Agent pipeline orchestration for the Antaris Analytics Suite
Zero dependencies. Coordinates memory, routing, safety, and context across agent lifecycles.
pip install antaris-pipeline
Architecture
antaris-pipeline orchestrates five Antaris packages:
- antaris-memory: Persistent conversation memory with temporal decay
- antaris-guard: Input/output safety scanning and policy enforcement
- antaris-context: Dynamic context window management and compression
- antaris-router: Model selection based on task classification
- antaris-contracts: Schema validation and structured outputs
Two coordination patterns:
- AgentPipeline: Simple pre_turn/post_turn workflow for direct integration
- AntarisPipeline: Full pipeline orchestration with cross-package optimization
Quick Start
AgentPipeline (Recommended)
from antaris_pipeline import AgentPipeline
pipeline = AgentPipeline(memory=True, guard=True, context=True)
# Before LLM call
pre_result = pipeline.pre_turn("What is quantum computing?")
if pre_result.blocked:
return pre_result.block_reason
# Add context if available
prompt = pre_result.context + user_input if pre_result.context else user_input
response = your_llm_call(prompt)
# After LLM call
post_result = pipeline.post_turn(user_input, response)
print(f"Stored {post_result.stored_memories} memories")
Full Pipeline Orchestration
from antaris_pipeline import create_pipeline, PipelineConfig
config = PipelineConfig(
memory_enabled=True,
guard_enabled=True,
context_enabled=True,
router_enabled=True
)
pipeline = create_pipeline(
storage_path="./pipeline_storage",
pipeline_config=config
)
# Process with full orchestration
result = await pipeline.process("Explain neural networks", your_model_function)
print(f"Response: {result.output}")
print(f"Tokens: {result.metrics.total_tokens}")
Core Features
Session Lifecycle Management
pipeline = AgentPipeline(memory=True)
# Start new session
pipeline.session_start("user_123", session_metadata={"context": "support"})
# Process turns
for user_input in conversation:
pre_result = pipeline.pre_turn(user_input)
# ... LLM processing ...
post_result = pipeline.post_turn(user_input, llm_response)
# End session (automatic memory consolidation)
pipeline.session_end()
Pre-turn Processing
AgentPipeline.pre_turn() handles:
- Guard input check: Scans for policy violations, PII, harmful content
- Memory recall: Retrieves relevant conversation history
- Context building: Assembles enriched prompt with recalled memories
pre_result = pipeline.pre_turn("Remember what I told you about my project?")
# Check results
print(f"Blocked: {pre_result.blocked}")
print(f"Context length: {len(pre_result.context or '')}")
print(f"Memories recalled: {pre_result.memory_count}")
print(f"Warnings: {pre_result.warnings}")
Post-turn Processing
AgentPipeline.post_turn() handles:
- Guard output check: Validates LLM response safety
- Memory ingestion: Stores conversation turn for future recall
post_result = pipeline.post_turn(user_input, llm_response)
# Check results
print(f"Output blocked: {post_result.blocked_output}")
print(f"Safe replacement: {post_result.safe_replacement}")
print(f"Memories stored: {post_result.stored_memories}")
Compaction Recovery
When OpenClaw compacts agent context, the pipeline preserves essential state:
# Before compaction: pipeline stores handoff notes
handoff_data = pipeline.prepare_compaction()
# After compaction: pipeline restores context from notes
pipeline.recover_from_compaction(handoff_data)
The bridge handles this automatically in OpenClaw environments.
Pipeline Telemetry (v4.2.0)
PipelineTelemetry is a structured dataclass attached to every pipeline run, providing per-stage timing breakdowns to help identify bottlenecks.
from antaris_pipeline import PipelineTelemetry
# After pipeline run:
telemetry = agent.last_telemetry
print(telemetry.summary())
# "Pipeline: 145ms total | recall=12ms guard=5ms llm=120ms | 342 tokens"
stage, ms = telemetry.slowest_stage()
print(f"Bottleneck: {stage} at {ms:.1f}ms")
PipelineTelemetry fields:
stages—dict[str, float]mapping stage name → elapsed millisecondstotal_ms— total wall-clock time for the full pipeline runtoken_count— tokens processed (input + output combined)summary()— returns a one-line human-readable performance reportslowest_stage()→tuple[str, float]— name and duration of the slowest stage
Access the telemetry object from the result of any pre_turn / post_turn call or via agent.last_telemetry after a full pipeline run.
Telemetry and Observability
from antaris_pipeline import TelemetricsCollector
collector = TelemetricsCollector(
session_id="prod_session_001",
output_dir="./telemetry_logs"
)
pipeline = AgentPipeline(
memory=True,
guard=True,
telemetrics_collector=collector
)
# All pipeline operations automatically emit telemetry events
# Logs stored as JSONL: ./telemetry_logs/telemetrics_prod_session_001.jsonl
Telemetry captures:
- Performance metrics (latency, tokens, costs)
- Security events (blocks, policy violations)
- Memory operations (storage, retrieval, compaction)
- Error conditions and warnings
Configuration Profiles
from antaris_pipeline import PipelineConfig, ProfileType
# Balanced: moderate safety, performance, and cost
config = PipelineConfig.from_profile(ProfileType.BALANCED)
# Strict safety: maximum security scanning
config = PipelineConfig.from_profile(ProfileType.STRICT_SAFETY)
# Cost optimized: minimal processing, fast models
config = PipelineConfig.from_profile(ProfileType.COST_OPTIMIZED)
# Performance: minimal latency, premium models
config = PipelineConfig.from_profile(ProfileType.PERFORMANCE)
# Debug: full logging, telemetry, dry-run mode
config = PipelineConfig.from_profile(ProfileType.DEBUG)
Dry-run Mode
Test pipeline behavior without API costs:
config = PipelineConfig(dry_run=True)
pipeline = create_pipeline(pipeline_config=config)
# Simulates processing without actual LLM calls
simulation = pipeline.dry_run("Test input")
print(f"Estimated latency: {simulation['total_estimated_time_ms']}ms")
print(f"Would recall: {simulation['memory']['would_retrieve']} memories")
Component Access
Access individual components for fine-grained control:
pipeline = AgentPipeline(memory=True, guard=True)
# Direct memory access
memories = pipeline.memory_system.search("quantum computing", limit=5)
# Direct guard access
scan_result = pipeline.guard_system.scan_input("Test message")
# Component status
status = pipeline.get_component_status()
print(f"Memory: {status['memory']['status']}")
print(f"Guard: {status['guard']['policy_count']} policies loaded")
OpenClaw Bridge Integration
For OpenClaw agents, antaris-pipeline provides a bridge protocol that handles stdin/stdout NDJSON communication.
Bridge commands:
config-check: Validate pipeline configurationsession-start: Initialize new session with metadatapre-turn: Process input before LLM callpost-turn: Process output after LLM responsecompaction-recovery: Restore state after context compactionmemory-search: Direct memory query operations
The bridge runs as a persistent subprocess, maintaining pipeline state across turns.
Example bridge usage:
echo '{"cmd": "pre-turn", "text": "Hello", "memory_path": "./mem"}' | python3 pipeline_bridge.py
# {"success": true, "context": "Relevant context...", "blocked": false}
Configuration
PipelineConfig Options
from antaris_pipeline import PipelineConfig, MemoryConfig, GuardConfig
config = PipelineConfig(
# Component toggles
memory_enabled=True,
guard_enabled=True,
context_enabled=True,
router_enabled=False,
# Memory settings
memory_config=MemoryConfig(
max_memory_mb=1024,
decay_half_life_hours=168.0, # 1 week
min_relevance=0.3
),
# Guard settings
guard_config=GuardConfig(
input_scanning=True,
output_scanning=True,
policy_strictness=0.7,
max_scan_time_ms=1000
),
# Performance settings
dry_run=False,
enable_telemetrics=True,
max_concurrent_operations=10
)
YAML Configuration
# pipeline_config.yaml
profile: balanced
memory:
enabled: true
storage_path: "./memory_store"
max_memory_mb: 2048
decay_half_life_hours: 168.0
min_relevance: 0.3
guard:
enabled: true
input_scanning: true
output_scanning: true
policy_strictness: 0.7
context:
enabled: true
max_tokens: 8000
compression_enabled: true
telemetrics:
enabled: true
session_id: "production_v1"
output_dir: "./logs"
from antaris_pipeline import PipelineConfig
config = PipelineConfig.from_yaml("pipeline_config.yaml")
pipeline = create_pipeline(pipeline_config=config)
Command Line Interface
# Validate configuration
antaris-pipeline config --profile balanced --output config.yaml
# Process single input (dry-run)
antaris-pipeline process "Hello world" --dry-run
# Analyze telemetry logs
antaris-pipeline telemetrics summary ./logs/telemetrics_session123.jsonl
# Start telemetry dashboard
antaris-pipeline serve --port 8080
Performance Characteristics
Memory operation latencies (single-threaded, M2 MacBook Pro):
- Store single memory: 1-5ms
- Search 1000 memories: 15-45ms
- Compaction (10k memories): 200-500ms
Guard scan latencies:
- Input scan (typical): 10-50ms
- Output scan (typical): 15-75ms
- Policy compilation: 100-300ms (cached)
Context processing:
- Token counting: 1-5ms
- Compression (8k → 4k): 50-200ms
Pipeline coordination overhead: 5-15ms per turn.
Error Handling
Pipeline operations return structured results with success indicators:
pre_result = pipeline.pre_turn("Test input")
if not pre_result.success:
print("Pre-turn failed:")
for warning in pre_result.warnings:
print(f" Warning: {warning}")
for issue in pre_result.guard_issues:
print(f" Guard: {issue}")
# Graceful degradation
if pre_result.memory_count == 0:
print("Memory retrieval failed, proceeding without context")
Component failures are isolated. If memory fails, guard and context continue. If guard fails, processing continues with warnings logged.
Dependencies
Zero external dependencies. Uses only Python standard library:
jsonfor serializationpathlibfor file operationsloggingfor error reportingdataclassesfor structured typestypingfor type annotations
The packages that antaris-pipeline coordinates (memory, guard, context, router) have their own dependency requirements.
Testing
# Install with dev dependencies
pip install "antaris-pipeline[dev]"
# Run tests
pytest tests/
# With coverage
pytest --cov=antaris_pipeline tests/
# Test specific components
pytest tests/test_agent_pipeline.py -v
Thread Safety
AgentPipeline is thread-safe for read operations (memory search, guard scans). Write operations (memory storage, session state) use file locking to prevent corruption.
For high-concurrency applications, use separate pipeline instances per worker thread.
License
Apache 2.0
Related Packages
- antaris-memory - Persistent conversation memory
- antaris-guard - Input/output safety scanning
- antaris-context - Context window management
- antaris-router - Adaptive model routing
- antaris-contracts - Schema validation
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file antaris_pipeline-4.7.1.tar.gz.
File metadata
- Download URL: antaris_pipeline-4.7.1.tar.gz
- Upload date:
- Size: 81.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d3e943c21342ddfd9dd062a99d779a391f8be47d12661456a9c143977c20063
|
|
| MD5 |
e043cae355d4c174692bcf3f173b0a4a
|
|
| BLAKE2b-256 |
2ae7f1aeb0f76021296635fed23d3aee7d0d497d05bd897c7f0b2e9496249c31
|
File details
Details for the file antaris_pipeline-4.7.1-py3-none-any.whl.
File metadata
- Download URL: antaris_pipeline-4.7.1-py3-none-any.whl
- Upload date:
- Size: 57.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6c2207ffc8758dadb7b2ae4a33f8ee933d062601253032e1959ddb97391a1761
|
|
| MD5 |
c28c3bdb3b9e95a3c5251cb7967c951c
|
|
| BLAKE2b-256 |
911f1ef827e2fa7680658e732c5c77917f83f71e35d5ee5953d9752e38134876
|