Skip to main content

Utility library for ZMQ data streaming

Project description

Build Status Build status

Overview

mflow facilitates the handling of ZMQ data streams. It provides basic accounting and statistics on messages received/send as well as an easy way to handle different types of messages within a stream.

Right now mflow comes with following message type support:

  • array-1.0
  • bsr_m-1.0
  • dheader-1.0 (Dectris Eiger)
  • dimage-1.0 (Dectris Eiger)
  • dseries_end-1.0 (Dectris Eiger)

Installation

Pip

The mflow package is available on https://pypi.python.org and can be installed via pip

pip install mflow

Anaconda

The mflow package is available on anaconda.org and can be installed as follows:

conda install -c https://conda.anaconda.org/paulscherrerinstitute mflow

Usage

Connect/Create stream:

stream = mflow.connect(address, conn_type=mflow.CONNECT, mode=mflow.PULL, receive_timeout=None, queue_size=100)

Receive a message:

message = stream.receive(self, handler=None)

The returned message object contains the current receiving statistics in message.statistics and the actual message data in message.data.

If there should be no dynamic resolution of the message handler an explicit handler can be specified to handle the incoming message.

Disconnecting stream:

stream.disconnect()

Sending message (ensure that you specified the correct mode!):

stream.send('message content', send_more=True)

Register multiple custom (htype) handlers:

def receive_function(receiver):
      header = receiver.next(as_json=True)

      data = []
      while receiver.has_more():
          segment = receiver.next() or None
          data.append(segment)

      res = {
          "header": header,
          "data": data
      }
      return res


my_handlers = dict()
my_handlers['my_htype-1.0'] = receive_function
# ... register more handlers ...

# set handlers
stream.handlers = my_handlers

Note: Handlers need to be registered before calling receive().

Example:

import mflow
stream = mflow.connect('tcp://sf-lc:9999')

# Receive "loop"
message  = stream.receive()
print(message.statistics.messages_received)

stream.disconnect()

Advanced

Register Additional Handlers

Manually register more handlers that are not provided by this package (after creating the stream)

stream.handlers['id'] = myhandler

Merge Streams

mflow provides a simple class to merge two ore more streams. The default implementation merges the messages round robin, i.e. you will receive message 1 from stream 1 then message 1 from stream 2, then message 2 from stream 1 ...

import mflow
stream_one = mflow.connect('tcp://source1:7777')
stream_two = mflow.connect('tcp://source2:7779')

import mflow.utils
stream = mflow.utils.Merge(stream_one, stream_two)

message = stream.receive()

stream.disconnect()

Command Line

The Anaconda mflow package comes with several command line tools useful for testing streaming.

m_stats

Show statistics for incoming streams. Useful for measure the maximum throughput for a given stream on a link.

usage: m_stats [-h] source

Stream statistic utility

positional arguments:
  source      Source address - format "tcp://<address>:<port>"

optional arguments:
  -h, --help  show this help message and exit

m_generate

Generate a random stream. This is useful, together with m_stats to measure possible throughput.

usage: m_generate [-h] [-a ADDRESS] [-s SIZE]

Stream generation utility

optional arguments:
  -h, --help            show this help message and exit
  -a ADDRESS, --address ADDRESS
                        Address - format "tcp://<address>:<port>"
  -s SIZE, --size SIZE  Size of data to send (MB)"

m_dump

Dump an incoming stream to disk or screen. While dumping into files, m_dump saves all sub-messages into individual files. The option -s can be used if you are only interested in the first n submessages (e.g. header)

usage: m_dump [-h] [-s SKIP] source [folder]

Stream dump utility

positional arguments:
  source                Source address - format "tcp://<address>:<port>"
  folder                Destination folder

optional arguments:
  -h, --help            show this help message and exit
  -s SKIP, --skip SKIP  Skip sub-messages starting from this number (including
                        number)

m_replay

Replay a recorded (via m_dump) stream.

usage: m_replay [-h] [-a ADDRESS] folder

Stream replay utility

positional arguments:
  folder                Destination folder

optional arguments:
  -h, --help            show this help message and exit
  -a ADDRESS, --address ADDRESS
                        Address - format "tcp://<address>:<port>" (default:
                        "tcp://*:9999")

m_split

Split an incoming stream into multiple streams. Currently only the PUSH/PULL scheme is supported.

usage: m_split [-h] [-c CONFIG] [source] [streams [streams ...]]

Stream dump utility

positional arguments:
  source                Source address - format "tcp://<address>:<port>"
  streams               Streams to generate - "tcp://<address>:<port>"

optional arguments:
  -h, --help            show this help message and exit
  -c CONFIG, --config CONFIG
                        Configuration file

The -c / --config option accepts a configuration file as follows:

{
    "source": {
        "address": "tcp://localhost:7777",
        "mode": "PULL",
        "queue_size": 10
    },
    "streams": [
        {
            "address": "tcp://*:8888",
            "mode": "PUSH"
        }
    ]
}

If an address is specified in the format of 'tcp://*:' the splitter will do a bind on that address and opens the specified port. If there is a hostname given, the splitter tries to connect to the address. Supported modes are PULL/SUB for the source and PUSH/PUB for outgoing streams.

The default value for mode (if omitted) is PULL for the source and PUSH for output streams. The default queue size (if omitted) is 100 for both source and output streams.

Output streams can be reduced by applying a modulo. This can be done by specifying the modulo attribute as follows:

{
    "source": {
        "address": "tcp://localhost:7777",
        "mode": "PULL",
        "queue_size": 10
    },
    "streams": [
        {
            "address": "tcp://*:8888",
            "mode": "PUSH",
            "modulo": 1000
        }
    ]
}

Such a configuration will result in that only every 1000 message is send out to the output stream.


# Development

## PyPi
Upload package to pypi.python.org

```bash
python setup.py sdist upload

Anaconda

To build the anaconda package do:

conda build conda_recipe

Afterwards the package can be uploaded to anaconda.org via

anaconda upload <path_to.tar.bz2_file>

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

psi_mflow-1.0.0-py3-none-any.whl (32.7 kB view details)

Uploaded Python 3

File details

Details for the file psi_mflow-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: psi_mflow-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 32.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.9.18

File hashes

Hashes for psi_mflow-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e7e6e888c1588150872b2f76286b652753715acf5fe8af0372c14fadf8f71776
MD5 c88254437b875029e9c838b40c8b5d77
BLAKE2b-256 08d9cbb3018eab380e49e5664985551bc0a6e0ad4061592be27323cac0af6f06

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page