Skip to main content

Python SDK for Basion AI Agent framework - handles agent registration, message consumption, and streaming responses

Project description

Basion Agent SDK

Python SDK for building AI agents on the Basion platform. Agents register themselves, receive messages through Kafka, and stream responses back to users in real time.

Installation

pip install basion-agent

# With LangGraph support
pip install basion-agent[langgraph]

# With Pydantic AI support
pip install basion-agent[pydantic]

Quick Start

from basion_agent import BasionAgentApp

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key",
)

agent = app.register_me(
    name="rare-disease-assistant",
    about="Answers questions about rare diseases, symptoms, and treatments",
    document="A medical assistant specializing in rare diseases. Can look up conditions, find related diseases, and help patients understand their diagnosis.",
)

@agent.on_message
async def handle(message, sender):
    history = await message.conversation.get_history(limit=10)

    async with agent.streamer(message) as s:
        s.stream(f"You asked: {message.content}\n\n")
        s.stream("Let me look into that for you...")

app.run()

CLI

# Run your agent
basion-agent run main:app

# Defaults to :app
basion-agent run main

# Custom app name
basion-agent run myagent:application

# Show version
basion-agent version

Configuration

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",       # Required
    api_key="key",                           # Required
    heartbeat_interval=60,                   # Heartbeat frequency in seconds
    max_concurrent_tasks=100,                # Max concurrent message handlers
    error_message_template="...",            # Error message sent to users on failure
    secure=False,                            # TLS for gRPC and HTTPS for HTTP
    enable_remote_logging=False,             # Send logs to Loki via gateway
    remote_log_level=logging.INFO,           # Min log level for remote logging
    remote_log_batch_size=100,               # Logs per batch
    remote_log_flush_interval=5.0,           # Seconds between flushes
)
Environment Variable Description
GATEWAY_URL Agent Gateway endpoint
GATEWAY_API_KEY API key for authentication

Agent Registration

agent = app.register_me(
    name="rare-disease-assistant",            # Unique identifier (used for Kafka routing)
    about="Rare disease medical assistant",   # Short description for agent selection
    document="Full documentation...",         # Detailed docs used by the router
    representation_name="Dr. Assistant",      # Display name (optional)
    base_url="http://...",                    # Base URL for frontend service (optional)
    metadata={"specialty": "rare-diseases"},  # Additional metadata (optional)
    category_name="medical",                  # Category in kebab-case (optional)
    tag_names=["rare-disease", "genetics"],   # Tags in kebab-case (optional)
    example_prompts=[                         # Example prompts shown to users (optional)
        "What is Huntington disease?",
        "Find diseases similar to Cystic Fibrosis",
    ],
    is_experimental=False,                    # Mark as experimental (optional)
    related_pages=[                           # Related pages (optional)
        {"name": "Resources", "endpoint": "/resources"}
    ],
)

Message Handling

Basic Handler

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        s.stream("Hello! How can I help?")

Handlers can be async or synchronous. Sync handlers run in a thread pool automatically.

Sender Filtering

# Handle messages from users only
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    ...

# Handle messages from a specific agent
@agent.on_message(senders=["triage-agent"])
async def handle_triage(message, sender):
    ...

# Exclude a specific sender
@agent.on_message(senders=["~notification-agent"])
async def handle_others(message, sender):
    ...

Error Handling

If a handler throws, the SDK automatically sends an error message to the user. Customize or disable this:

# Custom error message
app = BasionAgentApp(
    ...,
    error_message_template="Sorry, something went wrong. Please try again.",
)

# Or disable per-agent
agent.send_error_responses = False

@agent.on_message
async def handle(message, sender):
    try:
        ...
    except Exception as e:
        async with agent.streamer(message) as s:
            s.stream(f"I ran into an issue: {e}")

Message

The message object provides access to content, conversation history, memory, and attachments.

@agent.on_message
async def handle(message, sender):
    message.content              # Message text
    message.conversation_id      # Conversation UUID
    message.user_id              # User UUID
    message.metadata             # Optional dict (from frontend)
    message.schema               # Optional dict (for form responses)
    message.headers              # Raw Kafka headers
    message.conversation         # Conversation helper
    message.memory_v2            # mem0 memory (ingest + search)
    message.memory               # Deprecated memory (use memory_v2)

Streaming Responses

Basic Streaming

