Skip to main content

Unified orchestration pipeline for Antaris Analytics Suite

Project description

antaris-pipeline

The unified orchestration engine for the Antaris AI suite.

antaris-pipeline ties together memory, routing, guard, and context into a single, production-ready processing loop. It handles the full lifecycle of every LLM interaction — from input safety scanning through intelligent model selection, context optimization, response generation, and memory persistence — while emitting structured telemetry at every phase.


📦 Installation

pip install antaris-pipeline

Version: 4.9.20

Dependencies (auto-installed):

Package Role
antaris-memory Long-term memory storage, BM25 retrieval, context packets
antaris-router Smart model selection, provider routing, fallback chains
antaris-guard Input/output safety scanning, threat classification
antaris-context Token budgeting, context compression, adaptive optimization

🗂️ Table of Contents

  1. Quick Start
  2. Core Exports
  3. create_pipeline() Factory
  4. Pipeline Profile Shortcuts
  5. ProfileType Presets
  6. PipelineConfig & Sub-Configs
  7. AntarisPipeline — Main Orchestrator
  8. The 8-Phase Processing Pipeline
  9. PipelineResult
  10. Dry Run Mode
  11. Memory Retrieval
  12. Session Lifecycle Hooks
  13. Custom Token Estimator
  14. Performance & Intelligence
  15. AgentPipeline — Agent Lifecycle Wrapper
  16. Hook System
  17. Telemetrics & Events
  18. Cross-Package Intelligence Flows
  19. Complete Integration Example

⚡ Quick Start

from antaris_pipeline import create_pipeline

# Create a balanced pipeline (memory + routing + guard + context)
pipeline = create_pipeline(
    storage_path="./memory_store",
    session_id="my-session",
    agent_name="MyBot",
)

# Process a user message end-to-end
result = pipeline.process(
    input_text="What's the capital of France?",
    model_caller=lambda text: call_your_llm(text),
)

if result.success:
    print(result.output)
else:
    print(f"Error: {result.error}")

📋 Core Exports

All public symbols are importable directly from antaris_pipeline:

from antaris_pipeline import (
    # Core orchestrators
    Pipeline,
    AntarisPipeline,
    PipelineResult,
    create_pipeline,

    # Agent lifecycle wrapper
    AgentPipeline,
    PreTurnResult,
    PostTurnResult,

    # Configuration
    PipelineConfig,
    MemoryConfig,
    RouterConfig,
    GuardConfig,
    ContextConfig,
    TelemetricsConfig,

    # Profile system
    ProfileType,
    create_config,
    PROFILE_PRESETS,

    # Events & telemetrics
    AntarisEvent,
    EventType,
    ConfidenceBasis,
    PerformanceMetrics,
    EventEmitter,
    TelemetricsCollector,
    TelemetricsServer,
    PipelineTelemetry,

    # Hook system
    HookPhase,
    HookContext,
    HookResult,
    HookRegistry,
    PipelineHooks,
    HookCallback,

    # Profile shortcut factories
    balanced_pipeline,
    strict_pipeline,
    cost_optimized_pipeline,
    performance_pipeline,
    debug_pipeline,

    # Event helpers
    memory_event,
    router_event,
    guard_event,
    context_event,
)

🏭 create_pipeline() Factory

The recommended entry point for creating a fully-wired pipeline instance.

Signature

def create_pipeline(
    storage_path: str,
    memory_config: dict | None = None,
    router_config: dict | None = None,
    guard_config: dict | None = None,
    context_config: dict | None = None,
    pipeline_config: PipelineConfig | None = None,
    session_id: str | None = None,
    agent_name: str | None = None,
) -> AntarisPipeline:

Parameters

Parameter Type Default Description
storage_path str Path to the memory store directory. Created if it doesn't exist.
memory_config dict | None None Dict of MemoryConfig field overrides (e.g. {"half_life": 7.0}).
router_config dict | None None Dict of RouterConfig field overrides.
guard_config dict | None None Dict of GuardConfig field overrides (e.g. {"sensitivity": "balanced"}).
context_config dict | None None Dict of ContextConfig field overrides (e.g. {"total_budget": 8000}).
pipeline_config PipelineConfig | None None Full PipelineConfig object. When provided, individual dicts are ignored.
session_id str | None None Session identifier. Auto-generated UUID if not provided.
agent_name str | None None Name of the agent. Used for memory filtering and context restoration.

Example

from antaris_pipeline import create_pipeline

pipeline = create_pipeline(
    storage_path="./memory_store",
    memory_config={"half_life": 7.0},
    router_config={},
    guard_config={"sensitivity": "balanced"},
    context_config={"total_budget": 8000, "template": "agent_with_tools"},
    pipeline_config=None,
    session_id="my-session",
    agent_name="MyBot",
)

🚀 Pipeline Profile Shortcuts

Five pre-configured factory functions for common deployment scenarios. Each wraps create_pipeline() with tuned defaults.

from antaris_pipeline import (
    balanced_pipeline,
    strict_pipeline,
    cost_optimized_pipeline,
    performance_pipeline,
    debug_pipeline,
)

balanced_pipeline(storage_path, **kwargs) -> AntarisPipeline

General-purpose profile. Moderate safety, standard routing, compression enabled.

pipeline = balanced_pipeline("./store", agent_name="MyBot")

strict_pipeline(storage_path, **kwargs) -> AntarisPipeline

High-safety profile. Strict guard sensitivity, conservative routing, output scanning enforced. Ideal for customer-facing or regulated environments.

pipeline = strict_pipeline("./store")

cost_optimized_pipeline(storage_path, **kwargs) -> AntarisPipeline

Minimizes cost. Prefers cheaper models, aggressive compression, reduced memory retrieval limits.

pipeline = cost_optimized_pipeline("./store")

performance_pipeline(storage_path, **kwargs) -> AntarisPipeline

Optimized for low latency. Faster models preferred, reduced guard overhead, adaptive budgeting enabled.

pipeline = performance_pipeline("./store")

