Skip to main content

No project description provided

Reason this release was yanked:

doesn't work

Project description

MessageFlux

license tests pypi version python compatibility last commit stars

This package is used to create long-running services, that read messages from devices, and handles them. The devices in this package are meant to be composable - meaning that added functionality can come from wrapping devices in other devices.

Requirements

Python 3.7+

Installation

$ pip install messageflux

Extra Requirements (Example)

$ pip install messageflux[rabbitmq]

Example

Create it

  • Create a file main.py with:
from messageflux import SingleMessageDeviceReaderService, InputDevice, ReadResult
from messageflux.iodevices.in_memory_device import InMemoryDeviceManager


class MyExampleService(SingleMessageDeviceReaderService):
    def _handle_single_message(self, input_device: InputDevice, read_result: ReadResult):
        message = read_result.message
        # Do somthing with the message...


input_device_manager = InMemoryDeviceManager()
# write messages to devices here...

my_example_service = MyExampleService(input_device_manager=input_device_manager,
                                      input_device_names=['MY_QUEUE'])

my_example_service.start()  # this blocks indefinitely (until CTRL-C or sigterm)

Run it

python main.py 

Using Multi Processing for concurrency

from messageflux import SingleMessageDeviceReaderService, InputDevice, ReadResult
from messageflux.multiprocessing import get_service_runner, ServiceFactory


class MyExampleService(SingleMessageDeviceReaderService):
    def _handle_single_message(self, input_device: InputDevice, read_result: ReadResult):
        message = read_result.message
        # Do somthing with the message...


class MyServiceFactory(ServiceFactory):

    def create_service(self) -> MyExampleService:
        # we import the devices in 'create_service' so that all the imports will be in the child process.
        # this is only a precaution, but recommended
        from messageflux.iodevices.in_memory_device import InMemoryDeviceManager

        input_device_manager = InMemoryDeviceManager()
        # write messages to devices here...

        my_example_service = MyExampleService(input_device_manager=input_device_manager,
                                              input_device_names=['MY_QUEUE'])
        return my_example_service


service_to_run = get_service_runner(MyServiceFactory(),
                                    instance_count=5)  # this will run 5 child processes

service_to_run.start()  # this starts the child processes and blocks indefinitely (until CTRL-C or sigterm)

Optional Requirements

  • messageflux[rabbitmq] - for using the rabbitmq device
  • messageflux[dev] - for running tests and developing for this package
  • messageflux[all] - all extras required for all devices

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

messageflux-0.0.6.tar.gz (45.2 kB view details)

Uploaded Source

Built Distribution

messageflux-0.0.6-py3-none-any.whl (33.1 kB view details)

Uploaded Python 3

File details

Details for the file messageflux-0.0.6.tar.gz.

File metadata

  • Download URL: messageflux-0.0.6.tar.gz
  • Upload date:
  • Size: 45.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.7

File hashes

Hashes for messageflux-0.0.6.tar.gz
Algorithm Hash digest
SHA256 d91e22ef1768c33c4f0d0c87b50f159bb29ad44b07dd6c71551de2d3b63a5d53
MD5 289dd3682d7a8df18fde70538e6b0517
BLAKE2b-256 753759d4311813e7d4a5b1fdbbaaab47f8757e92d7211a9d5d7ecd1fc155a1ad

See more details on using hashes here.

File details

Details for the file messageflux-0.0.6-py3-none-any.whl.

File metadata

  • Download URL: messageflux-0.0.6-py3-none-any.whl
  • Upload date:
  • Size: 33.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.7

File hashes

Hashes for messageflux-0.0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 70d111c7f1179349f5a712f40c0b600732fa1aa6cdf11c1bb10fe85e8d00e9c0
MD5 5506a2f4f23bc4419b1c7cbbb340e0a5
BLAKE2b-256 68bd558655be2c34a732204ba99b330c14bac03688d5512833083a1ddca63220

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page