Skip to main content

Unified orchestration pipeline for Antaris Analytics Suite

Project description

antaris-pipeline

Agent pipeline orchestration for the Antaris Analytics Suite

Zero dependencies. Coordinates memory, routing, safety, and context across agent lifecycles.

pip install antaris-pipeline

Architecture

antaris-pipeline orchestrates five Antaris packages:

  • antaris-memory: Persistent conversation memory with temporal decay
  • antaris-guard: Input/output safety scanning and policy enforcement
  • antaris-context: Dynamic context window management and compression
  • antaris-router: Model selection based on task classification
  • antaris-contracts: Schema validation and structured outputs

Two coordination patterns:

  1. AgentPipeline: Simple pre_turn/post_turn workflow for direct integration
  2. AntarisPipeline: Full pipeline orchestration with cross-package optimization

Quick Start

AgentPipeline (Recommended)

from antaris_pipeline import AgentPipeline

pipeline = AgentPipeline(memory=True, guard=True, context=True)

# Before LLM call
pre_result = pipeline.pre_turn("What is quantum computing?")
if pre_result.blocked:
    return pre_result.block_reason

# Add context if available
prompt = pre_result.context + user_input if pre_result.context else user_input
response = your_llm_call(prompt)

# After LLM call
post_result = pipeline.post_turn(user_input, response)
print(f"Stored {post_result.stored_memories} memories")

Full Pipeline Orchestration

from antaris_pipeline import create_pipeline, PipelineConfig

config = PipelineConfig(
    memory_enabled=True,
    guard_enabled=True,
    context_enabled=True,
    router_enabled=True
)

pipeline = create_pipeline(
    storage_path="./pipeline_storage",
    pipeline_config=config
)

# Process with full orchestration
result = await pipeline.process("Explain neural networks", your_model_function)
print(f"Response: {result.output}")
print(f"Tokens: {result.metrics.total_tokens}")

Core Features

Session Lifecycle Management

pipeline = AgentPipeline(memory=True)

# Start new session
pipeline.session_start("user_123", session_metadata={"context": "support"})

# Process turns
for user_input in conversation:
    pre_result = pipeline.pre_turn(user_input)
    # ... LLM processing ...
    post_result = pipeline.post_turn(user_input, llm_response)

# End session (automatic memory consolidation)
pipeline.session_end()

Pre-turn Processing

AgentPipeline.pre_turn() handles:

  1. Guard input check: Scans for policy violations, PII, harmful content
  2. Memory recall: Retrieves relevant conversation history
  3. Context building: Assembles enriched prompt with recalled memories
pre_result = pipeline.pre_turn("Remember what I told you about my project?")

# Check results
print(f"Blocked: {pre_result.blocked}")
print(f"Context length: {len(pre_result.context or '')}")
print(f"Memories recalled: {pre_result.memory_count}")
print(f"Warnings: {pre_result.warnings}")

Post-turn Processing

AgentPipeline.post_turn() handles:

  1. Guard output check: Validates LLM response safety
  2. Memory ingestion: Stores conversation turn for future recall
post_result = pipeline.post_turn(user_input, llm_response)

# Check results
print(f"Output blocked: {post_result.blocked_output}")
print(f"Safe replacement: {post_result.safe_replacement}")
print(f"Memories stored: {post_result.stored_memories}")

Compaction Recovery

When OpenClaw compacts agent context, the pipeline preserves essential state:

# Before compaction: pipeline stores handoff notes
handoff_data = pipeline.prepare_compaction()

# After compaction: pipeline restores context from notes
pipeline.recover_from_compaction(handoff_data)

The bridge handles this automatically in OpenClaw environments.

Pipeline Telemetry (v4.2.0)

PipelineTelemetry is a structured dataclass attached to every pipeline run, providing per-stage timing breakdowns to help identify bottlenecks.

from antaris_pipeline import PipelineTelemetry

# After pipeline run:
telemetry = agent.last_telemetry
print(telemetry.summary())
# "Pipeline: 145ms total | recall=12ms guard=5ms llm=120ms | 342 tokens"

stage, ms = telemetry.slowest_stage()
print(f"Bottleneck: {stage} at {ms:.1f}ms")

