No project description provided
Project description
MessageFlux
messageflux is a framework for creating long-running services, that read messages from devices, and handles them.
Devices are composable components - meaning that added functionality can come from wrapping devices in other devices.
You can find the full documentation here
Requirements
Python 3.7+
Installation
$ pip install messageflux
Extra Requirements (Example)
$ pip install messageflux[rabbitmq]
Example
Create it
- Create a file
main.py
with:
from messageflux import MessageHandlingService, MessageHandlerBase, InputDevice, ReadResult
from messageflux.iodevices.in_memory_device import InMemoryDeviceManager
class MyMessageHandler(MessageHandlerBase):
def handle_message(self, input_device: InputDevice, read_result: ReadResult):
message = read_result.message
print(f'message bytes: {message.bytes}') # Do somthing with the message...
input_device_manager = InMemoryDeviceManager()
# write messages to devices here...
my_example_service = MessageHandlingService(message_handler=MyMessageHandler(),
input_device_manager=input_device_manager,
input_device_names=['MY_QUEUE'])
if __name__ == '__main__':
my_example_service.start() # this blocks indefinitely (until CTRL-C or sigterm)
Run it
python main.py
Using Multi Processing for concurrency
from messageflux import MessageHandlingService, MessageHandlerBase, InputDevice, ReadResult
from messageflux.multiprocessing import get_service_runner, ServiceFactory
class MyMessageHandler(MessageHandlerBase):
def handle_message(self, input_device: InputDevice, read_result: ReadResult):
message = read_result.message
print(f'message bytes: {message.bytes}') # Do somthing with the message...
class MyServiceFactory(ServiceFactory):
def create_service(self) -> MessageHandlingService:
"""
we import the devices in 'create_service' so that all the imports will be in the child process.
this is only a precaution, but recommended
"""
from messageflux.iodevices.in_memory_device import InMemoryDeviceManager
input_device_manager = InMemoryDeviceManager()
# write messages to devices here...
my_example_service = MessageHandlingService(message_handler=MyMessageHandler(),
input_device_manager=input_device_manager,
input_device_names=['MY_QUEUE'])
return my_example_service
if __name__ == '__main__': # you must do this in multiprocess running
service_to_run = get_service_runner(service_factory=MyServiceFactory(),
instance_count=5) # this will run 5 child processes
service_to_run.start() # this starts the child processes and blocks indefinitely (until CTRL-C or sigterm)
Optional Requirements
messageflux[fastmessage]
- for using the FastMessage pipeline handlermessageflux[rabbitmq]
- for using the rabbitmq devicemessageflux[objectstorage]
- for using the s3 device wrappersmessageflux[dev]
- for running tests and developing for this packagemessageflux[all]
- all extras required for all devices
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
messageflux-0.2.1.tar.gz
(74.9 kB
view details)
Built Distribution
messageflux-0.2.1-py3-none-any.whl
(103.4 kB
view details)
File details
Details for the file messageflux-0.2.1.tar.gz
.
File metadata
- Download URL: messageflux-0.2.1.tar.gz
- Upload date:
- Size: 74.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8465c8eba3050e246c169ce7e57a72a85615c42ca94d021c106689b6c2b8b528 |
|
MD5 | f13a046e4a9ab51029284596b21d9afc |
|
BLAKE2b-256 | e74dc66890257b40a576a52271eeaf82b661fb7faf909dca8214198fbe8e17f3 |
File details
Details for the file messageflux-0.2.1-py3-none-any.whl
.
File metadata
- Download URL: messageflux-0.2.1-py3-none-any.whl
- Upload date:
- Size: 103.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7dd170f7d5a2ded0aca4ab575917dd83d7126daab6696aec71bdcf924c7acf39 |
|
MD5 | 4c634b7946abc26b7da11684d43fe12e |
|
BLAKE2b-256 | 4a11dc7f45e247b0d99202094cc4bf48c7868428d499ec215b300a7db1d5057b |