Skip to main content

Toto Microservice SDK - Python framework for building microservices

Project description

Toto Microservice SDK - Python

The Toto Microservice SDK is a framework for building cloud-agnostic microservices.
This is the Python SDK documentation.

Table of Contents

  1. Installation
  2. Overview
  3. Usage

Other:

1. Installation

pip install totoms

2. Overview

Everything starts with TotoMicroservice and the TotoMicroserviceConfiguration.
TotoMicroservice is the main orchestrator that coordinates your entire microservice. It initializes and manages:

  • API Controller & API Endpoints: FastAPI-based REST API setup with automatic endpoint registration
  • Message Bus & Message Handlers: Event-driven communication via Pub/Sub and Queues. Registration and routing of event handlers to appropriate topics.
  • Secrets Management: Automatic loading of secrets from your cloud provider
  • Service Lifecycle: Initialization, startup, and shutdown management

The configuration is declarative. The goal is to make it very simple to configure a full microservice, with a syntax that will look like this:

  TotoMicroserviceConfiguration(
        service_name="toto-ms-tome-scraper",
        base_path="/tomescraper",
        environment=TotoEnvironment(
            hyperscaler=os.getenv("HYPERSCALER", "aws").lower(),
            hyperscaler_configuration=determine_envrionment()
        ),
        custom_config=TomeScraperConfig,
        api_configuration=APIConfiguration(
            api_endpoints=[
                APIEndpoint(method="POST", path="/blogs", delegate=extract_blog_content),
                APIEndpoint(method="POST", path="/test/refresher", delegate=test_refresher),
                APIEndpoint(method="POST", path="/test/pubsub", delegate=test_pubsub),
            ]
        ),
        message_bus_configuration=MessageBusConfig(
            topics=[
                MessageBusTopicConfig(logical_name="tometopics", secret="tome_topics_topic_name")
            ], 
            message_handlers=[
                MessageBusHandlerConfig(handler_class=TopicRefreshedEventHandler)
            ]
        ),
    )

The TotoMicroserviceConfiguration object specifies:

  • Service Metadata: Service name and base path for API endpoints
  • Environment: Cloud provider (AWS, GCP, Azure) information
  • API Configuration: REST endpoints with their handlers
  • Message Bus Configuration: Topics to subscribe to and message handlers
  • Custom Configuration: Your application-specific settings

The microservice initialization is async and returns a fully configured instance ready to start:

microservice = await TotoMicroservice.init(get_microservice_config())
await microservice.start(port=8080)

3. Usage

3.1. The Toto Microservice Configuration

Go to the Toto Microservice Configuration API Reference page.

3.2. Create and Register APIs

Your microservice exposes REST API endpoints using FastAPI.
Endpoints are defined when creating the microservice configuration and are automatically set up with the API controller.

Create a Toto Delegate

Every endpoint needs to be managed by a Toto Delegate.
Toto Delegates are identified through the annotation @toto_delegate.

This is how you define a Toto Delegate.
The following example shows a delegate that extracts the content of a blog.

@toto_delegate
async def extract_blog_content(request: Request, user_context: UserContext, exec_context: ExecutionContext): 

    # Extract and parse the body from FastAPI Request
    body = await request.json()

    blog_url = data.get("url")
    
    # Process the blog
    result = await process_blog(blog_url)
    
    # Return anything you'd like to
    return {"status": "success", "data": result}

Register your delegate

You can now register your endpoints in the microservice configuration:

def get_microservice_config() -> TotoMicroserviceConfiguration:
    
    return TotoMicroserviceConfiguration(
        service_name="my-service",
        base_path="/myservice",
        api_configuration=APIConfiguration(
            api_endpoints=[
                APIEndpoint(method="POST", path="/blogs", delegate=extract_blog_content),
            ]
        ),
    )

The microservice will start a FastAPI application with all registered endpoints available at the specified base path.


3.2. Use a Message Bus

