No project description provided
Project description
MessageFlux
messageflux is a framework for creating long-running services, that read messages from devices, and handles them.
Devices are composable components - meaning that added functionality can come from wrapping devices in other devices.
You can find the full documentation here
Requirements
Python 3.7+
Installation
$ pip install messageflux
Extra Requirements (Example)
$ pip install messageflux[rabbitmq]
Example
Create it
- Create a file
main.pywith:
from messageflux import MessageHandlingService, MessageHandlerBase, InputDevice, ReadResult
from messageflux.iodevices.in_memory_device import InMemoryDeviceManager
class MyMessageHandler(MessageHandlerBase):
def handle_message(self, input_device: InputDevice, read_result: ReadResult):
message = read_result.message
print(f'message bytes: {message.bytes}') # Do somthing with the message...
input_device_manager = InMemoryDeviceManager()
# write messages to devices here...
my_example_service = MessageHandlingService(message_handler=MyMessageHandler(),
input_device_manager=input_device_manager,
input_device_names=['MY_QUEUE'])
if __name__ == '__main__':
my_example_service.start() # this blocks indefinitely (until CTRL-C or sigterm)
Run it
python main.py
Using Multi Processing for concurrency
from messageflux import MessageHandlingService, MessageHandlerBase, InputDevice, ReadResult
from messageflux.multiprocessing import get_service_runner, ServiceFactory
class MyMessageHandler(MessageHandlerBase):
def handle_message(self, input_device: InputDevice, read_result: ReadResult):
message = read_result.message
print(f'message bytes: {message.bytes}') # Do somthing with the message...
class MyServiceFactory(ServiceFactory):
def create_service(self) -> MessageHandlingService:
"""
we import the devices in 'create_service' so that all the imports will be in the child process.
this is only a precaution, but recommended
"""
from messageflux.iodevices.in_memory_device import InMemoryDeviceManager
input_device_manager = InMemoryDeviceManager()
# write messages to devices here...
my_example_service = MessageHandlingService(message_handler=MyMessageHandler(),
input_device_manager=input_device_manager,
input_device_names=['MY_QUEUE'])
return my_example_service
if __name__ == '__main__': # you must do this in multiprocess running
service_to_run = get_service_runner(service_factory=MyServiceFactory(),
instance_count=5) # this will run 5 child processes
service_to_run.start() # this starts the child processes and blocks indefinitely (until CTRL-C or sigterm)
Optional Requirements
messageflux[rabbitmq]- for using the rabbitmq devicemessageflux[objectstorage]- for using the s3 device wrappersmessageflux[dev]- for running tests and developing for this packagemessageflux[all]- all extras required for all devices
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file messageflux-0.5.7.tar.gz.
File metadata
- Download URL: messageflux-0.5.7.tar.gz
- Upload date:
- Size: 67.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ae7736ba0e3566453b562f40c2593ff70af7f3fac695daca56f93cefc4bfef8d
|
|
| MD5 |
c1f926ad4a8a1bb451492346b9b6164d
|
|
| BLAKE2b-256 |
343fdacb5644361b7c72e027691f6f05c7011e7edbe26f0484e2e31a57525d2a
|
File details
Details for the file messageflux-0.5.7-py3-none-any.whl.
File metadata
- Download URL: messageflux-0.5.7-py3-none-any.whl
- Upload date:
- Size: 98.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d97ff8f2603428d3b19abcf4c20f17d288bddc5d333529ed49bc9d8ed1f47c89
|
|
| MD5 |
c681c653bff5b8fda1c8f96b2519f128
|
|
| BLAKE2b-256 |
045d3b10ee5d49ceca2f908830349f46db38ffedab906f252ba6c81275c7b589
|