debug_pipeline(storage_path, **kwargs) -> AntarisPipeline

Full telemetrics, verbose logging, all hooks active. Use during development to inspect every phase.

pipeline = debug_pipeline("./store")
# Emits complete JSONL trace to ./telemetrics/

Comparison Table

Profile Safety Cost Speed Telemetrics
balanced Medium Medium Medium Standard
strict High Higher Slower Standard
cost_optimized Medium Low Medium Minimal
performance Medium Medium Fast Standard
debug Medium Medium Medium Full

🎛️ ProfileType Presets

ProfileType is an enum. Use create_config() to generate a PipelineConfig from a named preset. This is useful when you want to start from a preset and customize further.

from antaris_pipeline import ProfileType, create_config

config = create_config(ProfileType.BALANCED)
config = create_config(ProfileType.STRICT_SAFETY)
config = create_config(ProfileType.COST_OPTIMIZED)
config = create_config(ProfileType.PERFORMANCE)
config = create_config(ProfileType.DEBUG)

Available ProfileType Values

Enum Value Equivalent Shortcut
ProfileType.BALANCED balanced_pipeline()
ProfileType.STRICT_SAFETY strict_pipeline()
ProfileType.COST_OPTIMIZED cost_optimized_pipeline()
ProfileType.PERFORMANCE performance_pipeline()
ProfileType.DEBUG debug_pipeline()

PROFILE_PRESETS

PROFILE_PRESETS is a dict mapping each ProfileType to its default PipelineConfig:

from antaris_pipeline import PROFILE_PRESETS, ProfileType

config = PROFILE_PRESETS[ProfileType.BALANCED]

⚙️ PipelineConfig & Sub-Configs

PipelineConfig is the central configuration object. Pass it to create_pipeline() or use it directly when constructing AntarisPipeline.

Full Config Example

from antaris_pipeline import (
    PipelineConfig,
    MemoryConfig,
    RouterConfig,
    GuardConfig,
    ContextConfig,
    TelemetricsConfig,
)

config = PipelineConfig(
    memory=MemoryConfig(
        decay_half_life_hours=168,
        search_timeout_ms=5000,
        context_packet_max_tokens=2000,
    ),
    router=RouterConfig(
        track_model_performance=True,
    ),
    guard=GuardConfig(
        enable_output_scanning=True,
        default_policy_strictness=0.5,   # 0.0–1.0
    ),
    context=ContextConfig(
        default_max_tokens=8000,
        enable_compression=True,
        enable_adaptive_budgeting=False,
        model_context_limits={
            "claude-opus-4-6": 200000,
            "gpt-4o": 128000,
        },
    ),
    telemetrics=TelemetricsConfig(
        output_directory="./telemetrics",
        buffer_size=100,
        enable_telemetrics=True,
    ),
)

MemoryConfig

Controls memory storage, retrieval, and decay behaviour.

Field Type Default Description
decay_half_life_hours float 168 Memory decay rate. Memories older than this lose relevance weight. 168h = 1 week.
search_timeout_ms int 5000 BM25 search timeout in milliseconds.
context_packet_max_tokens int 2000 Max tokens for the memory context packet injected into prompts.
memory_cfg = MemoryConfig(
    decay_half_life_hours=72,        # 3-day decay
    search_timeout_ms=3000,
    context_packet_max_tokens=1500,
)

RouterConfig

Controls model selection, provider preferences, and performance tracking.

Field Type Default Description
track_model_performance bool True Record per-model latency and success rates. Feeds into cross-package intelligence.
router_cfg = RouterConfig(
    track_model_performance=True,
)

GuardConfig

Controls safety scanning for both input and output.

Field Type Default Description
enable_output_scanning bool True Whether to scan the model's response (Phase 7).
default_policy_strictness float 0.5 Safety threshold. 0.0 = PERMISSIVE, 0.5 = BALANCED, 1.0 = STRICT.
guard_cfg = GuardConfig(
    enable_output_scanning=True,
    default_policy_strictness=0.8,   # Lean toward strict
)

Strictness mapping:

Range Mode
0.0 – 0.33 PERMISSIVE
0.34 – 0.66 BALANCED
0.67 – 1.0 STRICT

ContextConfig

Controls token budget management, compression, and model-specific context limits.

Field Type Default Description
default_max_tokens int 8000 Default token budget for context assembly.
enable_compression bool True Enable context compression when approaching budget limits.
enable_adaptive_budgeting bool False Dynamically adjust token budgets based on request complexity.
model_context_limits dict[str, int] {} Per-model context window overrides. Used by Router → Context intelligence flow.
context_cfg = ContextConfig(
    default_max_tokens=12000,
    enable_compression=True,
    enable_adaptive_budgeting=True,
    model_context_limits={
        "claude-opus-4-6": 200000,
        "gpt-4o": 128000,
        "gpt-4o-mini": 128000,
    },
)

TelemetricsConfig

Controls telemetry output location, buffering, and enable/disable.

Field Type Default Description
output_directory str "./telemetrics" Directory for JSONL telemetry files.
buffer_size int 100 Number of events to buffer before flushing to disk.
enable_telemetrics bool True Master switch for telemetry collection.
telemetrics_cfg = TelemetricsConfig(
    output_directory="./logs/telemetrics",
    buffer_size=50,
    enable_telemetrics=True,
)

🧠 AntarisPipeline — Main Orchestrator

AntarisPipeline (also exported as Pipeline) is the central class that wires all sub-systems together and runs the 8-phase processing loop.

Construction

Typically created via create_pipeline() or a profile shortcut. Direct construction:

from antaris_pipeline import AntarisPipeline, PipelineConfig

pipeline = AntarisPipeline(
    storage_path="./memory_store",
    config=PipelineConfig(...),
    session_id="my-session",
    agent_name="MyBot",
)

Public Methods