@agent.on_message
async def handle(message, sender):
    # Context manager — auto-finishes on exit
    async with agent.streamer(message) as s:
        s.stream("Looking up information on Huntington disease...\n\n")
        s.stream("Huntington disease is a progressive neurodegenerative disorder...")

    # Or manual control
    s = agent.streamer(message)
    s.stream("Hello!")
    await s.finish()

Streamer Options

# Route response to another agent
async with agent.streamer(message, send_to="specialist-agent") as s:
    s.stream("Forwarding to the specialist...")

# Set awaiting — next user reply routes back to this agent
async with agent.streamer(message, awaiting=True) as s:
    s.stream("What symptoms are you experiencing?")

Non-Persisted Content

Chunks that appear in real-time but don't get saved to conversation history:

async with agent.streamer(message) as s:
    s.stream("Searching knowledge graph...", persist=False, event_type="thinking")
    # ... do work ...
    s.stream("Here are the results:")  # This gets persisted

Message Metadata

Attach metadata to the response message:

async with agent.streamer(message) as s:
    s.set_message_metadata({"source": "knowledge_graph", "confidence": 0.92})
    s.stream("Based on the knowledge graph, ...")

Response Schema (Forms)

Request structured input from users with JSON Schema. The frontend renders a form, and the user's submission arrives as JSON in the next message.

@agent.on_message
async def handle(message, sender):
    # Check if this is a form submission
    if message.schema:
        import json
        data = json.loads(message.content)
        severity = data["severity"]
        async with agent.streamer(message) as s:
            s.stream(f"Thank you. You reported severity level {severity}.")
        return

    # Ask for structured input
    async with agent.streamer(message, awaiting=True) as s:
        s.stream("Please describe your symptoms:")
        s.set_response_schema({
            "type": "object",
            "title": "Symptom Report",
            "properties": {
                "severity": {
                    "type": "integer",
                    "minimum": 1,
                    "maximum": 10,
                    "title": "Pain Severity (1-10)",
                },
                "location": {
                    "type": "string",
                    "title": "Where do you feel pain?",
                },
                "duration": {
                    "type": "string",
                    "title": "How long have you had these symptoms?",
                },
            },
            "required": ["severity"],
        })

Conversation History

@agent.on_message
async def handle(message, sender):
    conv = message.conversation

    # Get message history
    history = await conv.get_history(limit=20)
    history = await conv.get_history(role="user", limit=10, offset=0)

    # Get conversation metadata
    metadata = await conv.get_metadata()

    # Get messages where this agent was sender or recipient
    agent_history = await conv.get_agent_history(limit=50)
    agent_history = await conv.get_agent_history(agent_name="triage-agent")

Memory V2 (mem0)

Long-term memory powered by mem0.ai. Ingest messages and semantically search across a user's history.

@agent.on_message
async def handle(message, sender):
    mem = message.memory_v2

    # Ingest the user's message
    await mem.ingest(role="user", content=message.content)

    # Search for relevant past context
    results = await mem.search(query="diagnosis history")
    for r in results:
        memory_text = r.get("memory")  # Extracted memory text

    async with agent.streamer(message) as s:
        if results:
            s.stream("Based on what I remember:\n")
            for r in results:
                s.stream(f"- {r.get('memory')}\n")
            s.stream("\n")
        s.stream(f"Regarding your question: {message.content}")

    # Ingest the assistant's response too
    await mem.ingest(role="assistant", content="...")
Method Description
ingest(role, content) Ingest a message. Role: 'user', 'assistant', or 'system'.
search(query) Semantic search across the user's memories. Returns a list of dicts.

Attachments

Download and process file attachments (images, PDFs, etc.).

@agent.on_message
async def handle(message, sender):
    if not message.has_attachments():
        return

    count = message.get_attachment_count()
    attachments = message.get_attachments()

    async with agent.streamer(message) as s:
        s.stream(f"Received {count} file(s).\n\n")

        for i, att in enumerate(attachments):
            s.stream(f"**{att.filename}** ({att.content_type}, {att.size} bytes)\n")

            if att.is_image():
                base64_str = await message.get_attachment_base64_at(i)
                # Send to vision model...

            elif att.is_pdf():
                pdf_bytes = await message.get_attachment_bytes_at(i)
                # Parse PDF...

        # Or download everything at once
        all_bytes = await message.get_all_attachment_bytes()
        all_base64 = await message.get_all_attachment_base64()

