Skip to main content

A modern, minimal event bus for python inspired by Eclipse Vert.x

Project description

🚌 TinyBus

A modern, async-first Event Bus for Python 3.12+ inspired by Eclipse Vert.x. TinyBus provides a clean, type-safe way to implement event-driven architectures with both request-response and publish-subscribe patterns.

Why

Because I've spent quite a lot of time evaluating event bus libraries but none seemed to support the async Request-Response pattern I needed. Some alternatives like Ethereum's Lahja or lightbus seem too heavy and complex with lots of features I don't need (inter-process communication, queues, RPC...)

The Request-Response Events pattern is quite useful when growing an async backend with lots of internal messages being passed around (using the Actor Model terminology, method calls are considered messages) as it allows to decouple caller and callee very transparently and fits nicely in the asynchronous "mental model".

Installation

uv add tinybus

Quick Example

Here's a simple user service implementation showcasing TinyBus's main features

import uuid

from enum import Enum
from typing import Optional

from pydantic import BaseModel
from tinybus import EventBus, Message

# First, we must define our addresses.
class Address(str, Enum):
    CREATE_USER = "create_user"
    GET_USER = "get_user"

# Then, the clases that hold the data being sent around
class User(BaseModel):
    user_id: uuid.UUID
    username: str
    email: str

class CreateUserRequest(BaseModel):
    username: str
    email: str

class CreateUserResponse(BaseModel):
    user: User

class GetUserResponse(BaseModel):
    user: User

# This service is subscribed to all updates sent to the CREATE_USER and GET_USER
# addresses and will run the appropriate methods when a valid request is sent.
class UserService:
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus

        # A fake data layer
        self._users: dict[uuid.UUID, User] = {}
        
        # Register handlers
        # If these methods are not used directly anywhere outside this class, you'll likely
        # want to make them private (self._create_user)
        self._event_bus.consumer(Address.CREATE_USER, self.create_user)
        self._event_bus.consumer(Address.GET_USER, self.get_user)
    
    async def create_user(self, message: Message[CreateUserRequest]) -> CreateUserResponse:
        # A Message is fairly simple, it has a header and a body.
        # Most of the time, you'll only be using the body because it contains the actual data you'll use.
        request = message.body

        # In the real world, you'll use a data layer to create your user (likely an async operation)
        user_id = uuid.uuid4()
        user = User(
            id=user_id,
            username=request.username,
            email=request.email
        )
        self.users[user_id] = user
        
        # We can also publish events without expecting a return value
        await self.event_bus.publish("user.created", user)

        # Return a response that will be received by the Address listener
        return CreateUserResponse(user=user)
    
    # Messages can hold any value, including builtins.
    async def get_user(self, message: Message[uuid.UUID]) -> Optional[GetUserResponse]:
        requested_user_id = message.body
        if found_user := self.users.get(user_id) is not None:
            return GetUserResponse(user=found_user)
        else:
            # You can return None too (no need to wrap it in a custom object)
            return None


async def main():
    # Create an event bus
    event_bus = EventBus()
    
    # Create service
    user_service = UserService(event_bus)
    
    # Register event listener
    @event_bus.on("user.created")
    async def on_user_created(user: User):
        print(f"User created: {user.username}")
    
    # Send a request to the handler for the CREATE_USER address
    response = await event_bus.request(
        Address.CREATE_USER,
        CreateUserRequest(username="john", email="john@example.com")
    )
    
    # Likewise, call the handler of the GET_USER address
    user = await event_bus.request(
        Address.GET_USER,
        response.id
    )
    
    print(f"Retrieved user: {user.username}")

Key Concepts

Request-Response Pattern

TinyBus implements an address-based messaging system where consumers register handlers for specific addresses. When a request is made to an address, the corresponding handler processes it and returns a response

# Register the consumer for a given address.
# Addresses can be strings too although we recommend using Enums for readability
@event_bus.consumer("greeting")
async def handle_greeting(msg: Message[str]) -> str:
    return f"Hello, {msg.body}!"

# Send a request
response = await event_bus.request("greeting", "World")
print(response) # Prints: Hello, World!

Publish-Subscribe Pattern

The event bus also supports event-based communication where multiple listeners can subscribe to events where you do not care about the result or what happens when it is delivered.

# Register listeners using the .on annotation
@event_bus.on("user.created")
async def notify_admin(user: User):
    print(f"New user registered: {user.email}")

# or the .on method directly
async def send_welcome_email(user: User):
    print(f"Sending welcome email to {user.email}")
event_bus.on("user.created", send_welcome_email)

# Publish an event
await event_bus.publish("user.created", user)
# > "New user registered: john@example.com"

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

License

TinyBus is MIT licensed. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tinybus-0.2.0.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tinybus-0.2.0-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file tinybus-0.2.0.tar.gz.

File metadata

  • Download URL: tinybus-0.2.0.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.6

File hashes

Hashes for tinybus-0.2.0.tar.gz
Algorithm Hash digest
SHA256 79299295de36261b430a0f02496d6fef3153811fb22c7f991ed5699a3881848f
MD5 5f9959675d9198cf1fa58cb070e4cd5f
BLAKE2b-256 9aba4a8f4cb94b5fa3c07f81e64150a7e2dd876d1eb17fd0b6637bda24518259

See more details on using hashes here.

File details

Details for the file tinybus-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: tinybus-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 9.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.6

File hashes

Hashes for tinybus-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 49ee1204b221b8b1065320dd0599c627756bb2ead3ce44c0c11d3f84d5674d3f
MD5 ec5a5d9f8a523680e416b2ab11668939
BLAKE2b-256 1939a830c40bd7964853108778851e8f3c3212951281f9792a9be9efbc550a7d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page