Skip to main content

Python SDK for Basion AI Agent framework - handles agent registration, message consumption, and streaming responses

Project description

Basion Agent SDK

Python SDK for building AI agents on the Basion platform. Agents register themselves, receive messages through Kafka, and stream responses back to users in real time.

Table of Contents

Installation

pip install basion-agent

# With LangGraph support
pip install basion-agent[langgraph]

# With Pydantic AI support
pip install basion-agent[pydantic]

Quick Start

from basion_agent import BasionAgentApp

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key",
)

agent = app.register_me(
    name="rare-disease-assistant",
    about="Answers questions about rare diseases, symptoms, and treatments",
    document="Rare disease specialist. Abilities: disease lookup, symptom analysis, genetic variant interpretation, finding similar conditions. Keywords: rare disease, genetics, orphan disease, diagnosis.",
)

@agent.on_message
async def handle(message, sender):
    history = await message.conversation.get_history(limit=10)

    async with agent.streamer(message) as s:
        s.stream(f"You asked: {message.content}\n\n")
        s.stream("Let me look into that for you...")

app.run()

That's all you need. When you call app.run(), the SDK registers your agent with the gateway, subscribes to {agent-name}.inbox on Kafka, starts a heartbeat loop, and begins consuming messages. Your @on_message handler fires for each incoming message.

CLI

# Run your agent
basion-agent run main:app

# Defaults to :app
basion-agent run main

# Custom app name
basion-agent run myagent:application

# Show version
basion-agent version

Configuration

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",       # Required
    api_key="key",                           # Required
    heartbeat_interval=60,                   # Heartbeat frequency in seconds
    max_concurrent_tasks=100,                # Max concurrent message handlers
    error_message_template="...",            # Error message sent to users on failure
    secure=False,                            # TLS for gRPC and HTTPS for HTTP
    enable_remote_logging=False,             # Send logs to Loki via gateway
    remote_log_level=logging.INFO,           # Min log level for remote logging
    remote_log_batch_size=100,               # Logs per batch
    remote_log_flush_interval=5.0,           # Seconds between flushes
)
Environment Variable Description
GATEWAY_URL Agent Gateway endpoint
GATEWAY_API_KEY API key for authentication

Agent Registration