Attachment Methods

Method Returns Description
has_attachments() bool Whether the message has any attachments
get_attachment_count() int Number of attachments
get_attachments() List[AttachmentInfo] List of attachment metadata
get_attachment_bytes() bytes Download first attachment as bytes
get_attachment_base64() str Download first attachment as base64
get_attachment_buffer() BytesIO Download first attachment as BytesIO
get_attachment_bytes_at(i) bytes Download attachment at index i
get_attachment_base64_at(i) str Download attachment at index i as base64
get_attachment_buffer_at(i) BytesIO Download attachment at index i as BytesIO
get_all_attachment_bytes() List[bytes] Download all attachments
get_all_attachment_base64() List[str] Download all as base64
get_all_attachment_buffers() List[BytesIO] Download all as BytesIO

AttachmentInfo Properties

Property Type Example
filename str "genetic_report.pdf"
content_type str "application/pdf"
size int 524288
url str Download URL
file_extension str "pdf"
file_type str "pdf"
is_image() bool True for image MIME types
is_pdf() bool True for PDF files

Structural Streaming

Rich UI components streamed alongside text. Bind a structural component to the streamer with stream_by().

Artifact

Files, images, or embeds with generation progress. Artifact data is persisted to the database.

from basion_agent import Artifact

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        artifact = Artifact()

        # Show progress
        s.stream_by(artifact).generating("Generating genetic pathway diagram...", progress=0.3)
        # ... do work ...
        s.stream_by(artifact).generating("Rendering...", progress=0.8)

        # Complete with result
        s.stream_by(artifact).done(
            url="https://example.com/pathway-diagram.png",
            type="image",
            title="HTT Gene Pathway",
            description="Huntingtin protein interaction network",
            metadata={"width": 1200, "height": 800},
        )

        # Or signal an error
        # s.stream_by(artifact).error("Failed to generate diagram")

        s.stream("Here's the pathway diagram for the HTT gene.")

Artifact types: image, iframe, document, video, audio, code, link, file

Surface

Interactive embedded components (iframes, widgets).

from basion_agent import Surface

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        surface = Surface()
        s.stream_by(surface).generating("Loading appointment scheduler...")
        s.stream_by(surface).done(
            url="https://cal.com/embed/dr-smith",
            type="iframe",
            title="Schedule Genetic Counseling",
            description="Book a session with a genetic counselor",
        )
        s.stream("You can schedule your genetic counseling session above.")

TextBlock

Collapsible text blocks with streaming title/body and visual variants. TextBlock events are not persisted.

from basion_agent import TextBlock

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        block = TextBlock()

        # Set visual variant
        s.stream_by(block).set_variant("thinking")

        # Stream title (appends)
        s.stream_by(block).stream_title("Analyzing ")
        s.stream_by(block).stream_title("symptoms...")

        # Stream body (appends)
        s.stream_by(block).stream_body("Checking symptom database...\n")
        s.stream_by(block).stream_body("Cross-referencing with HPO ontology...\n")
        s.stream_by(block).stream_body("Matching against known phenotypes...\n")

        # Replace title/body entirely
        s.stream_by(block).update_title("Analysis Complete")
        s.stream_by(block).update_body("Found 3 matching conditions.")

        # Mark as done
        s.stream_by(block).done()

        s.stream("Based on the symptoms, here are possible conditions...")

Variants: thinking, note, warning, error, success

Stepper

Multi-step progress indicators. Stepper events are not persisted.

from basion_agent import Stepper

@agent.on_message
async def handle(message, sender):
    async with agent.streamer(message) as s:
        stepper = Stepper(steps=[
            "Search diseases",
            "Analyze phenotypes",
            "Find similar conditions",
            "Generate report",
        ])

        s.stream_by(stepper).start_step(0)
        diseases = await kg.search_diseases(name="Huntington")
        s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).start_step(1)
        phenotypes = await kg.search_phenotypes(name="chorea")
        s.stream_by(stepper).complete_step(1)

        s.stream_by(stepper).start_step(2)
        similar = await kg.find_similar_diseases("Huntington Disease")
        s.stream_by(stepper).complete_step(2)

        # Add a step dynamically
        s.stream_by(stepper).add_step("Cross-reference")
        s.stream_by(stepper).start_step(4)
        s.stream_by(stepper).update_step_label(4, "Cross-reference (final)")
        s.stream_by(stepper).complete_step(4)

        s.stream_by(stepper).start_step(3)
        # Or mark a step as failed
        # s.stream_by(stepper).fail_step(3, error="Report generation timed out")
        s.stream_by(stepper).complete_step(3)

        s.stream_by(stepper).done()
        s.stream("Here's your rare disease report...")

