A comprehensive LLM agent framework with streaming support, persistent context, multi-agent orchestration, and DAG-based workflow execution
Project description
Aegeantic
A framework for building LLM agents with versioned context, persistent storage, streaming execution, and multi-agent orchestration.
Features
- Streaming-First: Real-time event streams for LLM generation, pattern detection, and tool execution
- 21 Event Types: Complete observability across LLM, tools, patterns, validation, retry, rate limiting, context health, and graph orchestration
- Structured Logging: Built-in logger with contextual metadata for debugging and monitoring
- Persistent Context: Versioned key-value store backed by RocksDB with automatic timestamps and iteration tracking
- Pattern Extraction: Regex-based extraction with streaming support, incremental detection, and malformed pattern handling
- Multi-Mode Tools: Execute tools in PROCESS, THREAD, or ASYNC modes with timeout handling and streaming output
- Validation System: Extensible validation registry supporting simple validators, JSON Schema, or custom formats
- Resilience: Retry logic with exponential backoff and token bucket rate limiting
- Tool Verification: Human-in-the-loop tool approval with timeout handling and rejection tracking
- Multi-Agent Patterns: Chain, Supervisor-Worker, Parallel, and Debate patterns for orchestrating multiple agents
- Graph Orchestration: DAG-based workflow execution with dynamic scheduling, failure strategies, and cycle detection
- Flexible Storage: RocksDB for persistence or in-memory storage for testing and ephemeral use
- Flexible Logic Flows: Conditional loops with pattern-based, regex-based, or context-based conditions
- Type-Safe: Full type hints throughout
Installation
From PyPI (Recommended)
pip install aegeantic
For Development
git clone https://github.com/LOGQS/agentic_framework.git
cd agentic_framework
pip install -e .
Requirements: Python 3.9+ and rocksdict >= 0.3.0
Quick Start
from agentic import (
StorageConfig, RocksDBStorage, InMemoryStorage, IterationManager, ContextManager,
PatternRegistry, create_default_pattern_set,
ToolRegistry, create_tool,
AgentConfig, Agent, AgentRunner,
ProcessingMode, create_message_prompt_builder
)
# Initialize storage and context
# Option 1: RocksDB for persistent storage
config = StorageConfig(base_dir="./context", db_name_prefix="my_agent")
storage = RocksDBStorage(config)
storage.initialize()
# Option 2: In-memory storage for testing or ephemeral use
# storage = InMemoryStorage()
# storage.initialize()
iteration = IterationManager(storage)
context = ContextManager(storage, iteration)
# Register patterns
patterns = PatternRegistry(storage)
patterns.register_pattern_set(create_default_pattern_set())
# Register tools
tools = ToolRegistry()
def search_web(inputs: dict) -> dict:
return {"results": f"Search results for: {inputs.get('query', '')}"}
tools.register(create_tool(
name="search_web",
func=search_web,
input_schema={
"validator": "simple",
"required": ["query"],
"fields": {
"query": {"type": "str"}
}
},
timeout_seconds=30.0,
processing_mode=ProcessingMode.THREAD
))
# Create LLM provider (must implement LLMProvider protocol)
class MyLLMProvider:
def generate(self, prompt, **kwargs) -> str:
# Your LLM API call here
pass
async def stream(self, prompt, **kwargs):
# Stream chunks for real-time execution
text = self.generate(prompt, **kwargs)
yield text
provider = MyLLMProvider()
# Configure and run agent
agent_config = AgentConfig(
agent_id="assistant",
tools_allowed=["search_web"],
input_mapping=[
{"context_key": "literal:You are a helpful assistant.", "role": "system", "order": 0}
],
output_mapping=[("last_output", "set_latest")],
pattern_set="default",
prompt_builder=create_message_prompt_builder()
)
agent = Agent(agent_config, context, patterns, tools, provider)
runner = AgentRunner(agent)
# Batch execution
result = runner.step("Tell me about agentic systems")
print(f"Status: {result.status}, Response: {result.segments.response}")
# Streaming execution
import asyncio
from agentic import LLMChunkEvent, ToolStartEvent, StepCompleteEvent
async def stream_example():
async for event in runner.step_stream("Your prompt"):
if isinstance(event, LLMChunkEvent):
print(event.chunk, end="", flush=True)
elif isinstance(event, ToolStartEvent):
print(f"\n[Tool: {event.tool_name}]")
elif isinstance(event, StepCompleteEvent):
return event.result
asyncio.run(stream_example())
Architecture
agentic/
├── core.py # Enums (ProcessingMode, SegmentType, AgentStatus) & data classes
├── validation.py # Format-agnostic validation system with extensible validators
├── events.py # 21 event types (LLM, tools, patterns, retry, rate limit, health, graph)
├── storage.py # RocksDBStorage & InMemoryStorage with automatic path resolution
├── context.py # IterationManager & ContextManager with versioning and history
├── patterns.py # PatternExtractor (batch) & StreamingPatternExtractor (incremental)
├── tools.py # Tool execution with multi-mode support, timeouts, and streaming
├── agent.py # Agent & AgentRunner with dual-mode execution (step/step_stream)
├── logic.py # LogicRunner for conditional loops with context health monitoring
├── resilience.py # Retry logic with backoff and token bucket rate limiting
├── multi_agent.py # Multi-agent orchestration patterns (Chain, Supervisor, Parallel, Debate)
├── graph.py # GraphRunner for DAG-based workflow orchestration
└── logging_util.py # Structured logging with contextual metadata for debugging
Key Components
Storage (storage.py):
- RocksDBStorage: Persistent backend via
rocksdict- Automatic DB ID generation:
agentic_<hash>_<timestamp>_<uuid> - CRUD operations:
get(),put(),delete(),iterate()
- Automatic DB ID generation:
- InMemoryStorage: Ephemeral storage for testing and development
- Same interface as RocksDBStorage for drop-in compatibility
- Faster performance, no disk I/O
Context (context.py):
IterationManager: Global iteration counter withget(),next(),register_event()ContextManager: Versioned store withset(),update(),get(),delete(),list_keys(),get_history(),clear()ContextRecord:{value, iteration, timestamp, version}set()creates new version,update()overwrites current version- Tombstone deletion for soft deletes with history preservation
Patterns (patterns.py):
Pattern:{name, start_tag, end_tag, segment_type, greedy}PatternSet: Collection of patterns with configurationPatternRegistry: Persistent pattern set storage in RocksDBPatternExtractor: Batch regex-based extractionStreamingPatternExtractor: Incremental chunk processing with malformed pattern handling- Built-in pattern sets:
default,json_tools,xml_tools,backtick_tools
Tools (tools.py):
Tool: Executable withrun()(batch) andrun_stream()(streaming), supports async callablesToolDefinition: Metadata withinput_schema,output_schema,timeout_seconds,processing_modeToolRegistry:register(),get(),exists(),list(),get_definitions(),unregister()- Multi-mode execution: PROCESS, THREAD, ASYNC (inherits from parent if not set)
- Automatic timeout enforcement and validation integration
Agent (agent.py):
LLMProvider: Protocol defininggenerate()andstream()methods acceptingPromptType(Any)Agent: Container for config, context, patterns, tools, providerAgentRunner:step()(batch) andstep_stream()(streaming)AgentConfig: Configuration with options includingincremental_context_writes,stream_pattern_content,on_tool_detected,concurrent_tool_execution,max_partial_buffer_size,prompt_builder,tool_verification_timeout,tool_verification_on_timeoutPromptObject: Structured prompt withsystem,messages, andmetadatafieldscreate_message_prompt_builder(): Reference implementation for building PromptObject from input_mappingAgentStepResult: Containstool_decisionslist tracking complete tool lifecycle (verification + execution)ToolExecutionDecision: Rich decision object withaccepted,rejection_reason,verification_duration_ms,executed, andresultfields
Logic (logic.py):
LogicRunner: Conditional loops withrun()(batch) andrun_stream()(streaming)LogicCondition: Pattern/regex/context-based conditions with evaluation points (auto,llm_chunk,llm_complete,tool_detected,tool_finished,step_complete,any_event)LogicConfig: Loop configuration withmax_iterations,stop_conditions,loop_until_conditions,break_on_error,context_health_checksContextHealthCheck: Monitor context size, version count, growth rate with configurable thresholds and actions- Helper functions:
loop_n_times(),loop_until_pattern(),loop_until_regex(),stop_on_error()
Validation (validation.py):
ValidatorRegistry: Extensible registry for any validation formatValidationError: Structured error with field, message, and value- Built-in validators:
simple_validator(type checking, constraints),passthrough_validator - Support for JSON Schema, XML Schema, Protocol Buffers, or custom validators
Events (events.py):
21 event types: LLMChunkEvent, LLMCompleteEvent, PatternStartEvent, PatternContentEvent, PatternEndEvent, StatusEvent, ToolStartEvent, ToolDecisionEvent, ToolOutputEvent, ToolEndEvent, ToolValidationEvent, ContextWriteEvent, ErrorEvent, StepCompleteEvent, RetryEvent, RateLimitEvent, ContextHealthEvent, GraphStartEvent, GraphNodeStartEvent, GraphNodeCompleteEvent, GraphCompleteEvent
Logging (logging_util.py):
- Structured logger with contextual metadata
- Integration with Python's standard logging module
- Automatic context enrichment (agent_id, iteration, step_id, etc.)
- Supports JSON-formatted output for log aggregation systems
Resilience (resilience.py):
RetryConfig: Exponential/linear/constant backoff with jitter and configurable retry exceptionsRateLimitConfig: Per-second/minute/hour limits with burst capacityRateLimiter: Token bucket implementation withacquire(),try_acquire(),tokens_available()retry_stream(): Universal retry wrapper for any async iterator with RetryEvent emissionrate_limited_stream(): Universal rate limiting wrapper with RateLimitEvent emissionresilient_stream(): Combined retry + rate limiting for any operation
Multi-Agent (multi_agent.py):
AgentChain: Sequential execution with configurable output passing (response,full_context,tool_results,custom)AgentChainConfig: Configure pass mode, context templates, and custom transform functionsSupervisorPattern: Supervisor agent delegates tasks to specialized worker agents with delegation detectionSupervisorConfig: Configure delegation pattern name, worker/task keys, max roundsParallelPattern: Parallel agent execution with result merging (concat, agent-based, voting)ParallelConfig: Configure merge strategy, templates, and timeoutDebatePattern: Multi-round debate between agents with consensus detectionDebateConfig: Configure max rounds, consensus detector, and prompt templates
Graph (graph.py):
GraphRunner: DAG-based workflow orchestration with dynamic scheduling and failure handlingGraphNode: Configurable nodes supporting AgentRunner, LogicRunner, or custom callablesGraphConfig: Graph execution settings including concurrency limits and failure strategiesGraphNodeStatus: Node execution states (PENDING, RUNNING, COMPLETED, FAILED, SKIPPED)GraphStartEvent,GraphNodeStartEvent,GraphNodeCompleteEvent,GraphCompleteEvent: Graph-level events- DFS-based cycle detection, indegree scheduling, retry/rate limiting per node
- Failure strategies: fail_fast, allow_independent, always_run
- Optional state persistence to context
Advanced Usage
Context Versioning
# Set creates new version
context.set("key", b"value1") # version 1
context.set("key", b"value2") # version 2
# Update overwrites current version (for streaming/incremental writes)
context.update("key", b"partial_value") # version 2 (overwrites)
# Get latest or specific version
latest = context.get("key")
v1 = context.get("key", version=1)
# History (returns ContextRecord objects)
history = context.get_history("key", max_versions=100)
# Delete (creates tombstone, preserves history)
context.delete("key")
assert context.get("key") is None
# List all keys with optional prefix
all_keys = context.list_keys()
llm_keys = context.list_keys(prefix="llm_output:")
Custom Patterns
from agentic import Pattern, PatternSet, SegmentType
custom = PatternSet(
name="custom",
patterns=[
Pattern("thought", "<thought>", "</thought>", SegmentType.REASONING, greedy=False),
Pattern("action", "<action>", "</action>", SegmentType.TOOL, greedy=False)
],
default_response_behavior="all_remaining"
)
patterns.register_pattern_set(custom)
Logic Control
from agentic import LogicConfig, LogicCondition, LogicRunner
logic_config = LogicConfig(
logic_id="main_loop",
max_iterations=10,
stop_conditions=[
LogicCondition(
pattern_set="default",
pattern_name="DONE",
match_type="regex",
target="response",
evaluation_point="llm_complete"
)
],
loop_until_conditions=[
LogicCondition(
pattern_set="default",
pattern_name="complete",
match_type="contains",
target="response",
evaluation_point="auto" # auto-detects best evaluation point
)
],
break_on_error=True
)
logic = LogicRunner(runner, context, patterns, logic_config)
results = logic.run("Analyze this problem")
# Streaming
async for event in logic.run_stream("Analyze this problem"):
if isinstance(event, StepCompleteEvent):
print(f"Iteration {event.result.iteration} complete")
Tool Validation
from agentic import ValidatorRegistry, ValidationError
# Built-in simple validator
tools.register(create_tool(
name="calculate",
func=calculate_fn,
input_schema={
"validator": "simple",
"required": ["x", "y"],
"fields": {
"x": {"type": "float", "min": 0},
"y": {"type": "float", "min": 0},
"op": {"type": "str", "pattern": "^[+\\-*/]$"}
},
"allow_extra_fields": False
}
))
# Custom validator
def my_validator(value: Any, schema: dict) -> tuple[bool, list[ValidationError]]:
errors = []
# Your validation logic
if not isinstance(value, dict):
errors.append(ValidationError("_root", "Expected dict"))
return len(errors) == 0, errors
registry = ValidatorRegistry()
registry.register("my_format", my_validator)
# Use in tool registry
tools_with_validation = ToolRegistry(validator_registry=registry)
Retry and Rate Limiting
from agentic import RetryConfig, RateLimitConfig, RateLimiter, resilient_stream
# Retry configuration
retry_config = RetryConfig(
max_attempts=3,
backoff="exponential", # or "linear", "constant"
base_delay=1.0,
max_delay=60.0,
jitter=True,
retry_on=(TimeoutError, ConnectionError)
)
# Rate limiting
rate_config = RateLimitConfig(
requests_per_second=10,
requests_per_minute=100,
burst_size=20
)
limiter = RateLimiter(rate_config)
# Combined resilient stream
async def my_llm_call():
async for chunk in provider.stream(prompt):
yield chunk
async for item in resilient_stream(
my_llm_call,
retry_config=retry_config,
rate_limiter=limiter,
operation_name="gpt-4",
operation_type="llm"
):
if isinstance(item, RetryEvent):
print(f"Retrying after {item.next_delay_seconds}s")
elif isinstance(item, RateLimitEvent):
print(f"Rate limit: {item.tokens_remaining} tokens left")
else:
print(item, end="")
Graph Orchestration
Build complex multi-agent workflows as directed acyclic graphs:
from agentic import GraphRunner, GraphNode, GraphConfig, GraphNodeStatus
# Create graph with concurrency control
graph_config = GraphConfig(
graph_id="data_pipeline",
max_concurrency=4,
failure_strategy="allow_independent", # or "fail_fast", "always_run"
persist_state=True
)
graph = GraphRunner(graph_config, context)
# Add nodes with dependencies
graph.add_node(GraphNode("fetch_data", AgentRunner(fetch_agent)))
graph.add_node(GraphNode("validate", AgentRunner(validate_agent)), ["fetch_data"])
graph.add_node(GraphNode("process", AgentRunner(process_agent)), ["validate"])
graph.add_node(
GraphNode(
"analyze",
AgentRunner(analyze_agent),
output_key="analysis_result", # Store result in context
retry_config=RetryConfig(max_attempts=3) # Node-level retry
),
["process"]
)
# Execute graph with streaming events
async for event in graph.run_stream():
if isinstance(event, GraphNodeStartEvent):
print(f"Starting node: {event.node_id}")
elif isinstance(event, GraphNodeCompleteEvent):
print(f"Node {event.node_id} completed with status: {event.status.value}")
elif isinstance(event, GraphCompleteEvent):
print(f"Graph finished: {event.status}")
print(f"Stats: {event.stats}")
# Or batch execution
final_statuses = graph.run()
Graph Visualization
Export graph structure for debugging and documentation:
from agentic import GraphRunner, GraphConfig, GraphNode, to_mermaid, to_dot
# Build your graph
graph = GraphRunner(GraphConfig(graph_id="pipeline"), context)
graph.add_node(GraphNode("fetch", fetch_agent))
graph.add_node(GraphNode("process", process_agent), ["fetch"])
graph.add_node(GraphNode("analyze", analyze_agent), ["process"])
# Export to Mermaid (for GitHub, documentation)
mermaid = to_mermaid(graph)
print(mermaid)
# Output:
# flowchart TD
# fetch["fetch"]
# process["process"]
# analyze["analyze"]
# fetch --> process
# process --> analyze
# Export to Graphviz DOT (for rendering)
dot = to_dot(graph, include_metadata=True)
# Save and render: dot -Tpng graph.dot -o graph.png
# Include metadata (node types, flags, output keys)
mermaid_detailed = to_mermaid(graph, include_metadata=True)
Visualization utilities are:
- Read-only: Zero effect on graph execution
- Stateless: Pure functions, no side effects
- Optional: Only imported when needed
Advanced graph features:
# Cleanup nodes that run even on failure
graph.add_node(
GraphNode(
"cleanup",
cleanup_agent,
run_on_failure=True # Runs even if upstream failed
),
["analyze"]
)
# Custom callable nodes
def merge_results(ctx: ContextManager) -> AsyncIterator[BaseEvent]:
data1 = ctx.get("output1")
data2 = ctx.get("output2")
merged = f"{data1}\n{data2}"
ctx.set("merged", merged)
yield StatusEvent(AgentStatus.OK, "Merged results")
graph.add_node(GraphNode("merge", merge_results), ["process1", "process2"])
# Parallel branches with synchronized merge
graph.add_node(GraphNode("branch1", agent1))
graph.add_node(GraphNode("branch2", agent2))
graph.add_node(GraphNode("merge", merger_agent), ["branch1", "branch2"])
Multi-Agent Patterns
from agentic import (
AgentChain, AgentChainConfig,
SupervisorPattern, SupervisorConfig,
ParallelPattern, ParallelConfig,
DebatePattern, DebateConfig
)
# Sequential chain
chain = AgentChain(
agents=[
("researcher", research_agent),
("writer", writing_agent),
("editor", editing_agent)
],
config=AgentChainConfig(
pass_mode="response", # or "full_context", "tool_results", "custom"
prepend_context=True,
context_template="Previous agent ({agent_id}) output:\n{output}\n\n"
)
)
async for event in chain.execute("Write article about AI"):
if isinstance(event, StepCompleteEvent):
print(f"Agent completed: {event.result.segments.response}")
# Supervisor-Worker pattern
supervisor = SupervisorPattern(
supervisor=coordinator_agent,
workers={
"research": research_agent,
"coding": coding_agent,
"testing": testing_agent
},
config=SupervisorConfig(
delegation_pattern_name="delegate",
worker_key="to",
task_key="task",
max_delegation_rounds=10
)
)
async for event in supervisor.execute("Build a web scraper"):
# Supervisor delegates to specialized workers
if isinstance(event, StatusEvent):
print(event.message)
# Parallel execution with merging
parallel = ParallelPattern(
agents={
"optimist": optimist_agent,
"pessimist": pessimist_agent,
"realist": realist_agent
},
merger=synthesis_agent,
config=ParallelConfig(
merge_strategy="agent", # or "concat", "voting"
merge_template="Synthesize these perspectives:\n\n{perspectives}",
timeout_seconds=120.0
)
)
async for event in parallel.execute_and_merge("Analyze market trends"):
# All agents run in parallel, results merged
pass
# Debate pattern
debate = DebatePattern(
agents={
"pro": pro_agent,
"con": con_agent
},
moderator=moderator_agent,
config=DebateConfig(
max_rounds=5,
consensus_detector=None # Uses default similarity check
)
)
async for event in debate.converge("Should we use microservices?"):
if isinstance(event, StatusEvent):
print(f"Debate: {event.message}")
Structured Logging
The framework includes built-in structured logging for debugging and monitoring.
from agentic.logging_util import get_logger
import logging
# Configure logging (optional - framework uses default Python logging)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# The framework automatically logs key events with rich context
# Log entries include: agent_id, iteration, step_id, tool_name, etc.
# Example log output:
# 2025-11-15 10:30:15 - agentic.agent - DEBUG - agent.step.start - {"agent_id": "assistant", "iteration": 1, "step_id": "abc123"}
# 2025-11-15 10:30:16 - agentic.agent - DEBUG - agent.llm.complete - {"agent_id": "assistant", "iteration": 1, "output_length": 250, "tools_detected": 1}
# 2025-11-15 10:30:16 - agentic.agent - DEBUG - tool.verification.start - {"tool_name": "search_web", "call_id": "def456"}
# 2025-11-15 10:30:18 - agentic.agent - DEBUG - agent.step.complete - {"agent_id": "assistant", "status": "TOOL_EXECUTED", "tools_executed": 1}
# Use the logger in custom tools or extensions
logger = get_logger(__name__)
def my_custom_tool(inputs: dict) -> dict:
logger.info("tool.custom.start", extra={
"query": inputs.get("query"),
"user_id": inputs.get("user_id")
})
# Your tool logic
return {"result": "success"}
Context Health Monitoring
from agentic import ContextHealthCheck, ContextHealthEvent
logic_config = LogicConfig(
logic_id="monitored_loop",
max_iterations=100,
context_health_checks=[
ContextHealthCheck(
check_type="size",
key_pattern="llm_output:*", # Glob pattern
threshold=1_000_000, # 1MB
action="warn", # or "stop"
evaluation_point="step_complete",
max_versions_limit=10000 # Safety limit for version_count checks
),
ContextHealthCheck(
check_type="version_count",
key_pattern="*",
threshold=1000,
action="stop"
)
]
)
logic = LogicRunner(runner, context, patterns, logic_config)
# Health events emitted during execution
async for event in logic.run_stream("Your task"):
if isinstance(event, ContextHealthEvent):
print(f"Health issue: {event.check_type} at {event.key}")
print(f"Current: {event.current_value}, Threshold: {event.threshold}")
print(f"Action: {event.recommended_action}")
Tool Verification and Approval
The framework provides comprehensive tool verification with timeout handling and decision tracking.
from agentic import ToolCall, ToolDecisionEvent, AgentStatus, ToolExecutionDecision
def approve_tool(tool_call: ToolCall) -> bool:
"""Callback for human-in-the-loop tool approval."""
print(f"Tool '{tool_call.name}' detected with args: {tool_call.arguments}")
return input("Execute? (y/n): ").lower() == 'y'
agent_config = AgentConfig(
agent_id="assistant",
tools_allowed=["search_web", "calculate"],
on_tool_detected=approve_tool, # None = auto-approve all
tool_verification_timeout=30.0, # Timeout in seconds for verification callback
tool_verification_on_timeout="reject", # "accept" or "reject" - action when timeout occurs
concurrent_tool_execution=False # Set to True to execute tools during LLM streaming
)
# Streaming: Listen for tool decision events in real-time
async for event in runner.step_stream("Your prompt"):
if isinstance(event, ToolDecisionEvent):
print(f"Tool: {event.tool_name}")
print(f"Accepted: {event.accepted}")
if not event.accepted:
print(f"Rejection reason: {event.rejection_reason}")
print(f"Verification took: {event.verification_duration_ms}ms")
elif isinstance(event, StatusEvent):
if event.status == AgentStatus.WAITING_FOR_VERIFICATION:
print(f"Waiting for approval: {event.message}")
elif event.status == AgentStatus.TOOLS_REJECTED:
print("All tools were rejected")
# Batch: Access tool decisions from result
result = runner.step("Your prompt")
# Inspect all tool decisions (both accepted and rejected)
for decision in result.tool_decisions:
print(f"Tool: {decision.tool_call.name}")
print(f" Accepted: {decision.accepted}")
print(f" Verification required: {decision.verification_required}")
print(f" Verification duration: {decision.verification_duration_ms}ms")
if not decision.accepted:
print(f" Rejection reason: {decision.rejection_reason}")
if decision.executed and decision.result:
print(f" Execution success: {decision.result.success}")
if decision.result.success:
print(f" Output: {decision.result.output}")
else:
print(f" Error: {decision.result.error_message}")
# Concurrent tool execution
# When enabled, tools are executed as soon as pattern is detected during LLM streaming
# Otherwise, tools execute after LLM completes
agent_config_concurrent = AgentConfig(
agent_id="assistant",
tools_allowed=["search_web"],
concurrent_tool_execution=True, # Execute tools during streaming
stream_pattern_content=True # Stream pattern content as it's detected
)
Testing
pytest # Run all tests
pytest --cov=agentic # With coverage
pytest -v # Verbose
pytest -m asyncio # Async tests only
550+ tests with >90% coverage across all modules. See tests/README.md for details.
Security Considerations
Tool Execution:
- Tools execute with the same permissions as the Python process
- Always review tool implementations before registering them
- Use sandboxing (Docker, separate processes) for untrusted code
- The framework provides APIs but does not restrict tool behavior by design
Validation:
- Use input validation on all tools to prevent injection attacks
- Built-in
simple_validatorchecks types and constraints - Custom validators can implement format-specific security checks
Buffer Limits:
StreamingPatternExtractorenforces a 10MB buffer limit by default- Configure via
max_buffer_sizeinAgentConfigto prevent memory exhaustion - Partial buffer tracking in
LogicRunneruses same limit
Context Health:
- Monitor context size and version count to prevent resource exhaustion
- Use
ContextHealthCheckwithaction="stop"to halt execution on threshold breach - Default history limit of 100 versions prevents unbounded growth
Rate Limiting:
- Apply rate limiting to external API calls to prevent quota exhaustion
- Token bucket implementation prevents burst attacks
- Combine with retry logic for resilient operation
Best Practices:
- Validate tool inputs before execution using the validation system
- Use timeouts on all tool calls (enforced by framework)
- Review LLM outputs before executing extracted tools
- Use
on_tool_detectedcallback for human-in-the-loop approval - Apply retry logic only to idempotent operations
- Monitor context health in long-running loops
Design Principles
- Streaming First: Batch mode wraps streaming for consistency
- Persistence First: All state stored in RocksDB with versioning
- Versioning by Default: Context auto-versioned with full history and tombstone deletion
- Type Safety: Full type hints throughout codebase
- Single Responsibility: Focused modules with clear boundaries
- Extensibility: Pluggable validation, custom patterns, user-defined tools
- Resilience: Built-in retry, rate limiting, and context health monitoring
- Minimal Dependencies: Only rocksdict required (Python wrapper for RocksDB)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aegeantic-1.0.0.tar.gz.
File metadata
- Download URL: aegeantic-1.0.0.tar.gz
- Upload date:
- Size: 192.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
975dedf213b7e05e4469f94471fc907694b24887f14f2b30c2a0ee47e1e60565
|
|
| MD5 |
ee43034a26a59f6d71a7dd9ce52b66c1
|
|
| BLAKE2b-256 |
a1e2ed722bc9719a603cb50f581d97c42a5a52399421a544425a23c68df695e5
|
File details
Details for the file aegeantic-1.0.0-py3-none-any.whl.
File metadata
- Download URL: aegeantic-1.0.0-py3-none-any.whl
- Upload date:
- Size: 64.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ed865a78a672b5f7f1ec3e7c713d03324e84c68dfbfc8c960250046e803d950c
|
|
| MD5 |
f7deca5f87b7fc801209b843c34b1f7c
|
|
| BLAKE2b-256 |
ec96008ca07866dd2483be7f5be3b1512da21ea96644cf20d77cc8ec852021d8
|