Python SDK for Basion AI Agent framework - handles agent registration, message consumption, and streaming responses
Project description
Basion Agent SDK
Python SDK for building AI agents in the Basion AI platform. Provides agent registration, message handling via Kafka (through Agent Gateway), streaming responses, and integrations with LangGraph and Pydantic AI.
Overview
The Basion Agent SDK (basion-agent) enables developers to build AI agents that integrate with the Basion AI Management platform. Agents register themselves, receive messages through Kafka topics, and stream responses back to users.
Key Features
- Agent Registration: Automatic registration with AI Inventory
- Message Handling: Decorator-based handlers with sender filtering
- Response Streaming: Chunked responses via Kafka/Centrifugo
- Structural Streaming: Rich UI components (Artifacts, Surfaces, TextBlocks, Steppers)
- Conversation History: Access to message history via Conversation Store
- Memory: Semantic search over long-term user and conversation memory
- Attachments: Download and process file attachments (images, PDFs, etc.)
- Knowledge Graph: Query biomedical knowledge graphs (diseases, proteins, phenotypes)
- Remote Logging: Send logs to Loki via the gateway for centralized monitoring
- LangGraph Integration: HTTP-based checkpoint saver for LangGraph graphs
- Pydantic AI Integration: Persistent message history for Pydantic AI agents
- CLI: Run agents with
basion-agent run main:app - Error Handling: Automatic error responses to users on handler failures
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ Your Agent Application │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ BasionAgentApp │
│ ├── register_me() → Agent │
│ │ ├── @on_message decorator │
│ │ ├── streamer() → Streamer │
│ │ │ └── stream_by() → Structural (Artifact, Surface, TextBlock, etc) │
│ │ └── tools → Tools │
│ │ └── knowledge_graph → KnowledgeGraphTool │
│ └── run() → Start consume loop │
│ │
│ Message Context: │
│ ├── message.conversation → Conversation (history, metadata) │
│ ├── message.memory → Memory (semantic search) │
│ └── message.attachments → List[AttachmentInfo] (file downloads) │
│ │
│ Extensions: │
│ ├── HTTPCheckpointSaver (LangGraph) │
│ └── PydanticAIMessageStore (Pydantic AI) │
│ │
└──────────────────────────────────┬──────────────────────────────────────────┘
│ gRPC (Kafka) + HTTP (APIs)
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Agent Gateway │
├─────────────────────────────────────────────────────────────────────────────┤
│ gRPC: AgentStream (bidirectional) HTTP: /s/{service}/* proxy │
│ - Auth - /s/ai-inventory/* │
│ - Subscribe/Unsubscribe - /s/conversation-store/* │
│ - Produce/Consume messages - /s/ai-memory/* │
│ - /s/attachment/* │
│ - /s/knowledge-graph/* │
│ - /loki/api/v1/push (logging) │
└─────────────────┬───────────────────────────────────┬───────────────────────┘
│ │
▼ ▼
┌──────────────┐ ┌────────────────────┐
│ Kafka │ │ AI Inventory / │
│ {agent}.inbox │ │ Conversation Store │
└──────────────┘ │ AI Memory / KG │
└────────────────────┘
Message Flow
sequenceDiagram
participant User
participant Provider
participant Router
participant Gateway as Agent Gateway
participant Agent as Your Agent
participant ConvStore as Conversation Store
User->>Provider: Send message
Provider->>Router: Kafka: router.inbox
Router->>Gateway: Kafka: {agent}.inbox
Gateway->>Agent: gRPC stream: message
Agent->>ConvStore: Get conversation history
ConvStore-->>Agent: Message history
loop Streaming Response
Agent->>Gateway: gRPC stream: content chunk
Gateway->>Router: Kafka: router.inbox
Router->>Provider: Kafka: user.inbox
Provider->>User: WebSocket: chunk
end
Agent->>Gateway: gRPC stream: done=true
Installation
# Basic installation
pip install basion-agent
# With LangGraph support
pip install basion-agent[langgraph]
# With Pydantic AI support
pip install basion-agent[pydantic]
# Development installation
pip install -e ".[dev]"
Quick Start
from basion_agent import BasionAgentApp
# Initialize the app
app = BasionAgentApp(
gateway_url="agent-gateway:8080",
api_key="your-api-key"
)
# Register an agent
agent = app.register_me(
name="my-assistant",
about="A helpful AI assistant",
document="Answers general questions and provides helpful information.",
representation_name="My Assistant"
)
# Handle messages
@agent.on_message
async def handle_message(message, sender):
# Access conversation history
history = await message.conversation.get_history(limit=10)
# Stream response
async with agent.streamer(message) as s:
s.stream("Hello! ")
s.stream("How can I help you today?")
# Run the agent
app.run()
CLI
Run agents using uvicorn-style import strings:
# Run 'app' from main.py
basion-agent run main:app
# Run 'app' from main.py (defaults to :app)
basion-agent run main
# Run 'application' from myagent.py
basion-agent run myagent:application
# Show version
basion-agent version
Your agent file should define a BasionAgentApp instance:
# main.py
app = BasionAgentApp(gateway_url="...", api_key="...")
agent = app.register_me(name="my-agent", ...)
@agent.on_message
async def handle(message, sender):
...
Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
GATEWAY_URL |
Agent Gateway endpoint | Required |
GATEWAY_API_KEY |
API key for authentication | Required |
BasionAgentApp Options
app = BasionAgentApp(
gateway_url="agent-gateway:8080", # Gateway endpoint
api_key="key", # Authentication key
heartbeat_interval=60, # Heartbeat frequency (seconds)
max_concurrent_tasks=100, # Max concurrent message handlers
error_message_template="...", # Error message sent to users
secure=False, # Use TLS for gRPC and HTTPS for HTTP
enable_remote_logging=False, # Send logs to Loki via gateway
remote_log_level=logging.INFO, # Min log level for remote logging
remote_log_batch_size=100, # Logs per batch
remote_log_flush_interval=5.0, # Seconds between flushes
)
API Reference
BasionAgentApp
Main application class for initializing and running agents.
app = BasionAgentApp(gateway_url, api_key)
# Register an agent
agent = app.register_me(
name="agent-name", # Unique identifier (used for routing)
about="Short description", # Brief description for agent selection
document="Full docs...", # Detailed documentation
representation_name="Name", # Display name (optional)
metadata={"key": "value"}, # Additional metadata (optional)
category_name="my-category", # Category in kebab-case (optional, auto-created)
tag_names=["tag-1", "tag-2"],# Tags in kebab-case (optional, auto-created)
example_prompts=["Ask me anything"], # Example prompts for users (optional)
is_experimental=False, # Mark as experimental (optional)
force_update=False, # Bypass content hash check (optional)
base_url="http://...", # Base URL for agent's frontend service (optional)
related_pages=[ # Related pages (optional)
{"name": "Docs", "endpoint": "/docs"}
],
)
# Start consuming messages
app.run() # Blocks until shutdown
Agent
Handles message registration and response streaming.
# Register message handler (all senders)
@agent.on_message
async def handle(message, sender):
pass
# Filter by sender
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
pass
# Exclude sender
@agent.on_message(senders=["~other-agent"])
async def handle_not_other(message, sender):
pass
Message
Represents an incoming message with conversation context, memory, and attachments.
@agent.on_message
async def handle(message, sender):
message.content # Message content
message.conversation_id # Conversation ID
message.user_id # User ID
message.metadata # Optional message metadata (dict)
message.schema # Optional message schema (dict)
# Conversation history
history = await message.conversation.get_history(limit=10)
# Memory (semantic search)
results = await message.memory.query_about_user("diagnosis", limit=5)
# Attachments
if message.has_attachments():
count = message.get_attachment_count()
attachments = message.get_attachments()
# Download first attachment
data = await message.get_attachment_bytes()
base64_str = await message.get_attachment_base64()
buffer = await message.get_attachment_buffer()
# Download specific attachment by index
data = await message.get_attachment_bytes_at(1)
# Download all attachments at once
all_bytes = await message.get_all_attachment_bytes()
all_base64 = await message.get_all_attachment_base64()
# Inspect attachment metadata
for att in attachments:
att.filename # "document.pdf"
att.content_type # "application/pdf"
att.size # bytes
att.url # download URL
att.file_extension # "pdf"
att.file_type # "pdf"
att.is_image() # True/False
att.is_pdf() # True/False
Conversation
Access conversation history and metadata.
@agent.on_message
async def handle(message, sender):
conv = message.conversation
# Get message history
history = await conv.get_history(limit=10)
history = await conv.get_history(role="user", limit=20, offset=0)
# Get conversation metadata
metadata = await conv.get_metadata()
# Get messages where this agent was sender or recipient
agent_history = await conv.get_agent_history(limit=50)
agent_history = await conv.get_agent_history(agent_name="other-agent")
Memory
Semantic search over long-term user and conversation memory. Accessed via message.memory.
@agent.on_message
async def handle(message, sender):
mem = message.memory
# Search user's long-term memory
results = await mem.query_about_user(
query="previous diagnosis",
limit=10, # Max results (1-100)
threshold=70, # Similarity threshold 0-100
context_messages=2, # Surrounding messages to include (0-20)
)
for r in results:
r.message.content # Matched message content
r.score # Similarity score
r.context # List of surrounding MemoryMessage objects
# Search conversation memory
results = await mem.query_about_conversation(
query="what was discussed",
limit=5,
)
# Get user summary (aggregated across all conversations)
summary = await mem.get_user_summary()
if summary:
summary.text # Summary text
summary.message_count # Total messages
summary.last_updated # Timestamp
Streamer
Streams response chunks back to the user (or another agent).
@agent.on_message
async def handle(message, sender):
# Basic streaming (auto-finishes on exit)
async with agent.streamer(message) as s:
s.stream("Chunk 1...")
s.stream("Chunk 2...")
# Streaming with options
async with agent.streamer(
message,
send_to="user", # or another agent name
awaiting=True, # Set awaiting_route to this agent
) as s:
# Non-persisted content (not saved to DB)
s.stream("Thinking...", persist=False, event_type="thinking")
# Persisted content
s.stream("Here's my response...")
# write() is an alias for stream()
s.write("More content...")
# Set metadata on the message
s.set_message_metadata({"source": "search"})
# Set response schema for forms
s.set_response_schema({
"type": "object",
"properties": {
"name": {"type": "string"}
}
})
# Manual streaming (without context manager)
s = agent.streamer(message)
s.stream("Hello...")
await s.finish()
Structural Streaming
Rich UI components streamed alongside text content. Use s.stream_by() to bind a structural component to the streamer.
Artifact
Artifacts represent files, images, or embeds that are generated and displayed. Artifact data is persisted to the database.
from basion_agent import Artifact
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
artifact = Artifact()
# Show progress
s.stream_by(artifact).generating("Creating chart...", progress=0.5)
# Complete with result
s.stream_by(artifact).done(
url="https://example.com/chart.png",
type="image", # image, iframe, document, video, audio, code, link, file
title="Sales Chart",
description="Q4 sales data",
metadata={"width": 800, "height": 600}
)
# Or signal an error
# s.stream_by(artifact).error("Failed to generate chart")
s.stream("Here's your chart!")
Surface
Surfaces are interactive embedded components (iframes, widgets). Similar API to Artifact.
from basion_agent import Surface
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
surface = Surface()
s.stream_by(surface).generating("Loading widget...")
s.stream_by(surface).done(
url="https://example.com/calendar",
type="iframe",
title="Calendar Widget",
)
TextBlock
Collapsible text blocks with streaming title/body and visual variants. TextBlock events are not persisted.
from basion_agent import TextBlock
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
block = TextBlock()
# Set visual variant: thinking, note, warning, error, success
s.stream_by(block).set_variant("thinking")
# Stream title (appends)
s.stream_by(block).stream_title("Deep ")
s.stream_by(block).stream_title("Analysis...")
# Stream body (appends)
s.stream_by(block).stream_body("Step 1: Checking patterns\n")
s.stream_by(block).stream_body("Step 2: Validating\n")
# Replace title/body entirely
s.stream_by(block).update_title("Analysis Complete")
s.stream_by(block).update_body("All checks passed.")
# Mark as done
s.stream_by(block).done()
s.stream("Analysis finished!")
Stepper
Multi-step progress indicators. Stepper events are not persisted.
from basion_agent import Stepper
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
stepper = Stepper(steps=["Fetch", "Process", "Report"])
s.stream_by(stepper).start_step(0)
# ... do work ...
s.stream_by(stepper).complete_step(0)
s.stream_by(stepper).start_step(1)
# ... do work ...
s.stream_by(stepper).complete_step(1)
# Add a step dynamically
s.stream_by(stepper).add_step("Verify")
s.stream_by(stepper).start_step(2)
s.stream_by(stepper).complete_step(2)
s.stream_by(stepper).start_step(3)
# Update label mid-step
s.stream_by(stepper).update_step_label(3, "Verify (Final)")
s.stream_by(stepper).complete_step(3)
# Or signal failure
# s.stream_by(stepper).fail_step(1, error="Timeout")
s.stream_by(stepper).done()
s.stream("All steps complete!")
Knowledge Graph (Tools)
Query biomedical knowledge graphs for diseases, proteins, phenotypes, drugs, and pathways. Accessed via agent.tools.knowledge_graph.
@agent.on_message
async def handle(message, sender):
kg = agent.tools.knowledge_graph
# Search diseases
diseases = await kg.search_diseases(name="Huntington", limit=5)
disease = await kg.get_disease(disease_id=123)
# Search proteins/genes
proteins = await kg.search_proteins(symbol="BRCA1", limit=10)
# Search phenotypes (HPO terms)
phenotypes = await kg.search_phenotypes(name="seizure", hpo_id="HP:0001250")
# Search drugs
drugs = await kg.search_drugs(name="aspirin")
# Search pathways
pathways = await kg.search_pathways(name="apoptosis")
# Find similar diseases (by shared phenotypes)
similar = await kg.find_similar_diseases("Huntington Disease", limit=10)
for s in similar:
s.disease_name # Disease name
s.similarity_score # 0.0 - 1.0
s.shared_count # Number of shared phenotypes
# Find similar diseases (by shared genes)
similar = await kg.find_similar_diseases_by_genes("Huntington Disease")
# Get entity connections
edges = await kg.get_entity_network("BRCA1", "protein")
for e in edges:
e.source_id, e.source_type
e.target_id, e.target_type
e.relation_type
# k-hop graph traversal
subgraph = await kg.k_hop_traversal("BRCA1", "protein", k=2, limit_edges=100)
# Shortest path between entities
path = await kg.find_shortest_path(
start_name="BRCA1", start_type="protein",
end_name="Breast Cancer", end_type="disease",
max_hops=5
)
for step in path:
step.node_id, step.node_name, step.node_type, step.relation
Remote Logging (Loki)
Send agent logs to Loki via the gateway for centralized monitoring. Logs are batched and sent in the background.
import logging
app = BasionAgentApp(
gateway_url="agent-gateway:8080",
api_key="key",
enable_remote_logging=True, # Enable Loki logging
remote_log_level=logging.INFO, # Min level (default: INFO)
remote_log_batch_size=100, # Logs per batch (default: 100)
remote_log_flush_interval=5.0, # Flush every N seconds (default: 5.0)
)
# Then use standard Python logging - it will be sent to Loki automatically
logger = logging.getLogger(__name__)
logger.info("Agent started", extra={"custom_field": "value"})
Extensions
LangGraph Integration
Use HTTPCheckpointSaver to persist LangGraph state via the Conversation Store checkpoint API.
from basion_agent import BasionAgentApp
from basion_agent.extensions.langgraph import HTTPCheckpointSaver
from langgraph.graph import StateGraph
app = BasionAgentApp(gateway_url="...", api_key="...")
checkpointer = HTTPCheckpointSaver(app=app)
# Define your LangGraph
graph = StateGraph(MyState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)
agent = app.register_me(name="langgraph-agent", ...)
@agent.on_message
async def handle(message, sender):
config = {"configurable": {"thread_id": message.conversation_id}}
async with agent.streamer(message) as s:
# Graph state persists across messages via checkpointer
result = await compiled.ainvoke(
{"messages": [message.content]},
config
)
s.stream(result["messages"][-1])
app.run()
Pydantic AI Integration
Use PydanticAIMessageStore to persist Pydantic AI message history.
from basion_agent import BasionAgentApp
from basion_agent.extensions.pydantic_ai import PydanticAIMessageStore
from pydantic_ai import Agent as PydanticAgent
app = BasionAgentApp(gateway_url="...", api_key="...")
store = PydanticAIMessageStore(app=app)
my_llm = PydanticAgent('openai:gpt-4o', system_prompt="You are helpful.")
agent = app.register_me(name="pydantic-agent", ...)
@agent.on_message
async def handle(message, sender):
# Load previous messages
history = await store.load(message.conversation_id)
async with agent.streamer(message) as s:
async with my_llm.run_stream(
message.content,
message_history=history
) as result:
async for chunk in result.stream_text():
s.stream(chunk)
# Save updated history
await store.save(message.conversation_id, result.all_messages())
app.run()
Advanced Usage
Inter-Agent Communication
Agents can send messages to other agents using the send_to parameter.
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
# Forward to specialist agent
async with agent.streamer(message, send_to="specialist-agent") as s:
s.stream("Forwarding your question to the specialist...")
@agent.on_message(senders=["specialist-agent"])
async def handle_specialist(message, sender):
# Respond to user with specialist's answer
async with agent.streamer(message, send_to="user") as s:
s.stream(f"The specialist says: {message.content}")
Dynamic Forms with Response Schema
Request structured input from users using JSON Schema forms.
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message, awaiting=True) as s:
s.stream("Please fill out this form:")
s.set_response_schema({
"type": "object",
"title": "Contact Information",
"properties": {
"name": {"type": "string", "title": "Full Name"},
"email": {"type": "string", "format": "email"},
"message": {"type": "string", "title": "Message"}
},
"required": ["name", "email"]
})
# When user submits form, message.content will be JSON
@agent.on_message
async def handle_form(message, sender):
import json
data = json.loads(message.content)
name = data.get("name")
# Process form data...
Error Handling
Customize error handling behavior:
app = BasionAgentApp(
gateway_url="...",
api_key="...",
error_message_template="Sorry, something went wrong. Please try again."
)
agent = app.register_me(...)
# Disable automatic error responses
agent.send_error_responses = False
@agent.on_message
async def handle(message, sender):
try:
# Your logic
pass
except Exception as e:
# Custom error handling
async with agent.streamer(message) as s:
s.stream(f"I encountered an issue: {str(e)}")
Project Structure
ai-framework/
├── src/
│ └── basion_agent/
│ ├── __init__.py # Package exports
│ ├── app.py # BasionAgentApp
│ ├── agent.py # Agent class
│ ├── message.py # Message class (attachments, memory)
│ ├── streamer.py # Streamer class (stream_by, structural)
│ ├── conversation.py # Conversation helper (history, metadata)
│ ├── conversation_client.py # HTTP client for Conversation Store
│ ├── conversation_message.py # ConversationMessage dataclass
│ ├── memory.py # Memory context (query_about_user, etc.)
│ ├── memory_client.py # HTTP client for AI Memory
│ ├── attachment_client.py # HTTP client for attachments (download)
│ ├── checkpoint_client.py # HTTP client for checkpoints
│ ├── agent_state_client.py # HTTP client for agent state
│ ├── gateway_client.py # gRPC client for Agent Gateway
│ ├── gateway_pb2.py # Generated protobuf
│ ├── gateway_pb2_grpc.py # Generated gRPC stubs
│ ├── heartbeat.py # Heartbeat manager
│ ├── loki_handler.py # Loki remote log handler
│ ├── cli.py # CLI (basion-agent run)
│ ├── exceptions.py # Custom exceptions
│ ├── structural/
│ │ ├── __init__.py
│ │ ├── base.py # StructuralStreamer base class
│ │ ├── artifact.py # Artifact (image, file, iframe)
│ │ ├── surface.py # Surface (interactive embeds)
│ │ ├── text_block.py # TextBlock (collapsible text)
│ │ └── stepper.py # Stepper (multi-step progress)
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── container.py # Tools container (lazy init)
│ │ └── knowledge_graph.py # Knowledge Graph client
│ └── extensions/
│ ├── __init__.py
│ ├── langgraph.py # LangGraph HTTPCheckpointSaver
│ └── pydantic_ai.py # Pydantic AI MessageStore
├── pyproject.toml
└── README.md
Development
Running Tests
# Install dev dependencies
pip install -e ".[dev]"
# Run tests with coverage
pytest
# Run specific test file
pytest tests/test_agent.py -v
Regenerating Protobuf
If the gateway.proto file changes:
python -m grpc_tools.protoc \
-I../../agent-gateway/proto \
--python_out=src/basion_agent \
--grpc_python_out=src/basion_agent \
../../agent-gateway/proto/gateway.proto
Dependencies
| Package | Purpose |
|---|---|
| grpcio | gRPC communication with Agent Gateway |
| grpcio-tools | Protobuf compilation |
| protobuf | Message serialization |
| requests | Sync HTTP for registration |
| aiohttp | Async HTTP for runtime operations |
Optional Dependencies
| Package | Install Command | Purpose |
|---|---|---|
| langgraph | pip install basion-agent[langgraph] |
LangGraph checkpoint integration |
| pydantic-ai | pip install basion-agent[pydantic] |
Pydantic AI message history |
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file basion_agent-0.4.0.tar.gz.
File metadata
- Download URL: basion_agent-0.4.0.tar.gz
- Upload date:
- Size: 93.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
84903165a7105f04aaeaa0dbdabcc5d5aecd401f4407ca3adce3c458d6aa7340
|
|
| MD5 |
032903532e6dcba2381f78d6d4fcfa88
|
|
| BLAKE2b-256 |
f978773416ffbc072bcdde178d62b77aa2f8a5914e939ad95f59a5638a7616b1
|
File details
Details for the file basion_agent-0.4.0-py3-none-any.whl.
File metadata
- Download URL: basion_agent-0.4.0-py3-none-any.whl
- Upload date:
- Size: 65.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7c536c4ba3d319f183feb1d229adf909a9095fe687f6e2e1809ecf9e5fb6eae5
|
|
| MD5 |
90ce5016363eb34c412d9e0bf6a874bc
|
|
| BLAKE2b-256 |
9a573cfa22554e57711b02c2cea56ee3184a5ceaa2dc734f5ee969fff4535f03
|