Skip to main content

Async message bus framework designed for event-driven python monoliths or microservice nodes

Project description

iambus - intelligence async message bus

sounds like a poem

The library is designed for asynchronous event-driven Python projects, mostly for modular monoliths or microservice nodes, has no third-party dependencies.

See more examples on GitHub

Basic usage

The core of the library is the Dispatcher class. It handles three types of messages: events, commands and queries.

Dispatcher initializing

You can use the default object from iambus

from iambus import dispatcher as dp

Note that the event, command and query engines are not enabled by default. Each of them would be instantiated with default routers (if not passed specific class) with the first handler registered to this router.

You can also override default engines, routers and dispatcher and pass your classes to the Dispatcher constructor:

from iambus import Dispatcher, RequestRouter


class CustomRequestRouter(RequestRouter):
    def bind(
        self,
        message: MessageType,
        handler: HandlerType,
        argname: t.Optional[str] = EMPTY,
        response_event: t.Optional[MessageType] = None,
        **initkwargs,
    ) -> PyBusWrappedHandler:
# your implementation here


dp = Dispatcher(
    queries_router_cls=CustomRequestRouter,
)

Handlers` signature

A basic handler is an asynchronous function that takes either one or zero arguments.

async def handler_with_arg(event: EventType):


# do something with event...

async def handler_without_arg():
# do something

Additionally, a handler can accept any number of keyword arguments (how to pass them to the handler is explained below). However, if the parameter expecting the message is strictly positional, you must specify the argname parameter when registering the handler.

A handler can also be a class implementing the HandlerProtocol protocol or an asynchronous __call__ method. The signature rules for the handle or __call__ methods are the same as for a regular function.

class HandlerProtocol(Protocol):

    async def handle(self, message: MessageType, **kwargs) -> Any:
        """Handle message."""

    async def add_event(self, event: EventType) -> None:
        """Add event to emit later."""

    async def dump_events(self) -> list[EventType]:
        """Return list of collected events."""

If your class needs some initialization parameters, you can specify them during handler registration as named argument pairs. These arguments will be passed directly to the class’s __init__ method (make sure there are no strictly positional arguments).

class CustomHandler:
    def __init__(self, repo: RepoType):
        self._repo = repo

    async def __call__(self, cmd: CreateUserCommand) -> User:
        user = await self._repo.create(cmd.data)
        return user

The same behavior applies if the handler is a simple function but takes more than one parameter without default values.

async def create_user_handler(cmd: CreateUserCommand, repo: RepoType):
    user = await repo.create(cmd.data)
    return UserCreated(data=user)


# main.py
user_repo = UserRepoImpl()
dp.commands.bind(CreateUserCommand, handler=create_user_handler, repo=user_repo,
                 response_event=UserCreated)
dp.commands.bind(CreateUserCommand, handler=CustomHandler, repo=user_repo)
# note that this double binding would raise an error in runtime

You may notice that the create_user_handler function returns an event. This allows you to pass new outgoing events to the dispatcher, which will forward them to the appropriate handler. In the case of a class implementing HandlerProtocol, additionally events can be added during processing inside the handle method. You must specify response_event arg to bind or register methods to allow this work. Otherwise strict return value would be returned.

You can get the return value if wait_for_response arg would be set to True. It is useful for queries and commands.

created_user = await dp.handle(CreateUserCommand(), wait_for_response=True)

Handlers binding

We’ve talked a lot about how to declare handlers; now let’s register them.

Currently, there are several ways to do this:

  • Use the bind method of one of the dispatcher’s routers (dp.events, dp.commands, dp.queries).
    dp.events.bind(UserCreated, create_user_handler)
    
  • Use the register decorator from one of the dispatcher’s routers (dp.events, dp.commands, dp.queries).
    @dp.events.register(UserCreated)
    async def create_user_handler(event): ...
    

Dispatcher start

After registering the handlers, you just need to start the dispatcher:

dp.start()

During this operation, the handler map will be finalized, and you won’t be able to register new handlers. Please keep this in mind.

If no handlers registered dispatcher will drop setup.

Sending messages to the Dispatcher

You can send the message by calling handle method of the Dispatcher instance or it's engines.

await dp.handle('on startup')
await dp.events.handle('on startup')

Utils

For simple dependency injection, the library provides two classes: Singleton and Factory. They use the slice-syntax: the name of the resource and the callable that retrieves it (can be either synchronous or asynchronous).

In general, there are more convenient and optimized libraries for this task.

def get_user_repo() -> UserRepo: ...


@dp.queries.register(UserQuery, repo=Singleton["repo": get_user_repo])
async def book_query_handler(query: UserQuery, repo: UserRepo) -> BookQueryResult: ...
The docs are being updated...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

iambus-0.2.1.tar.gz (24.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

iambus-0.2.1-py3-none-any.whl (25.0 kB view details)

Uploaded Python 3

File details

Details for the file iambus-0.2.1.tar.gz.

File metadata

  • Download URL: iambus-0.2.1.tar.gz
  • Upload date:
  • Size: 24.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.10

File hashes

Hashes for iambus-0.2.1.tar.gz
Algorithm Hash digest
SHA256 447146ee87b6a67974ac72d1692a5af20b1204fae4852add833c59bd7b302cc9
MD5 3f2d680eefa9bcef692a804701ab8714
BLAKE2b-256 951fc7f34e523ccb04b05b0439dbb89263742c46c1bdc4ffb115a4fc6b753041

See more details on using hashes here.

File details

Details for the file iambus-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: iambus-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 25.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.5.10

File hashes

Hashes for iambus-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3a60774f0e31d7a2c48343dedaffec62ae9fa346de20171dda800daa6fedb50d
MD5 463132a0f758b3fb5fd2c0bc08a0dd78
BLAKE2b-256 0281ddb443c651389a455c151db79aff381cab2dcc6e1b59dd104aba801902d0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page