PipelineTelemetry fields:

  • stagesdict[str, float] mapping stage name → elapsed milliseconds
  • total_ms — total wall-clock time for the full pipeline run
  • token_count — tokens processed (input + output combined)
  • summary() — returns a one-line human-readable performance report
  • slowest_stage()tuple[str, float] — name and duration of the slowest stage

Access the telemetry object from the result of any pre_turn / post_turn call or via agent.last_telemetry after a full pipeline run.

Telemetry and Observability

from antaris_pipeline import TelemetricsCollector

collector = TelemetricsCollector(
    session_id="prod_session_001",
    output_dir="./telemetry_logs"
)

pipeline = AgentPipeline(
    memory=True, 
    guard=True,
    telemetrics_collector=collector
)

# All pipeline operations automatically emit telemetry events
# Logs stored as JSONL: ./telemetry_logs/telemetrics_prod_session_001.jsonl

Telemetry captures:

  • Performance metrics (latency, tokens, costs)
  • Security events (blocks, policy violations)
  • Memory operations (storage, retrieval, compaction)
  • Error conditions and warnings

Configuration Profiles

from antaris_pipeline import PipelineConfig, ProfileType

# Balanced: moderate safety, performance, and cost
config = PipelineConfig.from_profile(ProfileType.BALANCED)

# Strict safety: maximum security scanning
config = PipelineConfig.from_profile(ProfileType.STRICT_SAFETY)  

# Cost optimized: minimal processing, fast models
config = PipelineConfig.from_profile(ProfileType.COST_OPTIMIZED)

# Performance: minimal latency, premium models
config = PipelineConfig.from_profile(ProfileType.PERFORMANCE)

# Debug: full logging, telemetry, dry-run mode
config = PipelineConfig.from_profile(ProfileType.DEBUG)

Dry-run Mode

Test pipeline behavior without API costs:

config = PipelineConfig(dry_run=True)
pipeline = create_pipeline(pipeline_config=config)

# Simulates processing without actual LLM calls
simulation = pipeline.dry_run("Test input")
print(f"Estimated latency: {simulation['total_estimated_time_ms']}ms")
print(f"Would recall: {simulation['memory']['would_retrieve']} memories")

Component Access

Access individual components for fine-grained control:

pipeline = AgentPipeline(memory=True, guard=True)

# Direct memory access
memories = pipeline.memory_system.search("quantum computing", limit=5)

# Direct guard access  
scan_result = pipeline.guard_system.scan_input("Test message")

# Component status
status = pipeline.get_component_status()
print(f"Memory: {status['memory']['status']}")
print(f"Guard: {status['guard']['policy_count']} policies loaded")

OpenClaw Bridge Integration

For OpenClaw agents, antaris-pipeline provides a bridge protocol that handles stdin/stdout NDJSON communication.

Bridge commands:

  • config-check: Validate pipeline configuration
  • session-start: Initialize new session with metadata
  • pre-turn: Process input before LLM call
  • post-turn: Process output after LLM response
  • compaction-recovery: Restore state after context compaction
  • memory-search: Direct memory query operations

The bridge runs as a persistent subprocess, maintaining pipeline state across turns.

Example bridge usage:

echo '{"cmd": "pre-turn", "text": "Hello", "memory_path": "./mem"}' | python3 pipeline_bridge.py
# {"success": true, "context": "Relevant context...", "blocked": false}

Configuration

PipelineConfig Options

from antaris_pipeline import PipelineConfig, MemoryConfig, GuardConfig

config = PipelineConfig(
    # Component toggles
    memory_enabled=True,
    guard_enabled=True, 
    context_enabled=True,
    router_enabled=False,
    
    # Memory settings
    memory_config=MemoryConfig(
        max_memory_mb=1024,
        decay_half_life_hours=168.0,  # 1 week
        min_relevance=0.3
    ),
    
    # Guard settings  
    guard_config=GuardConfig(
        input_scanning=True,
        output_scanning=True,
        policy_strictness=0.7,
        max_scan_time_ms=1000
    ),
    
    # Performance settings
    dry_run=False,
    enable_telemetrics=True,
    max_concurrent_operations=10
)

YAML Configuration

# pipeline_config.yaml
profile: balanced

