Decoupling logic
Project description
Decouple
Say NO to monolithic architecture.
Decouple complex system to separated modules by mediator.
pip install decouple
Register event handler
Dispatch events
Example
Simple usage
Code of example is here
- Write ModuleA - publisher
from dataclasses import dataclass
from decouple import Module, Event, Mediator
class ModuleA(Module):
def __init__(self, mediator: Mediator = Mediator()):
super().__init__(mediator)
def start(self):
self.pub(StartEvent(a=7))
@dataclass
class StartEvent(Event):
a: int = 0 # must be serializable
- Write ModuleB - subscriber
from decouple import Module
from examples.simple.module_a import StartEvent
class ModuleB(Module):
def __init__(self):
super().__init__()
self.sub(StartEvent, self.handle_a)
def handle_start(self, event: StartEvent):
print(f'field a:{event.a}')
- Compose both modules to the whole
from examples.simple.module_a import ModuleA
from examples.simple.module_b import ModuleB
module_a = ModuleA()
module_a.add(ModuleB())
module_a.start()
Priorities
Manual control
from decouple import Module
from examples.simple.module_a import StartEvent
class ModuleB(Module):
def __init__(self):
super().__init__()
# handler with higher priority will be triggered early
self.sub(StartEvent, self.handle_b, priority=0)
self.sub(StartEvent, self.handle_a, priority=100)
def handle_a(self, event: StartEvent):
# will be triggered first
print(f'field a:{event.a}')
def handle_b(self, event: StartEvent):
# will be triggered second
print(f'event.uuid:{event.uuid}, event.timestamp:{event.timestamp}')
Default priority depends on registration order
from decouple import Module
from examples.simple.module_a import StartEvent
class ModuleB(Module):
def __init__(self):
super().__init__()
# priority of handlers call increased by 1 every registration for the same event
self.sub(StartEvent, self.handle_b) # priority=1
self.sub(StartEvent, self.handle_a) # priority=2
def handle_a(self, event: StartEvent):
# will be triggered second
print(f'field a:{event.a}')
def handle_b(self, event: StartEvent):
# will be triggered first
print(f'event.uuid:{event.uuid}, event.timestamp:{event.timestamp}')
Feedback
I will be glad to read your feedback in issues and pull requests.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
decouple-0.0.7.tar.gz
(3.3 kB
view details)
File details
Details for the file decouple-0.0.7.tar.gz
.
File metadata
- Download URL: decouple-0.0.7.tar.gz
- Upload date:
- Size: 3.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.4.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.7.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c253c1cc62b76e9720a066f334552548c6de1884bc3bdf456ddb62d61b867b09 |
|
MD5 | 4ffeaebb883be335262292aece329c04 |
|
BLAKE2b-256 | 6ce3f70681dd89a96a701f22055caedc79cf88cebdeac9df827a5720bca80278 |