Skip to main content

Python SDK for Basion AI Agent framework - handles agent registration, message consumption, and streaming responses

Project description

Basion Agent SDK

Python SDK for building AI agents in the Basion AI platform. Provides agent registration, message handling via Kafka (through Agent Gateway), streaming responses, and integrations with LangGraph and Pydantic AI.

Overview

The Basion Agent SDK (basion-agent) enables developers to build AI agents that integrate with the Basion AI Management platform. Agents register themselves, receive messages through Kafka topics, and stream responses back to users.

Key Features

  • Agent Registration: Automatic registration with AI Inventory
  • Message Handling: Decorator-based handlers with sender filtering
  • Response Streaming: Chunked responses via Kafka/Centrifugo
  • Structural Streaming: Rich UI components (Artifacts, Surfaces, TextBlocks, Steppers)
  • Conversation History: Access to message history via Conversation Store
  • Memory V2 (mem0): Long-term memory via mem0.ai Cloud — ingest and semantic search
  • Memory (deprecated): Semantic search over long-term user and conversation memory
  • Attachments: Download and process file attachments (images, PDFs, etc.)
  • Knowledge Graph: Query biomedical knowledge graphs (diseases, proteins, phenotypes)
  • Remote Logging: Send logs to Loki via the gateway for centralized monitoring
  • LangGraph Integration: HTTP-based checkpoint saver for LangGraph graphs
  • Pydantic AI Integration: Persistent message history for Pydantic AI agents
  • CLI: Run agents with basion-agent run main:app
  • Error Handling: Automatic error responses to users on handler failures

Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                           Your Agent Application                            │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│    BasionAgentApp                                                            │
│    ├── register_me() → Agent                                                │
│    │   ├── @on_message decorator                                            │
│    │   ├── streamer() → Streamer                                            │
│    │   │   └── stream_by() → Structural (Artifact, Surface, TextBlock, etc) │
│    │   └── tools → Tools                                                    │
│    │       └── knowledge_graph → KnowledgeGraphTool                         │
│    └── run() → Start consume loop                                           │
│                                                                             │
│    Message Context:                                                         │
│    ├── message.conversation → Conversation (history, metadata)              │
│    ├── message.memory_v2 → MemoryV2 (mem0 ingest + search)                 │
│    ├── message.memory → Memory (semantic search) [deprecated]               │
│    └── message.attachments → List[AttachmentInfo] (file downloads)          │
│                                                                             │
│    Extensions:                                                              │
│    ├── HTTPCheckpointSaver (LangGraph)                                      │
│    └── PydanticAIMessageStore (Pydantic AI)                                 │
│                                                                             │
└──────────────────────────────────┬──────────────────────────────────────────┘
                                   │ gRPC (Kafka) + HTTP (APIs)
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                           Agent Gateway                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│  gRPC: AgentStream (bidirectional)      HTTP: /s/{service}/* proxy          │
│  - Auth                                 - /s/ai-inventory/*                 │
│  - Subscribe/Unsubscribe                - /s/conversation-store/*           │
│  - Produce/Consume messages             - /s/ai-memory/*                    │
│                                         - /s/attachment/*                    │
│                                         - /s/memory/* (mem0)                │
│                                         - /s/knowledge-graph/*              │
│                                         - /loki/api/v1/push (logging)       │
└─────────────────┬───────────────────────────────────┬───────────────────────┘
                  │                                   │
                  ▼                                   ▼
           ┌──────────────┐                  ┌────────────────────┐
           │    Kafka     │                  │   AI Inventory /   │
           │  {agent}.inbox │                │ Conversation Store │
           └──────────────┘                  │ AI Memory / KG     │
                                             └────────────────────┘

Message Flow

sequenceDiagram
    participant User
    participant Provider
    participant Router
    participant Gateway as Agent Gateway
    participant Agent as Your Agent
    participant ConvStore as Conversation Store

    User->>Provider: Send message
    Provider->>Router: Kafka: router.inbox
    Router->>Gateway: Kafka: {agent}.inbox
    Gateway->>Agent: gRPC stream: message

    Agent->>ConvStore: Get conversation history
    ConvStore-->>Agent: Message history

    loop Streaming Response
        Agent->>Gateway: gRPC stream: content chunk
        Gateway->>Router: Kafka: router.inbox
        Router->>Provider: Kafka: user.inbox
        Provider->>User: WebSocket: chunk
    end

    Agent->>Gateway: gRPC stream: done=true

Installation

# Basic installation
pip install basion-agent

# With LangGraph support
pip install basion-agent[langgraph]

# With Pydantic AI support
pip install basion-agent[pydantic]

# Development installation
pip install -e ".[dev]"

Quick Start

from basion_agent import BasionAgentApp

# Initialize the app
app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key"
)

# Register an agent
agent = app.register_me(
    name="my-assistant",
    about="A helpful AI assistant",
    document="Answers general questions and provides helpful information.",
    representation_name="My Assistant"
)

# Handle messages
@agent.on_message
async def handle_message(message, sender):
    # Access conversation history
    history = await message.conversation.get_history(limit=10)

    # Stream response
    async with agent.streamer(message) as s:
        s.stream("Hello! ")
        s.stream("How can I help you today?")

# Run the agent
app.run()

CLI

Run agents using uvicorn-style import strings:

# Run 'app' from main.py
basion-agent run main:app

# Run 'app' from main.py (defaults to :app)
basion-agent run main

# Run 'application' from myagent.py
basion-agent run myagent:application

# Show version
basion-agent version

Your agent file should define a BasionAgentApp instance:

# main.py
app = BasionAgentApp(gateway_url="...", api_key="...")
agent = app.register_me(name="my-agent", ...)

@agent.on_message
async def handle(message, sender):
    ...

Configuration

Environment Variables

Variable Description Default
GATEWAY_URL Agent Gateway endpoint Required
GATEWAY_API_KEY API key for authentication Required

BasionAgentApp Options

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",    # Gateway endpoint
    api_key="key",                        # Authentication key
    heartbeat_interval=60,                # Heartbeat frequency (seconds)
    max_concurrent_tasks=100,             # Max concurrent message handlers
    error_message_template="...",         # Error message sent to users
    secure=False,                         # Use TLS for gRPC and HTTPS for HTTP
    enable_remote_logging=False,          # Send logs to Loki via gateway
    remote_log_level=logging.INFO,        # Min log level for remote logging
    remote_log_batch_size=100,            # Logs per batch
    remote_log_flush_interval=5.0,        # Seconds between flushes
)

API Reference

BasionAgentApp

Main application class for initializing and running agents.

app = BasionAgentApp(gateway_url, api_key)

# Register an agent
agent = app.register_me(
    name="agent-name",           # Unique identifier (used for routing)
    about="Short description",   # Brief description for agent selection
    document="Full docs...",     # Detailed documentation
    representation_name="Name",  # Display name (optional)
    metadata={"key": "value"},   # Additional metadata (optional)
    category_name="my-category", # Category in kebab-case (optional, auto-created)
    tag_names=["tag-1", "tag-2"],# Tags in kebab-case (optional, auto-created)
    example_prompts=["Ask me anything"],  # Example prompts for users (optional)
    is_experimental=False,       # Mark as experimental (optional)
    force_update=False,          # Bypass content hash check (optional)
    base_url="http://...",       # Base URL for agent's frontend service (optional)
    related_pages=[              # Related pages (optional)
        {"name": "Docs", "endpoint": "/docs"}
    ],
)

# Start consuming messages
app.run()  # Blocks until shutdown

Agent

Handles message registration and response streaming.

# Register message handler (all senders)
@agent.on_message
async def handle(message, sender):
    pass

# Filter by sender
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    pass

# Exclude sender
@agent.on_message(senders=["~other-agent"])
async def handle_not_other(message, sender):
    pass

Message

Represents an incoming message with conversation context, memory, and attachments.

@agent.on_message
async def handle(message, sender):
    message.content             # Message content
    message.conversation_id     # Conversation ID
    message.user_id             # User ID
    message.metadata            # Optional message metadata (dict)
    message.schema              # Optional message schema (dict)

    # Conversation history
    history = await message.conversation.get_history(limit=10)

    # Memory V2 (mem0) — ingest and search long-term memory
    if message.memory_v2:
        await message.memory_v2.ingest(role="user", content=message.content)
        results = await message.memory_v2.search(query="diagnosis")

    # Memory (deprecated — use memory_v2 instead)
    results = await message.memory.query_about_user("diagnosis", limit=5)

    # Attachments
    if message.has_attachments():
        count = message.get_attachment_count()
        attachments = message.get_attachments()

        # Download first attachment
        data = await message.get_attachment_bytes()
        base64_str = await message.get_attachment_base64()
        buffer = await message.get_attachment_buffer()

        # Download specific attachment by index
        data = await message.get_attachment_bytes_at(1)

        # Download all attachments at once
        all_bytes = await message.get_all_attachment_bytes()
        all_base64 = await message.get_all_attachment_base64()

        # Inspect attachment metadata
        for att in attachments:
            att.filename        # "document.pdf"
            att.content_type    # "application/pdf"
            att.size            # bytes
            att.url             # download URL
            att.file_extension  # "pdf"
            att.file_type       # "pdf"
            att.is_image()      # True/False
            att.is_pdf()        # True/False

Conversation

Access conversation history and metadata.

@agent.on_message
async def handle(message, sender):
    conv = message.conversation

    # Get message history
    history = await conv.get_history(limit=10)
    history = await conv.get_history(role="user", limit=20, offset=0)

    # Get conversation metadata
    metadata = await conv.get_metadata()

    # Get messages where this agent was sender or recipient
    agent_history = await conv.get_agent_history(limit=50)
    agent_history = await conv.get_agent_history(agent_name="other-agent")

Memory V2 (mem0)

Long-term memory powered by mem0.ai. Automatically ingests messages and provides semantic search across a user's history. Accessed via message.memory_v2.

@agent.on_message
async def handle(message, sender):
    mem = message.memory_v2

    # Ingest the user's message into mem0
    await mem.ingest(role="user", content=message.content)

    # Search for relevant memories
    results = await mem.search(query="previous diagnosis")
    for r in results:
        r.get("memory")  # Extracted memory text

    # Ingest assistant response too
    response = "Here is my answer..."
    await mem.ingest(role="assistant", content=response)

Methods:

Method Description
ingest(role, content) Ingest a message into mem0. Role is 'user', 'assistant', or 'system'.
search(query) Semantic search across the user's mem0 memories. Returns a list of results.

How it works: The MemoryV2Client sends HTTP requests to the memory service (/s/memory/* via the gateway proxy), which wraps the mem0.ai Cloud API. Messages are ingested per-user and per-conversation. Search returns semantically relevant memories extracted by mem0.

Memory (Deprecated)

Note: message.memory is deprecated. Use message.memory_v2 (mem0) instead.

Semantic search over long-term user and conversation memory. Accessed via message.memory.

@agent.on_message
async def handle(message, sender):
    mem = message.memory

    # Search user's long-term memory
    results = await mem.query_about_user(
        query="previous diagnosis",
        limit=10,           # Max results (1-100)
        threshold=70,       # Similarity threshold 0-100
        context_messages=2, # Surrounding messages to include (0-20)
    )
    for r in results:
        r.message.content   # Matched message content
        r.score             # Similarity score
        r.context           # List of surrounding MemoryMessage objects

    # Search conversation memory
    results = await mem.query_about_conversation(
        query="what was discussed",
        limit=5,
    )

    # Get user summary (aggregated across all conversations)
    summary = await mem.get_user_summary()
    if summary:
        summary.text           # Summary text
        summary.message_count  # Total messages
        summary.last_updated   # Timestamp

Streamer

Streams response chunks back to the user (or another agent).

@agent.on_message
async def handle(message, sender):
    # Basic streaming (auto-finishes on exit)
    async with agent.streamer(message) as s:
        s.stream("Chunk 1...")
        s.stream("Chunk 2...")

    # Streaming with options
    async with agent.streamer(
        message,
        send_to="user",      # or another agent name
        awaiting=True,        # Set awaiting_route to this agent
    ) as s:
        # Non-persisted content (not saved to DB)
        s.stream("Thinking...", persist=False, event_type="thinking")

        # Persisted content
        s.stream("Here's my response...")

        # write() is an alias for stream()
        s.write("More content...")

        # Set metadata on the message
        s.set_message_metadata({"source": "search"})

        # Set response schema for forms
        s.set_response_schema({
            "type": "object",
            "properties": {
                "name": {"type": "string"}
            }
        })

    # Manual streaming (without context manager)
    s = agent.streamer(message)
    s.stream("Hello...")
    await s.finish()

Structural Streaming

Rich UI components streamed alongside text content. Use s.stream_by() to bind a structural component to the streamer.

Artifact

Artifacts represent files, images, or embeds that are generated and displayed. Artifact data is persisted to the database.

from basion_agent import Artifact

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        artifact = Artifact()

        # Show progress
        s.stream_by(artifact).generating("Creating chart...", progress=0.5)

        # Complete with result
        s.stream_by(artifact).done(
            url="https://example.com/chart.png",
            type="image",        # image, iframe, document, video, audio, code, link, file
            title="Sales Chart",
            description="Q4 sales data",
            metadata={"width": 800, "height": 600}
        )

        # Or signal an error
        # s.stream_by(artifact).error("Failed to generate chart")

        s.stream("Here's your chart!")

Surface

Surfaces are interactive embedded components (iframes, widgets). Similar API to Artifact.

from basion_agent import Surface

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        surface = Surface()
        s.stream_by(surface).generating("Loading widget...")
        s.stream_by(surface).done(
            url="https://example.com/calendar",
            type="iframe",
            title="Calendar Widget",
        )

TextBlock

Collapsible text blocks with streaming title/body and visual variants. TextBlock events are not persisted.

from basion_agent import TextBlock

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        block = TextBlock()

        # Set visual variant: thinking, note, warning, error, success
        s.stream_by(block).set_variant("thinking")

        # Stream title (appends)
        s.stream_by(block).stream_title("Deep ")
        s.stream_by(block).stream_title("Analysis...")

        # Stream body (appends)
        s.stream_by(block).stream_body("Step 1: Checking patterns\n")
        s.stream_by(block).stream_body("Step 2: Validating\n")

        # Replace title/body entirely
        s.stream_by(block).update_title("Analysis Complete")
        s.stream_by(block).update_body("All checks passed.")

        # Mark as done
        s.stream_by(block).done()

        s.stream("Analysis finished!")

Stepper

Multi-step progress indicators. Stepper events are not persisted.

from basion_agent import Stepper

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        stepper = Stepper(steps=["Fetch", "Process", "Report"])

        s.stream_by(stepper).start_step(0)
        # ... do work ...
        s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).start_step(1)
        # ... do work ...
        s.stream_by(stepper).complete_step(1)

        # Add a step dynamically
        s.stream_by(stepper).add_step("Verify")

        s.stream_by(stepper).start_step(2)
        s.stream_by(stepper).complete_step(2)

        s.stream_by(stepper).start_step(3)
        # Update label mid-step
        s.stream_by(stepper).update_step_label(3, "Verify (Final)")
        s.stream_by(stepper).complete_step(3)

        # Or signal failure
        # s.stream_by(stepper).fail_step(1, error="Timeout")

        s.stream_by(stepper).done()
        s.stream("All steps complete!")

Knowledge Graph (Tools)

Query biomedical knowledge graphs for diseases, proteins, phenotypes, drugs, and pathways. Accessed via agent.tools.knowledge_graph.

@agent.on_message
async def handle(message, sender):
    kg = agent.tools.knowledge_graph

    # Search diseases
    diseases = await kg.search_diseases(name="Huntington", limit=5)
    disease = await kg.get_disease(disease_id=123)

    # Search proteins/genes
    proteins = await kg.search_proteins(symbol="BRCA1", limit=10)

    # Search phenotypes (HPO terms)
    phenotypes = await kg.search_phenotypes(name="seizure", hpo_id="HP:0001250")

    # Search drugs
    drugs = await kg.search_drugs(name="aspirin")

    # Search pathways
    pathways = await kg.search_pathways(name="apoptosis")

    # Find similar diseases (by shared phenotypes)
    similar = await kg.find_similar_diseases("Huntington Disease", limit=10)
    for s in similar:
        s.disease_name       # Disease name
        s.similarity_score   # 0.0 - 1.0
        s.shared_count       # Number of shared phenotypes

    # Find similar diseases (by shared genes)
    similar = await kg.find_similar_diseases_by_genes("Huntington Disease")

    # Get entity connections
    edges = await kg.get_entity_network("BRCA1", "protein")
    for e in edges:
        e.source_id, e.source_type
        e.target_id, e.target_type
        e.relation_type

    # k-hop graph traversal
    subgraph = await kg.k_hop_traversal("BRCA1", "protein", k=2, limit_edges=100)

    # Shortest path between entities
    path = await kg.find_shortest_path(
        start_name="BRCA1", start_type="protein",
        end_name="Breast Cancer", end_type="disease",
        max_hops=5
    )
    for step in path:
        step.node_id, step.node_name, step.node_type, step.relation

Remote Logging (Loki)

Send agent logs to Loki via the gateway for centralized monitoring. Logs are batched and sent in the background.

import logging

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="key",
    enable_remote_logging=True,       # Enable Loki logging
    remote_log_level=logging.INFO,    # Min level (default: INFO)
    remote_log_batch_size=100,        # Logs per batch (default: 100)
    remote_log_flush_interval=5.0,    # Flush every N seconds (default: 5.0)
)

# Then use standard Python logging - it will be sent to Loki automatically
logger = logging.getLogger(__name__)
logger.info("Agent started", extra={"custom_field": "value"})

Extensions

LangGraph Integration

Use HTTPCheckpointSaver to persist LangGraph state via the Conversation Store checkpoint API.

from basion_agent import BasionAgentApp
from basion_agent.extensions.langgraph import HTTPCheckpointSaver
from langgraph.graph import StateGraph

app = BasionAgentApp(gateway_url="...", api_key="...")
checkpointer = HTTPCheckpointSaver(app=app)

# Define your LangGraph
graph = StateGraph(MyState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)

agent = app.register_me(name="langgraph-agent", ...)

@agent.on_message
async def handle(message, sender):
    config = {"configurable": {"thread_id": message.conversation_id}}

    async with agent.streamer(message) as s:
        # Graph state persists across messages via checkpointer
        result = await compiled.ainvoke(
            {"messages": [message.content]},
            config
        )
        s.stream(result["messages"][-1])

app.run()

Pydantic AI Integration

Use PydanticAIMessageStore to persist Pydantic AI message history.

from basion_agent import BasionAgentApp
from basion_agent.extensions.pydantic_ai import PydanticAIMessageStore
from pydantic_ai import Agent as PydanticAgent

app = BasionAgentApp(gateway_url="...", api_key="...")
store = PydanticAIMessageStore(app=app)

my_llm = PydanticAgent('openai:gpt-4o', system_prompt="You are helpful.")

agent = app.register_me(name="pydantic-agent", ...)

@agent.on_message
async def handle(message, sender):
    # Load previous messages
    history = await store.load(message.conversation_id)

    async with agent.streamer(message) as s:
        async with my_llm.run_stream(
            message.content,
            message_history=history
        ) as result:
            async for chunk in result.stream_text():
                s.stream(chunk)

        # Save updated history
        await store.save(message.conversation_id, result.all_messages())

app.run()

Advanced Usage

Agent-Initiated (Proactive) Conversations

Agents can proactively start new conversations with users — without waiting for the user to message first. This enables outreach scenarios like health check-ins, reminders, onboarding follow-ups, and scheduled notifications.

How It Works

sequenceDiagram
    participant Agent as Your Agent
    participant ConvStore as Conversation Store
    participant Gateway as Agent Gateway
    participant Router
    participant Provider
    participant User

    Agent->>ConvStore: POST /conversations/ (is_new=true, locked_by=agent)
    ConvStore-->>Agent: conversation_id

    loop Streaming First Message
        Agent->>Gateway: gRPC stream: content chunk
        Gateway->>Router: Kafka: router.inbox
        Router->>Provider: Kafka: user.inbox
        Provider->>User: WebSocket: chunk
    end

    Agent->>Gateway: gRPC stream: done=true
    Provider->>ConvStore: Persist message, unlock conversation

    Note over User: User sees new conversation in sidebar (bold)
    User->>Provider: Reply
    Provider->>Router: Kafka: router.inbox
    Router->>Agent: Kafka: {agent}.inbox (via awaiting_route)
    Note over Agent: Normal @on_message handler runs
  1. Agent creates conversation via the conversation store API with is_new=True, current_route, and locked_by set atomically.
  2. Agent streams the first message through the normal Kafka pipeline (router → user.inbox → provider → Centrifugo → user).
  3. Provider persists the assistant message and unlocks the conversation on done=true.
  4. User sees a new bold conversation in their sidebar (the is_new flag triggers bold rendering in the frontend).
  5. User replies — if awaiting=True was set, the reply is routed to the agent's @on_message handler via awaiting_route.

Basic Usage

agent = app.register_me(name="outreach-agent", ...)

@agent.on_message
async def handle_reply(message, sender):
    # User replied to our proactive conversation
    async with agent.streamer(message) as s:
        s.stream("Thanks for responding to my check-in!")

# Start a proactive conversation (e.g., from a scheduler, webhook, or API endpoint)
async def send_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Weekly Health Check-in",
        awaiting=True,
    )
    async with streamer as s:
        s.stream("Hi! It's time for your weekly check-in. How are you feeling today?")
    # When the user replies, handle_reply() runs

API Reference

async def start_conversation(
    user_id: str,
    title: str = "Agent-initiated conversation",
    awaiting: bool = False,
    response_schema: Optional[Dict[str, Any]] = None,
    message_metadata: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Streamer]:
Parameter Type Default Description
user_id str Required Target user UUID
title str "Agent-initiated conversation" Conversation title shown in sidebar
awaiting bool False If True, sets awaiting_route so user reply comes back to this agent
response_schema dict None JSON Schema for structured user response (renders a form)
message_metadata dict None Metadata attached to the streamed message
metadata dict None Metadata stored on the conversation itself

Returns: Tuple[str, Streamer](conversation_id, streamer)

With Response Schema (Form)

async def request_symptom_report(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Symptom Report",
        response_schema={
            "type": "object",
            "title": "How are you feeling?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "notes": {"type": "string", "title": "Additional Notes"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Please fill out this quick symptom report:")
    # User sees a form; their submission arrives in @on_message as JSON

With Message Metadata

conv_id, streamer = await agent.start_conversation(
    user_id=user_id,
    title="Appointment Reminder",
    message_metadata={"reminder_type": "appointment", "appointment_id": "abc-123"},
    metadata={"source": "scheduler", "scheduled_at": "2024-01-15T09:00:00Z"},
)
async with streamer as s:
    s.stream("Reminder: You have an appointment tomorrow at 10 AM.")

Key Behaviors

  • is_new=True is always set on agent-initiated conversations, causing them to appear bold in the user's sidebar until opened.
  • awaiting defaults to False. Set awaiting=True explicitly if you want the user's reply to route back to this agent's @on_message handler.
  • locked_by is set at creation to prevent race conditions, then automatically cleared by the provider when done=true is received.
  • current_route is preserved by the router when it detects an agent-to-user response (the router's isAgentToUserResponse check).
  • The streamer returned by start_conversation() works identically to agent.streamer(message) — supports stream(), write(), set_response_schema(), set_message_metadata(), stream_by() for structural elements, and the async with context manager.

Inter-Agent Communication

Agents can send messages to other agents using the send_to parameter.

@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    # Forward to specialist agent
    async with agent.streamer(message, send_to="specialist-agent") as s:
        s.stream("Forwarding your question to the specialist...")

@agent.on_message(senders=["specialist-agent"])
async def handle_specialist(message, sender):
    # Respond to user with specialist's answer
    async with agent.streamer(message, send_to="user") as s:
        s.stream(f"The specialist says: {message.content}")

Dynamic Forms with Response Schema

Request structured input from users using JSON Schema forms.

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message, awaiting=True) as s:
        s.stream("Please fill out this form:")
        s.set_response_schema({
            "type": "object",
            "title": "Contact Information",
            "properties": {
                "name": {"type": "string", "title": "Full Name"},
                "email": {"type": "string", "format": "email"},
                "message": {"type": "string", "title": "Message"}
            },
            "required": ["name", "email"]
        })

# When user submits form, message.content will be JSON
@agent.on_message
async def handle_form(message, sender):
    import json
    data = json.loads(message.content)
    name = data.get("name")
    # Process form data...

Error Handling

Customize error handling behavior:

app = BasionAgentApp(
    gateway_url="...",
    api_key="...",
    error_message_template="Sorry, something went wrong. Please try again."
)

agent = app.register_me(...)

# Disable automatic error responses
agent.send_error_responses = False

@agent.on_message
async def handle(message, sender):
    try:
        # Your logic
        pass
    except Exception as e:
        # Custom error handling
        async with agent.streamer(message) as s:
            s.stream(f"I encountered an issue: {str(e)}")

Project Structure

ai-framework/
├── src/
│   └── basion_agent/
│       ├── __init__.py              # Package exports
│       ├── app.py                   # BasionAgentApp
│       ├── agent.py                 # Agent class
│       ├── message.py               # Message class (attachments, memory)
│       ├── streamer.py              # Streamer class (stream_by, structural)
│       ├── conversation.py          # Conversation helper (history, metadata)
│       ├── conversation_client.py   # HTTP client for Conversation Store
│       ├── conversation_message.py  # ConversationMessage dataclass
│       ├── memory_v2.py              # Memory V2 context (mem0 ingest + search)
│       ├── memory_v2_client.py      # HTTP client for mem0 memory service
│       ├── memory.py                # Memory context (deprecated)
│       ├── memory_client.py         # HTTP client for AI Memory (deprecated)
│       ├── attachment_client.py     # HTTP client for attachments (download)
│       ├── checkpoint_client.py     # HTTP client for checkpoints
│       ├── agent_state_client.py    # HTTP client for agent state
│       ├── gateway_client.py        # gRPC client for Agent Gateway
│       ├── gateway_pb2.py           # Generated protobuf
│       ├── gateway_pb2_grpc.py      # Generated gRPC stubs
│       ├── heartbeat.py             # Heartbeat manager
│       ├── loki_handler.py          # Loki remote log handler
│       ├── cli.py                   # CLI (basion-agent run)
│       ├── exceptions.py            # Custom exceptions
│       ├── structural/
│       │   ├── __init__.py
│       │   ├── base.py              # StructuralStreamer base class
│       │   ├── artifact.py          # Artifact (image, file, iframe)
│       │   ├── surface.py           # Surface (interactive embeds)
│       │   ├── text_block.py        # TextBlock (collapsible text)
│       │   └── stepper.py           # Stepper (multi-step progress)
│       ├── tools/
│       │   ├── __init__.py
│       │   ├── container.py         # Tools container (lazy init)
│       │   └── knowledge_graph.py   # Knowledge Graph client
│       └── extensions/
│           ├── __init__.py
│           ├── langgraph.py         # LangGraph HTTPCheckpointSaver
│           └── pydantic_ai.py       # Pydantic AI MessageStore
├── pyproject.toml
└── README.md

Development

Running Tests

# Install dev dependencies
pip install -e ".[dev]"

# Run tests with coverage
pytest

# Run specific test file
pytest tests/test_agent.py -v

Regenerating Protobuf

If the gateway.proto file changes:

python -m grpc_tools.protoc \
    -I../../agent-gateway/proto \
    --python_out=src/basion_agent \
    --grpc_python_out=src/basion_agent \
    ../../agent-gateway/proto/gateway.proto

Dependencies

Package Purpose
grpcio gRPC communication with Agent Gateway
grpcio-tools Protobuf compilation
protobuf Message serialization
requests Sync HTTP for registration
aiohttp Async HTTP for runtime operations

Optional Dependencies

Package Install Command Purpose
langgraph pip install basion-agent[langgraph] LangGraph checkpoint integration
pydantic-ai pip install basion-agent[pydantic] Pydantic AI message history

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

basion_agent-0.6.0.tar.gz (110.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

basion_agent-0.6.0-py3-none-any.whl (71.1 kB view details)

Uploaded Python 3

File details

Details for the file basion_agent-0.6.0.tar.gz.

File metadata

  • Download URL: basion_agent-0.6.0.tar.gz
  • Upload date:
  • Size: 110.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for basion_agent-0.6.0.tar.gz
Algorithm Hash digest
SHA256 bac5dcbc49c8598085b57b694a18dc8d7c216af3a5a3c897deb0390ba54a60e7
MD5 8f63dfa5946e2a6c18fe5bad5e5ca997
BLAKE2b-256 332119f58fac4dad4739a6491d7c57e587c3fc41cfd2f961a3d0058e492f8f27

See more details on using hashes here.

File details

Details for the file basion_agent-0.6.0-py3-none-any.whl.

File metadata

  • Download URL: basion_agent-0.6.0-py3-none-any.whl
  • Upload date:
  • Size: 71.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for basion_agent-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4f82b2d3c0603a728efa23b60b170658ea44e953630280c657a7c30c8571b805
MD5 2c38f640165e6c893e706d0fd6f4e439
BLAKE2b-256 f0ee2a8894a28b53978c1852600c8b9b1be5980de3857ad8077e953ad0ac7120

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page