The Message Bus enables event-driven communication between microservices.
It supports both PUSH (webhook-based from cloud Pub/Sub) and PULL (polling) delivery models, depending on your cloud provider and configuration.

3.2.1. React to Messages

Message handlers are the primary way to react to events.

Create a Message Handler

Create a handler by subclassing TotoMessageHandler and implementing the required methods:

from totoms import TotoMessageHandler, ProcessingResponse, ProcessingStatus, TotoMessage

class TopicRefreshedEventHandler(TotoMessageHandler):
    
    def get_handled_message_type(self) -> str:
        """Return the message type this handler processes."""
        return "topicRefreshed"
    
    async def process_message(self, message: TotoMessage) -> ProcessingResponse:
        """Process incoming messages of type 'topicRefreshed'."""
        # Access message metadata
        correlation_id = message.cid
        message_id = message.id
        
        # Extract event data
        topic_name = message.data.get("name")
        blog_url = message.data.get("blogURL")
        user = message.data.get("user")
        
        # Your handler has access to context
        self.logger.log(correlation_id, f"Processing topic refresh for: {topic_name}")
        
        # Perform your business logic
        await refresh_topic(topic_name, blog_url, user)
        
        # Return success or failure
        return ProcessingResponse(status=ProcessingStatus.SUCCESS)
Register a Message Handler

Register your message handlers in the microservice configuration.

IMPORTANT NOTE:

  • When using PubSub infrastructure, you need to register topics.
    Topics are registered by giving them:
    • A logical_name which is the name that will be used in the application to reference the topic.
    • A secret which is the name of the secret that contains the implementation-specific resource identifier of the topic (e.g. ARN on AWS or fully-qualified Topic Name on GCP)
from totoms import MessageBusHandlerConfig

def get_microservice_config() -> TotoMicroserviceConfiguration:
    """Create configuration with message handlers."""
    return TotoMicroserviceConfiguration(
        service_name="my-service",
        message_bus_configuration=MessageBusConfig(
            topics=[
                MessageBusTopicConfig(
                    logical_name="topic-events", 
                    secret="topic_events_topic_name"  # Secret name in your secrets manager
                )
            ],
            message_handlers=[
                MessageBusHandlerConfig(handler_class=TopicRefreshedEventHandler),
                MessageBusHandlerConfig(handler_class=AnotherEventHandler),
            ]
        ),
    )

When the microservice starts, it automatically subscribes to the configured topics and routes incoming messages to the appropriate handlers based on their message type.

3.2.2. Publish Messages

You can always publish messages to topics.

NOTE:

  • In the Message Destination, the topic is the logical name of the topic (see above).
from totoms import TotoMessage, MessageDestination

async def publish_topic_update(microservice, topic_id: str, topic_name: str):
    """Publish a topic update event."""
    message = TotoMessage(
        type="topicUpdated",
        cid="correlation-id-123",
        id=topic_id,
        data={"name": topic_name, "timestamp": datetime.now().isoformat()}
    )
    
    destination = MessageDestination(topic="topic-events")

    await microservice.message_bus.publish_message(destination, message)
Getting access to the Message Bus.

There are different ways to get access to the Message Bus instance:

  • Through the TotoMicroservice singleton:
    TotoMicroservice.get_instance().message_bus

  • Through an existing instance of TotoMicroservice (in the example above)

  • In a TotoMessageHandler you will have message_bus as an instance variable:
    self.message_bus

  • In a toto_delegate, you will have it part of ExecutionContext and can use like this:
    exec_context.message_bus


3.3. Load Secrets

The SDK handles secret loading from your cloud provider automatically. Access secrets through the configuration or use the SecretsManager directly:

from totoms import SecretsManager

secrets = SecretsManager()

# Load a secret by name
api_key = secrets.get_secret("api-key")
database_url = secrets.get_secret("database-url")

Secrets are typically stored as environment variable names or secret manager references, depending on your deployment environment.


