Skip to main content

A Python-based message bus system designed for message handling and crafted to integrate with dependency injection frameworks.

Project description

py-knot

This repository offers a message bus system tailored for integration with python-dependency-injector . It efficiently manages command and query dispatching and supports dependency injection, enhancing application decoupling, organization, and maintainability.

Installation

The source code is currently hosted on GitHub at: https://github.com/Retr0327/py-knot

Binary installers for the latest released version are available at the Python Package Index (PyPI).

  • pip

    pip install pyknot
    
  • poetry

    poetry add pyknot
    

Usage

1. Fundamental Implementation

  1. Define messages:

    Create specific command or query messages by extending Command or Query classes.

    from dataclasses import dataclass
    from knot import Command
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
  2. Define handlers:

    Implement CommandHandler or QueryHandler for handling defined messages.

    from dataclasses import dataclass
    from knot import Command, CommandHandler
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            ... # do something
    
  3. Register handlers:

    Register handlers to MessageBus by using register_handler method.

    from dataclasses import dataclass
    from knot import (
         Command,
         CommandHandler,
         MessageBus,
         register_handlers,
     )
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            ... # do something
    
    
    messages = register_handlers((TestCommandHandler,))
    message_bus = MessageBus(messages=messages)
    
  4. Dispatch Messages:

    Utilize MessageBus to dispatch messages within your application

    from dataclasses import dataclass
    from knot import (
         Command,
         CommandHandler,
         MessageBus,
         register_handlers,
     )
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
       a: int
       b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
       def handle(self, message: TestCommand) -> ReturnType:
           ... # do something
    
    
    messages = register_handlers((TestCommandHandler,))
    message_bus = MessageBus(messages=messages)
    message_bus.dispatch(TestCommand(a=1, b=2))
    

2. Integration with Dependency Injection

please make sure you have installed python-dependency-injector

  1. Set Up Dependency Injection Container

    from dataclasses import dataclass
    from dependency_injector import (
        containers,
        providers,
        )
    
    from knot import (
        Command,
        CommandHandler,
        MessageBus,
        register_handlers,
    )
    from knot.plugins.di import handlers_to_factories
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            return message.as_dict()
    
    
    messages = register_handlers((TestCommandHandler,))
    
    
    class MessageBusContainer(containers.DeclarativeContainer):
        message_bus = providers.Singleton(
            MessageBus,
            messages=providers.Dict(handlers_to_factories(messages)),
        )
    

    In this container, you'll manage your dependencies, including the message bus. The key function here is handlers_to_factories, transforming the handlers into factory providers that are compatible with the dependency injection framework.

  2. Wire Dependencies and Dispatch Command

    Wire the dependencies and dispatch commands through the message bus, and inject the message bus to whereever you need it.

    from dependency_injector.wiring import inject, Provide
    
    @inject
    def test(
        message_bus: MessageBus = Provide["message_bus"],
    ):
        return message_bus.dispatch(TestCommand(a=1, b=2))
    
    if __name__ == "__main__":
        container = MessageBusContainer()
        container.wire(modules=[__name__])
        test()
    

3. Extension as a Celery Plugin

please ensure that your application is already configured to work with Celery

py-knot can be extended to work seamlessly with Celery, enabling the dispatching of messages to a task queue. This extension is particularly useful for applications that require asynchronous processing or distributed task execution.

  1. Extend MessageBus for Celery Integration

    Change:

    from knot import MessageBus
    

    to

    from knot.plugins.celery import MessageBus
    
  2. Wire queue_dispatcher_module to the DI Container

    Wiring queue_dispatcher_module to the DI container makes the dispatch_queue method accessible as a dependency.

    from dataclasses import dataclass
    
    from dependency_injector import (
        containers,
        providers,
    )
    from dependency_injector.wiring import (
        Provide,
        inject,
    )
    
    from knot import (
        Command,
        CommandHandler,
        register_handlers,
    )
    from knot.plugins.celery import (
        MessageBus,
        queue_dispatcher_module,
    )
    from knot.plugins.di import handlers_to_factories
    
    ReturnType = dict[str, int]
    
    
    @dataclass(slots=True)
    class TestCommand(Command[ReturnType]):
        a: int
        b: int
    
    
    class TestCommandHandler(CommandHandler[TestCommand]):
        def handle(self, message: TestCommand) -> ReturnType:
            return message.as_dict()
    
    
    messages = register_handlers((TestCommandHandler,))
    
    
    class MessageBusContainer(containers.DeclarativeContainer):
        message_bus = providers.Singleton(
            MessageBus,
            messages=providers.Dict(handlers_to_factories(messages)),
        )
    
    
    @inject
    def test(
        message_bus: MessageBus = Provide["message_bus"],
    ):
        return message_bus.dispatch_queue(TestCommand(a="a", b="b"))
    
    
    if __name__ == "__main__":
        container = MessageBusContainer()
        container.wire(modules=[__name__, queue_dispatcher_module])
        test()
    

Contact Me

If you have any suggestion or question, please do not hesitate to email me at retr0327.dev@gmail.com.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyknot-1.0.0.tar.gz (7.8 kB view details)

Uploaded Source

Built Distribution

pyknot-1.0.0-py3-none-any.whl (8.1 kB view details)

Uploaded Python 3

File details

Details for the file pyknot-1.0.0.tar.gz.

File metadata

  • Download URL: pyknot-1.0.0.tar.gz
  • Upload date:
  • Size: 7.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/23.1.0

File hashes

Hashes for pyknot-1.0.0.tar.gz
Algorithm Hash digest
SHA256 cd0fbbdceaec21a0522b461594ec15c9826070845bfc307ec6fc2cd1d77731bd
MD5 f4d5514a0f3f29215e4d1ccf68086ce9
BLAKE2b-256 00ee01aea06dfdd7f80232a9629d11cfd75fa214ba6b2e917a3e6c7462966f0f

See more details on using hashes here.

File details

Details for the file pyknot-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: pyknot-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 8.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/23.1.0

File hashes

Hashes for pyknot-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 13a5070582e22fbf4d4ceab00cea576c4ba6f2ae7cbd70cdac9e792cb6840ca0
MD5 8b4e71b78a9a3c2558bf9e9804fd024a
BLAKE2b-256 4666895b51dfb4812f99bdb9cc7a0751a42b6f4d02c4deae43b90f5a28c062d5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page