Zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets
Project description
MomentumX
MomentumX is a zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets using Python.
Key Features:
- High-Throughput, Low Latency
- Supports streaming and synchronous modes for use within a wide variety of use cases.
- Bring your own encoding, or use raw binary data.
- Sane data protections to ensure reliability of data in a cooperative computing environment.
- Pairs with other high-performance libraries, such as numpy and scipy, to support parallel processing of memory-intensive scientific data.
- Works on most modern versions of Linux using shared memory (via
/dev/shm
). - Seamlessly integrates into a Docker environment with minimal configuration, and readily enables lightweight container-to-container data sharing.
Examples:
Below are some simplified use cases for common MomentumX workflows. Consult the examples in the examples/
directory for additional details and implementation guidance.
Stream Mode
# Producer Process
import momentumx as mx
# Create a stream with a total capacity of 10MB
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)
# Write the series 0-9 repeatedly to a buffer 1000 times
buffer = stream.next_to_send()
buffer.write(b'1')
# buffer data == b'1'
# alternatively, set via array indexing using python ByteArray syntax...
buffer[1] = ord('2')
# buffer data == b'12'
# or also set via python slice operator
buffer[2:3] = b'34'
# buffer data == b'1234'
buffer.send()
# NOTE: buffer.send() can also be passed an explicit number of bytes as well.
# Otherwise an internally managed cursor will be used.
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
# Receive from my_stream as long as the stream has not ended OR there are unread buffers
while stream.has_next:
# Block while waiting to receive buffer
# NOTE: Non-blocking receive is possible using blocking=False keyword argument
buffer = stream.receive()
# If we are here, either the stream ended OR we have a buffer, so check...
if buffer is not None:
# We have buffer containing data, so print the entire contents
print(buffer.read(buffer.data_size))
# See also "Implicit versus Explicit Buffer Release" section below.
Sync Mode
# Producer Process
import momentumx as mx
import threading
import signal
cancel_event = threading.Event()
signal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))
# Create a stream with a total capacity of 10MB
stream = mx.Producer(
'my_stream',
buffer_size=int(1e6),
buffer_count=10,
sync=True
) # NOTE: sync set to True
min_subscribers = 1
while stream.subscriber_count < min_subscribers:
print("waiting for subscriber(s)")
if cancel_event.wait(0.5):
break
print("All expected subscribers are ready")
# Write the series 0-999 to a consumer
for n in range(0, 1000):
if stream.subscriber_count == 0:
cancel_event.wait(0.5)
# Note: sending strings directly is possible via the send_string call
elif stream.send_string(str(n)):
print(f"Sent: {n}")
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
while stream.has_next:
data = stream.receive_string()
if data is not None:
# Note: receiving strings is possible as well via the receive_string call
print(f"Received: {data}")
Iterator Syntax
Working with buffers is even easier using iter()
builtin:
import momentumx as mx
stream = mx.Consumer(STREAM)
# Iterate over buffers in the stream until stream.receive() returns None
for buffer in iter(stream.receive, None):
# Now, buffer is guaranteed to be valid, so no check required -
# go ahead and print all the contents again, this time using
# the index and slice operators!
print(buffer[0]) # print first byte
print(buffer[1:buffer.data_size]) # print remaining bytes
Numpy Integration
import momentumx as mx
import numpy as np
# Create a stream
stream = mx.Consumer('numpy_stream')
# Receive the next buffer (or if a producer, obtain the next_to_send buffer)
buffer = stream.receive()
# Create a numpy array directly from the memory without any copying
np_buff = np.frombuffer(buffer, dtype=uint8)
Implicit versus Explicit Buffer Release
MomentumX Consumers will, by default, automatically release a buffer under the covers once all references are destroyed. This promotes both usability and data integrity. However, there may be cases where the developer wants to utilize a different strategy and explicity control when buffers are released to the pool of available buffers.
stream = mx.Consumer('my_stream')
buffer = stream.receive()
# Access to buffer is safe!
buffer.read(10)
# Buffer is being returned back to available buffer pool.
# Be sure you are truly done with your data!
buffer.release()
# DANGER: DO NOT DO THIS!
# All operations on a buffer after calling `release` are considered unsafe!
# All safeguards have been removed and the memory is volatile!
buffer.read(10)
Isolated Contexts
MomentumX allows for the usage of streams outside of /dev/shm
(the default location). Pass the context
kwarg pointing to a directory on the filesystem for both the Producer
and all Consumer
instances to create isolated contexts.
This option is useful if access to /dev/shm
is unsuitable.
import momentumx as mx
# Create a producer attached to the context path /my/path
stream = mx.Producer('my_stream', ..., context='/my/path/')
...
# Create Consumer elsewhere attached to the same context of /my/path
stream = mx.Consumer('my_stream', context='/my/path/')
License
Captivation Software, LLC offers MomentumX under an Unlimited Use License to the United States Government, with all other parties subject to the GPL-3.0 License.
Inquiries / Requests
All inquiries and requests may be sent to opensource@captivation.us.
Copyright © 2022-2023 - Captivation Software, LLC.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distributions
Hashes for MomentumX-2.6.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b36fc4cad91b515182630235824f4e5cb097415c7b99edc0e2ee4eb3ec098764 |
|
MD5 | d6b262295d18500bdff351424cde91bb |
|
BLAKE2b-256 | b4ed56afacd34067cb84bd87fd6c1f7c48484a8fa4fcfd08a95c19ad1c8d06eb |
Hashes for MomentumX-2.6.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ed8ed6623e42467ec0055ec58e8050a5bcd29a2af966d572dad63d1a5f215af9 |
|
MD5 | 9a9250b6da842083cebf202cca62f157 |
|
BLAKE2b-256 | 042e2ef994ebf3d8faa476d57d1f49e8f31b944bb7ec8cbc241b584671c4dfe0 |
Hashes for MomentumX-2.6.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5dd2f570c33d9f807f382e3b6ea36ee8c748d8428828453e10ce5161750a3f08 |
|
MD5 | aa6c012790b6591d0b0007098c558b7f |
|
BLAKE2b-256 | 35533413894b5efdb06fac5a2c567cd2a0c1ade367cc57101b256d1abb86e785 |
Hashes for MomentumX-2.6.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ea07bdcb01408d8714957a19caaa8b984a620a52b49d8e6ae4317dd48bbd7653 |
|
MD5 | d9c262f15f8438c0a6a7079e8cab7afa |
|
BLAKE2b-256 | e55a419a91f07744b46836c16209337b8f1aeb02ee12af986cc27ef7797672ee |
Hashes for MomentumX-2.6.5-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ed334079234ab68e4c506544c1d7f23d872a22edcbb7bc5af805a5f25b735236 |
|
MD5 | 58daa5c091b7e2b73d1401a4363450c7 |
|
BLAKE2b-256 | 8d5a4456ba73d0379029b2e9b06a28c39a2371d22342c2bcf516441723b63d9f |
Hashes for MomentumX-2.6.5-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 94b3f6c8b22f14f00ffbfe9e63aa47e639f093f6e197ecb54aef36b22cdf651b |
|
MD5 | feae068d80d14d81b8fc4e740d5425d6 |
|
BLAKE2b-256 | c2f48fa609b895339c8c5653fca454ec62c862fd62a44884e223c8848ddaa5f9 |