A library to stream data into real-time analytics pipelines
Project description
DataMux
DataMux is a library to stream data into real-time analytics pipelines. It provides the modes listed below.
- Proxy Mode: to interface and proxy live data from sensors
- Replay Mode: to replay stored data from datasets
- Simulate Mode: to stream guided/unguided mock data for testing
Installation
First, install datamux as a pip package.
pip install streaminghub-datamux
Initialization
Next, configure where datamux should look for data and metadata. We use $HOME/streaminghub
by default. This configuration will be stored at $HOME/.streaminghubrc
.
python -m streaminghub_datamux init --data_dir="$HOME/streaminghub" --meta_dir="$HOME/streaminghub"
Usage
Import and Setup
In your Python script, first import datamux as follows.
# import datamux
import streaminghub_datamux as dm
Next, instantiate the Datamux API. Here, you have two options:
# Option A - Local API (runs locally)
api = dm.API()
# Option B - Remote API (runs over a remote datamux server)
api = dm.RemoteAPI(rpc_name="<rpc>", codec_name="<codec>")
await api.connect(server_host="<host>", server_port=<port>)
Replay Recordings from Collections
# list all collections (each collection provides one or more streams)
collections = await api.list_collections()
# list all recordings (i.e., streams) in a collection, by id
streams = await api.list_collection_streams(collection_id="<id>")
# sample attributes of a stream (found in stream.attrs)
attrs = dict({"subject": "A", "session": "1", "task": "1"})
# queue to append received data
sink = asyncio.Queue()
# start gathering data into queue
ack = await api.replay_collection_stream(collection_id="<id>", stream_id="<id>", attrs, sink)
# each request is assigned a unique ID for later reference
assert ack.randseq is not None
# simply await the queue to read data
while True:
item = await sink.get()
# checking for end-of-stream
if item == util.END_OF_STREAM:
break
# once done, stop the task to avoid wasting resources
await api.stop_task(ack.randseq)
Proxy Live Streams from Devices
# list all nodes (each node provides one or more streams)
nodes = await api.list_live_nodes()
# list all devices (i.e., streams) in a node, by id
streams = await api.list_live_streams(node_id="<id>")
# sample attributes of a stream (found in stream.attrs)
attrs = dict({"subject": "A", "session": "1", "task": "1"})
# queue to append received data
sink = asyncio.Queue()
# start gathering data into queue
ack = await api.proxy_live_stream(node_id="<id>", stream_id="<id>", attrs, sink)
# each request is assigned a unique ID for later reference
assert ack.randseq is not None
# simply await the queue to read data
while True:
item = await sink.get()
# checking for end-of-stream
if item == util.END_OF_STREAM:
break
# once done, stop the task to avoid wasting resources
await api.stop_task(ack.randseq)
Start a Remote API
You can start a remote API using the command below.
python -m streaminghub_datamux serve -H "<host_name>" -p <port> -r <rpc_name> -c <codec_name>
For Developers
# clone streaminghub from git
git clone https://github.com/nirdslab/streaminghub.git
# cd into streaminghub/ directory
cd streaminghub/
# install the streaminghub_datamux/ folder as a pip package
python -m pip install -e streaminghub_datamux/
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
streaminghub_datamux-0.1.8.tar.gz
(21.8 kB
view hashes)
Built Distribution
Close
Hashes for streaminghub_datamux-0.1.8.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3740a30810828277ad5d7951f81eb709c4724bb6d6e199d077ac83b482f2344d |
|
MD5 | d92dd5cf64fd7e3346632941065a8101 |
|
BLAKE2b-256 | 990b40ae8fb450062cd38d00da38a298ecc5aa5311c463bc2cdbf46e781c8f05 |
Close
Hashes for streaminghub_datamux-0.1.8-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3d4dd2cd0d8d4643b29bc9a66dbd59eb12e5960c15d72f830e1a1983ddecf26d |
|
MD5 | 47aaa6422b80745ac561a6f5cb5c2048 |
|
BLAKE2b-256 | e01b868275743353e438d09b8d8fd5396b08af4cefad5a6d27083adff55eeb45 |