Knowledge Graph

Query biomedical knowledge graphs for diseases, proteins, phenotypes, drugs, and pathways. Accessed via agent.tools.knowledge_graph.

@agent.on_message
async def handle(message, sender):
    kg = agent.tools.knowledge_graph

    # Search diseases
    diseases = await kg.search_diseases(name="Huntington", limit=5)
    diseases = await kg.search_diseases(orphacode="399", limit=5)
    diseases = await kg.search_diseases(omim="143100")
    disease = await kg.get_disease(disease_id=123)

    # Search proteins/genes
    proteins = await kg.search_proteins(symbol="HTT", limit=10)
    proteins = await kg.search_proteins(ensembl_id="ENSG00000197386")

    # Search phenotypes (HPO terms)
    phenotypes = await kg.search_phenotypes(name="chorea", limit=10)
    phenotypes = await kg.search_phenotypes(hpo_id="HP:0002072")

    # Search drugs
    drugs = await kg.search_drugs(name="tetrabenazine", limit=5)

    # Search pathways
    pathways = await kg.search_pathways(name="apoptosis", limit=5)

    # Find similar diseases by shared phenotypes
    similar = await kg.find_similar_diseases("Huntington Disease", limit=10)
    for s in similar:
        s.disease_name        # "Spinocerebellar Ataxia Type 17"
        s.similarity_score    # 0.85
        s.shared_count        # 12

    # Find similar diseases by shared genes
    by_genes = await kg.find_similar_diseases_by_genes("Huntington Disease", limit=10)

    # Get all connections for an entity
    edges = await kg.get_entity_network("HTT", "protein")
    for e in edges:
        e.source_id, e.source_type
        e.target_id, e.target_type
        e.relation_type

    # k-hop graph traversal (BFS subgraph)
    subgraph = await kg.k_hop_traversal("HTT", "protein", k=2, limit_edges=100)

    # Shortest path between two entities
    path = await kg.find_shortest_path(
        start_name="HTT", start_type="protein",
        end_name="Huntington Disease", end_type="disease",
        max_hops=5,
    )
    for step in path:
        step.node_id, step.node_name, step.node_type, step.relation

Entity types: protein, phenotype, disease, pathway, drug, molecular_function, cellular_component, biological_process

Agent Inventory

Query the AI Inventory service to discover active agents and their capabilities. Accessed via agent.tools.agent_inventory.

@agent.on_message
async def handle(message, sender):
    inv = agent.tools.agent_inventory

    # Get all active agents
    agents = await inv.get_active_agents()
    for a in agents:
        a.id                    # Agent UUID
        a.name                  # "rare-disease-assistant"
        a.representation_name   # "Dr. Assistant"
        a.about                 # Short description
        a.document              # Full documentation
        a.example_prompts       # ["What is Huntington disease?", ...]
        a.categories            # [{"id": "...", "name": "medical"}]
        a.tags                  # [{"id": "...", "name": "rare-disease"}]

    # Get agents accessible to a specific user (filtered by role/permissions)
    user_agents = await inv.get_user_agents(user_id="user-uuid")
Method Returns Description
get_active_agents() List[AgentInfo] All agents with status=active and lifeStatus=active
get_user_agents(user_id) List[AgentInfo] Active agents accessible to a specific user

Agent-Initiated (Proactive) Conversations

Agents can proactively start new conversations with users — without waiting for them to message first. Use cases: health check-ins, medication reminders, appointment follow-ups, new research alerts.

How It Works

  1. Agent creates a conversation via the conversation store API (is_new=True, current_route, locked_by set atomically)
  2. Agent streams the first message through the normal Kafka pipeline (router → provider → Centrifugo → user)
  3. Provider persists the assistant message and unlocks the conversation on done=true
  4. User sees a new bold conversation in their sidebar (is_new flag)
  5. If awaiting=True, the user's reply routes back to the agent's @on_message handler

Basic Usage