agent = app.register_me(
    name="rare-disease-assistant",            # Unique identifier (kebab-case, used for Kafka routing)
    about="Rare disease medical assistant",   # Short description for agent selection (max 500 chars)
    document="""Rare disease specialist. Abilities: disease lookup, symptom analysis,
    genetic variant interpretation, finding similar conditions, drug info.
    Keywords: rare disease, genetics, orphan disease, diagnosis, phenotype, genotype.""",
    representation_name="Dr. Assistant",      # Display name (defaults to name)
    version="1.0.0",                          # Version string
    lifecycle_stage="public",                 # 'dev' | 'test' | 'private-preview' | 'public-preview' | 'public'
    welcome_message="Hello! I specialize in rare diseases. How can I help?",
    detailed_description="A comprehensive rare disease assistant...",
    base_url="http://...",                    # Base URL for frontend service (optional)
    metadata={"specialty": "rare-diseases"},  # Free-form metadata (optional)
    category_names=["my-body", "medical"],    # Categories in kebab-case (auto-created)
    prompts=[                                 # Suggested prompts shown to users
        {"label": "Huntington", "prompt": "What is Huntington disease?"},
        {"label": "Similar diseases", "prompt": "Find diseases similar to Cystic Fibrosis"},
    ],
    waitlist=False,                           # Not publicly available yet (optional)
    related_pages=[                           # Related pages (optional)
        {"name": "Resources", "endpoint": "/resources"}
    ],
    force_update=False,                       # Bypass content hash check (optional)
)
Parameter Type Default Description
name str required Unique agent name in kebab-case (used for Kafka routing)
about str required Short description (max 500 characters, used for agent selection)
document str required Used by the router to match user messages to your agent. Include keywords describing abilities and key capabilities so the router can properly route to your agent
representation_name str name Human-readable display name
version str None Version string (e.g., "1.0.0"). Included in content hash — changing version triggers re-registration
lifecycle_stage str 'dev' One of: dev, test, private-preview, public-preview, public
welcome_message str None Welcome text shown when a user starts a conversation
detailed_description str None Extended description beyond about
base_url str None Base URL for agent's frontend service (used for iframe artifact URLs)
metadata dict None Free-form metadata dict
category_names list[str] None Categories in kebab-case (auto-created if they don't exist)
prompts list[dict] None List of {"label": "...", "prompt": "..."} objects shown to users
waitlist bool None If True, agent is not publicly available
related_pages list[dict] None Pages with name and endpoint keys (supports {conversation_id} placeholder)
force_update bool None Bypass content hash check and always update

Message Handling

Basic Handler

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        s.stream("Hello! How can I help?")

Handlers can be async or synchronous. Sync handlers run in a thread pool automatically.

Sender Filtering

# Handle messages from users only
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    ...

# Handle messages from a specific agent
@agent.on_message(senders=["triage-agent"])
async def handle_triage(message, sender):
    ...

# Exclude a specific sender
@agent.on_message(senders=["~notification-agent"])
async def handle_others(message, sender):
    ...

Designing Handlers for Agents vs Users

User messages and agent messages often require fundamentally different handling. A user message goes through your full conversation flow — history loading, memory, LLM calls. But when another agent calls you via agent.call(), it expects a focused, structured answer — not a full conversation experience.

Scenario: A pharmacology agent with both user and agent interfaces.

# User-facing handler — full conversational flow with history, memory, streaming
@pharma_agent.on_message(senders=["user"])
async def handle_user(message, sender):
    history = await message.conversation.get_agent_history(limit=20)
    memories = await message.memory_v2.search(message.content)

    async with pharma_agent.streamer(message) as s:
        s.stream(generate_conversational_response(message.content, history, memories))

# Agent-facing handler — concise, structured response for agent.call()
@pharma_agent.on_message(senders=["coordinator-agent", "wellness-agent"])
async def handle_agent_call(message, sender):
    # No need for history/memory — the calling agent provides all context
    # Return a focused, structured answer
    async with pharma_agent.streamer(message) as s:
        s.stream(lookup_drug_interactions(message.content))

Tip: When an agent calls yours via agent.call(), the response is automatically routed back to the caller — no special code needed. But if you have a dedicated senders handler, you can tailor the response format to what the calling agent expects (e.g., structured data vs conversational prose).

Error Handling

If a handler throws, the SDK automatically sends an error message to the user. Customize or disable this:

# Custom error message
app = BasionAgentApp(
    ...,
    error_message_template="Sorry, something went wrong. Please try again.",
)

# Or disable per-agent
agent.send_error_responses = False

@agent.on_message
async def handle(message, sender):
    try:
        ...
    except Exception as e:
        async with agent.streamer(message) as s:
            s.stream(f"I ran into an issue: {e}")

Message

The message object provides access to content, conversation history, memory, and attachments.

@agent.on_message
async def handle(message, sender):
    message.content              # Message text
    message.conversation_id      # Conversation UUID
    message.user_id              # User UUID
    message.metadata             # Optional dict (from frontend)
    message.schema               # Optional dict (for form responses)
    message.headers              # Raw Kafka headers
    message.conversation         # Conversation helper
    message.memory_v2            # mem0 memory (ingest + search)
    message.memory               # Deprecated memory (use memory_v2)

    # Inter-agent communication
    message.hand_off("agent-name")              # Forward to another agent
    message.hand_off("agent-name", "content")   # Forward with custom content

Streaming Responses

Basic Streaming

@agent.on_message
async def handle(message, sender):
    # Context manager — auto-finishes on exit
    async with agent.streamer(message) as s:
        s.stream("Looking up information on Huntington disease...\n\n")
        s.stream("Huntington disease is a progressive neurodegenerative disorder...")

    # Or manual control
    s = agent.streamer(message)
    s.stream("Hello!")
    await s.finish()

s.write(content) is available as an alias for s.stream(content).

Streamer Options

# Set awaiting — next user reply routes back to this agent
async with agent.streamer(message, awaiting=True) as s:
    s.stream("What symptoms are you experiencing?")

When to Use awaiting=True

Use awaiting=True when you want to keep the user talking to your agent and bypass the router for the next message. This is powerful but requires care — you need to release the conversation at some point, or the user gets stuck with your agent forever.

Scenario: Multi-turn data collection. A diagnostic agent asks the user a series of questions. Each reply must come back to the same agent, not get rerouted by the router to a different specialist.

@diagnostic_agent.on_message
async def handle(message, sender):
    history = await message.conversation.get_agent_history(limit=10)

    if len(history) == 0:
        # First interaction — ask the first question
        async with diagnostic_agent.streamer(message, awaiting=True) as s:
            s.stream("I'll help assess your symptoms. How long have you been experiencing this?")
        return

    if len(history) < 4:
        # Still collecting — keep awaiting
        async with diagnostic_agent.streamer(message, awaiting=True) as s:
            s.stream(ask_next_question(history, message.content))
        return

    # Collection done — respond WITHOUT awaiting to release back to router
    async with diagnostic_agent.streamer(message) as s:
        s.stream(generate_assessment(history, message.content))

Scenario: Awaiting + hand-off for off-topic messages. You hold the conversation with awaiting=True, but if the user asks something unrelated, you hand off to the guide agent instead of trying to answer it yourself.

@nutrition_agent.on_message
async def handle(message, sender):
    if not is_nutrition_related(message.content):
        # Not our topic — hand off to the guide agent who can reroute properly
        message.hand_off("guide-agent", message.content)
        return

    # On-topic — keep the conversation
    async with nutrition_agent.streamer(message, awaiting=True) as s:
        s.stream(generate_nutrition_advice(message.content))

Tip: Always have an exit path. If you set awaiting=True indefinitely without a way out, the user cannot reach other agents. Common exits: (1) stop setting awaiting after your task is done, (2) hand off to guide-agent when the topic doesn't match yours, or (3) use the agent inventory to find a better agent and hand off.

Non-Persisted Content

Chunks that appear in real-time but don't get saved to conversation history:

async with agent.streamer(message) as s:
    s.stream("Searching knowledge graph...", persist=False, event_type="thinking")
    # ... do work ...
    s.stream("Here are the results:")  # This gets persisted

Message Metadata

Attach metadata to the response message:

async with agent.streamer(message) as s:
    s.set_message_metadata({"source": "knowledge_graph", "confidence": 0.92})
    s.stream("Based on the knowledge graph, ...")

Response Schema (genui Forms)

Request structured input from users with genui — a declarative form builder. Define form classes with typed fields, send them to the frontend, and parse typed responses back.

Quick Start

from basion_agent.genui import Form, Text, Slider, MultiSelect, Switch, Option

class SymptomReport(Form):
    name = Text(label="Full Name", placeholder="John Doe")
    severity = Slider(label="Pain Level", min=1, max=10)
    symptoms = MultiSelect(label="Symptoms", options=[
        Option(value="headache", label="Headache"),
        Option(value="fatigue", label="Fatigue"),
        Option(value="nausea", label="Nausea"),
    ])
    chronic = Switch(label="Is this chronic?")

Send to frontend:

@agent.on_message
async def handle(message, sender):
    # Check if this is a form submission
    if message.schema:
        data = await SymptomReport.parse(message)
        async with agent.streamer(message) as s:
            s.stream(f"Thank you, {data.name}. Pain level: {data.severity}/10.")
        return

    # Ask for structured input
    async with agent.streamer(message, awaiting=True) as s:
        s.stream("Please describe your symptoms:")
        form = SymptomReport(title="Symptom Report", description="Tell us about your symptoms")
        s.set_response_schema(form)

How it works:

  1. Declare a Form subclass with field class attributes
  2. Instantiate with metadata: form = MyForm(title="...", description="...")
  3. streamer.set_response_schema(form) sends the schema to the frontend
  4. When the user submits, await MyForm.parse(message) returns a typed Python object with IDE autocomplete

Field Types

All fields accept required=True (default). All arguments are keyword-only.

Field Python Type None Default Description
Text str "" Single/multiline text input
Number float 0.0 Numeric input with min/max/step
Select str "" Single-select dropdown
MultiSelect List[str] [] Multiple selection
Checkbox bool False Single boolean checkbox
CheckboxGroup List[str] [] Multiple checkboxes as a group
Switch bool False Toggle switch
Slider float 0.0 Range slider
DatePicker str "" Date selection
Hidden Any None Hidden field (not rendered)
FileField AttachmentInfo None File upload field

Text:

name = Text(label="Full Name", placeholder="John Doe", min_length=2, max_length=100)
bio = Text(label="Biography", multiline=True, required=False)

Number:

age = Number(label="Age", min=0, max=150, step=1)

Select / MultiSelect:

color = Select(label="Color", options=[
    Option(value="red", label="Red"),
    Option(value="blue", label="Blue"),
], placeholder="Pick one...", allow_custom=True)

symptoms = MultiSelect(label="Symptoms", options=[
    Option(value="headache", label="Headache"),
    Option(value="fatigue", label="Fatigue"),
], min_selections=1, max_selections=5, allow_custom=True)

Checkbox / CheckboxGroup / Switch:

agree = Checkbox(label="I agree to the terms")
languages = CheckboxGroup(label="Languages", options=[
    Option(value="en", label="English"),
    Option(value="es", label="Spanish"),
], min_selections=1)
notifications = Switch(label="Enable notifications", default=True)

Slider / DatePicker:

pain = Slider(label="Pain Level", min=0, max=10, step=1, default=5)
dob = DatePicker(label="Date of Birth", min_date="1900-01-01", max_date="2026-12-31")

Hidden / FileField:

session_id = Hidden(label="Session", value="abc-123", required=False)
doc = FileField(label="Document", accept=["application/pdf", "image/*"], max_size=10_000_000)

Form

Single-step form container. Metadata is passed via __init__ (all keyword-only):

class Feedback(Form):
    comment = Text(label="Comment", multiline=True)

form = Feedback(
    title="Feedback",
    description="Tell us what you think",
    submit_label="Send Feedback",
    allow_cancel=True,
    cancel_label="Dismiss",
)
streamer.set_response_schema(form)
Parameter Type Default Description
title str required Form heading
description str None Subtitle/description text
submit_label str "Submit" Submit button text
allow_cancel bool False Show a cancel button
cancel_label str "Cancel" Cancel button text

MultiStepForm

Multi-step wizard. Define Step subclasses with fields, then use them as class attributes:

from basion_agent.genui import MultiStepForm, Step, Text, Number, Slider, Checkbox

class Demographics(Step):
    name = Text(label="Full Name")
    age = Number(label="Age", min=0, max=150)

class PainAssessment(Step):
    pain_level = Slider(label="Pain Level", min=0, max=10)
    chronic = Checkbox(label="Is this chronic?")

class PatientIntake(MultiStepForm):
    demographics = Demographics(label="Basic Info", description="Your basic details")
    pain = PainAssessment(label="Pain Assessment")

form = PatientIntake(title="Patient Intake", description="Please complete all steps")
streamer.set_response_schema(form)

Field names must be unique across all steps. The frontend submits all fields as a flat dict.

Confirmation

A yes/no decision card — no input fields:

from basion_agent.genui import Confirmation

confirm = Confirmation(
    title="Delete Account",
    message="Are you sure? This cannot be undone.",
    confirm_label="Delete",
    cancel_label="Keep Account",
    variant="destructive",  # "default", "destructive", "warning", "info"
)
streamer.set_response_schema(confirm)

# Parse the response
result = await Confirmation.parse(message)
result.confirmed  # bool — True if confirmed, False if cancelled

Parsing

parse() is always async and is a classmethod. It accepts Message, JSON string, or dict:

data = await MyForm.parse(message)          # From Message (most common)
data = await MyForm.parse('{"name": "Alice"}')  # From JSON string
data = await MyForm.parse({"name": "Alice"})    # From dict

data.name       # str — typed access with IDE autocomplete
data.severity   # float

Missing fields get their type's default value. Extra keys are silently ignored.

For FileField, when parsing from a Message, the attachment client is automatically extracted:

class UploadForm(Form):
    doc = FileField(label="Document")

data = await UploadForm.parse(message)
data.doc  # AttachmentInfo object

Form Cancellation

When a form has allow_cancel=True, the user can dismiss it and type a plain text message instead. The provider detects this automatically: if a pending_response_schema exists but the message content is not valid JSON, it marks the message with formCancelled: True in metadata.

@agent.on_message
async def handle(message, sender):
    if message.schema:
        if message.form_cancelled:
            # User cancelled the form and typed plain text
            async with agent.streamer(message) as s:
                s.stream("No problem! How can I help you?")
            return

        # Normal form submission — parse and validate
        data = await SymptomReport.parse(message)
        result = SymptomReport.validate(data)
        if not result.is_valid:
            async with agent.streamer(message) as s:
                s.stream("Some fields are invalid:\n")
                for field_name, error in result.errors.items():
                    s.stream(f"- {field_name}: {error}\n")
            return
        # ... handle valid data

Conversation History

@agent.on_message
async def handle(message, sender):
    conv = message.conversation

    # Get message history
    history = await conv.get_history(limit=20)
    history = await conv.get_history(role="user", limit=10, offset=0)

    # Get conversation metadata
    metadata = await conv.get_metadata()

    # Get messages where this agent was sender or recipient
    agent_history = await conv.get_agent_history(limit=50)
    agent_history = await conv.get_agent_history(agent_name="triage-agent")

Memory V2 (mem0)

Long-term memory powered by mem0.ai. Ingest messages and semantically search across a user's history.

@agent.on_message
async def handle(message, sender):
    mem = message.memory_v2

    # Ingest the user's message
    await mem.ingest(role="user", content=message.content)

    # Search for relevant past context
    results = await mem.search(query="diagnosis history")
    for r in results:
        memory_text = r.get("memory")  # Extracted memory text

    async with agent.streamer(message) as s:
        if results:
            s.stream("Based on what I remember:\n")
            for r in results:
                s.stream(f"- {r.get('memory')}\n")
            s.stream("\n")
        s.stream(f"Regarding your question: {message.content}")

    # Ingest the assistant's response too
    await mem.ingest(role="assistant", content="...")
Method Description
ingest(role, content) Ingest a message. Role: 'user', 'assistant', or 'system'.
search(query) Semantic search across the user's memories. Returns a list of dicts.

Attachments

Download and process file attachments (images, PDFs, etc.).

@agent.on_message
async def handle(message, sender):
    if not message.has_attachments():
        return

    count = message.get_attachment_count()
    attachments = message.get_attachments()

    async with agent.streamer(message) as s:
        s.stream(f"Received {count} file(s).\n\n")

        for i, att in enumerate(attachments):
            s.stream(f"**{att.filename}** ({att.content_type}, {att.size} bytes)\n")

            if att.is_image():
                base64_str = await message.get_attachment_base64_at(i)
                # Send to vision model...

            elif att.is_pdf():
                pdf_bytes = await message.get_attachment_bytes_at(i)
                # Parse PDF...

        # Or download everything at once
        all_bytes = await message.get_all_attachment_bytes()
        all_base64 = await message.get_all_attachment_base64()

Attachment Methods

Method Returns Description
has_attachments() bool Whether the message has any attachments
get_attachment_count() int Number of attachments
get_attachments() List[AttachmentInfo] List of attachment metadata
get_attachment_bytes() bytes Download first attachment as bytes
get_attachment_base64() str Download first attachment as base64
get_attachment_buffer() BytesIO Download first attachment as BytesIO
get_attachment_bytes_at(i) bytes Download attachment at index i
get_attachment_base64_at(i) str Download attachment at index i as base64
get_attachment_buffer_at(i) BytesIO Download attachment at index i as BytesIO
get_all_attachment_bytes() List[bytes] Download all attachments
get_all_attachment_base64() List[str] Download all as base64
get_all_attachment_buffers() List[BytesIO] Download all as BytesIO

AttachmentInfo Properties

Property Type Example
filename str "genetic_report.pdf"
content_type str "application/pdf"
size int 524288
url str Download URL
file_extension str "pdf"
file_type str "pdf"
is_image() bool True for image MIME types
is_pdf() bool True for PDF files

Structural Streaming

Rich UI components streamed alongside text. Bind a structural component to the streamer with stream_by().

Artifact

Files, images, or embeds with generation progress. Artifact data is persisted to the database.

from basion_agent import Artifact

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        artifact = Artifact()

        # Show progress
        s.stream_by(artifact).generating("Generating genetic pathway diagram...", progress=0.3)
        # ... do work ...
        s.stream_by(artifact).generating("Rendering...", progress=0.8)

        # Complete with result
        s.stream_by(artifact).done(
            url="https://example.com/pathway-diagram.png",
            type="image",
            title="HTT Gene Pathway",
            description="Huntingtin protein interaction network",
            metadata={"width": 1200, "height": 800},
        )

        # Or signal an error
        # s.stream_by(artifact).error("Failed to generate diagram")

        s.stream("Here's the pathway diagram for the HTT gene.")

Artifact types: image, iframe, document, video, audio, code, link, file

Surface

Interactive embedded components (iframes, widgets).

from basion_agent import Surface

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        surface = Surface()
        s.stream_by(surface).generating("Loading appointment scheduler...")
        s.stream_by(surface).done(
            url="https://cal.com/embed/dr-smith",
            type="iframe",
            title="Schedule Genetic Counseling",
            description="Book a session with a genetic counselor",
        )
        s.stream("You can schedule your genetic counseling session above.")

TextBlock

Collapsible text blocks with streaming title/body and visual variants. TextBlock events are not persisted.

from basion_agent import TextBlock

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        block = TextBlock()

        # Set visual variant
        s.stream_by(block).set_variant("thinking")

        # Stream title (appends)
        s.stream_by(block).stream_title("Analyzing ")
        s.stream_by(block).stream_title("symptoms...")

        # Stream body (appends)
        s.stream_by(block).stream_body("Checking symptom database...\n")
        s.stream_by(block).stream_body("Cross-referencing with HPO ontology...\n")
        s.stream_by(block).stream_body("Matching against known phenotypes...\n")

        # Replace title/body entirely
        s.stream_by(block).update_title("Analysis Complete")
        s.stream_by(block).update_body("Found 3 matching conditions.")

        # Mark as done
        s.stream_by(block).done()

        s.stream("Based on the symptoms, here are possible conditions...")

Variants: thinking, note, warning, error, success

Stepper

Multi-step progress indicators. Stepper events are not persisted.

from basion_agent import Stepper

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        stepper = Stepper(steps=[
            "Search diseases",
            "Analyze phenotypes",
            "Find similar conditions",
            "Generate report",
        ])

        s.stream_by(stepper).start_step(0)
        diseases = await kg.search_diseases(name="Huntington")
        s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).start_step(1)
        phenotypes = await kg.search_phenotypes(name="chorea")
        s.stream_by(stepper).complete_step(1)

        s.stream_by(stepper).start_step(2)
        similar = await kg.find_similar_diseases("Huntington Disease")
        s.stream_by(stepper).complete_step(2)

        # Add a step dynamically
        s.stream_by(stepper).add_step("Cross-reference")
        s.stream_by(stepper).start_step(4)
        s.stream_by(stepper).update_step_label(4, "Cross-reference (final)")
        s.stream_by(stepper).complete_step(4)

        s.stream_by(stepper).start_step(3)
        # Or mark a step as failed
        # s.stream_by(stepper).fail_step(3, error="Report generation timed out")
        s.stream_by(stepper).complete_step(3)

        s.stream_by(stepper).done()
        s.stream("Here's your rare disease report...")