Method Description
process(...) Run the full 8-phase pipeline. Returns PipelineResult.
dry_run(input_text, context_data=None) Simulate all phases without calling the model or writing to memory.
memory_retrieval(input_text, context_data=None, limit=10) Query memory directly, bypassing the full pipeline.
flush_to_memory(reason="") Persist in-session state to memory (for compaction).
get_compaction_summary() Get a markdown summary string of the current session state.
on_session_start(summary="", agent_name=None) Restore context from memory at the start of a new session.
set_token_estimator(fn) Replace the default token counting heuristic.
get_performance_stats() Return session-level performance metrics.
get_intelligence_summary() Return cross-package intelligence state.

🔄 The 8-Phase Processing Pipeline

pipeline.process() executes all 8 phases sequentially for every request.

result = pipeline.process(
    input_text="user message",
    model_caller=lambda text: call_llm(text),
    context_data={"extra": "context"},   # Optional dict merged into context
    dry_run=False,
)

Parameters

Parameter Type Required Description
input_text str The raw user input or prompt.
model_caller Callable[[str], str] | None ✅ (unless dry_run=True) A callable that accepts the assembled prompt and returns the model's response string.
context_data dict | None Additional key-value context merged into the context assembly step.
dry_run bool If True, skips model execution and memory writes. Equivalent to calling dry_run().

Phase 1 — Guard Input Scan

What happens: The raw input_text is scanned by antaris-guard for threats, policy violations, and risk patterns.

If blocked: result.success = False, result.error contains the block reason, and all subsequent phases are skipped.

Key outputs stored in result:

  • result.guard_decisions[0] — the input guard decision dict
  • Fields: is_blocked, risk_score (0.0–1.0), threat_level, patterns_matched
# Guard decisions are always in result.guard_decisions
# Index 0 = input scan, Index 1 = output scan (Phase 7)
input_guard = result.guard_decisions[0]
print(input_guard["risk_score"])      # e.g. 0.12
print(input_guard["is_blocked"])      # False
print(input_guard["threat_level"])    # "LOW"

Phase 2 — Memory Retrieval

What happens: BM25 search runs against the memory store using input_text as the query. Top results are assembled into a context packet (up to context_packet_max_tokens).

Key outputs:

  • result.memory_retrievals — list of retrieved memory dicts
  • Each memory: {content, score, category, source, timestamp, ...}
print(f"Retrieved {len(result.memory_retrievals)} memories")
for mem in result.memory_retrievals:
    print(f"  [{mem['category']}] {mem['content'][:80]}")

Phase 3 — Context Building

What happens: The router's model selection hint (from Phase 4 lookahead) sets the token budget ceiling. Memory context packet is injected. Any context_data is merged. The full context is optimized/compressed if needed.

Key outputs:

  • result.context_optimizations — dict with compression ratio, token counts, optimization applied
opts = result.context_optimizations
print(f"Compression ratio: {opts.get('compression_ratio')}")
print(f"Input tokens: {opts.get('input_tokens')}")
print(f"Budget used: {opts.get('budget_utilization')}")

Phase 4 — Smart Routing

What happens: antaris-router selects the best model/provider based on: task characteristics, cost constraints, model context limits, and historical performance data fed back from memory.

Key outputs:

  • result.routing_decision — full routing dict
rd = result.routing_decision
print(f"Model: {rd['selected_model']}")
print(f"Provider: {rd['provider']}")
print(f"Tier: {rd['tier']}")
print(f"Confidence: {rd['confidence']}")
print(f"Est. cost: ${rd['estimated_cost']:.4f}")
print(f"Fallback chain: {rd['fallback_chain']}")
print(f"Reasoning: {rd['reasoning']}")

Phase 5 — Model Execution

What happens: The assembled context is passed to model_caller. The callable receives the optimized prompt string and must return the response string.

In dry_run mode: This phase is skipped. A simulated response is generated instead.

