A library of primitives for building agentic flows.
Project description
Asimov Agents
A Python framework for building AI agent systems with robust task management, inference capabilities, and caching.
🔮 Asimov is the foundation of bismuth.sh an in terminal coding agent that can handle many tasks autonomously. Check us out! 🔮
Quickstart
Checkout these docs which show off two basic examples that should be enough to get you experimenting!
Further documentation greatly appreciated in PRs!
System Overview
Asimov Agents is composed of three main components:
-
Task Graph System
- Manages task execution flow and dependencies
- Supports different task states (WAITING, EXECUTING, COMPLETE, FAILED, PARTIAL)
- Uses Pydantic models for robust data validation
- Unique task identification via UUIDs
-
Inference Clients
- Supports multiple LLM providers:
- Anthropic Claude (via API)
- AWS Bedrock
- OpenAI (Including local models)
- Vertex
- Features:
- Streaming responses
- Tool/function calling capabilities
- Token usage tracking
- OpenTelemetry instrumentation
- Prompt caching support
- Supports multiple LLM providers:
-
Caching System
- Abstract Cache interface with Redis implementation
- Features:
- Key-value storage with JSON serialization
- Prefix/suffix namespacing
- Pub/sub messaging via mailboxes
- Bulk operations (get_all, clear)
- Async interface
Component Interactions
Task Management
- Tasks are created and tracked using the
Task
class - Each task has:
- Unique ID
- Type and objective
- Parameters dictionary
- Status tracking
- Result/error storage
Graph System Architecture
-
Module Types
SUBGRAPH
: Nodes composes of other nodes.EXECUTOR
: Task execution modulesFLOW_CONTROL
: Execution flow control modules
-
Node Configuration
node_config = NodeConfig( parallel=True, # Enable parallel execution condition="task.ready", # Conditional execution retry_on_failure=True, # Enable retry mechanism max_retries=3, # Maximum retry attempts max_visits=5, # Maximum node visits inputs=["data"], # Required inputs outputs=["result"] # Expected outputs )
-
Flow Control Features
- Conditional branching based on task state
- Dynamic node addition during execution
- Dependency chain management
- Automatic cleanup of completed nodes
- Execution state tracking and recovery
-
Snapshot System
- State preservation modes:
NEVER
: No snapshotsONCE
: Single snapshotALWAYS
: Continuous snapshots
- Captures:
- Agent state
- Cache contents
- Task status
- Execution history
- Configurable storage location via
ASIMOV_SNAPSHOT
- State preservation modes:
-
Error Handling
- Automatic retry mechanisms
- Partial completion states
- Failed chain tracking
- Detailed error reporting
- Timeout management
Inference Pipeline
- Messages are formatted with appropriate roles (SYSTEM, USER, ASSISTANT, TOOL_RESULT)
- Inference clients handle:
- Message formatting
- API communication
- Response streaming
- Token accounting
- Error handling
Caching Layer
- Redis cache provides:
- Fast key-value storage
- Message queuing
- Namespace management
- Atomic operations
Agent Primitives
The Asimov Agents framework is built around several core primitives that enable flexible and powerful agent architectures:
Module Types
The framework supports different types of modules through the ModuleType
enum:
SUBGRAPH
: Nodes composes of other nodes.EXECUTOR
: Task execution and action implementationFLOW_CONTROL
: Execution flow and routing control
Agent Module
The AgentModule
is the base class for all agent components:
class AgentModule:
name: str # Unique module identifier
type: ModuleType # Module type classification
config: ModuleConfig # Module configuration
dependencies: List[str] # Module dependencies
input_mailboxes: List[str] # Input communication channels
output_mailbox: str # Output communication channel
trace: bool # OpenTelemetry tracing flag
Node Configuration
Nodes can be configured with various parameters through NodeConfig
:
class NodeConfig:
parallel: bool = False # Enable parallel execution
condition: Optional[str] = None # Execution condition
retry_on_failure: bool = True # Auto-retry on failures
max_retries: int = 3 # Maximum retry attempts
max_visits: int = 5 # Maximum node visits
inputs: List[str] = [] # Required inputs
outputs: List[str] = [] # Expected outputs
Flow Control
Flow control enables dynamic execution paths:
class FlowDecision:
next_node: str # Target node
condition: Optional[str] = None # Jump condition
cleanup_on_jump: bool = False # Cleanup on transition
class FlowControlConfig:
decisions: List[FlowDecision] # Decision rules
default: Optional[str] = None # Default node
cleanup_on_default: bool = True # Cleanup on default
Middleware System
Middleware allows for processing interception:
class Middleware:
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
return data # Process or transform data
Execution State
The framework maintains execution state through:
class ExecutionState:
execution_index: int # Current execution position
current_plan: ExecutionPlan # Active execution plan
execution_history: List[ExecutionPlan] # Historical plans
total_iterations: int # Total execution iterations
Snapshot Control
State persistence is managed through SnapshotControl
:
NEVER
: No snapshots takenONCE
: Single snapshot captureALWAYS
: Continuous state capture
Setup and Configuration
Redis Cache Setup
cache = RedisCache(
host="localhost", # Redis host
port=6379, # Redis port
db=0, # Database number
password=None, # Optional password
default_prefix="" # Optional key prefix
)
Inference Client Setup
# Anthropic Client
client = AnthropicInferenceClient(
model="claude-3",
api_key="your-api-key",
api_url="https://api.anthropic.com/v1/messages"
)
# AWS Bedrock Client
client = BedrockInferenceClient(
model="anthropic.claude-3",
region_name="us-east-1"
)
Task and Graph Setup
# Create a task
task = Task(
type="processing",
objective="Process data",
params={"input": "data"}
)
# Create nodes with different module types
executor_node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
dependencies=["planner"]
)
flow_control = Node(
name="flow_control",
type=ModuleType.FLOW_CONTROL,
modules=[FlowControlModule(
flow_config=FlowControlConfig(
decisions=[
FlowDecision(
next_node="executor",
condition="task.ready == true" # Conditions are small lua scripts that get run based on current state.
)
],
default="planner"
)
)]
)
# Set up the agent
agent = Agent(
cache=RedisCache(),
max_concurrent_tasks=5,
max_total_iterations=100
)
# Add nodes to the agent
agent.add_multiple_nodes([executor_node, flow_control])
# Run the task
await agent.run_task(task)
Advanced Features
Middleware System
class LoggingMiddleware(Middleware):
async def process(self, data: Dict[str, Any], cache: Cache) -> Dict[str, Any]:
print(f"Processing data: {data}")
return data
node = Node(
name="executor",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
config=ModuleConfig(
middlewares=[LoggingMiddleware()],
timeout=30.0
)
)
Execution State Management
- Tracks execution history
- Supports execution plan compilation
- Enables dynamic plan modification
- Provides state restoration capabilities
# Access execution state
current_plan = agent.execution_state.current_plan
execution_history = agent.execution_state.execution_history
total_iterations = agent.execution_state.total_iterations
# Compile execution plans
full_plan = agent.compile_execution_plan()
partial_plan = agent.compile_execution_plan_from("specific_node")
# Restore from snapshot
await agent.run_from_snapshot(snapshot_dir)
OpenTelemetry Integration
- Automatic span creation for nodes
- Execution tracking
- Performance monitoring
- Error tracing
node = Node(
name="traced_node",
type=ModuleType.EXECUTOR,
modules=[ExecutorModule()],
trace=True # Enable OpenTelemetry tracing
)
Performance Considerations
Caching
- Use appropriate key prefixes/suffixes for namespace isolation
- Consider timeout settings for blocking operations
- Monitor Redis memory usage
- Use raw mode when bypassing JSON serialization
Inference
- Token usage is tracked automatically
- Streaming reduces time-to-first-token
- Tool calls support iteration limits
- Prompt caching can improve response times
Task Management
- Tasks support partial failure states
- Use UUIDs for guaranteed uniqueness
- Status transitions are atomic
Development
Running Tests
pytest tests/
Required Dependencies
- Redis server
- Python 3.7+
- See requirements.txt for Python packages
License
ApacheV2
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file asimov_agents-0.3.1.tar.gz
.
File metadata
- Download URL: asimov_agents-0.3.1.tar.gz
- Upload date:
- Size: 27.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.27.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5048c2cae221f70dd4daacc8b489896f1d00d76835cf9bcdc2adadb369977387 |
|
MD5 | 060ca0724aebfdf9f8648f9b575a6d0f |
|
BLAKE2b-256 | 634594fc5a79d16b9fc6314932b04d5d8fb1dffc6d221b21137963694e840654 |
File details
Details for the file asimov_agents-0.3.1-py3-none-any.whl
.
File metadata
- Download URL: asimov_agents-0.3.1-py3-none-any.whl
- Upload date:
- Size: 28.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: python-httpx/0.27.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c7edad0342bc0894d44fb89cad1fac7c584d1b49c0f36d3678096d84dd586d2e |
|
MD5 | 4542b96c2ff5c9b8b21e4a26ee631f6f |
|
BLAKE2b-256 | 6de271795a3119081708c60a1f8fbd8932bb48c3dae5746b83fd058c484e7a60 |