Generative UI Message Components

Rich UI components that are persisted as the message content. Unlike structural streaming (which is ephemeral side-channel UI) and response schemas (which replace the chat input), genui message components are the actual message — stored in the database and rendered by the frontend as structured cards, accordions, etc.

Overview

Feature Method Persisted? Renders As
Text streaming s.stream() Yes Markdown text
Structural events s.stream_by() No (except Artifact) Stepper, TextBlock, etc.
Response schemas s.set_response_schema() No Form replacing chat input
Genui components s.generate_ui() Yes Card, Accordion, etc.

Quick Start

from basion_agent.genui import Card, CardSection, CardButton

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        s.generate_ui(Card(
            title="Lab Results",
            body="Blood work completed successfully.",
            variant="success",
            sections=[
                CardSection(label="Hemoglobin", value="14.2 g/dL"),
                CardSection(label="WBC", value="7,500 /uL"),
            ],
        ))

The message is stored with is_genui=True and content as a JSON array of component dicts. The frontend parses and renders them as structured UI instead of markdown.

Card

A card component for displaying structured information with optional sections, image, and button.

from basion_agent.genui import Card, CardSection, CardButton

Card(
    title="Patient Summary",                    # Required — card heading
    body="Overview of current status.",          # Optional — description text
    variant="info",                             # Optional — visual style
    sections=[                                  # Optional — key-value pairs
        CardSection(label="Name", value="John Doe"),
        CardSection(label="Age", value="42"),
        CardSection(label="Status", value="Active"),
    ],
    image="https://example.com/photo.jpg",      # Optional — image URL at top
    button=CardButton(                          # Optional — link button at bottom
        label="View Full Report",
        url="https://example.com/report/123",
    ),
)
Parameter Type Default Description
title str required Card heading
body str "" Body text / description
variant str "default" Visual style: default, success, warning, error, info
sections List[CardSection] None Key-value pairs displayed in the card
image str None Image URL displayed at the top of the card
button CardButton None Link button at the bottom of the card