3.4. Custom Configurations

You can define your own custom configurations by extending the TotoControllerConfig base class.

An example:

from totoms.model.TotoConfig import TotoControllerConfig
from typing import Optional, Dict

class TomeScraperConfig(TotoControllerConfig):
    """Custom configuration for the Toto Tome Scraper service."""
    
    def get_mongo_secret_names(self) -> Optional[Dict[str, str]]:
        """Return MongoDB secret names if service uses MongoDB."""
        return None

What you can do with a Custom Configuration:

  1. Load Secrets
    You can do that by overriding the load() async method and using self.secrets_manager.get_secret, "your-secret-name") to load secrets.

3.5. Using Cloud Storage

The SDK provides a cloud-agnostic storage abstraction through the CloudStorage interface.
This allows you to interact with cloud storage (AWS S3, Google Cloud Storage, Azure Blob Storage) without worrying about provider-specific APIs.

Getting a CloudStorage Instance

The easiest way to get a CloudStorage instance is through the ExecutionContext in your delegates:

from fastapi import Request
from totoms import toto_delegate, ExecutionContext, UserContext, ValidationError

@toto_delegate
async def handle_file_upload(request: Request, user_context: UserContext, exec_context: ExecutionContext):
    
    bucket_name = request.query_params.get("bucket")
    
    if not bucket_name:
        raise ValidationError("Missing 'bucket' query parameter.")
    
    # Get a CloudStorage instance for the specified bucket
    storage = exec_context.get_storage(bucket_name)
    
    # Now you can use the storage instance
    # ... (see examples below)

The get_storage() method automatically creates the appropriate storage implementation based on your configured cloud provider (hyperscaler).

Uploading Files

# Create a local file
local_file_path = './hello.txt'

with open(local_file_path, 'w') as f:
    f.write('Hello, Toto!')

# Upload to cloud storage
destination_path = 'my-folder/hello.txt'
storage.upload_file(local_file_path, destination_path)

exec_context.logger.log(exec_context.cid, f"Uploaded file to {destination_path}")

Downloading Files

# Download a file from cloud storage
source_path = 'my-folder/hello.txt'
local_destination = './downloaded_hello.txt'

storage.download_file(source_path, local_destination)

exec_context.logger.log(exec_context.cid, f"Downloaded file to {local_destination}")

Listing Files

# List all files with a specific prefix
files = storage.list_files('my-folder/')

exec_context.logger.log(exec_context.cid, f"Files in bucket: {files}")

for file_path in files:
    exec_context.logger.log(exec_context.cid, f"  - {file_path}")

Getting File Content

# Read file content directly without downloading
file_path = 'my-folder/hello.txt'
content = storage.get_file_content(file_path)

exec_context.logger.log(exec_context.cid, f"File content: {content}")

Deleting Files

# Delete a file from cloud storage
file_path = 'my-folder/hello.txt'
storage.delete_file(file_path)

exec_context.logger.log(exec_context.cid, f"Deleted file: {file_path}")

3.6. Publish an Agent

The SDK supports publishing Gale Agents — AI agents that can participate in conversations brokered by Gale Broker.
An agent receives a user message, processes it (potentially using an LLM), and returns a response. It can also publish intermediate messages back to the conversation in real time while processing.

Create a Conversational Agent

Extend GaleConversationalAgent and implement two required methods:

  • get_manifest() — returns the agent's identity information
  • on_message() — handles an incoming conversation message and returns the response
import uuid
from totoms import GaleConversationalAgent, AgentConversationMessage, AgentManifest
from totoms.gale.model.AgentConversationMessage import StreamInfo