@agent.on_message
async def handle_reply(message, sender):
    # User replied to the proactive conversation
    async with agent.streamer(message) as s:
        s.stream("Thanks for responding to the check-in!")

# Trigger from a scheduler, webhook, or API endpoint
async def send_weekly_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Weekly Health Check-in",
        awaiting=True,
    )
    async with streamer as s:
        s.stream("Hi! Time for your weekly check-in.\n\n")
        s.stream("How have you been feeling this week?")

With Response Schema

async def request_symptom_log(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Daily Symptom Log",
        awaiting=True,
        response_schema={
            "type": "object",
            "title": "How are you feeling today?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "fatigue": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Fatigue Level"},
                "notes": {"type": "string", "title": "Additional Notes"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Please fill out today's symptom log:")

With Metadata

conv_id, streamer = await agent.start_conversation(
    user_id=user_id,
    title="New Research Alert",
    awaiting=True,
    message_metadata={"alert_type": "research", "paper_id": "PMC12345"},
    metadata={"source": "pubmed_monitor", "triggered_at": "2025-01-15T09:00:00Z"},
)
async with streamer as s:
    s.stream("A new paper about your condition was published today...")

API Reference

async def start_conversation(
    user_id: str,
    title: str = "Agent-initiated conversation",
    awaiting: bool = False,
    response_schema: Optional[Dict[str, Any]] = None,
    message_metadata: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Streamer]:
Parameter Type Default Description
user_id str Required Target user UUID
title str "Agent-initiated conversation" Conversation title shown in sidebar
awaiting bool False If True, user reply routes back to this agent
response_schema dict None JSON Schema for structured response (renders a form)
message_metadata dict None Metadata attached to the streamed message
metadata dict None Metadata stored on the conversation itself

Returns (conversation_id, streamer). The streamer works identically to agent.streamer().

Inter-Agent Communication

Agents can send messages to other agents using send_to.

# Triage agent forwards to specialist
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    async with agent.streamer(message, send_to="genetics-specialist") as s:
        s.stream(f"Patient asks: {message.content}")

# Receive specialist response, relay to user
@agent.on_message(senders=["genetics-specialist"])
async def handle_specialist(message, sender):
    async with agent.streamer(message, send_to="user") as s:
        s.stream(f"The genetics specialist says:\n\n{message.content}")

Remote Logging (Loki)

Send agent logs to Loki for centralized monitoring.

import logging

app = BasionAgentApp(
    ...,
    enable_remote_logging=True,
    remote_log_level=logging.INFO,
    remote_log_batch_size=100,
    remote_log_flush_interval=5.0,
)

# Standard Python logging — automatically sent to Loki
logger = logging.getLogger(__name__)
logger.info("Processing symptom query", extra={"user_id": "..."})

Extensions

LangGraph

Persist LangGraph state via the Conversation Store checkpoint API.

from basion_agent import BasionAgentApp
from basion_agent.extensions.langgraph import HTTPCheckpointSaver
from langgraph.graph import StateGraph

app = BasionAgentApp(gateway_url="...", api_key="...")
checkpointer = HTTPCheckpointSaver(app=app)

graph = StateGraph(MyState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)

agent = app.register_me(name="langgraph-agent", ...)

@agent.on_message
async def handle(message, sender):
    config = {"configurable": {"thread_id": message.conversation_id}}
    result = await compiled.ainvoke({"messages": [message.content]}, config)

    async with agent.streamer(message) as s:
        s.stream(result["messages"][-1])

app.run()

Pydantic AI

Persist Pydantic AI message history via the Conversation Store.

from basion_agent import BasionAgentApp
from basion_agent.extensions.pydantic_ai import PydanticAIMessageStore
from pydantic_ai import Agent as PydanticAgent

app = BasionAgentApp(gateway_url="...", api_key="...")
store = PydanticAIMessageStore(app=app)
llm = PydanticAgent("openai:gpt-4o", system_prompt="You are a rare disease specialist.")

agent = app.register_me(name="pydantic-agent", ...)

@agent.on_message
async def handle(message, sender):
    history = await store.load(message.conversation_id)

    async with agent.streamer(message) as s:
        async with llm.run_stream(message.content, message_history=history) as result:
            async for chunk in result.stream_text():
                s.stream(chunk)

        await store.save(message.conversation_id, result.all_messages())

app.run()

Full Example: Rare Disease Assistant

A complete agent that uses memory, knowledge graph, attachments, and structural streaming.

import json
import logging
from basion_agent import BasionAgentApp, Stepper, TextBlock

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = BasionAgentApp(
    gateway_url="agent-gateway:8080",
    api_key="your-api-key",
    enable_remote_logging=True,
)

agent = app.register_me(
    name="rare-disease-assistant",
    about="Helps patients understand rare diseases, find similar conditions, and track symptoms",
    document="""
    A comprehensive rare disease assistant that can:
    - Look up diseases, genes, and phenotypes in a biomedical knowledge graph
    - Find similar diseases by shared symptoms or genes
    - Track patient symptoms over time using memory
    - Process genetic test reports (PDF attachments)
    - Proactively check in on patients
    """,
    category_name="medical",
    tag_names=["rare-disease", "genetics", "patient-support"],
    example_prompts=[
        "What is Huntington disease?",
        "Find diseases similar to Cystic Fibrosis",
        "What genes are associated with Marfan syndrome?",
    ],
)


@agent.on_message(senders=["user"])
async def handle_user(message, sender):
    kg = agent.tools.knowledge_graph
    mem = message.memory_v2

    # Ingest the user's message into long-term memory
    if mem:
        await mem.ingest(role="user", content=message.content)

    # Check for form submissions
    if message.schema:
        data = json.loads(message.content)
        async with agent.streamer(message) as s:
            s.stream(f"Thank you for logging your symptoms.\n\n")
            s.stream(f"- Pain level: {data.get('pain_level')}/10\n")
            s.stream(f"- Notes: {data.get('notes', 'None')}\n")
        return

    # Check for attachments (e.g., genetic test PDF)
    if message.has_attachments():
        async with agent.streamer(message) as s:
            for i, att in enumerate(message.get_attachments()):
                if att.is_pdf():
                    pdf_bytes = await message.get_attachment_bytes_at(i)
                    s.stream(f"Processing **{att.filename}** ({att.size} bytes)...\n")
                    # Parse and analyze the PDF...
                elif att.is_image():
                    base64_data = await message.get_attachment_base64_at(i)
                    s.stream(f"Received image **{att.filename}**.\n")
        return

    # Main message handling with knowledge graph
    async with agent.streamer(message) as s:
        # Show thinking process
        thinking = TextBlock()
        s.stream_by(thinking).set_variant("thinking")
        s.stream_by(thinking).stream_title("Analyzing query...")

        # Recall relevant memories
        if mem:
            memories = await mem.search(query=message.content)
            if memories:
                s.stream_by(thinking).stream_body("Found relevant patient history.\n")

        # Search for diseases mentioned in the query
        stepper = Stepper(steps=["Search diseases", "Find connections", "Compile results"])

        s.stream_by(stepper).start_step(0)
        diseases = await kg.search_diseases(name=message.content, limit=5)
        s.stream_by(stepper).complete_step(0)

        if diseases:
            s.stream_by(stepper).start_step(1)
            disease_name = diseases[0].get("name", "")
            similar = await kg.find_similar_diseases(disease_name, limit=5)
            s.stream_by(stepper).complete_step(1)

            s.stream_by(stepper).start_step(2)
            s.stream_by(thinking).done()

            s.stream(f"## {disease_name}\n\n")

            if similar:
                s.stream("### Similar Conditions\n\n")
                for sim in similar:
                    score = int(sim.similarity_score * 100)
                    s.stream(f"- **{sim.disease_name}** ({score}% similar, "
                             f"{sim.shared_count} shared phenotypes)\n")

            s.stream_by(stepper).complete_step(2)
        else:
            s.stream_by(thinking).done()
            s.stream(f"I couldn't find a disease matching \"{message.content}\". "
                     f"Try searching for a specific disease name.")
            s.stream_by(stepper).complete_step(0)

        s.stream_by(stepper).done()


# Proactive check-in (called from a scheduler or API)
async def daily_check_in(user_id: str):
    conv_id, streamer = await agent.start_conversation(
        user_id=user_id,
        title="Daily Symptom Check-in",
        awaiting=True,
        response_schema={
            "type": "object",
            "title": "How are you feeling today?",
            "properties": {
                "pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
                "notes": {"type": "string", "title": "Any additional notes?"},
            },
            "required": ["pain_level"],
        },
    )
    async with streamer as s:
        s.stream("Good morning! Time for your daily check-in.\n\n")
        s.stream("Please fill out the form below:")


app.run()

Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                         Your Agent Application                              │
├─────────────────────────────────────────────────────────────────────────────┤
│  BasionAgentApp                                                             │
│  ├── register_me() → Agent                                                  │
│  │   ├── @on_message(senders=[...])                                         │
│  │   ├── streamer(message) → Streamer                                       │
│  │   │   ├── stream() / write()                                             │
│  │   │   ├── set_response_schema() / set_message_metadata()                 │
│  │   │   └── stream_by() → Artifact, Surface, TextBlock, Stepper            │
│  │   ├── start_conversation() → (conv_id, Streamer)                         │
│  │   └── tools                                                              │
│  │       ├── .knowledge_graph → KnowledgeGraphTool                          │
│  │       └── .agent_inventory → AgentInventoryTool                          │
│  └── run()                                                                  │
│                                                                             │
│  Message Context:                                                           │
│  ├── message.conversation → Conversation (history, metadata)                │
│  ├── message.memory_v2 → MemoryV2 (mem0: ingest + search)                  │
│  ├── message.memory → Memory (deprecated)                                   │
│  └── message.attachments → List[AttachmentInfo]                             │
└──────────────────────────────┬──────────────────────────────────────────────┘
                               │ gRPC (Kafka) + HTTP (APIs)
                               ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                           Agent Gateway                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│  gRPC: AgentStream (bidirectional)      HTTP: /s/{service}/* proxy          │
│  - Auth + Subscribe/Unsubscribe         - /s/ai-inventory/*                 │
│  - Produce/Consume messages             - /s/conversation-store/*           │
│                                         - /s/memory/* (mem0)                │
│                                         - /s/knowledge-graph/*              │
│                                         - /s/attachment/*                   │
│                                         - /loki/api/v1/push (logging)       │
└─────────────────┬───────────────────────────────────┬───────────────────────┘
                  │                                   │
                  ▼                                   ▼
           ┌──────────────┐                  ┌────────────────────┐
           │    Kafka      │                  │   AI Inventory /   │
           │ {agent}.inbox │                  │ Conversation Store │
           └──────────────┘                  │ Memory / KG        │
                                             └────────────────────┘

Message Flow

User → Provider → Kafka: router.inbox → Router → Kafka: {agent}.inbox → Gateway → Agent
Agent → Gateway → Kafka: router.inbox → Router → Kafka: user.inbox → Provider → WebSocket → User

Development

# Install dev dependencies
pip install -e ".[dev]"

# Run tests with coverage
pytest

# Run specific tests
pytest tests/test_agent.py -v
pytest tests/test_streamer.py -v

Dependencies

Package Purpose
grpcio gRPC communication with Agent Gateway
grpcio-tools Protobuf compilation
protobuf Message serialization
requests Sync HTTP for registration
aiohttp Async HTTP for runtime operations

Optional

Extra Install Purpose
langgraph pip install basion-agent[langgraph] LangGraph checkpoint integration
pydantic pip install basion-agent[pydantic] Pydantic AI message history

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

basion_agent-0.7.0.tar.gz (110.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

basion_agent-0.7.0-py3-none-any.whl (72.5 kB view details)

Uploaded Python 3

File details

Details for the file basion_agent-0.7.0.tar.gz.

File metadata

  • Download URL: basion_agent-0.7.0.tar.gz
  • Upload date:
  • Size: 110.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for basion_agent-0.7.0.tar.gz
Algorithm Hash digest
SHA256 f83a13feddd255ecd21b694729520502e3494a4375c17f6303710e7a990113f9
MD5 b9dba7754272a2509dfaa38ba12e35eb
BLAKE2b-256 7975c3545b0b64fb23fc15110cc1054f05fef6db607fefc3393698bfc9cfb813

See more details on using hashes here.

File details

Details for the file basion_agent-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: basion_agent-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 72.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.12

File hashes

Hashes for basion_agent-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 91945b72b7ad48c569cd89a3403f60053e38a5025774036fd8c25d3ff6254764
MD5 6968fb788d8685f90b061659855b0778
BLAKE2b-256 b4de8575299e0c1d167d40cfd3c984233d17ee7615a1d2bcfbd9764fbcb29bb5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page