memory:
  enabled: true
  storage_path: "./memory_store"
  max_memory_mb: 2048
  decay_half_life_hours: 168.0
  min_relevance: 0.3

guard:
  enabled: true
  input_scanning: true
  output_scanning: true
  policy_strictness: 0.7

context:
  enabled: true
  max_tokens: 8000
  compression_enabled: true

telemetrics:
  enabled: true
  session_id: "production_v1"
  output_dir: "./logs"
from antaris_pipeline import PipelineConfig

config = PipelineConfig.from_yaml("pipeline_config.yaml")
pipeline = create_pipeline(pipeline_config=config)

Command Line Interface

# Validate configuration
antaris-pipeline config --profile balanced --output config.yaml

# Process single input (dry-run)
antaris-pipeline process "Hello world" --dry-run

# Analyze telemetry logs
antaris-pipeline telemetrics summary ./logs/telemetrics_session123.jsonl

# Start telemetry dashboard  
antaris-pipeline serve --port 8080

Performance Characteristics

Memory operation latencies (single-threaded, M2 MacBook Pro):

  • Store single memory: 1-5ms
  • Search 1000 memories: 15-45ms
  • Compaction (10k memories): 200-500ms

Guard scan latencies:

  • Input scan (typical): 10-50ms
  • Output scan (typical): 15-75ms
  • Policy compilation: 100-300ms (cached)

Context processing:

  • Token counting: 1-5ms
  • Compression (8k → 4k): 50-200ms

Pipeline coordination overhead: 5-15ms per turn.

Error Handling

Pipeline operations return structured results with success indicators:

pre_result = pipeline.pre_turn("Test input")

if not pre_result.success:
    print("Pre-turn failed:")
    for warning in pre_result.warnings:
        print(f"  Warning: {warning}")
    for issue in pre_result.guard_issues:
        print(f"  Guard: {issue}")

# Graceful degradation
if pre_result.memory_count == 0:
    print("Memory retrieval failed, proceeding without context")

Component failures are isolated. If memory fails, guard and context continue. If guard fails, processing continues with warnings logged.

Dependencies

Zero external dependencies. Uses only Python standard library:

  • json for serialization
  • pathlib for file operations
  • logging for error reporting
  • dataclasses for structured types
  • typing for type annotations

The packages that antaris-pipeline coordinates (memory, guard, context, router) have their own dependency requirements.

Testing

# Install with dev dependencies
pip install "antaris-pipeline[dev]"

# Run tests
pytest tests/

# With coverage
pytest --cov=antaris_pipeline tests/

# Test specific components
pytest tests/test_agent_pipeline.py -v

Thread Safety

AgentPipeline is thread-safe for read operations (memory search, guard scans). Write operations (memory storage, session state) use file locking to prevent corruption.

For high-concurrency applications, use separate pipeline instances per worker thread.

License

Apache 2.0

Related Packages

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

antaris_pipeline-4.9.13.tar.gz (82.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

antaris_pipeline-4.9.13-py3-none-any.whl (58.3 kB view details)

Uploaded Python 3

File details

Details for the file antaris_pipeline-4.9.13.tar.gz.

File metadata

  • Download URL: antaris_pipeline-4.9.13.tar.gz
  • Upload date:
  • Size: 82.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for antaris_pipeline-4.9.13.tar.gz
Algorithm Hash digest
SHA256 06c4832b7d2e37134b434bacc0e2f86c94949f7f63b66a9b8eb4af2090a9712f
MD5 e830e02b69cf3643823b20a7a3244915
BLAKE2b-256 6dac9bbfb92d2a83f8d5c4ebd916c15b535cb13b3bfadc24162a6f60ea111c80

See more details on using hashes here.

File details

Details for the file antaris_pipeline-4.9.13-py3-none-any.whl.

File metadata

File hashes

Hashes for antaris_pipeline-4.9.13-py3-none-any.whl
Algorithm Hash digest
SHA256 efe2d32f96f907413523a2cfd2f6b2969b03888658f56869b51cde5078953f5a
MD5 fabc55840958768632020eb5b2f9d7f8
BLAKE2b-256 949308a2181c6ca767868bf792d4e8abe7900cc11d005a347fac8c4d404138f2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page