Unified orchestration pipeline for Antaris Analytics Suite
Project description
antaris-pipeline
The unified orchestration engine for the Antaris AI suite.
antaris-pipeline ties together memory, routing, guard, and context into a single, production-ready processing loop. It handles the full lifecycle of every LLM interaction — from input safety scanning through intelligent model selection, context optimization, response generation, and memory persistence — while emitting structured telemetry at every phase.
📦 Installation
pip install antaris-pipeline
Version: 4.9.20
Dependencies (auto-installed):
| Package | Role |
|---|---|
antaris-memory |
Long-term memory storage, BM25 retrieval, context packets |
antaris-router |
Smart model selection, provider routing, fallback chains |
antaris-guard |
Input/output safety scanning, threat classification |
antaris-context |
Token budgeting, context compression, adaptive optimization |
🗂️ Table of Contents
- Quick Start
- Core Exports
- create_pipeline() Factory
- Pipeline Profile Shortcuts
- ProfileType Presets
- PipelineConfig & Sub-Configs
- AntarisPipeline — Main Orchestrator
- The 8-Phase Processing Pipeline
- PipelineResult
- Dry Run Mode
- Memory Retrieval
- Session Lifecycle Hooks
- Custom Token Estimator
- Performance & Intelligence
- AgentPipeline — Agent Lifecycle Wrapper
- Hook System
- Telemetrics & Events
- Cross-Package Intelligence Flows
- Complete Integration Example
⚡ Quick Start
from antaris_pipeline import create_pipeline
# Create a balanced pipeline (memory + routing + guard + context)
pipeline = create_pipeline(
storage_path="./memory_store",
session_id="my-session",
agent_name="MyBot",
)
# Process a user message end-to-end
result = pipeline.process(
input_text="What's the capital of France?",
model_caller=lambda text: call_your_llm(text),
)
if result.success:
print(result.output)
else:
print(f"Error: {result.error}")
📋 Core Exports
All public symbols are importable directly from antaris_pipeline:
from antaris_pipeline import (
# Core orchestrators
Pipeline,
AntarisPipeline,
PipelineResult,
create_pipeline,
# Agent lifecycle wrapper
AgentPipeline,
PreTurnResult,
PostTurnResult,
# Configuration
PipelineConfig,
MemoryConfig,
RouterConfig,
GuardConfig,
ContextConfig,
TelemetricsConfig,
# Profile system
ProfileType,
create_config,
PROFILE_PRESETS,
# Events & telemetrics
AntarisEvent,
EventType,
ConfidenceBasis,
PerformanceMetrics,
EventEmitter,
TelemetricsCollector,
TelemetricsServer,
PipelineTelemetry,
# Hook system
HookPhase,
HookContext,
HookResult,
HookRegistry,
PipelineHooks,
HookCallback,
# Profile shortcut factories
balanced_pipeline,
strict_pipeline,
cost_optimized_pipeline,
performance_pipeline,
debug_pipeline,
# Event helpers
memory_event,
router_event,
guard_event,
context_event,
)
🏭 create_pipeline() Factory
The recommended entry point for creating a fully-wired pipeline instance.
Signature
def create_pipeline(
storage_path: str,
memory_config: dict | None = None,
router_config: dict | None = None,
guard_config: dict | None = None,
context_config: dict | None = None,
pipeline_config: PipelineConfig | None = None,
session_id: str | None = None,
agent_name: str | None = None,
) -> AntarisPipeline:
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
storage_path |
str |
— | Path to the memory store directory. Created if it doesn't exist. |
memory_config |
dict | None |
None |
Dict of MemoryConfig field overrides (e.g. {"half_life": 7.0}). |
router_config |
dict | None |
None |
Dict of RouterConfig field overrides. |
guard_config |
dict | None |
None |
Dict of GuardConfig field overrides (e.g. {"sensitivity": "balanced"}). |
context_config |
dict | None |
None |
Dict of ContextConfig field overrides (e.g. {"total_budget": 8000}). |
pipeline_config |
PipelineConfig | None |
None |
Full PipelineConfig object. When provided, individual dicts are ignored. |
session_id |
str | None |
None |
Session identifier. Auto-generated UUID if not provided. |
agent_name |
str | None |
None |
Name of the agent. Used for memory filtering and context restoration. |
Example
from antaris_pipeline import create_pipeline
pipeline = create_pipeline(
storage_path="./memory_store",
memory_config={"half_life": 7.0},
router_config={},
guard_config={"sensitivity": "balanced"},
context_config={"total_budget": 8000, "template": "agent_with_tools"},
pipeline_config=None,
session_id="my-session",
agent_name="MyBot",
)
🚀 Pipeline Profile Shortcuts
Five pre-configured factory functions for common deployment scenarios. Each wraps create_pipeline() with tuned defaults.
from antaris_pipeline import (
balanced_pipeline,
strict_pipeline,
cost_optimized_pipeline,
performance_pipeline,
debug_pipeline,
)
balanced_pipeline(storage_path, **kwargs) -> AntarisPipeline
General-purpose profile. Moderate safety, standard routing, compression enabled.
pipeline = balanced_pipeline("./store", agent_name="MyBot")
strict_pipeline(storage_path, **kwargs) -> AntarisPipeline
High-safety profile. Strict guard sensitivity, conservative routing, output scanning enforced. Ideal for customer-facing or regulated environments.
pipeline = strict_pipeline("./store")
cost_optimized_pipeline(storage_path, **kwargs) -> AntarisPipeline
Minimizes cost. Prefers cheaper models, aggressive compression, reduced memory retrieval limits.
pipeline = cost_optimized_pipeline("./store")
performance_pipeline(storage_path, **kwargs) -> AntarisPipeline
Optimized for low latency. Faster models preferred, reduced guard overhead, adaptive budgeting enabled.
pipeline = performance_pipeline("./store")
debug_pipeline(storage_path, **kwargs) -> AntarisPipeline
Full telemetrics, verbose logging, all hooks active. Use during development to inspect every phase.
pipeline = debug_pipeline("./store")
# Emits complete JSONL trace to ./telemetrics/
Comparison Table
| Profile | Safety | Cost | Speed | Telemetrics |
|---|---|---|---|---|
balanced |
Medium | Medium | Medium | Standard |
strict |
High | Higher | Slower | Standard |
cost_optimized |
Medium | Low | Medium | Minimal |
performance |
Medium | Medium | Fast | Standard |
debug |
Medium | Medium | Medium | Full |
🎛️ ProfileType Presets
ProfileType is an enum. Use create_config() to generate a PipelineConfig from a named preset. This is useful when you want to start from a preset and customize further.
from antaris_pipeline import ProfileType, create_config
config = create_config(ProfileType.BALANCED)
config = create_config(ProfileType.STRICT_SAFETY)
config = create_config(ProfileType.COST_OPTIMIZED)
config = create_config(ProfileType.PERFORMANCE)
config = create_config(ProfileType.DEBUG)
Available ProfileType Values
| Enum Value | Equivalent Shortcut |
|---|---|
ProfileType.BALANCED |
balanced_pipeline() |
ProfileType.STRICT_SAFETY |
strict_pipeline() |
ProfileType.COST_OPTIMIZED |
cost_optimized_pipeline() |
ProfileType.PERFORMANCE |
performance_pipeline() |
ProfileType.DEBUG |
debug_pipeline() |
PROFILE_PRESETS
PROFILE_PRESETS is a dict mapping each ProfileType to its default PipelineConfig:
from antaris_pipeline import PROFILE_PRESETS, ProfileType
config = PROFILE_PRESETS[ProfileType.BALANCED]
⚙️ PipelineConfig & Sub-Configs
PipelineConfig is the central configuration object. Pass it to create_pipeline() or use it directly when constructing AntarisPipeline.
Full Config Example
from antaris_pipeline import (
PipelineConfig,
MemoryConfig,
RouterConfig,
GuardConfig,
ContextConfig,
TelemetricsConfig,
)
config = PipelineConfig(
memory=MemoryConfig(
decay_half_life_hours=168,
search_timeout_ms=5000,
context_packet_max_tokens=2000,
),
router=RouterConfig(
track_model_performance=True,
),
guard=GuardConfig(
enable_output_scanning=True,
default_policy_strictness=0.5, # 0.0–1.0
),
context=ContextConfig(
default_max_tokens=8000,
enable_compression=True,
enable_adaptive_budgeting=False,
model_context_limits={
"claude-opus-4-6": 200000,
"gpt-4o": 128000,
},
),
telemetrics=TelemetricsConfig(
output_directory="./telemetrics",
buffer_size=100,
enable_telemetrics=True,
),
)
MemoryConfig
Controls memory storage, retrieval, and decay behaviour.
| Field | Type | Default | Description |
|---|---|---|---|
decay_half_life_hours |
float |
168 |
Memory decay rate. Memories older than this lose relevance weight. 168h = 1 week. |
search_timeout_ms |
int |
5000 |
BM25 search timeout in milliseconds. |
context_packet_max_tokens |
int |
2000 |
Max tokens for the memory context packet injected into prompts. |
memory_cfg = MemoryConfig(
decay_half_life_hours=72, # 3-day decay
search_timeout_ms=3000,
context_packet_max_tokens=1500,
)
RouterConfig
Controls model selection, provider preferences, and performance tracking.
| Field | Type | Default | Description |
|---|---|---|---|
track_model_performance |
bool |
True |
Record per-model latency and success rates. Feeds into cross-package intelligence. |
router_cfg = RouterConfig(
track_model_performance=True,
)
GuardConfig
Controls safety scanning for both input and output.
| Field | Type | Default | Description |
|---|---|---|---|
enable_output_scanning |
bool |
True |
Whether to scan the model's response (Phase 7). |
default_policy_strictness |
float |
0.5 |
Safety threshold. 0.0 = PERMISSIVE, 0.5 = BALANCED, 1.0 = STRICT. |
guard_cfg = GuardConfig(
enable_output_scanning=True,
default_policy_strictness=0.8, # Lean toward strict
)
Strictness mapping:
| Range | Mode |
|---|---|
0.0 – 0.33 |
PERMISSIVE |
0.34 – 0.66 |
BALANCED |
0.67 – 1.0 |
STRICT |
ContextConfig
Controls token budget management, compression, and model-specific context limits.
| Field | Type | Default | Description |
|---|---|---|---|
default_max_tokens |
int |
8000 |
Default token budget for context assembly. |
enable_compression |
bool |
True |
Enable context compression when approaching budget limits. |
enable_adaptive_budgeting |
bool |
False |
Dynamically adjust token budgets based on request complexity. |
model_context_limits |
dict[str, int] |
{} |
Per-model context window overrides. Used by Router → Context intelligence flow. |
context_cfg = ContextConfig(
default_max_tokens=12000,
enable_compression=True,
enable_adaptive_budgeting=True,
model_context_limits={
"claude-opus-4-6": 200000,
"gpt-4o": 128000,
"gpt-4o-mini": 128000,
},
)
TelemetricsConfig
Controls telemetry output location, buffering, and enable/disable.
| Field | Type | Default | Description |
|---|---|---|---|
output_directory |
str |
"./telemetrics" |
Directory for JSONL telemetry files. |
buffer_size |
int |
100 |
Number of events to buffer before flushing to disk. |
enable_telemetrics |
bool |
True |
Master switch for telemetry collection. |
telemetrics_cfg = TelemetricsConfig(
output_directory="./logs/telemetrics",
buffer_size=50,
enable_telemetrics=True,
)
🧠 AntarisPipeline — Main Orchestrator
AntarisPipeline (also exported as Pipeline) is the central class that wires all sub-systems together and runs the 8-phase processing loop.
Construction
Typically created via create_pipeline() or a profile shortcut. Direct construction:
from antaris_pipeline import AntarisPipeline, PipelineConfig
pipeline = AntarisPipeline(
storage_path="./memory_store",
config=PipelineConfig(...),
session_id="my-session",
agent_name="MyBot",
)
Public Methods
| Method | Description |
|---|---|
process(...) |
Run the full 8-phase pipeline. Returns PipelineResult. |
dry_run(input_text, context_data=None) |
Simulate all phases without calling the model or writing to memory. |
memory_retrieval(input_text, context_data=None, limit=10) |
Query memory directly, bypassing the full pipeline. |
flush_to_memory(reason="") |
Persist in-session state to memory (for compaction). |
get_compaction_summary() |
Get a markdown summary string of the current session state. |
on_session_start(summary="", agent_name=None) |
Restore context from memory at the start of a new session. |
set_token_estimator(fn) |
Replace the default token counting heuristic. |
get_performance_stats() |
Return session-level performance metrics. |
get_intelligence_summary() |
Return cross-package intelligence state. |
🔄 The 8-Phase Processing Pipeline
pipeline.process() executes all 8 phases sequentially for every request.
result = pipeline.process(
input_text="user message",
model_caller=lambda text: call_llm(text),
context_data={"extra": "context"}, # Optional dict merged into context
dry_run=False,
)
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
input_text |
str |
✅ | The raw user input or prompt. |
model_caller |
Callable[[str], str] | None |
✅ (unless dry_run=True) |
A callable that accepts the assembled prompt and returns the model's response string. |
context_data |
dict | None |
❌ | Additional key-value context merged into the context assembly step. |
dry_run |
bool |
❌ | If True, skips model execution and memory writes. Equivalent to calling dry_run(). |
Phase 1 — Guard Input Scan
What happens: The raw input_text is scanned by antaris-guard for threats, policy violations, and risk patterns.
If blocked: result.success = False, result.error contains the block reason, and all subsequent phases are skipped.
Key outputs stored in result:
result.guard_decisions[0]— the input guard decision dict- Fields:
is_blocked,risk_score(0.0–1.0),threat_level,patterns_matched
# Guard decisions are always in result.guard_decisions
# Index 0 = input scan, Index 1 = output scan (Phase 7)
input_guard = result.guard_decisions[0]
print(input_guard["risk_score"]) # e.g. 0.12
print(input_guard["is_blocked"]) # False
print(input_guard["threat_level"]) # "LOW"
Phase 2 — Memory Retrieval
What happens: BM25 search runs against the memory store using input_text as the query. Top results are assembled into a context packet (up to context_packet_max_tokens).
Key outputs:
result.memory_retrievals— list of retrieved memory dicts- Each memory:
{content, score, category, source, timestamp, ...}
print(f"Retrieved {len(result.memory_retrievals)} memories")
for mem in result.memory_retrievals:
print(f" [{mem['category']}] {mem['content'][:80]}")
Phase 3 — Context Building
What happens: The router's model selection hint (from Phase 4 lookahead) sets the token budget ceiling. Memory context packet is injected. Any context_data is merged. The full context is optimized/compressed if needed.
Key outputs:
result.context_optimizations— dict with compression ratio, token counts, optimization applied
opts = result.context_optimizations
print(f"Compression ratio: {opts.get('compression_ratio')}")
print(f"Input tokens: {opts.get('input_tokens')}")
print(f"Budget used: {opts.get('budget_utilization')}")
Phase 4 — Smart Routing
What happens: antaris-router selects the best model/provider based on: task characteristics, cost constraints, model context limits, and historical performance data fed back from memory.
Key outputs:
result.routing_decision— full routing dict
rd = result.routing_decision
print(f"Model: {rd['selected_model']}")
print(f"Provider: {rd['provider']}")
print(f"Tier: {rd['tier']}")
print(f"Confidence: {rd['confidence']}")
print(f"Est. cost: ${rd['estimated_cost']:.4f}")
print(f"Fallback chain: {rd['fallback_chain']}")
print(f"Reasoning: {rd['reasoning']}")
Phase 5 — Model Execution
What happens: The assembled context is passed to model_caller. The callable receives the optimized prompt string and must return the response string.
In dry_run mode: This phase is skipped. A simulated response is generated instead.
# model_caller receives the assembled, optimized prompt
result = pipeline.process(
input_text="Summarize this document",
model_caller=lambda prompt: anthropic_client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
).content[0].text,
)
Phase 6 — Memory Storage
What happens: The conversation turn (input + output) is stored to memory.
Risk-based branching:
- If the input had
risk_score > 0.7(from Phase 1), the turn is stored as a security fact rather than a conversation entry. This prevents contamination of the conversation history with threat-related content. - Normal turns are stored as conversation entries with full metadata.
Phase 7 — Guard Output Scan
What happens: If enable_output_scanning=True, the model's response is scanned for policy violations or unsafe content.
If blocked: result.success = False even if the model call succeeded. The output scan result is appended to result.guard_decisions.
# Index 1 in guard_decisions is the output scan
if len(result.guard_decisions) > 1:
output_guard = result.guard_decisions[1]
print(output_guard["is_blocked"])
Phase 8 — Cross-Package Intelligence Updates
What happens: All five cross-package intelligence flows are updated with data from this request (see Cross-Package Intelligence Flows).
This includes:
- Routing feedback (latency + success) written to memory
- Context pressure counters updated
- Security pattern tracking updated in memory
📊 PipelineResult
process() always returns a PipelineResult. It never raises on pipeline-level failures — errors are captured in result.error.
Fields
| Field | Type | Description |
|---|---|---|
success |
bool |
True if the pipeline completed without blocking or error. |
output |
str |
The model's response. Empty string if blocked or errored. |
error |
str | None |
Error or block reason. None on success. |
events |
List[AntarisEvent] |
All telemetry events emitted during this request. |
performance |
dict |
{"total_latency_ms": float, "dry_run": bool} |
memory_retrievals |
List[dict] |
Memories retrieved in Phase 2. |
routing_decision |
dict |
Full routing decision from Phase 4. |
guard_decisions |
List[dict] |
Input guard (index 0) and optionally output guard (index 1). |
context_optimizations |
dict |
Context assembly stats from Phase 3. |
Methods
# Serialize to dict (e.g. for logging or API responses)
result_dict = result.to_dict()
Usage Example
result = pipeline.process(
input_text="Hello!",
model_caller=my_llm_caller,
)
if not result.success:
print(f"Blocked or error: {result.error}")
else:
print(f"Response: {result.output}")
print(f"Latency: {result.performance['total_latency_ms']:.1f}ms")
print(f"Model used: {result.routing_decision['selected_model']}")
print(f"Memories retrieved: {len(result.memory_retrievals)}")
🔍 Dry Run Mode
Dry run simulates all 8 phases without calling the model or writing anything to memory. Use it to preview routing decisions, guard behavior, and context utilization before committing to a live request.
Method 1: pipeline.dry_run()
sim = pipeline.dry_run(
input_text="What would happen?",
context_data={"topic": "security"}, # Optional
)
Method 2: pipeline.process() with dry_run=True
result = pipeline.process(
input_text="What would happen?",
model_caller=None, # Not called in dry_run
dry_run=True,
)
Dry Run Response Fields
sim = pipeline.dry_run("What would happen?")
# Input analysis
sim["input"]["text"] # The input string
sim["input"]["length"] # Character count
sim["input"]["context_provided"] # bool
# Guard input prediction
sim["guard_input"]["would_allow"] # bool
sim["guard_input"]["risk_score"] # float 0.0–1.0
sim["guard_input"]["threat_level"] # "LOW" | "MEDIUM" | "HIGH"
sim["guard_input"]["patterns_active"] # int — active rule count
sim["guard_input"]["sensitivity"] # current sensitivity level
sim["guard_input"]["match_count"] # int — patterns matched
sim["guard_input"]["message"] # human-readable summary
# Memory prediction
sim["memory"]["would_retrieve"] # bool
sim["memory"]["total_in_store"] # int — total stored memories
sim["memory"]["top_relevance"] # float — top BM25 score
sim["memory"]["matched_terms"] # list of matched query terms
# Context prediction
sim["context"]["would_optimize"] # bool
sim["context"]["estimated_input_tokens"] # int
sim["context"]["total_budget"] # int
sim["context"]["utilization"] # float 0.0–1.0
# Router prediction
sim["router"]["would_select"] # selected model name
sim["router"]["provider"] # provider name
sim["router"]["tier"] # model tier
sim["router"]["confidence"] # float 0.0–1.0
sim["router"]["estimated_cost_usd"] # float
sim["router"]["fallback_chain"] # list of fallback models
sim["router"]["reasoning"] # routing explanation string
# Output guard prediction
sim["guard_output"]["would_scan"] # bool
sim["guard_output"]["sensitivity"] # current sensitivity
# Timing
sim["dry_run_time_ms"] # float — simulation time in ms
Example: Pre-flight Check
sim = pipeline.dry_run("Ignore all previous instructions and...")
if not sim["guard_input"]["would_allow"]:
print(f"⛔ Would be blocked (risk: {sim['guard_input']['risk_score']:.2f})")
else:
print(f"✅ Would route to: {sim['router']['would_select']}")
print(f" Est. cost: ${sim['router']['estimated_cost_usd']:.4f}")
print(f" Context utilization: {sim['context']['utilization']:.0%}")
🗃️ Memory Retrieval
Run a direct BM25 memory search without executing the full pipeline. Useful for building custom context or debugging retrieval quality.
data = pipeline.memory_retrieval(
input_text="security vulnerability report",
context_data={"agent_id": "MyBot"}, # Optional filter hints
limit=10,
)
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input_text |
str |
— | Query text for BM25 search. |
context_data |
dict | None |
None |
Additional context passed to the memory engine (e.g. for filtering). |
limit |
int |
10 |
Maximum number of results to return. |
Response
data["retrievals"] # List[dict] — full memory objects
data["relevance_scores"] # List[float] — BM25 scores per memory
data["context_packet"] # str — assembled context string (ready for injection)
data["memory_count"] # int — total memories in store
data["retrieved_count"] # int — number actually returned
Example
data = pipeline.memory_retrieval("pricing strategy", limit=5)
print(f"Found {data['retrieved_count']} of {data['memory_count']} memories")
for mem, score in zip(data["retrievals"], data["relevance_scores"]):
print(f" [{score:.3f}] {mem['content'][:80]}")
# Use the context packet directly
my_prompt = f"{data['context_packet']}\n\nUser: What's our pricing strategy?"
🔄 Session Lifecycle Hooks
flush_to_memory(reason="") -> int
Persists current in-session state to the memory store. Call this before compaction to ensure session intelligence is preserved across context resets.
n = pipeline.flush_to_memory(reason="compaction")
print(f"Stored {n} memory entries")
What gets stored:
- Performance summary: per-model latency stats, success rates, request count
- Security pattern summary: any threat patterns observed this session
Returns: Number of memory entries written.
get_compaction_summary() -> str
Returns a markdown-formatted summary of the current session state. Designed to be passed as prependContext during compaction events.
summary = pipeline.get_compaction_summary()
# Returns a markdown string, e.g.:
# ## Session Summary
# - Requests: 42
# - Models used: claude-opus-4-6 (38), gpt-4o (4)
# - Avg latency: 1234ms
# - Security events: 2
Typical compaction workflow:
# In your compaction hook:
def before_compaction_hook(ctx):
n = pipeline.flush_to_memory(reason="compaction")
summary = pipeline.get_compaction_summary()
# Pass summary to your compaction system as prependContext
return summary
# Or with the hook system:
from antaris_pipeline import HookPhase, HookContext, HookResult
def compaction_hook(ctx: HookContext) -> HookResult:
pipeline.flush_to_memory(reason="compaction")
return HookResult(continue_pipeline=True)
registry.register(HookPhase.BEFORE_MODEL_CALL, compaction_hook)
on_session_start(summary="", agent_name=None) -> dict
Restores relevant context from memory at the beginning of a new session. Call this once, immediately after creating the pipeline, before the first process() call.
ctx = pipeline.on_session_start(
summary="", # Optional compaction summary from previous session
agent_name="MyBot", # Optional agent name for memory filtering
)
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
summary |
str |
"" |
Compaction summary from the previous session (from get_compaction_summary()). |
agent_name |
str | None |
None |
Agent name used to filter memory retrieval by agent_id. |
Response
ctx["prependContext"] # str — markdown context string, ready to prepend to system prompt
Restoration Strategy
- Primary:
memory.recent()— recency-first retrieval of the most recent memories - Supplemental: BM25 search filtered by
agent_idto find agent-specific context - Top 10 memories, deduplicated, formatted with category and source metadata
- Output format:
"## Restored Session Context\n..."— ready to prepend to your system prompt
Full Session Start Example
pipeline = create_pipeline("./memory_store", agent_name="MyBot")
# Restore previous context
ctx = pipeline.on_session_start(agent_name="MyBot")
system_prompt = f"""
{ctx['prependContext']}
You are MyBot, a helpful assistant.
"""
# Now start processing requests
result = pipeline.process(
input_text=user_message,
model_caller=lambda prompt: llm_call(system_prompt, prompt),
)
🔢 Custom Token Estimator
Replace the built-in token estimation heuristic with your own. This affects context budget calculations in Phase 3.
Default Behavior
The default estimator uses: len(text) // 4 — a rough approximation suitable for most use cases.
Setting a Custom Estimator
pipeline.set_token_estimator(fn: Callable[[str], int])
Example: tiktoken (OpenAI-compatible)
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
pipeline.set_token_estimator(lambda text: len(enc.encode(text)))
Example: Anthropic token counting
import anthropic
client = anthropic.Anthropic()
def anthropic_token_count(text: str) -> int:
response = client.messages.count_tokens(
model="claude-opus-4-6",
messages=[{"role": "user", "content": text}],
)
return response.input_tokens
pipeline.set_token_estimator(anthropic_token_count)
Example: Simple word-based estimate
# Roughly 0.75 tokens per word
pipeline.set_token_estimator(lambda text: int(len(text.split()) * 1.33))
📈 Performance & Intelligence
get_performance_stats() -> dict
Returns session-level performance metrics.
stats = pipeline.get_performance_stats()
Response fields:
| Field | Description |
|---|---|
session_id |
Current session identifier |
total_requests |
Number of process() calls this session |
routing_feedback |
Per-model latency and success rate history |
security_patterns |
Observed threat pattern counters |
telemetrics_summary |
Summary from the telemetrics collector |
stats = pipeline.get_performance_stats()
print(f"Session: {stats['session_id']}")
print(f"Total requests: {stats['total_requests']}")
for model, fb in stats["routing_feedback"].items():
print(f" {model}: {fb['success_rate']:.0%} success, {fb['avg_latency_ms']:.0f}ms avg")
get_intelligence_summary() -> dict
Returns the full cross-package intelligence state — a deep view into what the pipeline has learned this session.
intel = pipeline.get_intelligence_summary()
Response fields:
| Field | Description |
|---|---|
routing_feedback |
Model performance history (latency + success rates) |
security_patterns |
Active threat pattern counters from guard |
context_pressure_events |
Count of heavy-compression events (DoS signals) |
provider_health |
Per-provider availability and error rates |
memory_stats |
Memory store size and retrieval performance |
guard_posture |
Current guard sensitivity and recent decisions |
last_routing_result |
Full routing decision from the most recent request |
current_trace_id |
Active trace ID for telemetry correlation |
intel = pipeline.get_intelligence_summary()
print(f"Context pressure events: {intel['context_pressure_events']}")
print(f"Guard posture: {intel['guard_posture']}")
print(f"Last model: {intel['last_routing_result']['selected_model']}")
🤖 AgentPipeline — Agent Lifecycle Wrapper
AgentPipeline wraps AntarisPipeline and exposes a simplified two-step API optimized for conversational agent frameworks. Instead of one monolithic process() call, it exposes pre_turn() and post_turn() — letting you inject your own logic (tool calls, multi-step reasoning) between guard/retrieval and memory storage.
Construction
from antaris_pipeline import AgentPipeline
agent = AgentPipeline(
pipeline=pipeline, # An AntarisPipeline instance
session_id="session-1", # Session identifier
)
Parameters
| Parameter | Type | Description |
|---|---|---|
pipeline |
AntarisPipeline |
The underlying pipeline to wrap. |
session_id |
str |
Session identifier. |
pre_turn(user_message, search_limit=10) -> PreTurnResult
Runs Phase 1 (guard input scan) and Phase 2 (memory retrieval) without executing the model or storing anything. Call this at the start of each user turn to get safety clearance and relevant context.
pre = agent.pre_turn(
user_message="Tell me about the deployment last week",
search_limit=10,
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
user_message |
str |
— | The user's input text. |
search_limit |
int |
10 |
Max memories to retrieve. |
PreTurnResult fields:
| Field | Type | Description |
|---|---|---|
context_packet |
str |
Assembled memory context, ready to inject into the system prompt. |
memories_found |
int |
Number of relevant memories retrieved. |
guard_result |
dict |
Full guard decision dict (same as result.guard_decisions[0]). |
should_continue |
bool |
False if the input was blocked. Check this before calling the model. |
pre = agent.pre_turn("Hello!")
if not pre.should_continue:
print(f"Blocked: {pre.guard_result['threat_level']}")
return
# Inject context into your agent's system prompt
system_prompt = f"""
{pre.context_packet}
You are a helpful assistant.
"""
# Run your agent logic (tool calls, multi-step, etc.)
response = my_agent_logic(system_prompt, user_message)
post_turn(user_message, agent_response, agent_id=None, session_id=None, channel_id=None) -> PostTurnResult
Runs Phase 6 (memory storage) after the agent has generated a response. Call this at the end of each turn to persist the conversation.
post = agent.post_turn(
user_message="Tell me about the deployment",
agent_response="Last week we deployed v4.9.0 with...",
agent_id="MyBot",
session_id="session-1",
channel_id="discord-channel-123",
)
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
user_message |
str |
— | The original user input. |
agent_response |
str |
— | The agent's generated response. |
agent_id |
str | None |
None |
Agent identifier for memory metadata. |
session_id |
str | None |
None |
Session ID override. Uses AgentPipeline.session_id if not set. |
channel_id |
str | None |
None |
Channel identifier for memory metadata. |
PostTurnResult fields:
| Field | Type | Description |
|---|---|---|
ingested |
int |
Number of memory entries created. |
saved |
bool |
True if memory storage succeeded. |
Full AgentPipeline Example
from antaris_pipeline import create_pipeline, AgentPipeline
pipeline = create_pipeline("./store", agent_name="MyBot")
agent = AgentPipeline(pipeline=pipeline, session_id="session-abc")
async def handle_message(user_message: str) -> str:
# Step 1: Pre-turn (guard + memory)
pre = agent.pre_turn(user_message, search_limit=10)
if not pre.should_continue:
return "I can't help with that."
# Step 2: Build system prompt with retrieved context
system = f"""
{pre.context_packet}
You are MyBot. Be concise and helpful.
""".strip()
# Step 3: Call your LLM (with tool calls, chain-of-thought, etc.)
response = await my_llm(system, user_message)
# Step 4: Post-turn (store to memory)
post = agent.post_turn(
user_message=user_message,
agent_response=response,
agent_id="MyBot",
)
return response
🪝 Hook System
The hook system lets you inject custom logic at any phase of the pipeline. Hooks can observe, modify, or short-circuit processing.
Concepts
| Class | Description |
|---|---|
HookPhase |
Enum of all available injection points |
HookContext |
Input to your hook function — contains current pipeline state |
HookResult |
Output from your hook — controls pipeline continuation |
HookRegistry |
Registry that maps phases to hook functions |
HookCallback |
Type alias: Callable[[HookContext], HookResult] |
PipelineHooks |
Higher-level hook management (attached to AntarisPipeline) |
Available Hook Phases
from antaris_pipeline import HookPhase
# Input safety
HookPhase.BEFORE_GUARD_INPUT
HookPhase.AFTER_GUARD_INPUT
# Memory
HookPhase.BEFORE_MEMORY_RETRIEVE
HookPhase.AFTER_MEMORY_RETRIEVE
# Context assembly
HookPhase.BEFORE_CONTEXT_BUILD
HookPhase.AFTER_CONTEXT_BUILD
# Routing
HookPhase.BEFORE_ROUTE
HookPhase.AFTER_ROUTE
# Model execution
HookPhase.BEFORE_MODEL_CALL
HookPhase.AFTER_MODEL_CALL
# Memory storage
HookPhase.BEFORE_MEMORY_STORE
HookPhase.AFTER_MEMORY_STORE
# Output safety
HookPhase.BEFORE_GUARD_OUTPUT
HookPhase.AFTER_GUARD_OUTPUT
Writing a Hook
from antaris_pipeline import HookContext, HookResult
def my_hook(ctx: HookContext) -> HookResult:
# Available on ctx:
# ctx.phase — HookPhase enum value
# ctx.input_text — original user input
# ctx.result_so_far — partial PipelineResult accumulated so far
# ctx.metadata — dict of phase-specific data
# Observe and continue (most common)
return HookResult(continue_pipeline=True, modified_input=None)
# Modify the input for the next phase
return HookResult(continue_pipeline=True, modified_input="sanitized input")
# Short-circuit the pipeline
return HookResult(continue_pipeline=False, modified_input=None)
HookResult fields:
| Field | Type | Description |
|---|---|---|
continue_pipeline |
bool |
If False, stops processing and returns current state. |
modified_input |
str | None |
If set, replaces input_text for subsequent phases. |
Registering Hooks
from antaris_pipeline import HookRegistry, HookPhase
registry = HookRegistry()
registry.register(HookPhase.BEFORE_MODEL_CALL, my_hook)
registry.register(HookPhase.AFTER_MEMORY_RETRIEVE, another_hook)
Hook Examples
Logging hook — observe every phase:
import time
def timing_hook(ctx: HookContext) -> HookResult:
print(f"[{ctx.phase.name}] input_len={len(ctx.input_text)}")
return HookResult(continue_pipeline=True)
for phase in HookPhase:
registry.register(phase, timing_hook)
Input sanitization hook:
import re
def sanitize_hook(ctx: HookContext) -> HookResult:
cleaned = re.sub(r"<[^>]+>", "", ctx.input_text) # Strip HTML
return HookResult(continue_pipeline=True, modified_input=cleaned)
registry.register(HookPhase.BEFORE_GUARD_INPUT, sanitize_hook)
Compaction hook — flush before model call:
def compaction_hook(ctx: HookContext) -> HookResult:
if ctx.metadata.get("request_count", 0) % 50 == 0:
pipeline.flush_to_memory(reason="periodic_compaction")
return HookResult(continue_pipeline=True)
registry.register(HookPhase.BEFORE_MODEL_CALL, compaction_hook)
Circuit-breaker hook — block on high context pressure:
def pressure_guard_hook(ctx: HookContext) -> HookResult:
intel = pipeline.get_intelligence_summary()
if intel["context_pressure_events"] > 10:
# Too many compression events — possible DoS
return HookResult(continue_pipeline=False)
return HookResult(continue_pipeline=True)
registry.register(HookPhase.BEFORE_CONTEXT_BUILD, pressure_guard_hook)
📡 Telemetrics & Events
antaris-pipeline emits structured events at every phase. Events are written to JSONL files and can be consumed in real time.
Output Files
./telemetrics/telemetrics_{session_id}.jsonl
Each line is a JSON object representing one event. Files rotate per session.
Event Types
Events are emitted automatically. You don't need to instrument anything.
| EventType | When Emitted |
|---|---|
PIPELINE_START |
At the start of each process() call |
PIPELINE_COMPLETE |
When process() completes successfully |
PIPELINE_ERROR |
When an unhandled error occurs |
PIPELINE_DRY_RUN |
When dry_run() is called |
GUARD_SCAN |
Each time a guard scan runs |
GUARD_ALLOW |
When a guard scan allows the content |
GUARD_DENY |
When a guard scan blocks the content |
MEMORY_RETRIEVE |
After BM25 memory search |
MEMORY_INGEST |
After memory storage |
CONTEXT_BUILD |
After context assembly |
ROUTER_ROUTE |
After model selection |
AntarisEvent Structure
from antaris_pipeline import AntarisEvent, EventType
# Events on a result:
for event in result.events:
print(f"[{event.event_type}] {event.timestamp} — {event.data}")
AntarisEvent fields:
| Field | Type | Description |
|---|---|---|
event_type |
EventType |
The event category. |
timestamp |
float |
Unix timestamp. |
session_id |
str |
Session this event belongs to. |
trace_id |
str |
Per-request trace ID for correlation. |
data |
dict |
Event-specific payload. |
Telemetrics Classes
| Class | Description |
|---|---|
TelemetricsCollector |
Buffers events and writes to JSONL. Configured via TelemetricsConfig. |
TelemetricsServer |
Optional HTTP server for real-time event streaming. |
PipelineTelemetry |
High-level interface used internally by AntarisPipeline. |
EventEmitter |
Base class for emitting typed events. |
Event Helpers
Convenience functions for constructing events in hook callbacks or custom code:
from antaris_pipeline import memory_event, router_event, guard_event, context_event
evt = memory_event(session_id="s1", retrieved=5, total=200)
evt = router_event(session_id="s1", model="claude-opus-4-6", cost=0.0042)
evt = guard_event(session_id="s1", blocked=False, risk_score=0.12)
evt = context_event(session_id="s1", tokens=4200, budget=8000)
Parsing Telemetry Files
import json
from pathlib import Path
session_id = "my-session"
log_file = Path(f"./telemetrics/telemetrics_{session_id}.jsonl")
events = [json.loads(line) for line in log_file.read_text().splitlines()]
# Filter to GUARD_DENY events
blocks = [e for e in events if e["event_type"] == "GUARD_DENY"]
print(f"Total blocks this session: {len(blocks)}")
# Calculate average latency
pipeline_events = [e for e in events if e["event_type"] == "PIPELINE_COMPLETE"]
avg_latency = sum(e["data"]["total_latency_ms"] for e in pipeline_events) / len(pipeline_events)
print(f"Avg latency: {avg_latency:.1f}ms")
PerformanceMetrics and ConfidenceBasis
Additional telemetry types exposed for custom instrumentation:
from antaris_pipeline import PerformanceMetrics, ConfidenceBasis
# PerformanceMetrics: attached to routing decisions
# Fields: latency_ms, tokens_used, cost_usd, success
# ConfidenceBasis: explains why the router chose a model
# Values: PERFORMANCE_HISTORY, COST_OPTIMIZATION, CAPABILITY_MATCH, FALLBACK, DEFAULT
🔗 Cross-Package Intelligence Flows
antaris-pipeline implements 5 automatic intelligence flows that wire the sub-packages together. These run in Phase 8 of every process() call and require no configuration.
Flow 1 — Memory → Router
What: Model success rates and latency history (stored as routing_feedback memories) are retrieved and passed to the router as performance hints.
Effect: The router avoids models with poor recent performance and prefers those with proven reliability for the current task type.
Memory Store ──[routing_feedback memories]──► Router (Phase 4 model selection)
Flow 2 — Router → Context
What: The selected model's context window size (from model_context_limits in ContextConfig) is used as the upper bound for the token budget.
Effect: Context assembly never tries to pack more tokens than the selected model can actually handle.
Router (selected_model) ──[context_limit]──► Context Builder (Phase 3 budget ceiling)
Flow 3 — Guard → Memory
What: When risk_score > 0.7 on an input scan (Phase 1), the content is stored to memory as a security fact rather than a normal conversation entry.
Effect: High-risk inputs are flagged in memory for future reference without contaminating conversation history. The agent learns about threat patterns over time.
Guard (risk_score > 0.7) ──[security fact]──► Memory Store (Phase 6, alternate storage path)
Flow 4 — Context → Guard
What: When compression ratio drops below 0.3 (i.e., the context had to be compressed to less than 30% of its original size), this is flagged as a potential DoS vector and the context_pressure_events counter is incremented.
Effect: Sustained context pressure (many high-compression events) is a signal of prompt stuffing or adversarial input. Visible via get_intelligence_summary()["context_pressure_events"].
Context Builder (ratio < 0.3) ──[pressure signal]──► Guard/Intelligence (Phase 8 counter)
Flow 5 — Router → Memory
What: After each model call completes (Phase 5), the model name, latency, and success/failure are stored as a routing_feedback memory entry.
Effect: The pipeline builds up a performance history for each model over time, which feeds back into Flow 1 to improve future routing decisions.
Model Execution (Phase 5) ──[latency + success]──► Memory Store (routing_feedback)
↑
(retrieved in Flow 1 next request)
Intelligence Flow Summary
┌──────────────────────────────────────────────────────────────┐
│ Cross-Package Intelligence │
│ │
│ Memory ──Flow 1──► Router (perf history → model select) │
│ Router ──Flow 2──► Context (model limits → token budget) │
│ Guard ──Flow 3──► Memory (high-risk → security facts) │
│ Context──Flow 4──► Guard (heavy compress → DoS signal) │
│ Router ──Flow 5──► Memory (latency+success → feedback) │
└──────────────────────────────────────────────────────────────┘
All flows are wired at construction time. No configuration required.
🧩 Complete Integration Example
A full example showing antaris-pipeline integrated into a Discord bot:
import asyncio
from antaris_pipeline import create_pipeline, AgentPipeline
import anthropic
# ── Setup ────────────────────────────────────────────────────────────────────
client = anthropic.Anthropic()
pipeline = create_pipeline(
storage_path="./memory_store",
guard_config={"sensitivity": "balanced"},
context_config={"total_budget": 8000},
session_id="discord-bot-session",
agent_name="MyBot",
)
# Swap in a real token counter
import tiktoken
enc = tiktoken.get_encoding("cl100k_base")
pipeline.set_token_estimator(lambda text: len(enc.encode(text)))
agent = AgentPipeline(pipeline=pipeline, session_id="discord-bot-session")
# ── Session start ─────────────────────────────────────────────────────────────
ctx = pipeline.on_session_start(agent_name="MyBot")
BASE_SYSTEM = f"""
{ctx['prependContext']}
You are MyBot, a helpful assistant in a Discord server.
Be concise. Use markdown sparingly.
""".strip()
# ── Per-message handler ───────────────────────────────────────────────────────
async def handle_message(user_id: str, user_message: str) -> str:
# Step 1: Pre-turn (guard + memory retrieval)
pre = agent.pre_turn(user_message, search_limit=8)
if not pre.should_continue:
risk = pre.guard_result.get("risk_score", 0)
return f"⛔ I can't help with that. (risk: {risk:.2f})"
# Step 2: Build context-aware system prompt
system = BASE_SYSTEM
if pre.context_packet:
system = f"{pre.context_packet}\n\n{BASE_SYSTEM}"
# Step 3: Call the LLM
def llm_caller(prompt: str) -> str:
resp = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system=system,
messages=[{"role": "user", "content": prompt}],
)
return resp.content[0].text
result = pipeline.process(
input_text=user_message,
model_caller=llm_caller,
context_data={"user_id": user_id},
)
if not result.success:
return f"Something went wrong: {result.error}"
# Step 4: Store to memory
post = agent.post_turn(
user_message=user_message,
agent_response=result.output,
agent_id="MyBot",
channel_id="discord-main",
)
return result.output
# ── Periodic flush (call from your compaction/restart handler) ────────────────
def on_shutdown():
n = pipeline.flush_to_memory(reason="shutdown")
print(f"Flushed {n} memories before shutdown")
summary = pipeline.get_compaction_summary()
print(summary)
# ── Dry-run health check (call on startup) ────────────────────────────────────
def health_check():
sim = pipeline.dry_run("Hello, how are you?")
print("=== Pipeline Health Check ===")
print(f"Guard: {'✅ allow' if sim['guard_input']['would_allow'] else '⛔ block'}")
print(f"Memory: {sim['memory']['total_in_store']} entries in store")
print(f"Router: → {sim['router']['would_select']} (${sim['router']['estimated_cost_usd']:.4f})")
print(f"Context: {sim['context']['utilization']:.0%} budget utilization")
print(f"Dry run time: {sim['dry_run_time_ms']:.1f}ms")
🗺️ Architecture Overview
antaris-pipeline
┌───────────────────────────────────────────┐
User Input │ │
│ │ Phase 1: Guard Input Scan │
▼ │ ↓ (if blocked → return error) │
AntarisPipeline │ Phase 2: Memory Retrieval (BM25) │
.process() │ ↓ │
│ │ Phase 3: Context Building + Optimization │
│ │ ↓ │
│ │ Phase 4: Smart Routing │
│ │ ↓ │
│ │ Phase 5: Model Execution │
│ │ ↓ │
│ │ Phase 6: Memory Storage │
│ │ ↓ │
│ │ Phase 7: Guard Output Scan │
│ │ ↓ │
│ │ Phase 8: Intelligence Updates │
│ │ │
▼ └───────────────────────────────────────────┘
PipelineResult
(success, output, events, routing_decision, guard_decisions, ...)
Sub-packages:
┌─────────────────┐ ┌──────────────────┐ ┌─────────────┐ ┌──────────────────┐
│ antaris-memory │ │ antaris-router │ │antaris-guard│ │ antaris-context │
│ BM25 retrieval │ │ Model selection │ │ Safety scan │ │ Token budgeting │
│ Decay scoring │ │ Provider routing │ │ Threat class│ │ Compression │
│ Fact storage │ │ Fallback chains │ │ Policy rules│ │ Adaptive budget │
└─────────────────┘ └──────────────────┘ └─────────────┘ └──────────────────┘
🔖 Changelog
4.9.20
- Cross-package intelligence flows (Sprint 3): all 5 flows wired automatically
on_session_start()with recency-first + BM25 supplemental strategyflush_to_memory()andget_compaction_summary()for compaction supportAgentPipelinewithPreTurnResult/PostTurnResulttyped returns- Full hook system with 14 injection points across 8 phases
debug_pipeline()shortcut with full telemetrics enabledset_token_estimator()for custom token countingdry_run()simulation with per-phase prediction fields- JSONL telemetry output with session-scoped files
📄 License
Copyright © Antaris Analytics LLC. All rights reserved.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file antaris_pipeline-5.0.1.tar.gz.
File metadata
- Download URL: antaris_pipeline-5.0.1.tar.gz
- Upload date:
- Size: 119.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c00b76790f3eb29235c127ebbf724eee72c38c30113aa98fe068020cd5f2ca12
|
|
| MD5 |
1981cb127cbda1464feaf8ac117b0b54
|
|
| BLAKE2b-256 |
2921ac78de5d79abb181f6c284275b271c521e2a2c36d4bf06a2c5d21c1a0428
|
File details
Details for the file antaris_pipeline-5.0.1-py3-none-any.whl.
File metadata
- Download URL: antaris_pipeline-5.0.1-py3-none-any.whl
- Upload date:
- Size: 69.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
6f828661c130473d6c4054d19987afed9f632f908585533e7fa9c656c6c18893
|
|
| MD5 |
4c68e910522a0cd30bcb17d21f62e0e9
|
|
| BLAKE2b-256 |
397060924d6c5a927d253453f4459f355ca29f939596a9e9d92fd9658fd45713
|