Skip to main content

No project description provided

Project description

MessageFlux

stars license last commit tests Documentation Status pypi version python compatibility downloads

messageflux is a framework for creating long-running services, that read messages from devices, and handles them.

Devices are composable components - meaning that added functionality can come from wrapping devices in other devices.

You can find the full documentation here

Requirements

Python 3.7+

Installation

$ pip install messageflux

Extra Requirements (Example)

$ pip install messageflux[rabbitmq]

Example

Create it

  • Create a file main.py with:
from messageflux import MessageHandlingService, MessageHandlerBase, InputDevice, ReadResult
from messageflux.iodevices.in_memory_device import InMemoryDeviceManager


class MyMessageHandler(MessageHandlerBase):
    def handle_message(self, input_device: InputDevice, read_result: ReadResult):
        message = read_result.message
        print(f'message bytes: {message.bytes}')  # Do somthing with the message...


input_device_manager = InMemoryDeviceManager()
# write messages to devices here...

my_example_service = MessageHandlingService(message_handler=MyMessageHandler(),
                                            input_device_manager=input_device_manager,
                                            input_device_names=['MY_QUEUE'])
if __name__ == '__main__':
    my_example_service.start()  # this blocks indefinitely (until CTRL-C or sigterm)

Run it

python main.py 

Using Multi Processing for concurrency

from messageflux import MessageHandlingService, MessageHandlerBase, InputDevice, ReadResult
from messageflux.multiprocessing import get_service_runner, ServiceFactory


class MyMessageHandler(MessageHandlerBase):
    def handle_message(self, input_device: InputDevice, read_result: ReadResult):
        message = read_result.message
        print(f'message bytes: {message.bytes}')  # Do somthing with the message...


class MyServiceFactory(ServiceFactory):

    def create_service(self) -> MessageHandlingService:
        """
        we import the devices in 'create_service' so that all the imports will be in the child process.
        this is only a precaution, but recommended
        """
        from messageflux.iodevices.in_memory_device import InMemoryDeviceManager

        input_device_manager = InMemoryDeviceManager()
        # write messages to devices here...

        my_example_service = MessageHandlingService(message_handler=MyMessageHandler(),
                                                    input_device_manager=input_device_manager,
                                                    input_device_names=['MY_QUEUE'])
        return my_example_service


if __name__ == '__main__':  # you must do this in multiprocess running
    service_to_run = get_service_runner(service_factory=MyServiceFactory(),
                                        instance_count=5)  # this will run 5 child processes

    service_to_run.start()  # this starts the child processes and blocks indefinitely (until CTRL-C or sigterm)

Optional Requirements

  • messageflux[fastmessage] - for using the FastMessage pipeline handler
  • messageflux[rabbitmq] - for using the rabbitmq device
  • messageflux[objectstorage] - for using the s3 device wrappers
  • messageflux[dev] - for running tests and developing for this package
  • messageflux[all] - all extras required for all devices

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

messageflux-0.3.1.tar.gz (64.3 kB view details)

Uploaded Source

Built Distribution

messageflux-0.3.1-py3-none-any.whl (93.2 kB view details)

Uploaded Python 3

File details

Details for the file messageflux-0.3.1.tar.gz.

File metadata

  • Download URL: messageflux-0.3.1.tar.gz
  • Upload date:
  • Size: 64.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.4

File hashes

Hashes for messageflux-0.3.1.tar.gz
Algorithm Hash digest
SHA256 3e1d04352f6ecf9837846ed5f39792be2e28541613e13d0babb5b288071d2bdd
MD5 236319c3bfc94e057a09c8b0e333c90a
BLAKE2b-256 ef52badd52fddca71d8e7faa5f1b476e072ce4d8d0aaf9aed55b7b07e4d583f4

See more details on using hashes here.

File details

Details for the file messageflux-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: messageflux-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 93.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.4

File hashes

Hashes for messageflux-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e839a6c981cbd6656458872313fbe5fc10a7e25bba9059a5046e6b689aa4da47
MD5 b1071fef258a18f43593e234c8e9c1f6
BLAKE2b-256 07be4d4e29c3c187373041562ba83a1eccd48c7609a3b07804ede9da5b45ee8c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page