Zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets
Project description
MomentumX
MomentumX is a zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets using Python.
Key Features:
- High-Throughput, Low Latency
- Supports streaming and synchronous modes for use within a wide variety of use cases.
- Bring your own encoding, or use raw binary data.
- Sane data protections to ensure reliability of data in a cooperative computing environment.
- Pairs with other high-performance libraries, such as numpy and scipy, to support parallel processing of memory-intensive scientific data.
- Works on most modern versions of Linux using shared memory (via
/dev/shm
). - Seamlessly integrates into a Docker environment with minimal configuration, and readily enables lightweight container-to-container data sharing.
Examples:
Below are some simplified use cases for common MomentumX workflows. Consult the examples in the examples/
directory for additional details and implementation guidance.
Streaming Mode (e.g. lossy)
# Producer Process
import momentumx as mx
# Create a stream with a total capacity of 10MB
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)
# Write the series 0-9 repeatedly to a buffer 1000 times
for i in range(0, 1000):
buffer = stream.next_to_send()
buffer.write(f'{i % 10}'.encode('utf8')) # Note: writing to buffer via [<index>] and [<start_index>:<stop_index>] is also possible
buffer.send() # Note: call with .send(<num bytes>) if you want to explicitly control the data_size parameter, otherwise internal cursor will be used
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
while stream.is_alive:
# Receive from the stream as long as the stream is available
buffer = stream.receive()
print(buffer[:buffer.data_size])
# Calling `buffer.release()` not required (see "Implicit versus Explicit Buffer Release" section below)
Syncronous Mode (e.g. lossless)
# Producer Process
import momentumx as mx
import threading
import signal
cancel_event = threading.Event()
signal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))
# Create a stream with a total capacity of 10MB
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=True) # NOTE: sync set to True
min_subscribers = 1
while stream.subscriber_count < min_subscribers:
print("waiting for subscriber(s)")
if cancel_event.wait(0.5):
break
print("All expected subscribers are ready")
# Write the series 0-999 to a consumer
for n in range(0, 1000):
if stream.subscriber_count == 0:
cancel_event.wait(0.5)
# Note: sending strings directly is possible via the send_string call
elif stream.send_string(str(n)):
print(f"Sent: {n}")
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
while stream.is_alive:
# Note: receiving strings is possible as well via the receive_string call
print(f"Received: {stream.receive_string()}")
Numpy Integration
import momentumx as mx
import numpy as np
# Create a stream
stream = mx.Consumer('numpy_stream')
# Receive the next buffer (or if a producer, obtain the next_to_send buffer)
buffer = stream.receive()
# Create a numpy array directly from the memory without any copying
np_buff = np.frombuffer(buffer, dtype=uint8)
Implicit versus Explicit Buffer Release
MomentumX Consumers will, by default, automatically release a buffer under the covers once all references are destroyed. This promotes both usability and data integrity. However, there may be cases where the developer wants to utilize a different strategy and explicity control when buffers are released to the pool of available buffers.
stream = mx.Consumer('my_stream')
buffer = stream.receive()
# Access to buffer is safe!
buffer.read(10)
# Buffer is being returned back to available buffer pool. Be sure you are truly done with your data!
buffer.release()
# DANGER: DO NOT DO THIS!
# All operations on a buffer after calling `release` are considered unsafe! All safeguards have been removed and the memory is volatile!
buffer.read(10)
Isolated Contexts
MomentumX allows for the usage of streams outside of /dev/shm
(the default location). Pass the context
kwarg pointing to a directory on the filesystem for both the Producer
and all Consumer
instances to create isolated contexts.
This option is useful if access to /dev/shm
is unsuitable.
import momentumx as mx
# Create a producer attached to the context path /my/path
stream = mx.Producer('my_stream', ..., context='/my/path/')
...
# Create Consumer elsewhere attached to the same context of /my/path
stream = mx.Consumer('my_stream', context='/my/path/')
License
Captivation Software, LLC offers MomentumX under an Unlimited Use License to the United States Government, with all other parties subject to the GPL-3.0 License.
Inquiries / Requests
All inquiries and requests may be sent to opensource@captivation.us.
Copyright © 2022-2023 - Captivation Software, LLC.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Hashes for MomentumX-2.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6ea306581d7933cece00a5f20dbeffbabb4a5063324c55da29aca4253251943e |
|
MD5 | 317d50dd73e2dc539e7e7983ccd3e2d3 |
|
BLAKE2b-256 | bc8137420a49786a750e42b34f7b205d84ab74de7b2e16e0a8bd8eb090626d87 |
Hashes for MomentumX-2.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 4482e679c5985099195766b70facd808ac1ce1da06c3401feaa591e6d95d6bba |
|
MD5 | 9cee7e0bd3ea2e59135f51cd0abe0d50 |
|
BLAKE2b-256 | cfd7c9b2e3b5be191dbb67e6a36ae1a38328168579948c16a83d67386acd241e |
Hashes for MomentumX-2.5.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 957501b738bb25e206a35f9d8bc8196c2fd668339099d8119070957681d7fbff |
|
MD5 | 60da9618693a5b6daee14627967d4496 |
|
BLAKE2b-256 | ac76a4dd153f20ab00b870a87ba3b83cb62d938188154e910de54f1ee13ecf22 |
Hashes for MomentumX-2.5.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | aa932a2d479993beb32d089b7d56e83682c6183a77c882e81eae9c4011ba3ced |
|
MD5 | ee4a72c938fae57df7fdd63f75262a68 |
|
BLAKE2b-256 | 8ec942c520483a9b5d0eec566cfb8a8b2dd3f9762d163787d8cdcea8c405e1a6 |
Hashes for MomentumX-2.5.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 02efda056dd67bcbcb7567b4a9b0764b26a56d5f8ce73db80b397d1667a45fb0 |
|
MD5 | 2d4187d9719937f58983e7fd1ebe7f74 |
|
BLAKE2b-256 | 12e27d63b546fdd9af3df81347783625bcd523f308fd1999366524b159ba09d6 |
Hashes for MomentumX-2.5.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b1595331565036afd6b92c09c76873e3077f17aef5eae1d42d13d7fb34a088ea |
|
MD5 | 4e7d3538f9a7d026a82c90e467ea9ea8 |
|
BLAKE2b-256 | 8a1cb862ecf9dcbca4df269aa00d6af34f26ec6b49fc519190fab334dec119cb |