CardSection — a key-value pair:

Parameter Type Description
label str Label/key text
value str Value text

CardButton — a link button:

Parameter Type Description
label str Button text
url str URL the button opens

Accordion

A collapsible accordion component with expandable items.

from basion_agent.genui import Accordion, AccordionItem

Accordion(
    title="Frequently Asked Questions",         # Required — heading
    items=[                                     # Required — at least one item
        AccordionItem(
            title="What is this service?",
            body="A multi-agent AI assistant for healthcare tasks.",
        ),
        AccordionItem(
            title="How do I submit a form?",
            body="Fill in the fields and press Submit.",
        ),
    ],
)
Parameter Type Default Description
title str required Accordion heading
items List[AccordionItem] required Expandable items (min 1)

AccordionItem:

Parameter Type Description
title str Item heading (clickable)
body str Content revealed when expanded

Multiple Components in One Message

Call generate_ui() multiple times to send several components as a single message:

async with agent.streamer(message) as s:
    s.generate_ui(Card(
        title="Lab Results",
        body="Blood work completed on 2026-02-11.",
        variant="success",
        sections=[
            CardSection(label="Hemoglobin", value="14.2 g/dL (Normal)"),
            CardSection(label="WBC", value="7,500 /uL (Normal)"),
        ],
    ))
    s.generate_ui(Accordion(
        title="Detailed Breakdown",
        items=[
            AccordionItem(title="Complete Blood Count", body="All values within normal range."),
            AccordionItem(title="Metabolic Panel", body="Glucose: 95 mg/dL — normal."),
        ],
    ))

Both components are stored as a single message with content = [card_dict, accordion_dict].

Mutual Exclusion with stream()

generate_ui() and stream() cannot be mixed in the same message — they are mutually exclusive modes:

# OK — genui only
async with agent.streamer(message) as s:
    s.generate_ui(Card(title="Result", body="Done."))

# OK — text only
async with agent.streamer(message) as s:
    s.stream("Here are the results in plain text...")

# ERROR — mixing modes raises RuntimeError
async with agent.streamer(message) as s:
    s.stream("Some text")
    s.generate_ui(Card(title="Oops"))  # RuntimeError!

Combining with Structural Events

Structural events (stream_by()) work alongside generate_ui() because they are non-persisted side-channel UI:

from basion_agent import Stepper
from basion_agent.genui import Card, CardSection

async with agent.streamer(message) as s:
    stepper = Stepper(steps=["Fetch data", "Analyze", "Build card"])

    s.stream_by(stepper).start_step(0)
    data = await fetch_data()
    s.stream_by(stepper).complete_step(0)

    s.stream_by(stepper).start_step(1)
    results = analyze(data)
    s.stream_by(stepper).complete_step(1)

    s.stream_by(stepper).start_step(2)
    s.generate_ui(Card(
        title="Analysis Complete",
        variant="success",
        sections=[
            CardSection(label="Records Analyzed", value=str(len(results))),
        ],
    ))
    s.stream_by(stepper).complete_step(2)
    s.stream_by(stepper).done()

Raw Dict Fallback

If you need to send a component type not yet supported by the SDK classes, pass a raw dict:

s.generate_ui({"type": "card", "title": "Custom", "body": "Raw dict works too."})

Any object with a to_dict() method or a plain dict is accepted.

How It Works

  1. Agent calls s.generate_ui(component) — component is appended to an internal list
  2. On finish(), all components are serialized as a JSON array and sent as the message content with isGenui=True
  3. Provider consumer stores the message with is_genui=true in the database
  4. Frontend receives the Centrifugo event with isGenui: true, parses content as JSON, and renders the components

Knowledge Graph

