Toto Microservice SDK - Python framework for building microservices
Project description
Toto Microservice SDK - Python
The Toto Microservice SDK is a framework for building cloud-agnostic microservices.
This is the Python SDK documentation.
Table of Contents
Other:
1. Installation
pip install totoms
2. Overview
Everything starts with TotoMicroservice and the TotoMicroserviceConfiguration.
TotoMicroservice is the main orchestrator that coordinates your entire microservice. It initializes and manages:
- API Controller & API Endpoints: FastAPI-based REST API setup with automatic endpoint registration
- Message Bus & Message Handlers: Event-driven communication via Pub/Sub and Queues. Registration and routing of event handlers to appropriate topics.
- Secrets Management: Automatic loading of secrets from your cloud provider
- Service Lifecycle: Initialization, startup, and shutdown management
The configuration is declarative. The goal is to make it very simple to configure a full microservice, with a syntax that will look like this:
TotoMicroserviceConfiguration(
service_name="toto-ms-tome-scraper",
base_path="/tomescraper",
environment=TotoEnvironment(
hyperscaler=os.getenv("HYPERSCALER", "aws").lower(),
hyperscaler_configuration=determine_envrionment()
),
custom_config=TomeScraperConfig,
api_configuration=APIConfiguration(
api_endpoints=[
APIEndpoint(method="POST", path="/blogs", delegate=extract_blog_content),
APIEndpoint(method="POST", path="/test/refresher", delegate=test_refresher),
APIEndpoint(method="POST", path="/test/pubsub", delegate=test_pubsub),
]
),
message_bus_configuration=MessageBusConfig(
topics=[
MessageBusTopicConfig(logical_name="tometopics", secret="tome_topics_topic_name")
],
message_handlers=[
MessageBusHandlerConfig(handler_class=TopicRefreshedEventHandler)
]
),
)
The TotoMicroserviceConfiguration object specifies:
- Service Metadata: Service name and base path for API endpoints
- Environment: Cloud provider (AWS, GCP, Azure) information
- API Configuration: REST endpoints with their handlers
- Message Bus Configuration: Topics to subscribe to and message handlers
- Custom Configuration: Your application-specific settings
The microservice initialization is async and returns a fully configured instance ready to start:
microservice = await TotoMicroservice.init(get_microservice_config())
await microservice.start(port=8080)
3. Usage
3.1. The Toto Microservice Configuration
Go to the Toto Microservice Configuration API Reference page.
3.2. Create and Register APIs
Your microservice exposes REST API endpoints using FastAPI.
Endpoints are defined when creating the microservice configuration and are automatically set up with the API controller.
Create a Toto Delegate
Every endpoint needs to be managed by a Toto Delegate.
Toto Delegates are identified through the annotation @toto_delegate.
This is how you define a Toto Delegate.
The following example shows a delegate that extracts the content of a blog.
@toto_delegate
async def extract_blog_content(request: Request, user_context: UserContext, exec_context: ExecutionContext):
# Extract and parse the body from FastAPI Request
body = await request.json()
blog_url = data.get("url")
# Process the blog
result = await process_blog(blog_url)
# Return anything you'd like to
return {"status": "success", "data": result}
Register your delegate
You can now register your endpoints in the microservice configuration:
def get_microservice_config() -> TotoMicroserviceConfiguration:
return TotoMicroserviceConfiguration(
service_name="my-service",
base_path="/myservice",
api_configuration=APIConfiguration(
api_endpoints=[
APIEndpoint(method="POST", path="/blogs", delegate=extract_blog_content),
]
),
)
The microservice will start a FastAPI application with all registered endpoints available at the specified base path.
3.2. Use a Message Bus
The Message Bus enables event-driven communication between microservices.
It supports both PUSH (webhook-based from cloud Pub/Sub) and PULL (polling) delivery models, depending on your cloud provider and configuration.
3.2.1. React to Messages
Message handlers are the primary way to react to events.
Create a Message Handler
Create a handler by subclassing TotoMessageHandler and implementing the required methods:
from totoms import TotoMessageHandler, ProcessingResponse, ProcessingStatus, TotoMessage
class TopicRefreshedEventHandler(TotoMessageHandler):
def get_handled_message_type(self) -> str:
"""Return the message type this handler processes."""
return "topicRefreshed"
async def process_message(self, message: TotoMessage) -> ProcessingResponse:
"""Process incoming messages of type 'topicRefreshed'."""
# Access message metadata
correlation_id = message.cid
message_id = message.id
# Extract event data
topic_name = message.data.get("name")
blog_url = message.data.get("blogURL")
user = message.data.get("user")
# Your handler has access to context
self.logger.log(correlation_id, f"Processing topic refresh for: {topic_name}")
# Perform your business logic
await refresh_topic(topic_name, blog_url, user)
# Return success or failure
return ProcessingResponse(status=ProcessingStatus.SUCCESS)
Register a Message Handler
Register your message handlers in the microservice configuration.
IMPORTANT NOTE:
- When using PubSub infrastructure, you need to register topics.
Topics are registered by giving them:- A
logical_namewhich is the name that will be used in the application to reference the topic. - A
secretwhich is the name of the secret that contains the implementation-specific resource identifier of the topic (e.g. ARN on AWS or fully-qualified Topic Name on GCP)
- A
from totoms import MessageBusHandlerConfig
def get_microservice_config() -> TotoMicroserviceConfiguration:
"""Create configuration with message handlers."""
return TotoMicroserviceConfiguration(
service_name="my-service",
message_bus_configuration=MessageBusConfig(
topics=[
MessageBusTopicConfig(
logical_name="topic-events",
secret="topic_events_topic_name" # Secret name in your secrets manager
)
],
message_handlers=[
MessageBusHandlerConfig(handler_class=TopicRefreshedEventHandler),
MessageBusHandlerConfig(handler_class=AnotherEventHandler),
]
),
)
When the microservice starts, it automatically subscribes to the configured topics and routes incoming messages to the appropriate handlers based on their message type.
3.2.2. Publish Messages
You can always publish messages to topics.
NOTE:
- In the Message Destination, the topic is the logical name of the topic (see above).
from totoms import TotoMessage, MessageDestination
async def publish_topic_update(microservice, topic_id: str, topic_name: str):
"""Publish a topic update event."""
message = TotoMessage(
type="topicUpdated",
cid="correlation-id-123",
id=topic_id,
data={"name": topic_name, "timestamp": datetime.now().isoformat()}
)
destination = MessageDestination(topic="topic-events")
await microservice.message_bus.publish_message(destination, message)
Getting access to the Message Bus.
There are different ways to get access to the Message Bus instance:
-
Through the
TotoMicroservicesingleton:
TotoMicroservice.get_instance().message_bus -
Through an existing instance of
TotoMicroservice(in the example above) -
In a
TotoMessageHandleryou will havemessage_busas an instance variable:
self.message_bus -
In a
toto_delegate, you will have it part ofExecutionContextand can use like this:
exec_context.message_bus
3.3. Load Secrets
The SDK handles secret loading from your cloud provider automatically. Access secrets through the configuration or use the SecretsManager directly:
from totoms import SecretsManager
secrets = SecretsManager()
# Load a secret by name
api_key = secrets.get_secret("api-key")
database_url = secrets.get_secret("database-url")
Secrets are typically stored as environment variable names or secret manager references, depending on your deployment environment.
3.4. Custom Configurations
You can define your own custom configurations by extending the TotoControllerConfig base class.
An example:
from totoms.model.TotoConfig import TotoControllerConfig
from typing import Optional, Dict
class TomeScraperConfig(TotoControllerConfig):
"""Custom configuration for the Toto Tome Scraper service."""
def get_mongo_secret_names(self) -> Optional[Dict[str, str]]:
"""Return MongoDB secret names if service uses MongoDB."""
return None
What you can do with a Custom Configuration:
- Load Secrets
You can do that by overriding theload()async method and usingself.secrets_manager.get_secret, "your-secret-name")to load secrets.
3.5. Using Cloud Storage
The SDK provides a cloud-agnostic storage abstraction through the CloudStorage interface.
This allows you to interact with cloud storage (AWS S3, Google Cloud Storage, Azure Blob Storage) without worrying about provider-specific APIs.
Getting a CloudStorage Instance
The easiest way to get a CloudStorage instance is through the ExecutionContext in your delegates:
from fastapi import Request
from totoms import toto_delegate, ExecutionContext, UserContext, ValidationError
@toto_delegate
async def handle_file_upload(request: Request, user_context: UserContext, exec_context: ExecutionContext):
bucket_name = request.query_params.get("bucket")
if not bucket_name:
raise ValidationError("Missing 'bucket' query parameter.")
# Get a CloudStorage instance for the specified bucket
storage = exec_context.get_storage(bucket_name)
# Now you can use the storage instance
# ... (see examples below)
The get_storage() method automatically creates the appropriate storage implementation based on your configured cloud provider (hyperscaler).
Uploading Files
# Create a local file
local_file_path = './hello.txt'
with open(local_file_path, 'w') as f:
f.write('Hello, Toto!')
# Upload to cloud storage
destination_path = 'my-folder/hello.txt'
storage.upload_file(local_file_path, destination_path)
exec_context.logger.log(exec_context.cid, f"Uploaded file to {destination_path}")
Downloading Files
# Download a file from cloud storage
source_path = 'my-folder/hello.txt'
local_destination = './downloaded_hello.txt'
storage.download_file(source_path, local_destination)
exec_context.logger.log(exec_context.cid, f"Downloaded file to {local_destination}")
Listing Files
# List all files with a specific prefix
files = storage.list_files('my-folder/')
exec_context.logger.log(exec_context.cid, f"Files in bucket: {files}")
for file_path in files:
exec_context.logger.log(exec_context.cid, f" - {file_path}")
Getting File Content
# Read file content directly without downloading
file_path = 'my-folder/hello.txt'
content = storage.get_file_content(file_path)
exec_context.logger.log(exec_context.cid, f"File content: {content}")
Deleting Files
# Delete a file from cloud storage
file_path = 'my-folder/hello.txt'
storage.delete_file(file_path)
exec_context.logger.log(exec_context.cid, f"Deleted file: {file_path}")
3.6. Publish an Agent
The SDK supports publishing Gale Agents — AI agents that can participate in conversations brokered by Gale Broker.
An agent receives a user message, processes it (potentially using an LLM), and returns a response. It can also publish intermediate messages back to the conversation in real time while processing.
Create a Conversational Agent
Extend GaleConversationalAgent and implement two required methods:
get_manifest()— returns the agent's identity informationon_message()— handles an incoming conversation message and returns the response
import uuid
from totoms import GaleConversationalAgent, AgentConversationMessage, AgentManifest
from totoms.gale.model.AgentConversationMessage import StreamInfo
class MyAgent(GaleConversationalAgent):
def get_manifest(self) -> AgentManifest:
return AgentManifest(
agent_type="conversational",
agent_id="my-agent", # Unique agent identifier
human_friendly_name="My Agent", # Display name shown to users
)
async def on_message(self, message: AgentConversationMessage) -> AgentConversationMessage:
stream_id = str(uuid.uuid4())
# Optionally publish an intermediate message to the conversation
# while you are still processing, to give the user early feedback.
await self.publish_message(AgentConversationMessage(
conversation_id=message.conversation_id,
message_id=str(uuid.uuid4()),
agent_id=message.agent_id,
message="Got your message, working on it!",
actor="agent",
stream=StreamInfo(stream_id=stream_id, sequence_number=1, last=False),
))
# ... run your LLM / business logic here ...
result = "Here is my answer!"
# Return the final response
return AgentConversationMessage(
conversation_id=message.conversation_id,
message_id=message.message_id,
agent_id=message.agent_id,
message=result,
actor="agent",
stream=StreamInfo(stream_id=stream_id, sequence_number=2, last=True),
)
Key points:
publish_message()lets the agent stream intermediate messages to the client while still processing. This is useful to acknowledge receipt or provide progress updates before the final answer is ready.- The
streamfield is optional but recommended for conversational agents. Setlast=Trueonly on the final message. actorshould always be"agent"for messages sent by the agent.
Register the Agent
Add an agents_configuration block to your TotoMicroserviceConfiguration:
from totoms import TotoMicroservice, TotoMicroserviceConfiguration, AgentsConfiguration
from myservice.agent.my_agent import MyAgent
import asyncio
def get_microservice_config() -> TotoMicroserviceConfiguration:
return TotoMicroserviceConfiguration(
service_name="my-microservice",
base_path="/myservice",
environment=...,
custom_config=MyConfig,
agents_configuration=AgentsConfiguration(
agents=[
MyAgent,
]
),
)
async def main():
microservice = await TotoMicroservice.init(get_microservice_config())
await microservice.start(port=8080)
asyncio.run(main())
Multiple agents can be registered in the agents list.
Required Environment Variables
To integrate with Gale Broker the following environment variables must be set:
| Variable | Description |
|---|---|
GALE_BROKER_URL |
The base URL of the Gale Broker service (e.g. http://gale-broker:8080/galebroker). Used to register the agent on startup and to publish messages back to conversations. |
SERVICE_BASE_URL |
The publicly reachable base URL of this microservice, including the base path if any (e.g. https://myservice.example.com/basepath). Gale Broker uses this to know where to forward incoming conversation messages for the agent. |
The SDK will raise an error at startup if either variable is not set when any agent is configured.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file totoms-1.4.0.tar.gz.
File metadata
- Download URL: totoms-1.4.0.tar.gz
- Upload date:
- Size: 83.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e082ca13a7859ce7ac8048cb62762965d19f5ea2fa097f1c6ba651fc7dc50a52
|
|
| MD5 |
a9a4c21e3aae2b1ee324852403cd61da
|
|
| BLAKE2b-256 |
5d0de792b0a0875a7c86af2083d6e98ad4da959b86ad1e1f02ec6f157bc20858
|
File details
Details for the file totoms-1.4.0-py3-none-any.whl.
File metadata
- Download URL: totoms-1.4.0-py3-none-any.whl
- Upload date:
- Size: 103.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.10.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
18a4207c6f5ecd4bab1a877f71e04e4eb1ecec01647da4623361202627ae0780
|
|
| MD5 |
1d348c5dd26a41e9d263797ef1f25362
|
|
| BLAKE2b-256 |
9c1547d635490a15637f0d9a34ae97c37234c010511901f7a5fec9d9c4384ab2
|