Python SDK for Basion AI Agent framework - handles agent registration, message consumption, and streaming responses
Project description
Basion Agent SDK
Python SDK for building AI agents on the Basion platform. Agents register themselves, receive messages through Kafka, and stream responses back to users in real time.
Installation
pip install basion-agent
# With LangGraph support
pip install basion-agent[langgraph]
# With Pydantic AI support
pip install basion-agent[pydantic]
Quick Start
from basion_agent import BasionAgentApp
app = BasionAgentApp(
gateway_url="agent-gateway:8080",
api_key="your-api-key",
)
agent = app.register_me(
name="rare-disease-assistant",
about="Answers questions about rare diseases, symptoms, and treatments",
document="A medical assistant specializing in rare diseases. Can look up conditions, find related diseases, and help patients understand their diagnosis.",
)
@agent.on_message
async def handle(message, sender):
history = await message.conversation.get_history(limit=10)
async with agent.streamer(message) as s:
s.stream(f"You asked: {message.content}\n\n")
s.stream("Let me look into that for you...")
app.run()
CLI
# Run your agent
basion-agent run main:app
# Defaults to :app
basion-agent run main
# Custom app name
basion-agent run myagent:application
# Show version
basion-agent version
Configuration
app = BasionAgentApp(
gateway_url="agent-gateway:8080", # Required
api_key="key", # Required
heartbeat_interval=60, # Heartbeat frequency in seconds
max_concurrent_tasks=100, # Max concurrent message handlers
error_message_template="...", # Error message sent to users on failure
secure=False, # TLS for gRPC and HTTPS for HTTP
enable_remote_logging=False, # Send logs to Loki via gateway
remote_log_level=logging.INFO, # Min log level for remote logging
remote_log_batch_size=100, # Logs per batch
remote_log_flush_interval=5.0, # Seconds between flushes
)
| Environment Variable | Description |
|---|---|
GATEWAY_URL |
Agent Gateway endpoint |
GATEWAY_API_KEY |
API key for authentication |
Agent Registration
agent = app.register_me(
name="rare-disease-assistant", # Unique identifier (used for Kafka routing)
about="Rare disease medical assistant", # Short description for agent selection
document="Full documentation...", # Detailed docs used by the router
representation_name="Dr. Assistant", # Display name (optional)
base_url="http://...", # Base URL for frontend service (optional)
metadata={"specialty": "rare-diseases"}, # Additional metadata (optional)
category_name="medical", # Category in kebab-case (optional)
tag_names=["rare-disease", "genetics"], # Tags in kebab-case (optional)
example_prompts=[ # Example prompts shown to users (optional)
"What is Huntington disease?",
"Find diseases similar to Cystic Fibrosis",
],
is_experimental=False, # Mark as experimental (optional)
related_pages=[ # Related pages (optional)
{"name": "Resources", "endpoint": "/resources"}
],
)
Message Handling
Basic Handler
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
s.stream("Hello! How can I help?")
Handlers can be async or synchronous. Sync handlers run in a thread pool automatically.
Sender Filtering
# Handle messages from users only
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
...
# Handle messages from a specific agent
@agent.on_message(senders=["triage-agent"])
async def handle_triage(message, sender):
...
# Exclude a specific sender
@agent.on_message(senders=["~notification-agent"])
async def handle_others(message, sender):
...
Error Handling
If a handler throws, the SDK automatically sends an error message to the user. Customize or disable this:
# Custom error message
app = BasionAgentApp(
...,
error_message_template="Sorry, something went wrong. Please try again.",
)
# Or disable per-agent
agent.send_error_responses = False
@agent.on_message
async def handle(message, sender):
try:
...
except Exception as e:
async with agent.streamer(message) as s:
s.stream(f"I ran into an issue: {e}")
Message
The message object provides access to content, conversation history, memory, and attachments.
@agent.on_message
async def handle(message, sender):
message.content # Message text
message.conversation_id # Conversation UUID
message.user_id # User UUID
message.metadata # Optional dict (from frontend)
message.schema # Optional dict (for form responses)
message.headers # Raw Kafka headers
message.conversation # Conversation helper
message.memory_v2 # mem0 memory (ingest + search)
message.memory # Deprecated memory (use memory_v2)
# Inter-agent communication
message.hand_off("agent-name") # Forward to another agent
message.hand_off("agent-name", "content") # Forward with custom content
Streaming Responses
Basic Streaming
@agent.on_message
async def handle(message, sender):
# Context manager — auto-finishes on exit
async with agent.streamer(message) as s:
s.stream("Looking up information on Huntington disease...\n\n")
s.stream("Huntington disease is a progressive neurodegenerative disorder...")
# Or manual control
s = agent.streamer(message)
s.stream("Hello!")
await s.finish()
Streamer Options
# Route response to another agent
async with agent.streamer(message, send_to="specialist-agent") as s:
s.stream("Forwarding to the specialist...")
# Set awaiting — next user reply routes back to this agent
async with agent.streamer(message, awaiting=True) as s:
s.stream("What symptoms are you experiencing?")
Non-Persisted Content
Chunks that appear in real-time but don't get saved to conversation history:
async with agent.streamer(message) as s:
s.stream("Searching knowledge graph...", persist=False, event_type="thinking")
# ... do work ...
s.stream("Here are the results:") # This gets persisted
Message Metadata
Attach metadata to the response message:
async with agent.streamer(message) as s:
s.set_message_metadata({"source": "knowledge_graph", "confidence": 0.92})
s.stream("Based on the knowledge graph, ...")
Response Schema (genui Forms)
Request structured input from users with genui — a declarative form builder. Define form classes with typed fields, send them to the frontend, and parse typed responses back.
Quick Start
from basion_agent.genui import Form, Text, Slider, MultiSelect, Switch, Option
class SymptomReport(Form):
name = Text(label="Full Name", placeholder="John Doe")
severity = Slider(label="Pain Level", min=1, max=10)
symptoms = MultiSelect(label="Symptoms", options=[
Option(value="headache", label="Headache"),
Option(value="fatigue", label="Fatigue"),
Option(value="nausea", label="Nausea"),
])
chronic = Switch(label="Is this chronic?")
Send to frontend:
@agent.on_message
async def handle(message, sender):
# Check if this is a form submission
if message.schema:
data = await SymptomReport.parse(message)
async with agent.streamer(message) as s:
s.stream(f"Thank you, {data.name}. Pain level: {data.severity}/10.")
return
# Ask for structured input
async with agent.streamer(message, awaiting=True) as s:
s.stream("Please describe your symptoms:")
form = SymptomReport(title="Symptom Report", description="Tell us about your symptoms")
s.set_response_schema(form)
How it works:
- Declare a
Formsubclass with field class attributes - Instantiate with metadata:
form = MyForm(title="...", description="...") streamer.set_response_schema(form)sends the schema to the frontend- When the user submits,
await MyForm.parse(message)returns a typed Python object with IDE autocomplete
Field Types
All fields accept required=True (default). All arguments are keyword-only.
| Field | Python Type | None Default | Description |
|---|---|---|---|
Text |
str |
"" |
Single/multiline text input |
Number |
float |
0.0 |
Numeric input with min/max/step |
Select |
str |
"" |
Single-select dropdown |
MultiSelect |
List[str] |
[] |
Multiple selection |
Checkbox |
bool |
False |
Single boolean checkbox |
CheckboxGroup |
List[str] |
[] |
Multiple checkboxes as a group |
Switch |
bool |
False |
Toggle switch |
Slider |
float |
0.0 |
Range slider |
DatePicker |
str |
"" |
Date selection |
Hidden |
Any |
None |
Hidden field (not rendered) |
FileField |
AttachmentInfo |
None |
File upload field |
Text:
name = Text(label="Full Name", placeholder="John Doe", min_length=2, max_length=100)
bio = Text(label="Biography", multiline=True, required=False)
Number:
age = Number(label="Age", min=0, max=150, step=1)
Select / MultiSelect:
color = Select(label="Color", options=[
Option(value="red", label="Red"),
Option(value="blue", label="Blue"),
], placeholder="Pick one...", allow_custom=True)
symptoms = MultiSelect(label="Symptoms", options=[
Option(value="headache", label="Headache"),
Option(value="fatigue", label="Fatigue"),
], min_selections=1, max_selections=5, allow_custom=True)
Checkbox / CheckboxGroup / Switch:
agree = Checkbox(label="I agree to the terms")
languages = CheckboxGroup(label="Languages", options=[
Option(value="en", label="English"),
Option(value="es", label="Spanish"),
], min_selections=1)
notifications = Switch(label="Enable notifications", default=True)
Slider / DatePicker:
pain = Slider(label="Pain Level", min=0, max=10, step=1, default=5)
dob = DatePicker(label="Date of Birth", min_date="1900-01-01", max_date="2026-12-31")
Hidden / FileField:
session_id = Hidden(label="Session", value="abc-123", required=False)
doc = FileField(label="Document", accept=["application/pdf", "image/*"], max_size=10_000_000)
Form
Single-step form container. Metadata is passed via __init__ (all keyword-only):
class Feedback(Form):
comment = Text(label="Comment", multiline=True)
form = Feedback(
title="Feedback",
description="Tell us what you think",
submit_label="Send Feedback",
allow_cancel=True,
cancel_label="Dismiss",
)
streamer.set_response_schema(form)
| Parameter | Type | Default | Description |
|---|---|---|---|
title |
str |
required | Form heading |
description |
str |
None |
Subtitle/description text |
submit_label |
str |
"Submit" |
Submit button text |
allow_cancel |
bool |
False |
Show a cancel button |
cancel_label |
str |
"Cancel" |
Cancel button text |
MultiStepForm
Multi-step wizard. Define Step subclasses with fields, then use them as class attributes:
from basion_agent.genui import MultiStepForm, Step, Text, Number, Slider, Checkbox
class Demographics(Step):
name = Text(label="Full Name")
age = Number(label="Age", min=0, max=150)
class PainAssessment(Step):
pain_level = Slider(label="Pain Level", min=0, max=10)
chronic = Checkbox(label="Is this chronic?")
class PatientIntake(MultiStepForm):
demographics = Demographics(label="Basic Info", description="Your basic details")
pain = PainAssessment(label="Pain Assessment")
form = PatientIntake(title="Patient Intake", description="Please complete all steps")
streamer.set_response_schema(form)
Field names must be unique across all steps. The frontend submits all fields as a flat dict.
Confirmation
A yes/no decision card — no input fields:
from basion_agent.genui import Confirmation
confirm = Confirmation(
title="Delete Account",
message="Are you sure? This cannot be undone.",
confirm_label="Delete",
cancel_label="Keep Account",
variant="destructive", # "default", "destructive", "warning", "info"
)
streamer.set_response_schema(confirm)
# Parse the response
result = await Confirmation.parse(message)
result.confirmed # bool — True if confirmed, False if cancelled
Parsing
parse() is always async and is a classmethod. It accepts Message, JSON string, or dict:
data = await MyForm.parse(message) # From Message (most common)
data = await MyForm.parse('{"name": "Alice"}') # From JSON string
data = await MyForm.parse({"name": "Alice"}) # From dict
data.name # str — typed access with IDE autocomplete
data.severity # float
Missing fields get their type's default value. Extra keys are silently ignored.
For FileField, when parsing from a Message, the attachment client is automatically extracted:
class UploadForm(Form):
doc = FileField(label="Document")
data = await UploadForm.parse(message)
data.doc # AttachmentInfo object
Conversation History
@agent.on_message
async def handle(message, sender):
conv = message.conversation
# Get message history
history = await conv.get_history(limit=20)
history = await conv.get_history(role="user", limit=10, offset=0)
# Get conversation metadata
metadata = await conv.get_metadata()
# Get messages where this agent was sender or recipient
agent_history = await conv.get_agent_history(limit=50)
agent_history = await conv.get_agent_history(agent_name="triage-agent")
Memory V2 (mem0)
Long-term memory powered by mem0.ai. Ingest messages and semantically search across a user's history.
@agent.on_message
async def handle(message, sender):
mem = message.memory_v2
# Ingest the user's message
await mem.ingest(role="user", content=message.content)
# Search for relevant past context
results = await mem.search(query="diagnosis history")
for r in results:
memory_text = r.get("memory") # Extracted memory text
async with agent.streamer(message) as s:
if results:
s.stream("Based on what I remember:\n")
for r in results:
s.stream(f"- {r.get('memory')}\n")
s.stream("\n")
s.stream(f"Regarding your question: {message.content}")
# Ingest the assistant's response too
await mem.ingest(role="assistant", content="...")
| Method | Description |
|---|---|
ingest(role, content) |
Ingest a message. Role: 'user', 'assistant', or 'system'. |
search(query) |
Semantic search across the user's memories. Returns a list of dicts. |
Attachments
Download and process file attachments (images, PDFs, etc.).
@agent.on_message
async def handle(message, sender):
if not message.has_attachments():
return
count = message.get_attachment_count()
attachments = message.get_attachments()
async with agent.streamer(message) as s:
s.stream(f"Received {count} file(s).\n\n")
for i, att in enumerate(attachments):
s.stream(f"**{att.filename}** ({att.content_type}, {att.size} bytes)\n")
if att.is_image():
base64_str = await message.get_attachment_base64_at(i)
# Send to vision model...
elif att.is_pdf():
pdf_bytes = await message.get_attachment_bytes_at(i)
# Parse PDF...
# Or download everything at once
all_bytes = await message.get_all_attachment_bytes()
all_base64 = await message.get_all_attachment_base64()
Attachment Methods
| Method | Returns | Description |
|---|---|---|
has_attachments() |
bool |
Whether the message has any attachments |
get_attachment_count() |
int |
Number of attachments |
get_attachments() |
List[AttachmentInfo] |
List of attachment metadata |
get_attachment_bytes() |
bytes |
Download first attachment as bytes |
get_attachment_base64() |
str |
Download first attachment as base64 |
get_attachment_buffer() |
BytesIO |
Download first attachment as BytesIO |
get_attachment_bytes_at(i) |
bytes |
Download attachment at index i |
get_attachment_base64_at(i) |
str |
Download attachment at index i as base64 |
get_attachment_buffer_at(i) |
BytesIO |
Download attachment at index i as BytesIO |
get_all_attachment_bytes() |
List[bytes] |
Download all attachments |
get_all_attachment_base64() |
List[str] |
Download all as base64 |
get_all_attachment_buffers() |
List[BytesIO] |
Download all as BytesIO |
AttachmentInfo Properties
| Property | Type | Example |
|---|---|---|
filename |
str |
"genetic_report.pdf" |
content_type |
str |
"application/pdf" |
size |
int |
524288 |
url |
str |
Download URL |
file_extension |
str |
"pdf" |
file_type |
str |
"pdf" |
is_image() |
bool |
True for image MIME types |
is_pdf() |
bool |
True for PDF files |
Structural Streaming
Rich UI components streamed alongside text. Bind a structural component to the streamer with stream_by().
Artifact
Files, images, or embeds with generation progress. Artifact data is persisted to the database.
from basion_agent import Artifact
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
artifact = Artifact()
# Show progress
s.stream_by(artifact).generating("Generating genetic pathway diagram...", progress=0.3)
# ... do work ...
s.stream_by(artifact).generating("Rendering...", progress=0.8)
# Complete with result
s.stream_by(artifact).done(
url="https://example.com/pathway-diagram.png",
type="image",
title="HTT Gene Pathway",
description="Huntingtin protein interaction network",
metadata={"width": 1200, "height": 800},
)
# Or signal an error
# s.stream_by(artifact).error("Failed to generate diagram")
s.stream("Here's the pathway diagram for the HTT gene.")
Artifact types: image, iframe, document, video, audio, code, link, file
Surface
Interactive embedded components (iframes, widgets).
from basion_agent import Surface
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
surface = Surface()
s.stream_by(surface).generating("Loading appointment scheduler...")
s.stream_by(surface).done(
url="https://cal.com/embed/dr-smith",
type="iframe",
title="Schedule Genetic Counseling",
description="Book a session with a genetic counselor",
)
s.stream("You can schedule your genetic counseling session above.")
TextBlock
Collapsible text blocks with streaming title/body and visual variants. TextBlock events are not persisted.
from basion_agent import TextBlock
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
block = TextBlock()
# Set visual variant
s.stream_by(block).set_variant("thinking")
# Stream title (appends)
s.stream_by(block).stream_title("Analyzing ")
s.stream_by(block).stream_title("symptoms...")
# Stream body (appends)
s.stream_by(block).stream_body("Checking symptom database...\n")
s.stream_by(block).stream_body("Cross-referencing with HPO ontology...\n")
s.stream_by(block).stream_body("Matching against known phenotypes...\n")
# Replace title/body entirely
s.stream_by(block).update_title("Analysis Complete")
s.stream_by(block).update_body("Found 3 matching conditions.")
# Mark as done
s.stream_by(block).done()
s.stream("Based on the symptoms, here are possible conditions...")
Variants: thinking, note, warning, error, success
Stepper
Multi-step progress indicators. Stepper events are not persisted.
from basion_agent import Stepper
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
stepper = Stepper(steps=[
"Search diseases",
"Analyze phenotypes",
"Find similar conditions",
"Generate report",
])
s.stream_by(stepper).start_step(0)
diseases = await kg.search_diseases(name="Huntington")
s.stream_by(stepper).complete_step(0)
s.stream_by(stepper).start_step(1)
phenotypes = await kg.search_phenotypes(name="chorea")
s.stream_by(stepper).complete_step(1)
s.stream_by(stepper).start_step(2)
similar = await kg.find_similar_diseases("Huntington Disease")
s.stream_by(stepper).complete_step(2)
# Add a step dynamically
s.stream_by(stepper).add_step("Cross-reference")
s.stream_by(stepper).start_step(4)
s.stream_by(stepper).update_step_label(4, "Cross-reference (final)")
s.stream_by(stepper).complete_step(4)
s.stream_by(stepper).start_step(3)
# Or mark a step as failed
# s.stream_by(stepper).fail_step(3, error="Report generation timed out")
s.stream_by(stepper).complete_step(3)
s.stream_by(stepper).done()
s.stream("Here's your rare disease report...")
Generative UI Message Components
Rich UI components that are persisted as the message content. Unlike structural streaming (which is ephemeral side-channel UI) and response schemas (which replace the chat input), genui message components are the actual message — stored in the database and rendered by the frontend as structured cards, accordions, etc.
Overview
| Feature | Method | Persisted? | Renders As |
|---|---|---|---|
| Text streaming | s.stream() |
Yes | Markdown text |
| Structural events | s.stream_by() |
No (except Artifact) | Stepper, TextBlock, etc. |
| Response schemas | s.set_response_schema() |
No | Form replacing chat input |
| Genui components | s.generate_ui() |
Yes | Card, Accordion, etc. |
Quick Start
from basion_agent.genui import Card, CardSection, CardButton
@agent.on_message
async def handle(message, sender):
async with agent.streamer(message) as s:
s.generate_ui(Card(
title="Lab Results",
body="Blood work completed successfully.",
variant="success",
sections=[
CardSection(label="Hemoglobin", value="14.2 g/dL"),
CardSection(label="WBC", value="7,500 /uL"),
],
))
The message is stored with is_genui=True and content as a JSON array of component dicts. The frontend parses and renders them as structured UI instead of markdown.
Card
A card component for displaying structured information with optional sections, image, and button.
from basion_agent.genui import Card, CardSection, CardButton
Card(
title="Patient Summary", # Required — card heading
body="Overview of current status.", # Optional — description text
variant="info", # Optional — visual style
sections=[ # Optional — key-value pairs
CardSection(label="Name", value="John Doe"),
CardSection(label="Age", value="42"),
CardSection(label="Status", value="Active"),
],
image="https://example.com/photo.jpg", # Optional — image URL at top
button=CardButton( # Optional — link button at bottom
label="View Full Report",
url="https://example.com/report/123",
),
)
| Parameter | Type | Default | Description |
|---|---|---|---|
title |
str |
required | Card heading |
body |
str |
"" |
Body text / description |
variant |
str |
"default" |
Visual style: default, success, warning, error, info |
sections |
List[CardSection] |
None |
Key-value pairs displayed in the card |
image |
str |
None |
Image URL displayed at the top of the card |
button |
CardButton |
None |
Link button at the bottom of the card |
CardSection — a key-value pair:
| Parameter | Type | Description |
|---|---|---|
label |
str |
Label/key text |
value |
str |
Value text |
CardButton — a link button:
| Parameter | Type | Description |
|---|---|---|
label |
str |
Button text |
url |
str |
URL the button opens |
Accordion
A collapsible accordion component with expandable items.
from basion_agent.genui import Accordion, AccordionItem
Accordion(
title="Frequently Asked Questions", # Required — heading
items=[ # Required — at least one item
AccordionItem(
title="What is this service?",
body="A multi-agent AI assistant for healthcare tasks.",
),
AccordionItem(
title="How do I submit a form?",
body="Fill in the fields and press Submit.",
),
],
)
| Parameter | Type | Default | Description |
|---|---|---|---|
title |
str |
required | Accordion heading |
items |
List[AccordionItem] |
required | Expandable items (min 1) |
AccordionItem:
| Parameter | Type | Description |
|---|---|---|
title |
str |
Item heading (clickable) |
body |
str |
Content revealed when expanded |
Multiple Components in One Message
Call generate_ui() multiple times to send several components as a single message:
async with agent.streamer(message) as s:
s.generate_ui(Card(
title="Lab Results",
body="Blood work completed on 2026-02-11.",
variant="success",
sections=[
CardSection(label="Hemoglobin", value="14.2 g/dL (Normal)"),
CardSection(label="WBC", value="7,500 /uL (Normal)"),
],
))
s.generate_ui(Accordion(
title="Detailed Breakdown",
items=[
AccordionItem(title="Complete Blood Count", body="All values within normal range."),
AccordionItem(title="Metabolic Panel", body="Glucose: 95 mg/dL — normal."),
],
))
Both components are stored as a single message with content = [card_dict, accordion_dict].
Mutual Exclusion with stream()
generate_ui() and stream() cannot be mixed in the same message — they are mutually exclusive modes:
# OK — genui only
async with agent.streamer(message) as s:
s.generate_ui(Card(title="Result", body="Done."))
# OK — text only
async with agent.streamer(message) as s:
s.stream("Here are the results in plain text...")
# ERROR — mixing modes raises RuntimeError
async with agent.streamer(message) as s:
s.stream("Some text")
s.generate_ui(Card(title="Oops")) # RuntimeError!
Combining with Structural Events
Structural events (stream_by()) work alongside generate_ui() because they are non-persisted side-channel UI:
from basion_agent import Stepper
from basion_agent.genui import Card, CardSection
async with agent.streamer(message) as s:
stepper = Stepper(steps=["Fetch data", "Analyze", "Build card"])
s.stream_by(stepper).start_step(0)
data = await fetch_data()
s.stream_by(stepper).complete_step(0)
s.stream_by(stepper).start_step(1)
results = analyze(data)
s.stream_by(stepper).complete_step(1)
s.stream_by(stepper).start_step(2)
s.generate_ui(Card(
title="Analysis Complete",
variant="success",
sections=[
CardSection(label="Records Analyzed", value=str(len(results))),
],
))
s.stream_by(stepper).complete_step(2)
s.stream_by(stepper).done()
Raw Dict Fallback
If you need to send a component type not yet supported by the SDK classes, pass a raw dict:
s.generate_ui({"type": "card", "title": "Custom", "body": "Raw dict works too."})
Any object with a to_dict() method or a plain dict is accepted.
How It Works
- Agent calls
s.generate_ui(component)— component is appended to an internal list - On
finish(), all components are serialized as a JSON array and sent as the messagecontentwithisGenui=True - Provider consumer stores the message with
is_genui=truein the database - Frontend receives the Centrifugo event with
isGenui: true, parsescontentas JSON, and renders the components
Knowledge Graph
Query biomedical knowledge graphs for diseases, proteins, phenotypes, drugs, and pathways. Accessed via agent.tools.knowledge_graph.
@agent.on_message
async def handle(message, sender):
kg = agent.tools.knowledge_graph
# Search diseases
diseases = await kg.search_diseases(name="Huntington", limit=5)
diseases = await kg.search_diseases(orphacode="399", limit=5)
diseases = await kg.search_diseases(omim="143100")
disease = await kg.get_disease(disease_id=123)
# Search proteins/genes
proteins = await kg.search_proteins(symbol="HTT", limit=10)
proteins = await kg.search_proteins(ensembl_id="ENSG00000197386")
# Search phenotypes (HPO terms)
phenotypes = await kg.search_phenotypes(name="chorea", limit=10)
phenotypes = await kg.search_phenotypes(hpo_id="HP:0002072")
# Search drugs
drugs = await kg.search_drugs(name="tetrabenazine", limit=5)
# Search pathways
pathways = await kg.search_pathways(name="apoptosis", limit=5)
# Find similar diseases by shared phenotypes
similar = await kg.find_similar_diseases("Huntington Disease", limit=10)
for s in similar:
s.disease_name # "Spinocerebellar Ataxia Type 17"
s.similarity_score # 0.85
s.shared_count # 12
# Find similar diseases by shared genes
by_genes = await kg.find_similar_diseases_by_genes("Huntington Disease", limit=10)
# Get all connections for an entity
edges = await kg.get_entity_network("HTT", "protein")
for e in edges:
e.source_id, e.source_type
e.target_id, e.target_type
e.relation_type
# k-hop graph traversal (BFS subgraph)
subgraph = await kg.k_hop_traversal("HTT", "protein", k=2, limit_edges=100)
# Shortest path between two entities
path = await kg.find_shortest_path(
start_name="HTT", start_type="protein",
end_name="Huntington Disease", end_type="disease",
max_hops=5,
)
for step in path:
step.node_id, step.node_name, step.node_type, step.relation
Entity types: protein, phenotype, disease, pathway, drug, molecular_function, cellular_component, biological_process
Agent Inventory
Query the AI Inventory service to discover active agents and their capabilities. Accessed via agent.tools.agent_inventory.
@agent.on_message
async def handle(message, sender):
inv = agent.tools.agent_inventory
# Get all active agents
agents = await inv.get_active_agents()
for a in agents:
a.id # Agent UUID
a.name # "rare-disease-assistant"
a.representation_name # "Dr. Assistant"
a.about # Short description
a.document # Full documentation
a.example_prompts # ["What is Huntington disease?", ...]
a.categories # [{"id": "...", "name": "medical"}]
a.tags # [{"id": "...", "name": "rare-disease"}]
# Get agents accessible to a specific user (filtered by role/permissions)
user_agents = await inv.get_user_agents(user_id="user-uuid")
| Method | Returns | Description |
|---|---|---|
get_active_agents() |
List[AgentInfo] |
All agents with status=active and lifeStatus=active |
get_user_agents(user_id) |
List[AgentInfo] |
Active agents accessible to a specific user |
Agent-Initiated (Proactive) Conversations
Agents can proactively start new conversations with users — without waiting for them to message first. Use cases: health check-ins, medication reminders, appointment follow-ups, new research alerts.
How It Works
- Agent creates a conversation via the conversation store API (
is_new=True,current_route,locked_byset atomically) - Agent streams the first message through the normal Kafka pipeline (router → provider → Centrifugo → user)
- Provider persists the assistant message and unlocks the conversation on
done=true - User sees a new bold conversation in their sidebar (
is_newflag) - If
awaiting=True, the user's reply routes back to the agent's@on_messagehandler
Basic Usage
@agent.on_message
async def handle_reply(message, sender):
# User replied to the proactive conversation
async with agent.streamer(message) as s:
s.stream("Thanks for responding to the check-in!")
# Trigger from a scheduler, webhook, or API endpoint
async def send_weekly_check_in(user_id: str):
conv_id, streamer = await agent.start_conversation(
user_id=user_id,
title="Weekly Health Check-in",
awaiting=True,
)
async with streamer as s:
s.stream("Hi! Time for your weekly check-in.\n\n")
s.stream("How have you been feeling this week?")
With Response Schema
async def request_symptom_log(user_id: str):
conv_id, streamer = await agent.start_conversation(
user_id=user_id,
title="Daily Symptom Log",
awaiting=True,
response_schema={
"type": "object",
"title": "How are you feeling today?",
"properties": {
"pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
"fatigue": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Fatigue Level"},
"notes": {"type": "string", "title": "Additional Notes"},
},
"required": ["pain_level"],
},
)
async with streamer as s:
s.stream("Please fill out today's symptom log:")
With Metadata
conv_id, streamer = await agent.start_conversation(
user_id=user_id,
title="New Research Alert",
awaiting=True,
message_metadata={"alert_type": "research", "paper_id": "PMC12345"},
metadata={"source": "pubmed_monitor", "triggered_at": "2025-01-15T09:00:00Z"},
)
async with streamer as s:
s.stream("A new paper about your condition was published today...")
API Reference
async def start_conversation(
user_id: str,
title: str = "Agent-initiated conversation",
awaiting: bool = False,
response_schema: Optional[Dict[str, Any]] = None,
message_metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[str, Streamer]:
| Parameter | Type | Default | Description |
|---|---|---|---|
user_id |
str |
Required | Target user UUID |
title |
str |
"Agent-initiated conversation" |
Conversation title shown in sidebar |
awaiting |
bool |
False |
If True, user reply routes back to this agent |
response_schema |
dict |
None |
JSON Schema for structured response (renders a form) |
message_metadata |
dict |
None |
Metadata attached to the streamed message |
metadata |
dict |
None |
Metadata stored on the conversation itself |
Returns (conversation_id, streamer). The streamer works identically to agent.streamer().
Inter-Agent Communication
Three patterns for agents to communicate with each other, each suited to different use cases.
Why Inter-Agent Communication?
In a rare disease platform, no single agent can cover everything. A patient might ask a general wellness agent about fatigue, but the underlying cause is Ehlers-Danlos syndrome — something only a genetics specialist would recognize. The general agent needs to either hand off the conversation to the specialist entirely, or call the specialist to get a quick answer and weave it into its own response.
Without inter-agent communication, every agent would need to be an expert in everything, or the user would have to manually switch between agents mid-conversation.
Overview
| Pattern | Method | Blocking? | Streamer? | Use Case |
|---|---|---|---|---|
| Stream-forward | streamer(msg, send_to=...) |
No | Yes | Relay content to another agent via streaming |
| Hand-off | message.hand_off(agent) |
No | No | Fire-and-forget forward; single Kafka message |
| Call | agent.call(agent, conv_id, content) |
Yes (await) | No | Synchronous request → response between agents |
Stream-Forward (send_to)
Route a streamed response to another agent instead of the user. The target agent receives the full streamed content as a single message. Useful when one agent preprocesses or enriches a message before another agent handles it.
Scenario: A triage agent receives a user's question about a rare genetic condition. It packages the question with relevant patient context and forwards it to the genetics specialist, who responds directly to the user.
# Triage agent forwards to genetics specialist with context
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
async with agent.streamer(message, send_to="genetics-specialist") as s:
s.stream(f"Patient (history: Marfan syndrome, age 34) asks: {message.content}")
# Genetics specialist receives enriched message, responds to user
@genetics_agent.on_message(senders=["triage-agent"])
async def handle_from_triage(message, sender):
async with genetics_agent.streamer(message, send_to="user") as s:
s.stream(f"Based on your Marfan syndrome history:\n\n{generate_response(message.content)}")
Hand-Off (message.hand_off)
Forward a message to another agent with a single Kafka message. No streamer, no done flag — the target agent's @on_message fires exactly once. The calling agent can optionally stream to the user before handing off.
Scenario: Out-of-expertise escalation. A patient asks the general wellness agent about joint hypermobility and chronic pain. The wellness agent recognizes this sounds like Ehlers-Danlos syndrome — far outside its expertise. It tells the user what's happening and hands off to the connective tissue specialist, who takes over the conversation entirely.
@wellness_agent.on_message
async def handle(message, sender):
content = message.content.lower()
# Detect rare disease indicators outside our expertise
if any(term in content for term in ["hypermobility", "joint laxity", "stretchy skin", "ehlers-danlos"]):
# Stream a brief explanation to the user before handing off
s = wellness_agent.streamer(message)
s.stream("Your symptoms suggest a connective tissue condition that requires specialist input.\n\n")
s.stream("I'm connecting you with our rare disease specialist who can help.")
# Hand off — the specialist takes over the conversation
message.hand_off("rare-disease-specialist",
f"Patient reports: {message.content}\n\n"
f"Wellness agent assessment: Symptoms consistent with possible Ehlers-Danlos syndrome. "
f"Patient needs specialist evaluation for hypermobility spectrum disorders."
)
return
# Normal wellness handling...
async with wellness_agent.streamer(message) as s:
s.stream(generate_wellness_response(message.content))
Scenario: Multi-step diagnostic pipeline. A general intake agent collects symptoms, hands off to a diagnostic agent, which may further hand off to a condition-specific agent.
@intake_agent.on_message
async def handle(message, sender):
symptoms = extract_symptoms(message.content)
s = intake_agent.streamer(message)
s.stream("Thank you for describing your symptoms. Let me connect you with the right specialist.")
# Hand off with structured context for the diagnostic agent
message.hand_off("diagnostic-agent",
f"Extracted symptoms: {', '.join(symptoms)}\n"
f"Original message: {message.content}"
)
How it works:
- Produces a single Kafka message to
router.inboxwith the original message headers - Router forwards to the target agent's inbox and updates
current_routeto the target agent - Target agent's
@on_messagehandler fires with the forwarded message - The calling agent does not send
done=True— the target agent is responsible for responding to the user - The conversation lock is not stuck — the target agent responds to the user via normal streamer flow, which sends
done=truetouser.inbox, and the provider unlocks the conversation
def hand_off(self, agent_name: str, content: Optional[str] = None) -> None
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name |
str |
required | Target agent name |
content |
str |
None |
Override content. If None, forwards the original message.content |
Call (agent.call)
Synchronous agent-to-agent communication. Call another agent and await the response. The calling agent keeps conversation ownership — current_route is not changed.
Scenario: Cross-referencing with a specialist. A patient asks the care coordinator about drug interactions for their rare condition (Gaucher disease). The coordinator needs specific pharmacogenomics information but should remain the patient's primary contact. It calls the pharmacology agent behind the scenes, gets the answer, and incorporates it into its own response.
@coordinator_agent.on_message
async def handle(message, sender):
async with coordinator_agent.streamer(message) as s:
s.stream("Let me check on that for you...\n\n")
try:
# Call the pharmacology agent — patient doesn't see this exchange
drug_info = await coordinator_agent.call(
agent_name="pharmacology-agent",
conversation_id=message.conversation_id,
content=f"Patient has Gaucher disease (Type 1, on ERT). "
f"Question: {message.content}",
timeout=15.0,
)
s.stream(f"Here's what I found about your medication:\n\n{drug_info}")
except asyncio.TimeoutError:
s.stream("I wasn't able to get the pharmacology details right now. "
"Let me note this and follow up with you shortly.")
Scenario: Gathering information from multiple specialists. A rare disease coordinator needs input from both a genetics agent and a clinical trials agent to give the patient a complete answer.
@coordinator_agent.on_message
async def handle(message, sender):
async with coordinator_agent.streamer(message) as s:
s.stream("Looking into this from multiple angles...\n\n")
# Call two specialists in parallel
genetics_task = coordinator_agent.call(
"genetics-agent", message.conversation_id,
f"Patient with suspected Wilson's disease. {message.content}",
)
trials_task = coordinator_agent.call(
"clinical-trials-agent", message.conversation_id,
f"Find active clinical trials for Wilson's disease relevant to: {message.content}",
)
genetics_info, trials_info = await asyncio.gather(
genetics_task, trials_task, return_exceptions=True
)
if not isinstance(genetics_info, Exception):
s.stream(f"**Genetic perspective:**\n{genetics_info}\n\n")
if not isinstance(trials_info, Exception):
s.stream(f"**Clinical trials:**\n{trials_info}\n\n")
s.stream("Is there anything else you'd like to know?")
How it works:
- The calling agent produces a message to
router.inboxwithisCall=trueand a uniquecallIdheader - Router forwards to the target agent's inbox —
current_routeis not updated (the caller keeps ownership) - The target agent's
@on_messagehandler fires — the streamer auto-detectsisCall=trueand routes the response back to the calling agent (not the user) - The calling agent's message interceptor captures the response chunks, accumulates them, and resolves the
call()future ondone=true call()returns the full accumulated response as a string- Call messages are not persisted — they are transient, like internal tool calls. The
conversation_idis only a Kafka partition key. This keeps the conversation history clean for LLMs (no confusing consecutive assistant messages from different agents)
async def call(
self,
agent_name: str,
conversation_id: str,
content: str,
timeout: float = 30.0,
) -> str
| Parameter | Type | Default | Description |
|---|---|---|---|
agent_name |
str |
required | Target agent to call |
conversation_id |
str |
required | Conversation ID (used as Kafka partition key only — not persisted) |
content |
str |
required | Text content to send |
timeout |
float |
30.0 |
Max seconds to wait for a response |
Returns: The target agent's full response as a str.
Raises:
asyncio.TimeoutErrorif the target agent doesn't respond within the timeoutRuntimeErrorif the app's call infrastructure is not available
Target agent handler: The target agent doesn't need special handling — its normal @on_message handler fires. The SDK's streamer auto-detects the isCall header and routes the response back to the caller.
# Target agent — no special code needed
@pharmacology_agent.on_message
async def handle(message, sender):
async with pharmacology_agent.streamer(message) as s:
s.stream(generate_drug_interaction_report(message.content))
# Response automatically routed back to the calling agent
When to Use Which
| Scenario | Pattern | Example |
|---|---|---|
| Topic is outside your expertise — another agent should take over | hand_off |
Wellness agent detects Ehlers-Danlos symptoms, hands off to rare disease specialist |
| Need information from a specialist to complete your response | call |
Care coordinator calls pharmacology agent about Gaucher disease drug interactions |
| Gather input from multiple agents in parallel | call + asyncio.gather |
Coordinator calls genetics + clinical trials agents simultaneously for Wilson's disease |
| Relay enriched context to a specialist who responds to user | send_to |
Triage agent adds patient history before forwarding to genetics specialist |
| Multi-step pipeline — each agent processes and passes forward | hand_off |
Intake → diagnostic → condition-specific agent |
| Fact-check or validate your response before sending to user | call |
General agent calls medical-qa agent to verify a dosage recommendation |
Conversation Locking and Persistence
Hand-off does not cause stuck locks. When Agent A hands off to Agent B, the conversation lock persists correctly through the transition. Agent B responds to the user via the normal streamer flow, which sends done=true to user.inbox. The provider receives it, persists the message, and unlocks the conversation. The lock is never orphaned — it simply transfers naturally as Agent B takes over.
Call messages are not persisted. Agent-to-agent call messages are transient by design — they never reach user.inbox and the provider never sees them. The conversation_id passed to agent.call() serves only as a Kafka partition key for ordering guarantees. This is intentional: if call messages were persisted, they would pollute the conversation history with confusing consecutive assistant messages from different agents (the call question and the call response) that the user never saw. The calling agent already incorporates the call result into its own streamed response, keeping the LLM's conversation history clean and coherent.
Remote Logging (Loki)
Send agent logs to Loki for centralized monitoring.
import logging
app = BasionAgentApp(
...,
enable_remote_logging=True,
remote_log_level=logging.INFO,
remote_log_batch_size=100,
remote_log_flush_interval=5.0,
)
# Standard Python logging — automatically sent to Loki
logger = logging.getLogger(__name__)
logger.info("Processing symptom query", extra={"user_id": "..."})
Extensions
LangGraph
Persist LangGraph state via the Conversation Store checkpoint API.
from basion_agent import BasionAgentApp
from basion_agent.extensions.langgraph import HTTPCheckpointSaver
from langgraph.graph import StateGraph
app = BasionAgentApp(gateway_url="...", api_key="...")
checkpointer = HTTPCheckpointSaver(app=app)
graph = StateGraph(MyState)
# ... add nodes and edges ...
compiled = graph.compile(checkpointer=checkpointer)
agent = app.register_me(name="langgraph-agent", ...)
@agent.on_message
async def handle(message, sender):
config = {"configurable": {"thread_id": message.conversation_id}}
result = await compiled.ainvoke({"messages": [message.content]}, config)
async with agent.streamer(message) as s:
s.stream(result["messages"][-1])
app.run()
Pydantic AI
Persist Pydantic AI message history via the Conversation Store.
from basion_agent import BasionAgentApp
from basion_agent.extensions.pydantic_ai import PydanticAIMessageStore
from pydantic_ai import Agent as PydanticAgent
app = BasionAgentApp(gateway_url="...", api_key="...")
store = PydanticAIMessageStore(app=app)
llm = PydanticAgent("openai:gpt-4o", system_prompt="You are a rare disease specialist.")
agent = app.register_me(name="pydantic-agent", ...)
@agent.on_message
async def handle(message, sender):
history = await store.load(message.conversation_id)
async with agent.streamer(message) as s:
async with llm.run_stream(message.content, message_history=history) as result:
async for chunk in result.stream_text():
s.stream(chunk)
await store.save(message.conversation_id, result.all_messages())
app.run()
Full Example: Rare Disease Assistant
A complete agent that uses memory, knowledge graph, attachments, and structural streaming.
import json
import logging
from basion_agent import BasionAgentApp, Stepper, TextBlock
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = BasionAgentApp(
gateway_url="agent-gateway:8080",
api_key="your-api-key",
enable_remote_logging=True,
)
agent = app.register_me(
name="rare-disease-assistant",
about="Helps patients understand rare diseases, find similar conditions, and track symptoms",
document="""
A comprehensive rare disease assistant that can:
- Look up diseases, genes, and phenotypes in a biomedical knowledge graph
- Find similar diseases by shared symptoms or genes
- Track patient symptoms over time using memory
- Process genetic test reports (PDF attachments)
- Proactively check in on patients
""",
category_name="medical",
tag_names=["rare-disease", "genetics", "patient-support"],
example_prompts=[
"What is Huntington disease?",
"Find diseases similar to Cystic Fibrosis",
"What genes are associated with Marfan syndrome?",
],
)
@agent.on_message(senders=["user"])
async def handle_user(message, sender):
kg = agent.tools.knowledge_graph
mem = message.memory_v2
# Ingest the user's message into long-term memory
if mem:
await mem.ingest(role="user", content=message.content)
# Check for form submissions
if message.schema:
data = json.loads(message.content)
async with agent.streamer(message) as s:
s.stream(f"Thank you for logging your symptoms.\n\n")
s.stream(f"- Pain level: {data.get('pain_level')}/10\n")
s.stream(f"- Notes: {data.get('notes', 'None')}\n")
return
# Check for attachments (e.g., genetic test PDF)
if message.has_attachments():
async with agent.streamer(message) as s:
for i, att in enumerate(message.get_attachments()):
if att.is_pdf():
pdf_bytes = await message.get_attachment_bytes_at(i)
s.stream(f"Processing **{att.filename}** ({att.size} bytes)...\n")
# Parse and analyze the PDF...
elif att.is_image():
base64_data = await message.get_attachment_base64_at(i)
s.stream(f"Received image **{att.filename}**.\n")
return
# Main message handling with knowledge graph
async with agent.streamer(message) as s:
# Show thinking process
thinking = TextBlock()
s.stream_by(thinking).set_variant("thinking")
s.stream_by(thinking).stream_title("Analyzing query...")
# Recall relevant memories
if mem:
memories = await mem.search(query=message.content)
if memories:
s.stream_by(thinking).stream_body("Found relevant patient history.\n")
# Search for diseases mentioned in the query
stepper = Stepper(steps=["Search diseases", "Find connections", "Compile results"])
s.stream_by(stepper).start_step(0)
diseases = await kg.search_diseases(name=message.content, limit=5)
s.stream_by(stepper).complete_step(0)
if diseases:
s.stream_by(stepper).start_step(1)
disease_name = diseases[0].get("name", "")
similar = await kg.find_similar_diseases(disease_name, limit=5)
s.stream_by(stepper).complete_step(1)
s.stream_by(stepper).start_step(2)
s.stream_by(thinking).done()
s.stream(f"## {disease_name}\n\n")
if similar:
s.stream("### Similar Conditions\n\n")
for sim in similar:
score = int(sim.similarity_score * 100)
s.stream(f"- **{sim.disease_name}** ({score}% similar, "
f"{sim.shared_count} shared phenotypes)\n")
s.stream_by(stepper).complete_step(2)
else:
s.stream_by(thinking).done()
s.stream(f"I couldn't find a disease matching \"{message.content}\". "
f"Try searching for a specific disease name.")
s.stream_by(stepper).complete_step(0)
s.stream_by(stepper).done()
# Proactive check-in (called from a scheduler or API)
async def daily_check_in(user_id: str):
conv_id, streamer = await agent.start_conversation(
user_id=user_id,
title="Daily Symptom Check-in",
awaiting=True,
response_schema={
"type": "object",
"title": "How are you feeling today?",
"properties": {
"pain_level": {"type": "integer", "minimum": 0, "maximum": 10, "title": "Pain Level"},
"notes": {"type": "string", "title": "Any additional notes?"},
},
"required": ["pain_level"],
},
)
async with streamer as s:
s.stream("Good morning! Time for your daily check-in.\n\n")
s.stream("Please fill out the form below:")
app.run()
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ Your Agent Application │
├─────────────────────────────────────────────────────────────────────────────┤
│ BasionAgentApp │
│ ├── register_me() → Agent │
│ │ ├── @on_message(senders=[...]) │
│ │ ├── streamer(message) → Streamer │
│ │ │ ├── stream() / write() │
│ │ │ ├── set_response_schema() / set_message_metadata() │
│ │ │ └── stream_by() → Artifact, Surface, TextBlock, Stepper │
│ │ ├── start_conversation() → (conv_id, Streamer) │
│ │ ├── call(agent, conv_id, content) → str (sync agent-to-agent) │
│ │ └── tools │
│ │ ├── .knowledge_graph → KnowledgeGraphTool │
│ │ └── .agent_inventory → AgentInventoryTool │
│ └── run() │
│ │
│ Message Context: │
│ ├── message.conversation → Conversation (history, metadata) │
│ ├── message.memory_v2 → MemoryV2 (mem0: ingest + search) │
│ ├── message.memory → Memory (deprecated) │
│ ├── message.attachments → List[AttachmentInfo] │
│ └── message.hand_off(agent, content?) → forward to another agent │
└──────────────────────────────┬──────────────────────────────────────────────┘
│ gRPC (Kafka) + HTTP (APIs)
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Agent Gateway │
├─────────────────────────────────────────────────────────────────────────────┤
│ gRPC: AgentStream (bidirectional) HTTP: /s/{service}/* proxy │
│ - Auth + Subscribe/Unsubscribe - /s/ai-inventory/* │
│ - Produce/Consume messages - /s/conversation-store/* │
│ - /s/memory/* (mem0) │
│ - /s/knowledge-graph/* │
│ - /s/attachment/* │
│ - /loki/api/v1/push (logging) │
└─────────────────┬───────────────────────────────────┬───────────────────────┘
│ │
▼ ▼
┌──────────────┐ ┌────────────────────┐
│ Kafka │ │ AI Inventory / │
│ {agent}.inbox │ │ Conversation Store │
└──────────────┘ │ Memory / KG │
└────────────────────┘
Message Flow
User → Provider → Kafka: router.inbox → Router → Kafka: {agent}.inbox → Gateway → Agent
Agent → Gateway → Kafka: router.inbox → Router → Kafka: user.inbox → Provider → WebSocket → User
Development
# Install dev dependencies
pip install -e ".[dev]"
# Run tests with coverage
pytest
# Run specific tests
pytest tests/test_agent.py -v
pytest tests/test_streamer.py -v
Dependencies
| Package | Purpose |
|---|---|
grpcio |
gRPC communication with Agent Gateway |
grpcio-tools |
Protobuf compilation |
protobuf |
Message serialization |
requests |
Sync HTTP for registration |
aiohttp |
Async HTTP for runtime operations |
Optional
| Extra | Install | Purpose |
|---|---|---|
langgraph |
pip install basion-agent[langgraph] |
LangGraph checkpoint integration |
pydantic |
pip install basion-agent[pydantic] |
Pydantic AI message history |
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file basion_agent-0.8.0.tar.gz.
File metadata
- Download URL: basion_agent-0.8.0.tar.gz
- Upload date:
- Size: 156.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8408649c1ae112539192f9ae1f0293fc9f9f42f51fbe658361d253979550d184
|
|
| MD5 |
acc076e1c526e2d61bd912fcd17f05d9
|
|
| BLAKE2b-256 |
a02aee834323248babe3357045a8330371f1b951e1f648fe79714114b83093b5
|
File details
Details for the file basion_agent-0.8.0-py3-none-any.whl.
File metadata
- Download URL: basion_agent-0.8.0-py3-none-any.whl
- Upload date:
- Size: 90.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
159fff794e129ba70ce878bfdfbb8262750839736f486951d89b10cca6a1fee3
|
|
| MD5 |
1c27f6a5460e32dd174fa42e9221e36a
|
|
| BLAKE2b-256 |
825a554f20fbb3e4f4db88183ad45ca9872cb6b70c77088b6756f162d7b57b64
|