Query biomedical knowledge graphs for diseases, proteins, phenotypes, drugs, and pathways. Accessed via agent.tools.knowledge_graph.

@agent.on_message
async def handle(message, sender):
    kg = agent.tools.knowledge_graph

    # Search diseases
    diseases = await kg.search_diseases(name="Huntington", limit=5)
    diseases = await kg.search_diseases(orphacode="399", limit=5)
    diseases = await kg.search_diseases(omim="143100")
    disease = await kg.get_disease(disease_id=123)

    # Search proteins/genes
    proteins = await kg.search_proteins(symbol="HTT", limit=10)
    proteins = await kg.search_proteins(ensembl_id="ENSG00000197386")

    # Search phenotypes (HPO terms)
    phenotypes = await kg.search_phenotypes(name="chorea", limit=10)
    phenotypes = await kg.search_phenotypes(hpo_id="HP:0002072")

    # Search drugs
    drugs = await kg.search_drugs(name="tetrabenazine", limit=5)

    # Search pathways
    pathways = await kg.search_pathways(name="apoptosis", limit=5)

    # Find similar diseases by shared phenotypes
    similar = await kg.find_similar_diseases("Huntington Disease", limit=10)
    for s in similar:
        s.disease_name        # "Spinocerebellar Ataxia Type 17"
        s.similarity_score    # 0.85
        s.shared_count        # 12

    # Find similar diseases by shared genes
    by_genes = await kg.find_similar_diseases_by_genes("Huntington Disease", limit=10)

    # Get all connections for an entity
    edges = await kg.get_entity_network("HTT", "protein")
    for e in edges:
        e.source_id, e.source_type
        e.target_id, e.target_type
        e.relation_type

    # k-hop graph traversal (BFS subgraph)
    subgraph = await kg.k_hop_traversal("HTT", "protein", k=2, limit_edges=100)

    # Shortest path between two entities
    path = await kg.find_shortest_path(
        start_name="HTT", start_type="protein",
        end_name="Huntington Disease", end_type="disease",
        max_hops=5,
    )
    for step in path:
        step.node_id, step.node_name, step.node_type, step.relation

Entity types: protein, phenotype, disease, pathway, drug, molecular_function, cellular_component, biological_process

Agent Inventory

Query the AI Inventory service to discover active agents and their capabilities. Accessed via agent.tools.agent_inventory.

@agent.on_message
async def handle(message, sender):
    inv = agent.tools.agent_inventory

    # Get all active agents
    agents = await inv.get_active_agents()
    for a in agents:
        a.id                    # Agent UUID
        a.name                  # "rare-disease-assistant"
        a.representation_name   # "Dr. Assistant"
        a.about                 # Short description
        a.document              # Full documentation
        a.example_prompts       # ["What is Huntington disease?", ...]
        a.categories            # [{"id": "...", "name": "medical"}]
        a.tags                  # [{"id": "...", "name": "rare-disease"}]

    # Get agents accessible to a specific user (filtered by role/permissions)
    user_agents = await inv.get_user_agents(user_id="user-uuid")
Method Returns Description
get_active_agents() List[AgentInfo] All agents with status=active and lifeStatus=active
get_user_agents(user_id) List[AgentInfo] Active agents accessible to a specific user

Agent-Initiated (Proactive) Conversations

Agents can proactively start new conversations with users — without waiting for them to message first. Use cases: health check-ins, medication reminders, appointment follow-ups, new research alerts.

How It Works

  1. Agent creates a conversation via the conversation store API (is_new=True, current_route, locked_by set atomically)
  2. Agent streams the first message through the normal Kafka pipeline (router → provider → Centrifugo → user)
  3. Provider persists the assistant message and unlocks the conversation on done=true
  4. User sees a new bold conversation in their sidebar (is_new flag)
  5. If awaiting=True, the user's reply routes back to the agent's @on_message handler

Basic Usage

@agent.on_message
async def handle_reply(message, sender):
    # User replied to the proactive conversation
    async with agent.streamer(message) as s:
        s.stream("Thanks for responding to the check-in!")

# Trigger from a scheduler, webhook, or API endpoint
async def send_weekly_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Weekly Health Check-in",
        awaiting=True,
    )
    async with streamer as s:
        s.stream("Hi! Time for your weekly check-in.\n\n")
        s.stream("How have you been feeling this week?")

With Response Schema