class MyAgent(GaleConversationalAgent):

    def get_manifest(self) -> AgentManifest:
        return AgentManifest(
            agent_type="conversational",
            agent_id="my-agent",                    # Unique agent identifier
            human_friendly_name="My Agent",         # Display name shown to users
        )

    async def on_message(self, message: AgentConversationMessage) -> AgentConversationMessage:

        stream_id = str(uuid.uuid4())

        # Optionally publish an intermediate message to the conversation
        # while you are still processing, to give the user early feedback.
        await self.publish_message(AgentConversationMessage(
            conversation_id=message.conversation_id,
            message_id=str(uuid.uuid4()),
            agent_id=message.agent_id,
            message="Got your message, working on it!",
            actor="agent",
            stream=StreamInfo(stream_id=stream_id, sequence_number=1, last=False),
        ))

        # ... run your LLM / business logic here ...
        result = "Here is my answer!"

        # Return the final response
        return AgentConversationMessage(
            conversation_id=message.conversation_id,
            message_id=message.message_id,
            agent_id=message.agent_id,
            message=result,
            actor="agent",
            stream=StreamInfo(stream_id=stream_id, sequence_number=2, last=True),
        )

Key points:

  • publish_message() lets the agent stream intermediate messages to the client while still processing. This is useful to acknowledge receipt or provide progress updates before the final answer is ready.
  • The stream field is optional but recommended for conversational agents. Set last=True only on the final message.
  • actor should always be "agent" for messages sent by the agent.

Register the Agent

Add an agents_configuration block to your TotoMicroserviceConfiguration:

from totoms import TotoMicroservice, TotoMicroserviceConfiguration, AgentsConfiguration
from myservice.agent.my_agent import MyAgent
import asyncio

def get_microservice_config() -> TotoMicroserviceConfiguration:
    return TotoMicroserviceConfiguration(
        service_name="my-microservice",
        base_path="/myservice",
        environment=...,
        custom_config=MyConfig,
        agents_configuration=AgentsConfiguration(
            agents=[
                MyAgent,
            ]
        ),
    )

async def main():
    microservice = await TotoMicroservice.init(get_microservice_config())
    await microservice.start(port=8080)

asyncio.run(main())

Multiple agents can be registered in the agents list.

Required Environment Variables

To integrate with Gale Broker the following environment variables must be set:

Variable Description
GALE_BROKER_URL The base URL of the Gale Broker service (e.g. http://gale-broker:8080/galebroker). Used to register the agent on startup and to publish messages back to conversations.
SERVICE_BASE_URL The publicly reachable base URL of this microservice, including the base path if any (e.g. https://myservice.example.com/basepath). Gale Broker uses this to know where to forward incoming conversation messages for the agent.

The SDK will raise an error at startup if either variable is not set when any agent is configured.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

totoms-1.4.0.tar.gz (83.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

totoms-1.4.0-py3-none-any.whl (103.9 kB view details)

Uploaded Python 3

File details

Details for the file totoms-1.4.0.tar.gz.

File metadata

  • Download URL: totoms-1.4.0.tar.gz
  • Upload date:
  • Size: 83.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.5

File hashes

Hashes for totoms-1.4.0.tar.gz
Algorithm Hash digest
SHA256 e082ca13a7859ce7ac8048cb62762965d19f5ea2fa097f1c6ba651fc7dc50a52
MD5 a9a4c21e3aae2b1ee324852403cd61da
BLAKE2b-256 5d0de792b0a0875a7c86af2083d6e98ad4da959b86ad1e1f02ec6f157bc20858

See more details on using hashes here.

File details

Details for the file totoms-1.4.0-py3-none-any.whl.

File metadata

  • Download URL: totoms-1.4.0-py3-none-any.whl
  • Upload date:
  • Size: 103.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.10.5

File hashes

Hashes for totoms-1.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 18a4207c6f5ecd4bab1a877f71e04e4eb1ecec01647da4623361202627ae0780
MD5 1d348c5dd26a41e9d263797ef1f25362
BLAKE2b-256 9c1547d635490a15637f0d9a34ae97c37234c010511901f7a5fec9d9c4384ab2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page