A Python-based message bus system designed for message handling and crafted to integrate with dependency injection frameworks.
Project description
py-knot
This repository offers a message bus system tailored for integration with python-dependency-injector . It efficiently manages command and query dispatching and supports dependency injection, enhancing application decoupling, organization, and maintainability.
Installation
The source code is currently hosted on GitHub at: https://github.com/Retr0327/py-knot
Binary installers for the latest released version are available at the Python Package Index (PyPI).
-
pip
pip install pyknot
-
poetry
poetry add pyknot
Usage
1. Fundamental Implementation
-
Define messages:
Create specific command or query messages by extending
Command
orQuery
classes.from dataclasses import dataclass from knot import Command ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int
-
Define handlers:
Implement
CommandHandler
orQueryHandler
for handling defined messages.from dataclasses import dataclass from knot import Command, CommandHandler ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: ... # do something
-
Register handlers:
Register handlers to
MessageBus
by usingregister_handler
method.from dataclasses import dataclass from knot import ( Command, CommandHandler, MessageBus, register_handlers, ) ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: ... # do something messages = register_handlers((TestCommandHandler,)) message_bus = MessageBus(messages=messages)
-
Dispatch Messages:
Utilize
MessageBus
to dispatch messages within your applicationfrom dataclasses import dataclass from knot import ( Command, CommandHandler, MessageBus, register_handlers, ) ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: ... # do something messages = register_handlers((TestCommandHandler,)) message_bus = MessageBus(messages=messages) message_bus.dispatch(TestCommand(a=1, b=2))
2. Integration with Dependency Injection
please make sure you have installed python-dependency-injector
-
Set Up Dependency Injection Container
from dataclasses import dataclass from dependency_injector import ( containers, providers, ) from knot import ( Command, CommandHandler, MessageBus, register_handlers, ) from knot.plugins.di import handlers_to_factories ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: return message.as_dict() messages = register_handlers((TestCommandHandler,)) class MessageBusContainer(containers.DeclarativeContainer): message_bus = providers.Singleton( MessageBus, messages=providers.Dict(handlers_to_factories(messages)), )
In this container, you'll manage your dependencies, including the message bus. The key function here is
handlers_to_factories
, transforming the handlers into factory providers that are compatible with the dependency injection framework. -
Wire Dependencies and Dispatch Command
Wire the dependencies and dispatch commands through the message bus, and inject the message bus to whereever you need it.
from dependency_injector.wiring import inject, Provide @inject def test( message_bus: MessageBus = Provide["message_bus"], ): return message_bus.dispatch(TestCommand(a=1, b=2)) if __name__ == "__main__": container = MessageBusContainer() container.wire(modules=[__name__]) test()
3. Extension as a Celery Plugin
please ensure that your application is already configured to work with Celery
py-knot
can be extended to work seamlessly with Celery, enabling the dispatching of messages to a task queue. This extension is particularly useful for applications that require asynchronous processing or distributed task execution.
-
Extend
MessageBus
for Celery IntegrationChange:
from knot import MessageBus
to
from knot.plugins.celery import MessageBus
-
Wire
queue_dispatcher_module
to the DI ContainerWiring
queue_dispatcher_module
to the DI container makes thedispatch_queue
method accessible as a dependency.from dataclasses import dataclass from dependency_injector import ( containers, providers, ) from dependency_injector.wiring import ( Provide, inject, ) from knot import ( Command, CommandHandler, register_handlers, ) from knot.plugins.celery import ( MessageBus, queue_dispatcher_module, ) from knot.plugins.di import handlers_to_factories ReturnType = dict[str, int] @dataclass(slots=True) class TestCommand(Command[ReturnType]): a: int b: int class TestCommandHandler(CommandHandler[TestCommand]): def handle(self, message: TestCommand) -> ReturnType: return message.as_dict() messages = register_handlers((TestCommandHandler,)) class MessageBusContainer(containers.DeclarativeContainer): message_bus = providers.Singleton( MessageBus, messages=providers.Dict(handlers_to_factories(messages)), ) @inject def test( message_bus: MessageBus = Provide["message_bus"], ): return message_bus.dispatch_queue(TestCommand(a="a", b="b")) if __name__ == "__main__": container = MessageBusContainer() container.wire(modules=[__name__, queue_dispatcher_module]) test()
Contact Me
If you have any suggestion or question, please do not hesitate to email me at retr0327.dev@gmail.com.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pyknot-1.0.0.tar.gz
.
File metadata
- Download URL: pyknot-1.0.0.tar.gz
- Upload date:
- Size: 7.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/23.1.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cd0fbbdceaec21a0522b461594ec15c9826070845bfc307ec6fc2cd1d77731bd |
|
MD5 | f4d5514a0f3f29215e4d1ccf68086ce9 |
|
BLAKE2b-256 | 00ee01aea06dfdd7f80232a9629d11cfd75fa214ba6b2e917a3e6c7462966f0f |
File details
Details for the file pyknot-1.0.0-py3-none-any.whl
.
File metadata
- Download URL: pyknot-1.0.0-py3-none-any.whl
- Upload date:
- Size: 8.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.6.1 CPython/3.11.5 Darwin/23.1.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 13a5070582e22fbf4d4ceab00cea576c4ba6f2ae7cbd70cdac9e792cb6840ca0 |
|
MD5 | 8b4e71b78a9a3c2558bf9e9804fd024a |
|
BLAKE2b-256 | 4666895b51dfb4812f99bdb9cc7a0751a42b6f4d02c4deae43b90f5a28c062d5 |