async def request_symptom_log(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Daily Symptom Log",
        awaiting=True,
        response_schema={
            "type": "object",
            "title": "How are you feeling today?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "fatigue": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Fatigue Level"},
                "notes": {"type": "string", "title": "Additional Notes"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Please fill out today's symptom log:")

With Metadata

conv_id, streamer = await agent.start_conversation(
    user_id=user_id,
    title="New Research Alert",
    awaiting=True,
    message_metadata={"alert_type": "research", "paper_id": "PMC12345"},
    metadata={"source": "pubmed_monitor", "triggered_at": "2025-01-15T09:00:00Z"},
)
async with streamer as s:
    s.stream("A new paper about your condition was published today...")

API Reference

async def start_conversation(
    user_id: str,
    title: str = "Agent-initiated conversation",
    awaiting: bool = False,
    response_schema: Optional[Dict[str, Any]] = None,
    message_metadata: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Streamer]:
Parameter Type Default Description
user_id str Required Target user UUID
title str "Agent-initiated conversation" Conversation title shown in sidebar
awaiting bool False If True, user reply routes back to this agent
response_schema dict None JSON Schema for structured response (renders a form)
message_metadata dict None Metadata attached to the streamed message
metadata dict None Metadata stored on the conversation itself

Returns (conversation_id, streamer). The streamer works identically to agent.streamer().

Inter-Agent Communication

Two patterns for agents to communicate with each other, each suited to different use cases.

Why Inter-Agent Communication?

In a rare disease platform, no single agent can cover everything. A patient might ask a general wellness agent about fatigue, but the underlying cause is Ehlers-Danlos syndrome — something only a genetics specialist would recognize. The general agent needs to either hand off the conversation to the specialist entirely, or call the specialist to get a quick answer and weave it into its own response.

Without inter-agent communication, every agent would need to be an expert in everything, or the user would have to manually switch between agents mid-conversation.

Overview

Pattern Method Blocking? Use Case
Hand-off message.hand_off(agent) No Fire-and-forget forward; your agent is done, another takes over
Call agent.call(agent, conv_id, content) Yes (await) Request → response between agents; your agent stays in control

Hand-Off (message.hand_off)

Forward a message to another agent with a single Kafka message. No streamer, no done flag — the target agent's @on_message fires exactly once. The calling agent can optionally stream to the user before handing off.

Scenario: Out-of-expertise escalation. A patient asks the general wellness agent about joint hypermobility and chronic pain. The wellness agent recognizes this sounds like Ehlers-Danlos syndrome — far outside its expertise. It tells the user what's happening and hands off to the connective tissue specialist, who takes over the conversation entirely.

@wellness_agent.on_message
async def handle(message, sender):
    content = message.content.lower()

    # Detect rare disease indicators outside our expertise
    if any(term in content for term in ["hypermobility", "joint laxity", "stretchy skin", "ehlers-danlos"]):
        # Stream a brief explanation to the user before handing off
        s = wellness_agent.streamer(message)
        s.stream("Your symptoms suggest a connective tissue condition that requires specialist input.\n\n")
        s.stream("I'm connecting you with our rare disease specialist who can help.")

        # Hand off — the specialist takes over the conversation
        message.hand_off("rare-disease-specialist",
            f"Patient reports: {message.content}\n\n"
            f"Wellness agent assessment: Symptoms consistent with possible Ehlers-Danlos syndrome. "
            f"Patient needs specialist evaluation for hypermobility spectrum disorders."
        )
        return

    # Normal wellness handling...
    async with wellness_agent.streamer(message) as s:
        s.stream(generate_wellness_response(message.content))

Scenario: Agent has completed its task and knows the best next agent. A lab results agent finishes analyzing blood work and hands off to the treatment planning agent, because the natural next step is treatment recommendations — and the treatment agent is the right one for the job.

@lab_results_agent.on_message
async def handle(message, sender):
    async with lab_results_agent.streamer(message) as s:
        analysis = analyze_lab_results(message.content)
        s.stream(f"Here's your lab analysis:\n\n{analysis}\n\n")
        s.stream("Now let me connect you with our treatment planning specialist "
                 "to discuss next steps based on these results.")

    # Our job is done — hand off to the treatment agent with context
    message.hand_off("treatment-planning-agent",
        f"Lab analysis complete. Results summary:\n{analysis}\n\n"
        f"Patient's original message: {message.content}"
    )

When to hand off vs let the router decide: Use hand_off when your agent has domain knowledge about which agent should come next. If you're unsure, hand off to guide-agent (or simply don't set awaiting and let the router handle the next message).

Scenario: Multi-step diagnostic pipeline. A general intake agent collects symptoms, hands off to a diagnostic agent, which may further hand off to a condition-specific agent.

@intake_agent.on_message
async def handle(message, sender):
    symptoms = extract_symptoms(message.content)

    s = intake_agent.streamer(message)
    s.stream("Thank you for describing your symptoms. Let me connect you with the right specialist.")

    # Hand off with structured context for the diagnostic agent
    message.hand_off("diagnostic-agent",
        f"Extracted symptoms: {', '.join(symptoms)}\n"
        f"Original message: {message.content}"
    )

How it works:

  1. Produces a single Kafka message to router.inbox with the original message headers
  2. Router forwards to the target agent's inbox and updates current_route to the target agent
  3. Target agent's @on_message handler fires with the forwarded message
  4. The calling agent does not send done=True — the target agent is responsible for responding to the user
  5. The conversation lock is not stuck — the target agent responds to the user via normal streamer flow, which sends done=true to user.inbox, and the provider unlocks the conversation
def hand_off(self, agent_name: str, content: Optional[str] = None) -> None
Parameter Type Default Description
agent_name str required Target agent name
content str None Override content. If None, forwards the original message.content

Call (agent.call)

Synchronous agent-to-agent communication. Call another agent and await the response. The calling agent keeps conversation ownership — current_route is not changed.

Scenario: Cross-referencing with a specialist. A patient asks the care coordinator about drug interactions for their rare condition (Gaucher disease). The coordinator needs specific pharmacogenomics information but should remain the patient's primary contact. It calls the pharmacology agent behind the scenes, gets the answer, and incorporates it into its own response.

@coordinator_agent.on_message
async def handle(message, sender):
    async with coordinator_agent.streamer(message) as s:
        s.stream("Let me check on that for you...\n\n")

        try:
            # Call the pharmacology agent — patient doesn't see this exchange
            drug_info = await coordinator_agent.call(
                agent_name="pharmacology-agent",
                conversation_id=message.conversation_id,
                content=f"Patient has Gaucher disease (Type 1, on ERT). "
                        f"Question: {message.content}",
                timeout=15.0,
            )
            s.stream(f"Here's what I found about your medication:\n\n{drug_info}")
        except asyncio.TimeoutError:
            s.stream("I wasn't able to get the pharmacology details right now. "
                     "Let me note this and follow up with you shortly.")

Scenario: Gathering information from multiple specialists. A rare disease coordinator needs input from both a genetics agent and a clinical trials agent to give the patient a complete answer.

@coordinator_agent.on_message
async def handle(message, sender):
    async with coordinator_agent.streamer(message) as s:
        s.stream("Looking into this from multiple angles...\n\n")

        # Call two specialists in parallel
        genetics_task = coordinator_agent.call(
            "genetics-agent", message.conversation_id,
            f"Patient with suspected Wilson's disease. {message.content}",
        )
        trials_task = coordinator_agent.call(
            "clinical-trials-agent", message.conversation_id,
            f"Find active clinical trials for Wilson's disease relevant to: {message.content}",
        )

        genetics_info, trials_info = await asyncio.gather(
            genetics_task, trials_task, return_exceptions=True
        )

        if not isinstance(genetics_info, Exception):
            s.stream(f"**Genetic perspective:**\n{genetics_info}\n\n")
        if not isinstance(trials_info, Exception):
            s.stream(f"**Clinical trials:**\n{trials_info}\n\n")

        s.stream("Is there anything else you'd like to know?")

How it works:

  1. The calling agent produces a message to router.inbox with isCall=true and a unique callId header
  2. Router forwards to the target agent's inbox — current_route is not updated (the caller keeps ownership)
  3. The target agent's @on_message handler fires — the streamer auto-detects isCall=true and routes the response back to the calling agent (not the user)
  4. The calling agent's message interceptor captures the response chunks, accumulates them, and resolves the call() future on done=true
  5. call() returns the full accumulated response as a string
  6. Call messages are not persisted — they are transient, like internal tool calls. The conversation_id is only a Kafka partition key. This keeps the conversation history clean for LLMs (no confusing consecutive assistant messages from different agents)
async def call(
    self,
    agent_name: str,
    conversation_id: str,
    content: str,
    timeout: float = 30.0,
) -> str
Parameter Type Default Description
agent_name str required Target agent to call
conversation_id str required Conversation ID (used as Kafka partition key only — not persisted)
content str required Text content to send
timeout float 30.0 Max seconds to wait for a response

Returns: The target agent's full response as a str.

Raises:

  • asyncio.TimeoutError if the target agent doesn't respond within the timeout
  • RuntimeError if the app's call infrastructure is not available

Target agent handler — basic: The target agent doesn't need special handling — its normal @on_message handler fires. The SDK's streamer auto-detects the isCall header and routes the response back to the caller.

# Target agent — no special code needed
@pharmacology_agent.on_message
async def handle(message, sender):
    async with pharmacology_agent.streamer(message) as s:
        s.stream(generate_drug_interaction_report(message.content))
    # Response automatically routed back to the calling agent

Target agent handler — with mutual understanding. agent.call() works best when the two agents have a mutual understanding. The target agent knows it will be called by specific agents and has a dedicated handler optimized for those calls — structured input, structured output, no conversational overhead. While agent.call() works with any handler, dedicated handlers make the interaction more reliable and efficient.

# ---- Calling agent (coordinator) ----
@coordinator_agent.on_message(senders=["user"])
async def handle_user(message, sender):
    async with coordinator_agent.streamer(message) as s:
        # The gene-lookup-agent expects a specific input format
        # and returns structured data — both sides know the contract
        gene_data = await coordinator_agent.call(
            "gene-lookup-agent",
            message.conversation_id,
            f"LOOKUP gene_symbol=HTT disease=Huntington",
            timeout=10.0,
        )
        s.stream(f"Here's what we know about the gene:\n\n{gene_data}")


# ---- Target agent (gene-lookup) ----
# Dedicated handler for coordinator calls — expects structured input,
# returns focused data. This handler is NOT designed for end users.
@gene_lookup_agent.on_message(senders=["coordinator-agent"])
async def handle_coordinator_call(message, sender):
    parsed = parse_lookup_request(message.content)  # Parse the structured input
    result = await query_knowledge_graph(parsed)

    async with gene_lookup_agent.streamer(message) as s:
        s.stream(format_gene_report(result))

# Separate handler for direct user messages — full conversational flow
@gene_lookup_agent.on_message(senders=["user"])
async def handle_user(message, sender):
    async with gene_lookup_agent.streamer(message) as s:
        # Natural language processing, history, memory, etc.
        s.stream(generate_conversational_response(message.content))

Tip: You might be tempted to use agent.call() to implement your own routing — fetching the agent list from the inventory, picking the best one, and calling it. This is not recommended. The router already does this with access to the full agent registry and document matching. Use agent.call() for targeted, structured agent-to-agent interactions where both sides know each other, not as a replacement for routing.

Combining Patterns

Real-world agents often combine awaiting, hand_off, and call in a single flow.

Scenario: A symptom checker that collects data (awaiting), consults a specialist (call), and escalates when needed (hand_off).

@symptom_agent.on_message
async def handle(message, sender):
    history = await message.conversation.get_agent_history(limit=20)
    stage = determine_stage(history)

    if stage == "collecting":
        # Still gathering symptoms — keep the user talking to us
        async with symptom_agent.streamer(message, awaiting=True) as s:
            s.stream(ask_next_symptom_question(history, message.content))
        return

    if stage == "analyzing":
        async with symptom_agent.streamer(message) as s:
            s.stream("Let me analyze your symptoms...\n\n")

            # Call the diagnostic agent for analysis — we stay in control
            assessment = await symptom_agent.call(
                "diagnostic-agent",
                message.conversation_id,
                f"Symptoms collected: {summarize_symptoms(history)}",
            )

            if "urgent" in assessment.lower() or "specialist" in assessment.lower():
                # Serious finding — hand off to the appropriate specialist
                s.stream(f"Based on the assessment, I'm connecting you with a specialist.\n\n")
                message.hand_off("rare-disease-specialist",
                    f"Symptom assessment: {assessment}\nFull history available in conversation."
                )
                return

            # Non-urgent — share results and release back to router
            s.stream(f"Here's what I found:\n\n{assessment}")

When to Use Which

Scenario Pattern Example
Topic is outside your expertise — another agent should take over hand_off Wellness agent detects Ehlers-Danlos symptoms, hands off to rare disease specialist
Your agent has completed its task and knows the best next agent hand_off Lab results agent finishes analysis, hands off to treatment planning agent
User is talking to you with awaiting but topic doesn't match hand_off to guide-agent Nutrition agent with awaiting=True gets a genetics question, hands off to guide
Need information from a specialist to complete your response call Care coordinator calls pharmacology agent about Gaucher disease drug interactions
Two agents have a mutual understanding and structured contract call + senders handler Coordinator calls gene-lookup with LOOKUP gene_symbol=HTT, gets structured data back
Gather input from multiple agents in parallel call + asyncio.gather Coordinator calls genetics + clinical trials agents simultaneously for Wilson's disease
Multi-turn data collection — keep user talking to your agent awaiting=True Diagnostic agent asks a series of symptom questions before generating assessment
Multi-step pipeline — each agent processes and passes forward hand_off Intake → diagnostic → condition-specific agent
Collect data (awaiting), consult specialist (call), escalate if needed (hand_off) Combined Symptom checker: collect → call diagnostic → hand off to specialist if urgent
Fact-check or validate your response before sending to user call General agent calls medical-qa agent to verify a dosage recommendation

Conversation Locking and Persistence

Hand-off does not cause stuck locks. When Agent A hands off to Agent B, the conversation lock persists correctly through the transition. Agent B responds to the user via the normal streamer flow, which sends done=true to user.inbox. The provider receives it, persists the message, and unlocks the conversation. The lock is never orphaned — it simply transfers naturally as Agent B takes over.

Call messages are not persisted. Agent-to-agent call messages are transient by design — they never reach user.inbox and the provider never sees them. The conversation_id passed to agent.call() serves only as a Kafka partition key for ordering guarantees. This is intentional: if call messages were persisted, they would pollute the conversation history with confusing consecutive assistant messages from different agents (the call question and the call response) that the user never saw. The calling agent already incorporates the call result into its own streamed response, keeping the LLM's conversation history clean and coherent.

Remote Logging (Loki)

Send agent logs to Loki for centralized monitoring.

import logging

app = BasionAgentApp(
    ...,
    enable_remote_logging=True,
    remote_log_level=logging.INFO,
    remote_log_batch_size=100,
    remote_log_flush_interval=5.0,
)

# Standard Python logging — automatically sent to Loki
logger = logging.getLogger(__name__)
logger.info("Processing symptom query", extra={"user_id": "..."})

Extensions

LangGraph

Persist LangGraph state via the Conversation Store checkpoint API.

from basion_agent import BasionAgentApp
from basion_agent.extensions.langgraph import HTTPCheckpointSaver
from langgraph.graph import StateGraph

app = BasionAgentApp(gateway_url="...", api_key="...")
checkpointer = HTTPCheckpointSaver(app=app)

graph = StateGraph(MyState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)

agent = app.register_me(name="langgraph-agent", ...)

@agent.on_message
async def handle(message, sender):
    config = {"configurable": {"thread_id": message.conversation_id}}
    result = await compiled.ainvoke({"messages": [message.content]}, config)

    async with agent.streamer(message) as s:
        s.stream(result["messages"][-1])

app.run()

Pydantic AI

Persist Pydantic AI message history via the Conversation Store.

from basion_agent import BasionAgentApp
from basion_agent.extensions.pydantic_ai import PydanticAIMessageStore
from pydantic_ai import Agent as PydanticAgent

app = BasionAgentApp(gateway_url="...", api_key="...")
store = PydanticAIMessageStore(app=app)
llm = PydanticAgent("openai:gpt-4o", system_prompt="You are a rare disease specialist.")

agent = app.register_me(name="pydantic-agent", ...)

@agent.on_message
async def handle(message, sender):
    history = await store.load(message.conversation_id)

    async with agent.streamer(message) as s:
        async with llm.run_stream(message.content, message_history=history) as result:
            async for chunk in result.stream_text():
                s.stream(chunk)

        await store.save(message.conversation_id, result.all_messages())

app.run()

Full Example: Rare Disease Assistant

A complete agent that uses memory, knowledge graph, attachments, and structural streaming.

import json
import logging
from basion_agent import BasionAgentApp, Stepper, TextBlock

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key",
    enable_remote_logging=True,
)

