Skip to main content

Zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets

Project description

MomentumX


MomentumX is a zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets using Python.


Key Features:

  • High-Throughput, Low Latency
  • Supports streaming and synchronous modes for use within a wide variety of use cases.
  • Bring your own encoding, or use raw binary data.
  • Sane data protections to ensure reliability of data in a cooperative computing environment.
  • Pairs with other high-performance libraries, such as numpy and scipy, to support parallel processing of memory-intensive scientific data.
  • Works on most modern versions of Linux using shared memory (via /dev/shm).
  • Seamlessly integrates into a Docker environment with minimal configuration, and readily enables lightweight container-to-container data sharing.

Examples:

Below are some simplified use cases for common MomentumX workflows. Consult the examples in the examples/ directory for additional details and implementation guidance.

Stream Mode

# Producer Process
import momentumx as mx

# Create a stream with a total capacity of 10MB (1MB x 10)
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)

# Obtain the next available buffer for writing
buffer = stream.next_to_send()
buffer.write(b'1') 

buffer.send()
# NOTE: buffer.send() can also be passed an explicit number of bytes as well. 
# Otherwise an internally managed cursor will be used.
# Consumer Process(es)
import momentumx as mx

stream = mx.Consumer('my_stream')

# Receive from my_stream as long as the stream has not ended OR there are unread buffers 
while stream.has_next:

    # Block while waiting to receive buffer 
    # NOTE: Non-blocking receive is possible using blocking=False keyword argument
    buffer = stream.receive()
    
    # If we are here, either the stream ended OR we have a buffer, so check...
    if buffer is not None:

        # We have buffer containing data, so print the entire contents
        print(buffer.read(buffer.data_size))
    
        # See also "Implicit versus Explicit Buffer Release" section below.

Sync Mode

# Producer Process
import momentumx as mx
import threading
import signal

cancel_event = threading.Event()
signal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))

# Create a stream with a total capacity of 10MB
stream = mx.Producer(
    'my_stream', 
    buffer_size=int(1e6), 
    buffer_count=10, 
    sync=True
) # NOTE: sync set to True

min_subscribers = 1

while stream.subscriber_count < min_subscribers:
    print("waiting for subscriber(s)")
    if cancel_event.wait(0.5):
        break

print("All expected subscribers are ready")

# Write the series 0-999 to a consumer 
for n in range(0, 1000):
    if stream.subscriber_count == 0:
        cancel_event.wait(0.5)

    # Note: sending strings directly is possible via the send_string call
    elif stream.send_string(str(n)):
        print(f"Sent: {n}")
# Consumer Process(es)
import momentumx as mx

stream = mx.Consumer('my_stream')

while stream.has_next:
    data = stream.receive_string() 

    if data is not None: 
        # Note: receiving strings is possible as well via the receive_string call
        print(f"Received: {data}")

Iterator Syntax

Working with buffers is even easier using iter() builtin:

import momentumx as mx

stream = mx.Consumer(STREAM)

# Iterate over buffers in the stream until stream.receive() returns None
for buffer in iter(stream.receive, None):     
    # Now, buffer is guaranteed to be valid, so no check required -  
    # go ahead and print all the contents again, this time using 
    # the index and slice operators!
    print(buffer[0])                    # print first byte
    print(buffer[1:buffer.data_size])   # print remaining bytes

Numpy Integration

import momentumx as mx
import numpy as np

# Create a stream
stream = mx.Consumer('numpy_stream')

# Receive the next buffer (or if a producer, obtain the next_to_send buffer)
buffer = stream.receive()

# Create a numpy array directly from the memory without any copying
np_buff = np.frombuffer(buffer, dtype=uint8)

Implicit versus Explicit Buffer Release

MomentumX Consumers will, by default, automatically release a buffer under the covers once all references are destroyed. This promotes both usability and data integrity. However, there may be cases where the developer wants to utilize a different strategy and explicity control when buffers are released to the pool of available buffers.

stream = mx.Consumer('my_stream')

buffer = stream.receive()

# Access to buffer is safe!
buffer.read(10)

# Buffer is being returned back to available buffer pool. 
# Be sure you are truly done with your data!
buffer.release() 

# DANGER: DO NOT DO THIS! 
# All operations on a buffer after calling `release` are considered unsafe! 
# All safeguards have been removed and the memory is volatile!
buffer.read(10) 

Isolated Contexts

MomentumX allows for the usage of streams outside of /dev/shm (the default location). Pass the context kwarg pointing to a directory on the filesystem for both the Producer and all Consumer instances to create isolated contexts.

This option is useful if access to /dev/shm is unsuitable.

import momentumx as mx

# Create a producer attached to the context path /my/path
stream = mx.Producer('my_stream', ..., context='/my/path/')
...

# Create Consumer elsewhere attached to the same context of /my/path
stream = mx.Consumer('my_stream', context='/my/path/')

License

Captivation Software, LLC offers MomentumX under an Unlimited Use License to the United States Government, with all other parties subject to the GPL-3.0 License.

Inquiries / Requests

All inquiries and requests may be sent to opensource@captivation.us.

Copyright © 2022-2023 - Captivation Software, LLC.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

MomentumX-2.7.1.tar.gz (51.5 kB view hashes)

Uploaded Source

Built Distributions

MomentumX-2.7.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (225.7 kB view hashes)

Uploaded CPython 3.11 manylinux: glibc 2.17+ x86-64

MomentumX-2.7.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (206.8 kB view hashes)

Uploaded CPython 3.11 manylinux: glibc 2.17+ ARM64

MomentumX-2.7.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (225.8 kB view hashes)

Uploaded CPython 3.10 manylinux: glibc 2.17+ x86-64

MomentumX-2.7.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (206.8 kB view hashes)

Uploaded CPython 3.10 manylinux: glibc 2.17+ ARM64

MomentumX-2.7.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (225.8 kB view hashes)

Uploaded CPython 3.9 manylinux: glibc 2.17+ x86-64

MomentumX-2.7.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (206.8 kB view hashes)

Uploaded CPython 3.9 manylinux: glibc 2.17+ ARM64

MomentumX-2.7.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (225.5 kB view hashes)

Uploaded CPython 3.8 manylinux: glibc 2.17+ x86-64

MomentumX-2.7.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (206.5 kB view hashes)

Uploaded CPython 3.8 manylinux: glibc 2.17+ ARM64

MomentumX-2.7.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (230.6 kB view hashes)

Uploaded CPython 3.7m manylinux: glibc 2.17+ x86-64

MomentumX-2.7.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (214.6 kB view hashes)

Uploaded CPython 3.7m manylinux: glibc 2.17+ ARM64

MomentumX-2.7.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (230.6 kB view hashes)

Uploaded CPython 3.6m manylinux: glibc 2.17+ x86-64

MomentumX-2.7.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (214.4 kB view hashes)

Uploaded CPython 3.6m manylinux: glibc 2.17+ ARM64

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page