Skip to main content

A modern, minimal event bus for python inspired by Eclipse Vert.x

Project description

🚌 TinyBus

A modern, async-first Event Bus for Python 3.12+ inspired by Eclipse Vert.x. TinyBus provides a clean, type-safe way to implement event-driven architectures with both request-response and publish-subscribe patterns.

Why

Because I've spent quite a lot of time evaluating event bus libraries but none seemed to support the async Request-Response pattern I needed. Some alternatives like Ethereum's Lahja or lightbus seem too heavy and complex with lots of features I don't need (inter-process communication, queues, RPC...)

The Request-Response Events pattern is quite useful when growing an async backend with lots of internal messages being passed around (using the Actor Model terminology, method calls are considered messages) as it allows to decouple caller and callee very transparently and fits nicely in the asynchronous "mental model".

Installation

uv add tinybus

Quick Example

Here's a simple user service implementation showcasing TinyBus's main features

import uuid

from enum import Enum
from typing import Optional

from pydantic import BaseModel
from tinybus import EventBus, Message

# First, we must define our addresses.
class Address(str, Enum):
    CREATE_USER = "create_user"
    GET_USER = "get_user"

# Then, the clases that hold the data being sent around
class User(BaseModel):
    user_id: uuid.UUID
    username: str
    email: str

class CreateUserRequest(BaseModel):
    username: str
    email: str

class CreateUserResponse(BaseModel):
    user: User

class GetUserResponse(BaseModel):
    user: User

# This service is subscribed to all updates sent to the CREATE_USER and GET_USER
# addresses and will run the appropriate methods when a valid request is sent.
class UserService:
    def __init__(self, event_bus: EventBus):
        self._event_bus = event_bus

        # A fake data layer
        self._users: dict[uuid.UUID, User] = {}
        
        # Register handlers
        # If these methods are not used directly anywhere outside this class, you'll likely
        # want to make them private (self._create_user)
        self._event_bus.consumer(Address.CREATE_USER, self.create_user)
        self._event_bus.consumer(Address.GET_USER, self.get_user)
    
    async def create_user(self, message: Message[CreateUserRequest]) -> CreateUserResponse:
        # A Message is fairly simple, it has a header and a body.
        # Most of the time, you'll only be using the body because it contains the actual data you'll use.
        request = message.body

        # In the real world, you'll use a data layer to create your user (likely an async operation)
        user_id = uuid.uuid4()
        user = User(
            id=user_id,
            username=request.username,
            email=request.email
        )
        self.users[user_id] = user
        
        # We can also publish events without expecting a return value
        await self.event_bus.publish("user.created", user)

        # Return a response that will be received by the Address listener
        return CreateUserResponse(user=user)
    
    # Messages can hold any value, including builtins.
    async def get_user(self, message: Message[uuid.UUID]) -> Optional[GetUserResponse]:
        requested_user_id = message.body
        if found_user := self.users.get(user_id) is not None:
            return GetUserResponse(user=found_user)
        else:
            # You can return None too (no need to wrap it in a custom object)
            return None


async def main():
    # Create an event bus
    event_bus = EventBus()
    
    # Create service
    user_service = UserService(event_bus)
    
    # Register event listener
    @event_bus.on("user.created")
    async def on_user_created(user: User):
        print(f"User created: {user.username}")
    
    # Send a request to the handler for the CREATE_USER address
    response = await event_bus.request(
        Address.CREATE_USER,
        CreateUserRequest(username="john", email="john@example.com")
    )
    
    # Likewise, call the handler of the GET_USER address
    user = await event_bus.request(
        Address.GET_USER,
        response.id
    )
    
    print(f"Retrieved user: {user.username}")

Key Concepts

Request-Response Pattern

TinyBus implements an address-based messaging system where consumers register handlers for specific addresses. When a request is made to an address, the corresponding handler processes it and returns a response

# Register the consumer for a given address.
# Addresses can be strings too although we recommend using Enums for readability
@event_bus.consumer("greeting")
async def handle_greeting(msg: Message[str]) -> str:
    return f"Hello, {msg.body}!"

# Send a request
response = await event_bus.request("greeting", "World")
print(response) # Prints: Hello, World!

Publish-Subscribe Pattern

The event bus also supports event-based communication where multiple listeners can subscribe to events where you do not care about the result or what happens when it is delivered.

# Register listeners using the .on annotation
@event_bus.on("user.created")
async def notify_admin(user: User):
    print(f"New user registered: {user.email}")

# or the .on method directly
async def send_welcome_email(user: User):
    print(f"Sending welcome email to {user.email}")
event_bus.on("user.created", send_welcome_email)

# Publish an event
await event_bus.publish("user.created", user)
# > "New user registered: john@example.com"

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

License

TinyBus is MIT licensed. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tinybus-0.2.2.tar.gz (13.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

tinybus-0.2.2-py3-none-any.whl (10.8 kB view details)

Uploaded Python 3

File details

Details for the file tinybus-0.2.2.tar.gz.

File metadata

  • Download URL: tinybus-0.2.2.tar.gz
  • Upload date:
  • Size: 13.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.6

File hashes

Hashes for tinybus-0.2.2.tar.gz
Algorithm Hash digest
SHA256 9baaff218833914c088cc3a0c23dfd121465ffed89426c1377e61839631c11a5
MD5 c05415483c3c4bceba662e4330c59462
BLAKE2b-256 3b8e5b968cb1be035b23ad762b22d720bb02f3f89476cdc04b40e8766ea1c724

See more details on using hashes here.

File details

Details for the file tinybus-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: tinybus-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 10.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.6

File hashes

Hashes for tinybus-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 488679176cc84ae3c6409eeabee289d2b683e85005796112dbcf95a076149035
MD5 eeaab3119848a2d0493391bde62d7836
BLAKE2b-256 4f7a5e0386a46c8c83dbc88fbcd70f36187e6ebb0ed6234168313300a00fb05c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page