agent = app.register_me(
    name="rare-disease-assistant",
    about="Helps patients understand rare diseases, find similar conditions, and track symptoms",
    document="""
    Rare disease specialist agent.
    Abilities: disease lookup, gene and phenotype search, symptom analysis,
    genetic variant interpretation, finding similar conditions by shared symptoms
    or genes, drug and treatment information, processing genetic test reports.
    Keywords: rare disease, genetics, orphan disease, diagnosis, phenotype,
    genotype, Huntington, Cystic Fibrosis, Marfan syndrome, biomedical,
    knowledge graph, patient support.
    """,
    category_names=["medical"],
    prompts=[
        {"label": "Huntington", "prompt": "What is Huntington disease?"},
        {"label": "Similar diseases", "prompt": "Find diseases similar to Cystic Fibrosis"},
        {"label": "Gene lookup", "prompt": "What genes are associated with Marfan syndrome?"},
    ],
)


@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    kg = agent.tools.knowledge_graph
    mem = message.memory_v2

    # Ingest the user's message into long-term memory
    if mem:
        await mem.ingest(role="user", content=message.content)

    # Check for form submissions
    if message.schema:
        data = json.loads(message.content)
        async with agent.streamer(message) as s:
            s.stream(f"Thank you for logging your symptoms.\n\n")
            s.stream(f"- Pain level: {data.get('pain_level')}/10\n")
            s.stream(f"- Notes: {data.get('notes', 'None')}\n")
        return

    # Check for attachments (e.g., genetic test PDF)
    if message.has_attachments():
        async with agent.streamer(message) as s:
            for i, att in enumerate(message.get_attachments()):
                if att.is_pdf():
                    pdf_bytes = await message.get_attachment_bytes_at(i)
                    s.stream(f"Processing **{att.filename}** ({att.size} bytes)...\n")
                    # Parse and analyze the PDF...
                elif att.is_image():
                    base64_data = await message.get_attachment_base64_at(i)
                    s.stream(f"Received image **{att.filename}**.\n")
        return

    # Main message handling with knowledge graph
    async with agent.streamer(message) as s:
        # Show thinking process
        thinking = TextBlock()
        s.stream_by(thinking).set_variant("thinking")
        s.stream_by(thinking).stream_title("Analyzing query...")

        # Recall relevant memories
        if mem:
            memories = await mem.search(query=message.content)
            if memories:
                s.stream_by(thinking).stream_body("Found relevant patient history.\n")

        # Search for diseases mentioned in the query
        stepper = Stepper(steps=["Search diseases", "Find connections", "Compile results"])

        s.stream_by(stepper).start_step(0)
        diseases = await kg.search_diseases(name=message.content, limit=5)
        s.stream_by(stepper).complete_step(0)

        if diseases:
            s.stream_by(stepper).start_step(1)
            disease_name = diseases[0].get("name", "")
            similar = await kg.find_similar_diseases(disease_name, limit=5)
            s.stream_by(stepper).complete_step(1)

            s.stream_by(stepper).start_step(2)
            s.stream_by(thinking).done()

            s.stream(f"## {disease_name}\n\n")

            if similar:
                s.stream("### Similar Conditions\n\n")
                for sim in similar:
                    score = int(sim.similarity_score * 100)
                    s.stream(f"- **{sim.disease_name}** ({score}% similar, "
                             f"{sim.shared_count} shared phenotypes)\n")

            s.stream_by(stepper).complete_step(2)
        else:
            s.stream_by(thinking).done()
            s.stream(f"I couldn't find a disease matching \"{message.content}\". "
                     f"Try searching for a specific disease name.")
            s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).done()


