A library of primitives for building agentic flows.
Project description
Asimov Agents
A Python framework for building AI agent systems with robust task management in the form of a graph execution engine, inference capabilities, and caching.
We support advanced features like State Snapshotting, Middleware, Agent Directed Graph Execution, Open Telemetry Integrations and more.
🔮 Asimov is the foundation of bismuth.sh an end to end AI software developer that can handle many tasks autonomously. Check us out! 🔮
Quickstart
pip install asimov_agents
Checkout these docs which show off two basic examples that should be enough to get you experimenting!
Further documentation greatly appreciated in PRs!
System Overview
Asimov Agents is composed of three main components:
-
Graph Primitives
- Manages task execution flow and dependencies
-
Inference Clients
- Supports multiple LLM providers:
- Anthropic Claude (via API)
- AWS Bedrock
- OpenAI (Including local models)
- Vertex
- OpenRouter
- Features:
- Streaming responses
- Tool/function calling capabilities
- Token usage tracking
- OpenTelemetry instrumentation
- Prompt caching support
- Supports multiple LLM providers:
-
Caching System
- Abstract Cache interface with Redis implementation
- Features:
- Key-value storage with JSON serialization
- Prefix/suffix namespacing
- Pub/sub messaging via mailboxes
- Bulk operations (get_all, clear)
- Async interface
Component Interactions
Task Management
- Tasks are created and tracked using the
Taskclass - Each task has:
- Unique ID
- Type and objective
- Parameters dictionary
- Status tracking
- Result/error storage
Graph System Architecture
-
Module Types
SUBGRAPH: Nodes composes of other nodes.EXECUTOR: Task execution modulesFLOW_CONTROL: Execution flow control modules
-
Node Configuration
node_config = NodeConfig( parallel=True, # Enable parallel execution condition="task.ready", # Conditional execution retry_on_failure=True, # Enable retry mechanism max_retries=3, # Maximum retry attempts max_visits=5, # Maximum node visits inputs=["data"], # Required inputs outputs=["result"] # Expected outputs )
-
Flow Control Features
- Conditional branching based on task state
- Dynamic node addition during execution
- Dependency chain management
- Automatic cleanup of completed nodes
- Execution state tracking and recovery
- LLM directed flow for complex decisisons
-
Snapshot System
- State preservation modes:
NEVER: No snapshotsONCE: Single snapshotALWAYS: Continuous snapshots
- Captures:
- Agent state
- Cache contents
- Task status
- Execution history
- Configurable storage location via
ASIMOV_SNAPSHOT
- State preservation modes:
-
Error Handling
- Automatic retry mechanisms
- Partial completion states
- Failed chain tracking
- Detailed error reporting
- Timeout management
Inference Pipeline
- Messages are formatted with appropriate roles (SYSTEM, USER, ASSISTANT, TOOL_RESULT)
- Inference clients handle:
- Message formatting
- API communication
- Response streaming
- Token accounting
- Error handling
Caching Layer
- Redis cache provides:
- Fast key-value storage
- Message queuing
- Namespace management
- Atomic operations
Agent Primitives
The Asimov Agents framework is built around several core primitives that enable flexible and powerful agent architectures:
Module Types
The framework supports different types of modules through the ModuleType enum:
SUBGRAPH: Nodes composes of other nodes.EXECUTOR: Task execution and action implementationFLOW_CONTROL: Execution flow and routing control
Agent Module
The AgentModule is the base class for all agent components:
class AgentModule:
name: str # Unique module identifier
type: ModuleType # Module type classification
config: ModuleConfig # Module configuration
dependencies: List[str] # Module dependencies
input_mailboxes: List[str] # Input communication channels
output_mailbox: str # Output communication channel
trace: bool # OpenTelemetry tracing flag
Node Configuration
Nodes can be configured with various parameters through NodeConfig:
class NodeConfig:
parallel: bool = False # Enable parallel execution
condition: Optional[str] = None # Execution condition
retry_on_failure: bool = True # Auto-retry on failures
max_retries: int = 3 # Maximum retry attempts
max_visits: int = 5 # Maximum node visits
inputs: List[str] = [] # Required inputs
outputs: List[str] = [] # Expected outputs
Flow Control
Flow control enables dynamic execution paths:
class FlowDecision:
next_node: str # Target node
condition: Optional[str] = None # Jump condition
cleanup_on_jump: bool = False # Cleanup on transition
class FlowControlConfig:
decisions: List[FlowDecision] # Decision rules
default: Optional[str] = None # Default node
cleanup_on_default: bool = True # Cleanup on default
Agent Directed Flow Control
Agent Directed Flow Control is a powerful feature that enables intelligent routing of tasks based on LLM decision making. It allows the system to:
- Dynamically route tasks to specialized modules based on content analysis
- Use example-based learning for routing decisions
- Support multiple voters for consensus-based routing
- Handle fallback cases with error handlers
Example configuration:
flow_control = Node(
name="flow_control",
type=ModuleType.FLOW_CONTROL,
modules=[
AgentDirectedFlowControl(
name="ContentFlowControl",
type=ModuleType.FLOW_CONTROL,
voters=3, # Number of voters for consensus
inference_client=inference_client,
system_description="A system that handles various content creation tasks",
flow_config=AgentDrivenFlowControlConfig(
decisions=[
AgentDrivenFlowDecision(
next_node="blog_writer",
metadata={"description": "Writes blog posts on technical topics"},
examples=[
Example(
message="Write a blog post about AI agents",
choices=[
{"choice": "blog_writer", "description": "Writes blog posts"},
{"choice": "code_writer", "description": "Writes code"}
],
choice="blog_writer",
reasoning="The request is specifically for blog content"
)
]
),
AgentDrivenFlowDecision(
next_node="code_writer",
metadata={"description": "Writes code examples and tutorials"},
examples=[
Example(
message="Create a Python script for data processing",
choices=[
{"choice": "blog_writer", "description": "Writes blog posts"},
{"choice": "code_writer", "description": "Writes code"}
],
choice="code_writer",
reasoning="The request is for code creation"
)
]
)
],
default="error_handler" # Fallback node for unmatched requests
)
)
]
)
Key features:
- Example-based routing decisions with clear reasoning
- Multiple voter support (configurable number of voters) for robust decision making
- Specialized executor modules for different content types (e.g., blog posts, code)
- Metadata-enriched routing configuration for better decision context
- Fallback error handling for unmatched requests
- Cached message passing between nodes using Redis
- Asynchronous execution with semaphore control
- Comprehensive error handling and reporting
For a complete working example of Agent Directed Flow Control, check out the examples/agent_directed_flow.py file which demonstrates a content creation system that intelligently routes tasks between blog writing and code generation modules.
Middleware System
Middleware allows for processing interception:
class Middleware:
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
return data # Process or transform data
Execution State
The framework maintains execution state through:
class ExecutionState:
execution_index: int # Current execution position
current_plan: ExecutionPlan # Active execution plan
execution_history: List[ExecutionPlan] # Historical plans
total_iterations: int # Total execution iterations
Snapshot Control
State persistence is managed through SnapshotControl:
NEVER: No snapshots takenONCE: Single snapshot captureALWAYS: Continuous state capture
Setup and Configuration
Redis Cache Setup
cache = RedisCache(
host="localhost", # Redis host
port=6379, # Redis port
db=0, # Database number
password=None, # Optional password
default_prefix="" # Optional key prefix
)
Inference Client Setup
# Anthropic Client
client = AnthropicInferenceClient(
model="claude-3-5-sonnet-20241022",
api_key="your-api-key",
api_url="https://api.anthropic.com/v1/messages"
)
# AWS Bedrock Client
client = BedrockInferenceClient(
model="anthropic.claude-3-5-sonnet-20241022-v2:0",
region_name="us-east-1"
)
There is similar set up for VertexAI and OpenAI
Task and Graph Setup
# Create a task
task = Task(
type="processing",
objective="Process data",
params={"input": "data"}
)
# Create nodes with different module types
executor_node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
dependencies=["planner"]
)
flow_control = Node(
name="flow_control",
type=ModuleType.FLOW_CONTROL,
modules=[FlowControlModule(
flow_config=FlowControlConfig(
decisions=[
FlowDecision(
next_node="executor",
condition="task.ready == true" # Conditions are small lua scripts that get run based on current state.
)
],
default="planner"
)
)]
)
# Set up the agent
agent = Agent(
cache=RedisCache(),
max_concurrent_tasks=5,
max_total_iterations=100
)
# Add nodes to the agent
agent.add_multiple_nodes([executor_node, flow_control])
# Run the task
await agent.run_task(task)
Advanced Features
Middleware System
class LoggingMiddleware(Middleware):
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
print(f"Processing data: {data}")
return data
node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
config=ModuleConfig(
middlewares=[LoggingMiddleware()],
timeout=30.0
)
)
Execution State Management
- Tracks execution history
- Supports execution plan compilation
- Enables dynamic plan modification
- Provides state restoration capabilities
# Access execution state
current_plan = agent.execution_state.current_plan
execution_history = agent.execution_state.execution_history
total_iterations = agent.execution_state.total_iterations
# Compile execution plans
full_plan = agent.compile_execution_plan()
partial_plan = agent.compile_execution_plan_from("specific_node")
# Restore from snapshot
await agent.run_from_snapshot(snapshot_dir)
OpenTelemetry Integration
- Automatic span creation for nodes
- Execution tracking
- Performance monitoring
- Error tracing
node = Node(
name="traced_node",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
trace=True # Enable OpenTelemetry tracing
)
Performance Considerations
Caching
- Use appropriate key prefixes/suffixes for namespace isolation
- Consider timeout settings for blocking operations
- Monitor Redis memory usage
- Use raw mode when bypassing JSON serialization
Inference
- Token usage is tracked automatically
- Streaming reduces time-to-first-token
- Tool calls support iteration limits
- Prompt caching can improve response times
Task Management
- Tasks support partial failure states
- Use UUIDs for guaranteed uniqueness
- Status transitions are atomic
Development
Running Tests
pytest tests/
Required Dependencies
- Redis server (If using caching)
- Python 3.12+
- See requirements.txt for Python packages
License
ApacheV2
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file asimov_agents-0.6.0.tar.gz.
File metadata
- Download URL: asimov_agents-0.6.0.tar.gz
- Upload date:
- Size: 39.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d2d24465602409d4ac95a3d5cabe09d99e15b4be3854f6a320706860fefb588a
|
|
| MD5 |
2d2b0d41d68758abb14c21dc55224253
|
|
| BLAKE2b-256 |
bc1dfe429ad5df22f419fc70d26b1483223c4255905de7567e1ca2befe3a8316
|
File details
Details for the file asimov_agents-0.6.0-py3-none-any.whl.
File metadata
- Download URL: asimov_agents-0.6.0-py3-none-any.whl
- Upload date:
- Size: 39.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.28.1
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bebf42ee8f49619cb07e9133a21ede16c2be8574c13a7755446aaafca6d24088
|
|
| MD5 |
71b00dd9f8f7eb72854eeb7cc88b6c97
|
|
| BLAKE2b-256 |
ef3c66b4ad33e33103f9e7dc88e8330af02077e100d6f2fdfed3946094d3e07a
|