# model_caller receives the assembled, optimized prompt
result = pipeline.process(
    input_text="Summarize this document",
    model_caller=lambda prompt: anthropic_client.messages.create(
        model="claude-opus-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ).content[0].text,
)

Phase 6 — Memory Storage

What happens: The conversation turn (input + output) is stored to memory.

Risk-based branching:

  • If the input had risk_score > 0.7 (from Phase 1), the turn is stored as a security fact rather than a conversation entry. This prevents contamination of the conversation history with threat-related content.
  • Normal turns are stored as conversation entries with full metadata.

Phase 7 — Guard Output Scan

What happens: If enable_output_scanning=True, the model's response is scanned for policy violations or unsafe content.

If blocked: result.success = False even if the model call succeeded. The output scan result is appended to result.guard_decisions.

# Index 1 in guard_decisions is the output scan
if len(result.guard_decisions) > 1:
    output_guard = result.guard_decisions[1]
    print(output_guard["is_blocked"])

Phase 8 — Cross-Package Intelligence Updates

What happens: All five cross-package intelligence flows are updated with data from this request (see Cross-Package Intelligence Flows).

This includes:

  • Routing feedback (latency + success) written to memory
  • Context pressure counters updated
  • Security pattern tracking updated in memory

📊 PipelineResult

process() always returns a PipelineResult. It never raises on pipeline-level failures — errors are captured in result.error.

Fields

Field Type Description
success bool True if the pipeline completed without blocking or error.
output str The model's response. Empty string if blocked or errored.
error str | None Error or block reason. None on success.
events List[AntarisEvent] All telemetry events emitted during this request.
performance dict {"total_latency_ms": float, "dry_run": bool}
memory_retrievals List[dict] Memories retrieved in Phase 2.
routing_decision dict Full routing decision from Phase 4.
guard_decisions List[dict] Input guard (index 0) and optionally output guard (index 1).
context_optimizations dict Context assembly stats from Phase 3.

Methods

# Serialize to dict (e.g. for logging or API responses)
result_dict = result.to_dict()

Usage Example

result = pipeline.process(
    input_text="Hello!",
    model_caller=my_llm_caller,
)

if not result.success:
    print(f"Blocked or error: {result.error}")
else:
    print(f"Response: {result.output}")
    print(f"Latency: {result.performance['total_latency_ms']:.1f}ms")
    print(f"Model used: {result.routing_decision['selected_model']}")
    print(f"Memories retrieved: {len(result.memory_retrievals)}")

🔍 Dry Run Mode

Dry run simulates all 8 phases without calling the model or writing anything to memory. Use it to preview routing decisions, guard behavior, and context utilization before committing to a live request.

Method 1: pipeline.dry_run()

sim = pipeline.dry_run(
    input_text="What would happen?",
    context_data={"topic": "security"},   # Optional
)

Method 2: pipeline.process() with dry_run=True

result = pipeline.process(
    input_text="What would happen?",
    model_caller=None,   # Not called in dry_run
    dry_run=True,
)

Dry Run Response Fields

sim = pipeline.dry_run("What would happen?")

# Input analysis
sim["input"]["text"]               # The input string
sim["input"]["length"]             # Character count
sim["input"]["context_provided"]   # bool

# Guard input prediction
sim["guard_input"]["would_allow"]      # bool
sim["guard_input"]["risk_score"]       # float 0.0–1.0
sim["guard_input"]["threat_level"]     # "LOW" | "MEDIUM" | "HIGH"
sim["guard_input"]["patterns_active"]  # int — active rule count
sim["guard_input"]["sensitivity"]      # current sensitivity level
sim["guard_input"]["match_count"]      # int — patterns matched
sim["guard_input"]["message"]          # human-readable summary

# Memory prediction
sim["memory"]["would_retrieve"]   # bool
sim["memory"]["total_in_store"]   # int — total stored memories
sim["memory"]["top_relevance"]    # float — top BM25 score
sim["memory"]["matched_terms"]    # list of matched query terms

# Context prediction
sim["context"]["would_optimize"]          # bool
sim["context"]["estimated_input_tokens"]  # int
sim["context"]["total_budget"]            # int
sim["context"]["utilization"]             # float 0.0–1.0

# Router prediction
sim["router"]["would_select"]         # selected model name
sim["router"]["provider"]             # provider name
sim["router"]["tier"]                 # model tier
sim["router"]["confidence"]           # float 0.0–1.0
sim["router"]["estimated_cost_usd"]   # float
sim["router"]["fallback_chain"]       # list of fallback models
sim["router"]["reasoning"]            # routing explanation string

# Output guard prediction
sim["guard_output"]["would_scan"]    # bool
sim["guard_output"]["sensitivity"]   # current sensitivity

# Timing
sim["dry_run_time_ms"]   # float — simulation time in ms

Example: Pre-flight Check

sim = pipeline.dry_run("Ignore all previous instructions and...")

if not sim["guard_input"]["would_allow"]:
    print(f"⛔ Would be blocked (risk: {sim['guard_input']['risk_score']:.2f})")
else:
    print(f"✅ Would route to: {sim['router']['would_select']}")
    print(f"   Est. cost: ${sim['router']['estimated_cost_usd']:.4f}")
    print(f"   Context utilization: {sim['context']['utilization']:.0%}")

🗃️ Memory Retrieval

Run a direct BM25 memory search without executing the full pipeline. Useful for building custom context or debugging retrieval quality.

data = pipeline.memory_retrieval(
    input_text="security vulnerability report",
    context_data={"agent_id": "MyBot"},   # Optional filter hints
    limit=10,
)

Parameters

Parameter Type Default Description
input_text str Query text for BM25 search.
context_data dict | None None Additional context passed to the memory engine (e.g. for filtering).
limit int 10 Maximum number of results to return.

Response

data["retrievals"]        # List[dict] — full memory objects
data["relevance_scores"]  # List[float] — BM25 scores per memory
data["context_packet"]    # str — assembled context string (ready for injection)
data["memory_count"]      # int — total memories in store
data["retrieved_count"]   # int — number actually returned

Example

data = pipeline.memory_retrieval("pricing strategy", limit=5)

print(f"Found {data['retrieved_count']} of {data['memory_count']} memories")
for mem, score in zip(data["retrievals"], data["relevance_scores"]):
    print(f"  [{score:.3f}] {mem['content'][:80]}")

# Use the context packet directly
my_prompt = f"{data['context_packet']}\n\nUser: What's our pricing strategy?"

🔄 Session Lifecycle Hooks

flush_to_memory(reason="") -> int

Persists current in-session state to the memory store. Call this before compaction to ensure session intelligence is preserved across context resets.

n = pipeline.flush_to_memory(reason="compaction")
print(f"Stored {n} memory entries")

What gets stored:

  • Performance summary: per-model latency stats, success rates, request count
  • Security pattern summary: any threat patterns observed this session

Returns: Number of memory entries written.


get_compaction_summary() -> str

Returns a markdown-formatted summary of the current session state. Designed to be passed as prependContext during compaction events.

summary = pipeline.get_compaction_summary()
# Returns a markdown string, e.g.:
# ## Session Summary
# - Requests: 42
# - Models used: claude-opus-4-6 (38), gpt-4o (4)
# - Avg latency: 1234ms
# - Security events: 2

Typical compaction workflow:

# In your compaction hook:
def before_compaction_hook(ctx):
    n = pipeline.flush_to_memory(reason="compaction")
    summary = pipeline.get_compaction_summary()
    # Pass summary to your compaction system as prependContext
    return summary

# Or with the hook system:
from antaris_pipeline import HookPhase, HookContext, HookResult

def compaction_hook(ctx: HookContext) -> HookResult:
    pipeline.flush_to_memory(reason="compaction")
    return HookResult(continue_pipeline=True)

registry.register(HookPhase.BEFORE_MODEL_CALL, compaction_hook)

on_session_start(summary="", agent_name=None) -> dict

Restores relevant context from memory at the beginning of a new session. Call this once, immediately after creating the pipeline, before the first process() call.

ctx = pipeline.on_session_start(
    summary="",           # Optional compaction summary from previous session
    agent_name="MyBot",   # Optional agent name for memory filtering
)

Parameters

Parameter Type Default Description
summary str "" Compaction summary from the previous session (from get_compaction_summary()).
agent_name str | None None Agent name used to filter memory retrieval by agent_id.

Response

ctx["prependContext"]   # str — markdown context string, ready to prepend to system prompt

Restoration Strategy

  1. Primary: memory.recent() — recency-first retrieval of the most recent memories
  2. Supplemental: BM25 search filtered by agent_id to find agent-specific context
  3. Top 10 memories, deduplicated, formatted with category and source metadata
  4. Output format: "## Restored Session Context\n..." — ready to prepend to your system prompt

Full Session Start Example

pipeline = create_pipeline("./memory_store", agent_name="MyBot")

# Restore previous context
ctx = pipeline.on_session_start(agent_name="MyBot")

system_prompt = f"""
{ctx['prependContext']}

You are MyBot, a helpful assistant.
"""

# Now start processing requests
result = pipeline.process(
    input_text=user_message,
    model_caller=lambda prompt: llm_call(system_prompt, prompt),
)

🔢 Custom Token Estimator

Replace the built-in token estimation heuristic with your own. This affects context budget calculations in Phase 3.

Default Behavior

The default estimator uses: len(text) // 4 — a rough approximation suitable for most use cases.

Setting a Custom Estimator

pipeline.set_token_estimator(fn: Callable[[str], int])

Example: tiktoken (OpenAI-compatible)

import tiktoken

enc = tiktoken.get_encoding("cl100k_base")
pipeline.set_token_estimator(lambda text: len(enc.encode(text)))

Example: Anthropic token counting

import anthropic

client = anthropic.Anthropic()

def anthropic_token_count(text: str) -> int:
    response = client.messages.count_tokens(
        model="claude-opus-4-6",
        messages=[{"role": "user", "content": text}],
    )
    return response.input_tokens

pipeline.set_token_estimator(anthropic_token_count)

Example: Simple word-based estimate

# Roughly 0.75 tokens per word
pipeline.set_token_estimator(lambda text: int(len(text.split()) * 1.33))

📈 Performance & Intelligence

get_performance_stats() -> dict

Returns session-level performance metrics.

stats = pipeline.get_performance_stats()

Response fields:

Field Description
session_id Current session identifier
total_requests Number of process() calls this session
routing_feedback Per-model latency and success rate history
security_patterns Observed threat pattern counters
telemetrics_summary Summary from the telemetrics collector
stats = pipeline.get_performance_stats()
print(f"Session: {stats['session_id']}")
print(f"Total requests: {stats['total_requests']}")
for model, fb in stats["routing_feedback"].items():
    print(f"  {model}: {fb['success_rate']:.0%} success, {fb['avg_latency_ms']:.0f}ms avg")

get_intelligence_summary() -> dict

Returns the full cross-package intelligence state — a deep view into what the pipeline has learned this session.

intel = pipeline.get_intelligence_summary()

Response fields:

Field Description
routing_feedback Model performance history (latency + success rates)
security_patterns Active threat pattern counters from guard
context_pressure_events Count of heavy-compression events (DoS signals)
provider_health Per-provider availability and error rates
memory_stats Memory store size and retrieval performance
guard_posture Current guard sensitivity and recent decisions
last_routing_result Full routing decision from the most recent request
current_trace_id Active trace ID for telemetry correlation
intel = pipeline.get_intelligence_summary()
print(f"Context pressure events: {intel['context_pressure_events']}")
print(f"Guard posture: {intel['guard_posture']}")
print(f"Last model: {intel['last_routing_result']['selected_model']}")

🤖 AgentPipeline — Agent Lifecycle Wrapper

AgentPipeline wraps AntarisPipeline and exposes a simplified two-step API optimized for conversational agent frameworks. Instead of one monolithic process() call, it exposes pre_turn() and post_turn() — letting you inject your own logic (tool calls, multi-step reasoning) between guard/retrieval and memory storage.

Construction

from antaris_pipeline import AgentPipeline

agent = AgentPipeline(
    pipeline=pipeline,       # An AntarisPipeline instance
    session_id="session-1",  # Session identifier
)

Parameters

Parameter Type Description
pipeline AntarisPipeline The underlying pipeline to wrap.
session_id str Session identifier.

pre_turn(user_message, search_limit=10) -> PreTurnResult

Runs Phase 1 (guard input scan) and Phase 2 (memory retrieval) without executing the model or storing anything. Call this at the start of each user turn to get safety clearance and relevant context.

pre = agent.pre_turn(
    user_message="Tell me about the deployment last week",
    search_limit=10,
)

Parameters:

Parameter Type Default Description
user_message str The user's input text.
search_limit int 10 Max memories to retrieve.

PreTurnResult fields:

Field Type Description
context_packet str Assembled memory context, ready to inject into the system prompt.
memories_found int Number of relevant memories retrieved.
guard_result dict Full guard decision dict (same as result.guard_decisions[0]).
should_continue bool False if the input was blocked. Check this before calling the model.
pre = agent.pre_turn("Hello!")

if not pre.should_continue:
    print(f"Blocked: {pre.guard_result['threat_level']}")
    return

# Inject context into your agent's system prompt
system_prompt = f"""
{pre.context_packet}
You are a helpful assistant.
"""

# Run your agent logic (tool calls, multi-step, etc.)
response = my_agent_logic(system_prompt, user_message)

post_turn(user_message, agent_response, agent_id=None, session_id=None, channel_id=None) -> PostTurnResult

Runs Phase 6 (memory storage) after the agent has generated a response. Call this at the end of each turn to persist the conversation.

post = agent.post_turn(
    user_message="Tell me about the deployment",
    agent_response="Last week we deployed v4.9.0 with...",
    agent_id="MyBot",
    session_id="session-1",
    channel_id="discord-channel-123",
)

Parameters:

Parameter Type Default Description
user_message str The original user input.
agent_response str The agent's generated response.
agent_id str | None None Agent identifier for memory metadata.
session_id str | None None Session ID override. Uses AgentPipeline.session_id if not set.
channel_id str | None None Channel identifier for memory metadata.

PostTurnResult fields:

Field Type Description
ingested int Number of memory entries created.
saved bool True if memory storage succeeded.

Full AgentPipeline Example

from antaris_pipeline import create_pipeline, AgentPipeline

pipeline = create_pipeline("./store", agent_name="MyBot")
agent = AgentPipeline(pipeline=pipeline, session_id="session-abc")

async def handle_message(user_message: str) -> str:
    # Step 1: Pre-turn (guard + memory)
    pre = agent.pre_turn(user_message, search_limit=10)

    if not pre.should_continue:
        return "I can't help with that."

    # Step 2: Build system prompt with retrieved context
    system = f"""
{pre.context_packet}
You are MyBot. Be concise and helpful.
    """.strip()

    # Step 3: Call your LLM (with tool calls, chain-of-thought, etc.)
    response = await my_llm(system, user_message)

    # Step 4: Post-turn (store to memory)
    post = agent.post_turn(
        user_message=user_message,
        agent_response=response,
        agent_id="MyBot",
    )

    return response

🪝 Hook System

The hook system lets you inject custom logic at any phase of the pipeline. Hooks can observe, modify, or short-circuit processing.

Concepts

Class Description
HookPhase Enum of all available injection points
HookContext Input to your hook function — contains current pipeline state
HookResult Output from your hook — controls pipeline continuation
HookRegistry Registry that maps phases to hook functions
HookCallback Type alias: Callable[[HookContext], HookResult]
PipelineHooks Higher-level hook management (attached to AntarisPipeline)

Available Hook Phases

from antaris_pipeline import HookPhase

# Input safety
HookPhase.BEFORE_GUARD_INPUT
HookPhase.AFTER_GUARD_INPUT

# Memory
HookPhase.BEFORE_MEMORY_RETRIEVE
HookPhase.AFTER_MEMORY_RETRIEVE

# Context assembly
HookPhase.BEFORE_CONTEXT_BUILD
HookPhase.AFTER_CONTEXT_BUILD

# Routing
HookPhase.BEFORE_ROUTE
HookPhase.AFTER_ROUTE

# Model execution
HookPhase.BEFORE_MODEL_CALL
HookPhase.AFTER_MODEL_CALL

# Memory storage
HookPhase.BEFORE_MEMORY_STORE
HookPhase.AFTER_MEMORY_STORE

# Output safety
HookPhase.BEFORE_GUARD_OUTPUT
HookPhase.AFTER_GUARD_OUTPUT

Writing a Hook

from antaris_pipeline import HookContext, HookResult

def my_hook(ctx: HookContext) -> HookResult:
    # Available on ctx:
    # ctx.phase          — HookPhase enum value
    # ctx.input_text     — original user input
    # ctx.result_so_far  — partial PipelineResult accumulated so far
    # ctx.metadata       — dict of phase-specific data

    # Observe and continue (most common)
    return HookResult(continue_pipeline=True, modified_input=None)

    # Modify the input for the next phase
    return HookResult(continue_pipeline=True, modified_input="sanitized input")

    # Short-circuit the pipeline
    return HookResult(continue_pipeline=False, modified_input=None)

HookResult fields:

Field Type Description
continue_pipeline bool If False, stops processing and returns current state.
modified_input str | None If set, replaces input_text for subsequent phases.

Registering Hooks

from antaris_pipeline import HookRegistry, HookPhase

registry = HookRegistry()
registry.register(HookPhase.BEFORE_MODEL_CALL, my_hook)
registry.register(HookPhase.AFTER_MEMORY_RETRIEVE, another_hook)

Hook Examples

Logging hook — observe every phase:

import time

def timing_hook(ctx: HookContext) -> HookResult:
    print(f"[{ctx.phase.name}] input_len={len(ctx.input_text)}")
    return HookResult(continue_pipeline=True)

for phase in HookPhase:
    registry.register(phase, timing_hook)

Input sanitization hook:

import re

def sanitize_hook(ctx: HookContext) -> HookResult:
    cleaned = re.sub(r"<[^>]+>", "", ctx.input_text)  # Strip HTML
    return HookResult(continue_pipeline=True, modified_input=cleaned)

registry.register(HookPhase.BEFORE_GUARD_INPUT, sanitize_hook)

Compaction hook — flush before model call:

def compaction_hook(ctx: HookContext) -> HookResult:
    if ctx.metadata.get("request_count", 0) % 50 == 0:
        pipeline.flush_to_memory(reason="periodic_compaction")
    return HookResult(continue_pipeline=True)

registry.register(HookPhase.BEFORE_MODEL_CALL, compaction_hook)

Circuit-breaker hook — block on high context pressure:

def pressure_guard_hook(ctx: HookContext) -> HookResult:
    intel = pipeline.get_intelligence_summary()
    if intel["context_pressure_events"] > 10:
        # Too many compression events — possible DoS
        return HookResult(continue_pipeline=False)
    return HookResult(continue_pipeline=True)

registry.register(HookPhase.BEFORE_CONTEXT_BUILD, pressure_guard_hook)

📡 Telemetrics & Events

antaris-pipeline emits structured events at every phase. Events are written to JSONL files and can be consumed in real time.

Output Files

./telemetrics/telemetrics_{session_id}.jsonl

Each line is a JSON object representing one event. Files rotate per session.


Event Types

Events are emitted automatically. You don't need to instrument anything.

EventType When Emitted
PIPELINE_START At the start of each process() call
PIPELINE_COMPLETE When process() completes successfully
PIPELINE_ERROR When an unhandled error occurs
PIPELINE_DRY_RUN When dry_run() is called
GUARD_SCAN Each time a guard scan runs
GUARD_ALLOW When a guard scan allows the content
GUARD_DENY When a guard scan blocks the content
MEMORY_RETRIEVE After BM25 memory search
MEMORY_INGEST After memory storage
CONTEXT_BUILD After context assembly
ROUTER_ROUTE After model selection

AntarisEvent Structure

from antaris_pipeline import AntarisEvent, EventType

# Events on a result:
for event in result.events:
    print(f"[{event.event_type}] {event.timestamp}{event.data}")

AntarisEvent fields:

Field Type Description
event_type EventType The event category.
timestamp float Unix timestamp.
session_id str Session this event belongs to.
trace_id str Per-request trace ID for correlation.
data dict Event-specific payload.

Telemetrics Classes

Class Description
TelemetricsCollector Buffers events and writes to JSONL. Configured via TelemetricsConfig.
TelemetricsServer Optional HTTP server for real-time event streaming.
PipelineTelemetry High-level interface used internally by AntarisPipeline.
EventEmitter Base class for emitting typed events.

Event Helpers

Convenience functions for constructing events in hook callbacks or custom code:

from antaris_pipeline import memory_event, router_event, guard_event, context_event

evt = memory_event(session_id="s1", retrieved=5, total=200)
evt = router_event(session_id="s1", model="claude-opus-4-6", cost=0.0042)
evt = guard_event(session_id="s1", blocked=False, risk_score=0.12)
evt = context_event(session_id="s1", tokens=4200, budget=8000)

Parsing Telemetry Files

import json
from pathlib import Path

session_id = "my-session"
log_file = Path(f"./telemetrics/telemetrics_{session_id}.jsonl")

events = [json.loads(line) for line in log_file.read_text().splitlines()]

# Filter to GUARD_DENY events
blocks = [e for e in events if e["event_type"] == "GUARD_DENY"]
print(f"Total blocks this session: {len(blocks)}")

# Calculate average latency
pipeline_events = [e for e in events if e["event_type"] == "PIPELINE_COMPLETE"]
avg_latency = sum(e["data"]["total_latency_ms"] for e in pipeline_events) / len(pipeline_events)
print(f"Avg latency: {avg_latency:.1f}ms")

PerformanceMetrics and ConfidenceBasis

Additional telemetry types exposed for custom instrumentation:

from antaris_pipeline import PerformanceMetrics, ConfidenceBasis

# PerformanceMetrics: attached to routing decisions
# Fields: latency_ms, tokens_used, cost_usd, success

# ConfidenceBasis: explains why the router chose a model
# Values: PERFORMANCE_HISTORY, COST_OPTIMIZATION, CAPABILITY_MATCH, FALLBACK, DEFAULT

🔗 Cross-Package Intelligence Flows

antaris-pipeline implements 5 automatic intelligence flows that wire the sub-packages together. These run in Phase 8 of every process() call and require no configuration.


Flow 1 — Memory → Router

What: Model success rates and latency history (stored as routing_feedback memories) are retrieved and passed to the router as performance hints.

Effect: The router avoids models with poor recent performance and prefers those with proven reliability for the current task type.

Memory Store ──[routing_feedback memories]──► Router (Phase 4 model selection)

Flow 2 — Router → Context

What: The selected model's context window size (from model_context_limits in ContextConfig) is used as the upper bound for the token budget.

Effect: Context assembly never tries to pack more tokens than the selected model can actually handle.

Router (selected_model) ──[context_limit]──► Context Builder (Phase 3 budget ceiling)

Flow 3 — Guard → Memory

What: When risk_score > 0.7 on an input scan (Phase 1), the content is stored to memory as a security fact rather than a normal conversation entry.

Effect: High-risk inputs are flagged in memory for future reference without contaminating conversation history. The agent learns about threat patterns over time.

Guard (risk_score > 0.7) ──[security fact]──► Memory Store (Phase 6, alternate storage path)

Flow 4 — Context → Guard

What: When compression ratio drops below 0.3 (i.e., the context had to be compressed to less than 30% of its original size), this is flagged as a potential DoS vector and the context_pressure_events counter is incremented.

Effect: Sustained context pressure (many high-compression events) is a signal of prompt stuffing or adversarial input. Visible via get_intelligence_summary()["context_pressure_events"].

Context Builder (ratio < 0.3) ──[pressure signal]──► Guard/Intelligence (Phase 8 counter)

Flow 5 — Router → Memory

What: After each model call completes (Phase 5), the model name, latency, and success/failure are stored as a routing_feedback memory entry.

Effect: The pipeline builds up a performance history for each model over time, which feeds back into Flow 1 to improve future routing decisions.

Model Execution (Phase 5) ──[latency + success]──► Memory Store (routing_feedback)
                                                    ↑
                               (retrieved in Flow 1 next request)

Intelligence Flow Summary

┌──────────────────────────────────────────────────────────────┐
│                  Cross-Package Intelligence                   │
│                                                              │
│  Memory ──Flow 1──► Router  (perf history → model select)   │
│  Router ──Flow 2──► Context (model limits → token budget)    │
│  Guard  ──Flow 3──► Memory  (high-risk → security facts)     │
│  Context──Flow 4──► Guard   (heavy compress → DoS signal)    │
│  Router ──Flow 5──► Memory  (latency+success → feedback)     │
└──────────────────────────────────────────────────────────────┘

All flows are wired at construction time. No configuration required.


🧩 Complete Integration Example

A full example showing antaris-pipeline integrated into a Discord bot:

import asyncio
from antaris_pipeline import create_pipeline, AgentPipeline
import anthropic

# ── Setup ────────────────────────────────────────────────────────────────────

client = anthropic.Anthropic()

pipeline = create_pipeline(
    storage_path="./memory_store",
    guard_config={"sensitivity": "balanced"},
    context_config={"total_budget": 8000},
    session_id="discord-bot-session",
    agent_name="MyBot",
)

# Swap in a real token counter
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
pipeline.set_token_estimator(lambda text: len(enc.encode(text)))

agent = AgentPipeline(pipeline=pipeline, session_id="discord-bot-session")

# ── Session start ─────────────────────────────────────────────────────────────

ctx = pipeline.on_session_start(agent_name="MyBot")
BASE_SYSTEM = f"""
{ctx['prependContext']}

You are MyBot, a helpful assistant in a Discord server.
Be concise. Use markdown sparingly.
""".strip()


# ── Per-message handler ───────────────────────────────────────────────────────

async def handle_message(user_id: str, user_message: str) -> str:
    # Step 1: Pre-turn (guard + memory retrieval)
    pre = agent.pre_turn(user_message, search_limit=8)

    if not pre.should_continue:
        risk = pre.guard_result.get("risk_score", 0)
        return f"⛔ I can't help with that. (risk: {risk:.2f})"

    # Step 2: Build context-aware system prompt
    system = BASE_SYSTEM
    if pre.context_packet:
        system = f"{pre.context_packet}\n\n{BASE_SYSTEM}"

    # Step 3: Call the LLM
    def llm_caller(prompt: str) -> str:
        resp = client.messages.create(
            model="claude-opus-4-6",
            max_tokens=1024,
            system=system,
            messages=[{"role": "user", "content": prompt}],
        )
        return resp.content[0].text

    result = pipeline.process(
        input_text=user_message,
        model_caller=llm_caller,
        context_data={"user_id": user_id},
    )

    if not result.success:
        return f"Something went wrong: {result.error}"

    # Step 4: Store to memory
    post = agent.post_turn(
        user_message=user_message,
        agent_response=result.output,
        agent_id="MyBot",
        channel_id="discord-main",
    )

    return result.output


# ── Periodic flush (call from your compaction/restart handler) ────────────────

def on_shutdown():
    n = pipeline.flush_to_memory(reason="shutdown")
    print(f"Flushed {n} memories before shutdown")
    summary = pipeline.get_compaction_summary()
    print(summary)


# ── Dry-run health check (call on startup) ────────────────────────────────────

def health_check():
    sim = pipeline.dry_run("Hello, how are you?")
    print("=== Pipeline Health Check ===")
    print(f"Guard: {'✅ allow' if sim['guard_input']['would_allow'] else '⛔ block'}")
    print(f"Memory: {sim['memory']['total_in_store']} entries in store")
    print(f"Router: → {sim['router']['would_select']} (${sim['router']['estimated_cost_usd']:.4f})")
    print(f"Context: {sim['context']['utilization']:.0%} budget utilization")
    print(f"Dry run time: {sim['dry_run_time_ms']:.1f}ms")

🗺️ Architecture Overview

                        antaris-pipeline
                        ┌───────────────────────────────────────────┐
  User Input            │                                           │
       │                │  Phase 1: Guard Input Scan                │
       ▼                │       ↓ (if blocked → return error)       │
  AntarisPipeline       │  Phase 2: Memory Retrieval (BM25)         │
  .process()            │       ↓                                   │
       │                │  Phase 3: Context Building + Optimization │
       │                │       ↓                                   │
       │                │  Phase 4: Smart Routing                   │
       │                │       ↓                                   │
       │                │  Phase 5: Model Execution                 │
       │                │       ↓                                   │
       │                │  Phase 6: Memory Storage                  │
       │                │       ↓                                   │
       │                │  Phase 7: Guard Output Scan               │
       │                │       ↓                                   │
       │                │  Phase 8: Intelligence Updates            │
       │                │                                           │
       ▼                └───────────────────────────────────────────┘
  PipelineResult
  (success, output, events, routing_decision, guard_decisions, ...)

  Sub-packages:
  ┌─────────────────┐  ┌──────────────────┐  ┌─────────────┐  ┌──────────────────┐
  │ antaris-memory  │  │  antaris-router  │  │antaris-guard│  │ antaris-context  │
  │ BM25 retrieval  │  │ Model selection  │  │ Safety scan │  │ Token budgeting  │
  │ Decay scoring   │  │ Provider routing │  │ Threat class│  │ Compression      │
  │ Fact storage    │  │ Fallback chains  │  │ Policy rules│  │ Adaptive budget  │
  └─────────────────┘  └──────────────────┘  └─────────────┘  └──────────────────┘

🔖 Changelog

4.9.20

  • Cross-package intelligence flows (Sprint 3): all 5 flows wired automatically
  • on_session_start() with recency-first + BM25 supplemental strategy
  • flush_to_memory() and get_compaction_summary() for compaction support
  • AgentPipeline with PreTurnResult / PostTurnResult typed returns
  • Full hook system with 14 injection points across 8 phases
  • debug_pipeline() shortcut with full telemetrics enabled
  • set_token_estimator() for custom token counting
  • dry_run() simulation with per-phase prediction fields
  • JSONL telemetry output with session-scoped files

📄 License

Copyright © Antaris Analytics LLC. All rights reserved.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

antaris_pipeline-5.0.1.tar.gz (119.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

antaris_pipeline-5.0.1-py3-none-any.whl (69.4 kB view details)

Uploaded Python 3

File details

Details for the file antaris_pipeline-5.0.1.tar.gz.

File metadata

  • Download URL: antaris_pipeline-5.0.1.tar.gz
  • Upload date:
  • Size: 119.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for antaris_pipeline-5.0.1.tar.gz
Algorithm Hash digest
SHA256 c00b76790f3eb29235c127ebbf724eee72c38c30113aa98fe068020cd5f2ca12
MD5 1981cb127cbda1464feaf8ac117b0b54
BLAKE2b-256 2921ac78de5d79abb181f6c284275b271c521e2a2c36d4bf06a2c5d21c1a0428

See more details on using hashes here.

File details

Details for the file antaris_pipeline-5.0.1-py3-none-any.whl.

File metadata

File hashes

Hashes for antaris_pipeline-5.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6f828661c130473d6c4054d19987afed9f632f908585533e7fa9c656c6c18893
MD5 4c68e910522a0cd30bcb17d21f62e0e9
BLAKE2b-256 397060924d6c5a927d253453f4459f355ca29f939596a9e9d92fd9658fd45713

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page