# Proactive check-in (called from a scheduler or API)
async def daily_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Daily Symptom Check-in",
        awaiting=True,
        response_schema={
            "type": "object",
            "title": "How are you feeling today?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "notes": {"type": "string", "title": "Any additional notes?"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Good morning! Time for your daily check-in.\n\n")
        s.stream("Please fill out the form below:")


app.run()

Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                         Your Agent Application                              │
├─────────────────────────────────────────────────────────────────────────────┤
│  BasionAgentApp                                                             │
│  ├── register_me() → Agent                                                  │
│  │   ├── @on_message(senders=[...])                                         │
│  │   ├── streamer(message) → Streamer                                       │
│  │   │   ├── stream() / write()                                             │
│  │   │   ├── set_response_schema() / set_message_metadata()                 │
│  │   │   └── stream_by() → Artifact, Surface, TextBlock, Stepper            │
│  │   ├── start_conversation() → (conv_id, Streamer)                         │
│  │   ├── call(agent, conv_id, content) → str (sync agent-to-agent)          │
│  │   └── tools                                                              │
│  │       ├── .knowledge_graph → KnowledgeGraphTool                          │
│  │       └── .agent_inventory → AgentInventoryTool                          │
│  └── run()                                                                  │
│                                                                             │
│  Message Context:                                                           │
│  ├── message.conversation → Conversation (history, metadata)                │
│  ├── message.memory_v2 → MemoryV2 (mem0: ingest + search)                  │
│  ├── message.memory → Memory (deprecated)                                   │
│  ├── message.attachments → List[AttachmentInfo]                             │
│  └── message.hand_off(agent, content?) → forward to another agent           │
└──────────────────────────────┬──────────────────────────────────────────────┘
                               │ gRPC (Kafka) + HTTP (APIs)
                               ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                           Agent Gateway                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│  gRPC: AgentStream (bidirectional)      HTTP: /s/{service}/* proxy          │
│  - Auth + Subscribe/Unsubscribe         - /s/ai-inventory/*                 │
│  - Produce/Consume messages             - /s/conversation-store/*           │
│                                         - /s/memory/* (mem0)                │
│                                         - /s/knowledge-graph/*              │
│                                         - /s/attachment/*                   │
│                                         - /loki/api/v1/push (logging)       │
└─────────────────┬───────────────────────────────────┬───────────────────────┘
                  │                                   │
                  ▼                                   ▼
           ┌──────────────┐                  ┌────────────────────┐
           │    Kafka      │                  │   AI Inventory /   │
           │ {agent}.inbox │                  │ Conversation Store │
           └──────────────┘                  │ Memory / KG        │
                                             └────────────────────┘

Message Flow

User → Provider → Kafka: router.inbox → Router → Kafka: {agent}.inbox → Gateway → Agent
Agent → Gateway → Kafka: router.inbox → Router → Kafka: user.inbox → Provider → WebSocket → User

Dependencies

Package Purpose
grpcio gRPC communication with Agent Gateway
grpcio-tools Protobuf compilation
protobuf Message serialization
requests Sync HTTP for registration
aiohttp Async HTTP for runtime operations

Optional

Extra Install Purpose
langgraph pip install basion-agent[langgraph] LangGraph checkpoint integration
pydantic pip install basion-agent[pydantic] Pydantic AI message history

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

basion_agent-0.9.0.tar.gz (168.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

basion_agent-0.9.0-py3-none-any.whl (94.9 kB view details)

Uploaded Python 3

File details

Details for the file basion_agent-0.9.0.tar.gz.

File metadata

  • Download URL: basion_agent-0.9.0.tar.gz
  • Upload date:
  • Size: 168.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for basion_agent-0.9.0.tar.gz
Algorithm Hash digest
SHA256 5b1e18ea9ca35693bc45280ec221bae4e5cbdd71c2dd7379d8da5420453a35db
MD5 5e9d05161200ca35f27f5869936769b7
BLAKE2b-256 42167dec4e1b04e78cc14528ba47b93605130adee3733831aad7f48833665e63

See more details on using hashes here.

File details

Details for the file basion_agent-0.9.0-py3-none-any.whl.

File metadata

  • Download URL: basion_agent-0.9.0-py3-none-any.whl
  • Upload date:
  • Size: 94.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for basion_agent-0.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e563cf9a1492b4585e9b161b07d9e8069d969a8c0cda3b84691376cedc97202b
MD5 8b40697a6964bd5539d6de95ab39235c
BLAKE2b-256 b791128e8eeeb38b4b1e2f2a80abcaa2b71d3067589653302f241c2c4a0f89aa

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page