Utility library for ZMQ data streaming
Project description
Overview
mflow facilitates the handling of ZMQ data streams. It provides basic accounting and statistics on messages received/send as well as an easy way to handle different types of messages within a stream.
Right now mflow comes with following message type support:
- array-1.0
- bsr_m-1.0
- dheader-1.0 (Dectris Eiger)
- dimage-1.0 (Dectris Eiger)
- dseries_end-1.0 (Dectris Eiger)
Installation
Pip
The mflow package is available on https://pypi.python.org and can be installed via pip
pip install mflow
Anaconda
The mflow package is available on anaconda.org and can be installed as follows:
conda install -c https://conda.anaconda.org/paulscherrerinstitute mflow
Usage
Connect/Create stream:
stream = mflow.connect(address, conn_type=mflow.CONNECT, mode=mflow.PULL, receive_timeout=None, queue_size=100)
Receive a message:
message = stream.receive(self, handler=None)
The returned message object contains the current receiving statistics in message.statistics and the actual
message data in message.data.
If there should be no dynamic resolution of the message handler an explicit handler can be specified to handle the incoming message.
Disconnecting stream:
stream.disconnect()
Sending message (ensure that you specified the correct mode!):
stream.send('message content', send_more=True)
Register multiple custom (htype) handlers:
def receive_function(receiver):
header = receiver.next(as_json=True)
data = []
while receiver.has_more():
segment = receiver.next() or None
data.append(segment)
res = {
"header": header,
"data": data
}
return res
my_handlers = dict()
my_handlers['my_htype-1.0'] = receive_function
# ... register more handlers ...
# set handlers
stream.handlers = my_handlers
Note: Handlers need to be registered before calling receive().
Example:
import mflow
stream = mflow.connect('tcp://sf-lc:9999')
# Receive "loop"
message = stream.receive()
print(message.statistics.messages_received)
stream.disconnect()
Advanced
Register Additional Handlers
Manually register more handlers that are not provided by this package (after creating the stream)
stream.handlers['id'] = myhandler
Merge Streams
mflow provides a simple class to merge two ore more streams. The default implementation merges the messages round robin, i.e. you will receive message 1 from stream 1 then message 1 from stream 2, then message 2 from stream 1 ...
import mflow
stream_one = mflow.connect('tcp://source1:7777')
stream_two = mflow.connect('tcp://source2:7779')
import mflow.utils
stream = mflow.utils.Merge(stream_one, stream_two)
message = stream.receive()
stream.disconnect()
Command Line
The Anaconda mflow package comes with several command line tools useful for testing streaming.
m_stats
Show statistics for incoming streams. Useful for measure the maximum throughput for a given stream on a link.
usage: m_stats [-h] source
Stream statistic utility
positional arguments:
source Source address - format "tcp://<address>:<port>"
optional arguments:
-h, --help show this help message and exit
m_generate
Generate a random stream. This is useful, together with m_stats to measure possible throughput.
usage: m_generate [-h] [-a ADDRESS] [-s SIZE]
Stream generation utility
optional arguments:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
Address - format "tcp://<address>:<port>"
-s SIZE, --size SIZE Size of data to send (MB)"
m_dump
Dump an incoming stream to disk or screen. While dumping into files, m_dump saves all sub-messages into individual files.
The option -s can be used if you are only interested in the first n submessages (e.g. header)
usage: m_dump [-h] [-s SKIP] source [folder]
Stream dump utility
positional arguments:
source Source address - format "tcp://<address>:<port>"
folder Destination folder
optional arguments:
-h, --help show this help message and exit
-s SKIP, --skip SKIP Skip sub-messages starting from this number (including
number)
m_replay
Replay a recorded (via m_dump) stream.
usage: m_replay [-h] [-a ADDRESS] folder
Stream replay utility
positional arguments:
folder Destination folder
optional arguments:
-h, --help show this help message and exit
-a ADDRESS, --address ADDRESS
Address - format "tcp://<address>:<port>" (default:
"tcp://*:9999")
m_split
Split an incoming stream into multiple streams. Currently only the PUSH/PULL scheme is supported.
usage: m_split [-h] [-c CONFIG] [source] [streams [streams ...]]
Stream dump utility
positional arguments:
source Source address - format "tcp://<address>:<port>"
streams Streams to generate - "tcp://<address>:<port>"
optional arguments:
-h, --help show this help message and exit
-c CONFIG, --config CONFIG
Configuration file
The -c / --config option accepts a configuration file as follows:
{
"source": {
"address": "tcp://localhost:7777",
"mode": "PULL",
"queue_size": 10
},
"streams": [
{
"address": "tcp://*:8888",
"mode": "PUSH"
}
]
}
If an address is specified in the format of 'tcp://*:' the splitter will do a bind on that address and opens the specified port. If there is a hostname given, the splitter tries to connect to the address. Supported modes are PULL/SUB for the source and PUSH/PUB for outgoing streams.
The default value for mode (if omitted) is PULL for the source and PUSH for output streams. The default queue size (if omitted) is 100 for both source and output streams.
Output streams can be reduced by applying a modulo. This can be done by specifying the modulo attribute as follows:
{
"source": {
"address": "tcp://localhost:7777",
"mode": "PULL",
"queue_size": 10
},
"streams": [
{
"address": "tcp://*:8888",
"mode": "PUSH",
"modulo": 1000
}
]
}
Such a configuration will result in that only every 1000 message is send out to the output stream.
# Development
## PyPi
Upload package to pypi.python.org
```bash
python setup.py sdist upload
Anaconda
To build the anaconda package do:
conda build conda_recipe
Afterwards the package can be uploaded to anaconda.org via
anaconda upload <path_to.tar.bz2_file>
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file psi_mflow-1.0.0-py3-none-any.whl.
File metadata
- Download URL: psi_mflow-1.0.0-py3-none-any.whl
- Upload date:
- Size: 32.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.9.18
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e7e6e888c1588150872b2f76286b652753715acf5fe8af0372c14fadf8f71776
|
|
| MD5 |
c88254437b875029e9c838b40c8b5d77
|
|
| BLAKE2b-256 |
08d9cbb3018eab380e49e5664985551bc0a6e0ad4061592be27323cac0af6f06
|