Zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets
Project description
MomentumX
MomentumX is a zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets using Python.
Key Features:
- High-Throughput, Low Latency
- Supports streaming and synchronous modes for use within a wide variety of use cases.
- Bring your own encoding, or use raw binary data.
- Small footprint with zero dependencies.
- Sane data protections to ensure reliability of data in a cooperative computing environment.
- Pairs with other high-performance libraries, such as numpy and scipy, to support parallel processing of memory-intensive scientific data.
- Works on most modern versions of Linux using shared memory (via
/dev/shm
). - Seamlessly integrates into a Docker environment with minimal configuration, and readily enables lightweight container-to-container data sharing.
Examples:
Below are some simplified use cases for common MomentumX workflows. Consult the examples in the examples/
directory for additional details and implementation guidance.
Streaming Mode (e.g. lossy)
# Producer Process
import momentumx as mx
# Create a stream with a total capacity of 10MB
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)
# Write the series 0-9 repeatedly to a buffer 1000 times
for i in range(0, 1000):
buffer = stream.next_to_send()
buffer.write(f'{i % 10}'.encode('utf8')) # Note: writing to buffer via [<index>] and [<start_index>:<stop_index>] is also possible
buffer.send() # Note: call with .send(<num bytes>) if you want to explicitly control the data_size parameter, otherwise internal cursor will be used
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
while stream.is_alive:
# Receive from the stream as long as the stream is available
buffer = stream.receive()
print(buffer[:buffer.data_size])
Syncronous Mode (e.g. lossless)
# Producer Process
import momentumx as mx
import threading
import signal
cancel_event = threading.Event()
signal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))
# Create a stream with a total capacity of 10MB
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=True) # NOTE: sync set to True
min_subscribers = 1
while stream.subscriber_count < min_subscribers:
print("waiting for subscriber(s)")
if cancel_event.wait(0.5):
break
print("All expected subscribers are ready")
# Write the series 0-999 to a consumer
for n in range(0, 1000):
if stream.subscriber_count == 0:
cancel_event.wait(0.5)
# Note: sending strings directly is possible via the send_string call
elif stream.send_string(str(n)):
print(f"Sent: {n}")
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
while stream.is_alive:
# Note: receiving strings is possible as well via the receive_string call
print(f"Received: {stream.receive_string()}")
Numpy Integration
import momentumx as mx
import numpy as np
# Create a stream
stream = mx.Consumer('numpy_stream')
# Receive the next buffer (or if a producer, obtain the next_to_send buffer)
buffer = stream.receive()
# Create a numpy array directly from the memory without any copying
np_buff = np.frombuffer(buffer, dtype=uint8)
License
Captivation Software, LLC offers MomentumX under an Unlimited Use License to the United States Government, with all other parties subject to the GPL-3.0 License.
Inquiries / Requests
All inquiries and requests may be sent to opensource@captivation.us.
Copyright © 2022-2023 - Captivation Software, LLC.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Hashes for MomentumX-2.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | fc2f6277b8f1135447fc13c1c61a9d88d13b315deaf55d8a52258aba849333eb |
|
MD5 | 3b6b9b5ae45500ed9c7c690f2412df5e |
|
BLAKE2b-256 | f676f1b8deb57b5c481d0813ac64bc57330db432c0a85baabc6191317aa63700 |
Hashes for MomentumX-2.3.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bd2297a4a3584647ec52386a3091d317031ceaf013c6a46d5467d8046caae3dd |
|
MD5 | dedfcb27397888e51232a831bb21cc4c |
|
BLAKE2b-256 | cf6d83842955a2aebc381224942bbf591501deb5d6043702e6ccc05f1f686160 |
Hashes for MomentumX-2.3.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | bb17b862d44ca913d924093aa9a9a9046595927c31e4362c7a0fde0dbd2db5f2 |
|
MD5 | a3c4d535937e1a4807b2064fd2086c6d |
|
BLAKE2b-256 | 29be5ded710ce5bb93097b5d391957e33fc3fae82893d214764d64279de3ed04 |
Hashes for MomentumX-2.3.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5d02b14de81b1e56e88612e37d5971fcaa242e70a1a3937ab7e4bfb2a06c9b5e |
|
MD5 | 388fa8cbb31a9d2c05c5119d094c77b8 |
|
BLAKE2b-256 | 5e366b5d236432846e88c1bf21abf5baf8a131380c1238276515955e8931caa7 |
Hashes for MomentumX-2.3.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7968bcdaf66d15e5b6ae2418c480d3b59d6e0e87b08c08a0bce69e70b81b6a7b |
|
MD5 | 014b48b0ecd45defaecd4490054e1244 |
|
BLAKE2b-256 | 113bb7cd99339112e041b275da58e66e23308e82c8c331c8ab6111d6152d681e |
Hashes for MomentumX-2.3.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 21b5469df0dd0475162322788a726f3157d5c6d75325521cca605c636800587f |
|
MD5 | c604a248b7037f1eed8f9e6387c8101a |
|
BLAKE2b-256 | 0246b2d5dcb6096a8b3eec2c728d55ab550161196245a6ebfba02fc8b3706223 |