Python Client for NebulaStream
Project description
NebulaStream Python Client
NebulaStream's python client is the first data science environment for NebulaStream. It allows users to...
- use operators to execute queries in NebulaStream efficiently
- perform data preparation tasks on data streams
- train machine learning models with ease
- integrate any python library
Example
As an example, we want to perform a data exploration task. First, we need to connect with NebulaStream and select the data stream "vehicles". This data stream contains data from all by NebulaStream registered vehicles in the berlin, including their current position, their speed, the total driven route, and vehicle type.
>>> import nespy as nes
>>> c = nes.Connection('127.0.0.1', 8081)
>>> vehicles = c.get_logical_stream("vehicles")
>>> vehicles
type | id | lat | lon | timestamp | speed in km/h | route (in meters) |
---|---|---|---|---|---|---|
6 | 1 | 52.513126 | 13.328347 | 1 | 51 | 27.000 |
4 | 2 | 52.513109 | 13.326388 | 1 | 25 | 5.35 |
6 | 3 | 52.512763 | 13.325291 | 1 | 62 | 10.000 |
9 | 4 | 52.512677 | 13.325964 | 1 | 51 | 10.000 |
3 | 5 | 52.512259 | 13.322778 | 1 | 29 | 1.89 |
What we want to do now is to find out the average speed over the last five minutes of all cars in berlin. Therefore, we only filter cars (type = 6) and apply a sliding window on these cars. We are going to set the window size on five minutes and the sliding size on every second. This means that we are computing the average speed over the last five minutes every second.
>>> vehicles = vehicles[vehicles["type"] == 6]
>>> vehicles.sliding(size=5, size_unit="min", slide=1, slide_unit="sec").avg(on="value", name="avg_speed")
start | end | avg_speed |
---|---|---|
0 | 5 | 24 |
5 | 10 | 26 |
10 | 15 | 25 |
If we anaylize this data stream for an entire day we can find out how the speed changes throughout the day. We can also filter other vehicle types, e.g., underground, bus, trucks, and compare them to each other.
Additionally we can apply for example DBSCAN before using the window on each cluster. This way we find out which areas are prone to traffic jams at a specific time of day. We can write the DBSCAN algorithm by integrating other libaries. For this, we have to write a function like the following :
>>> def my_dbscan_function(current_tuple):
# here comes the code for dbscan
Note:
Currently this function is the last function to be processed in the python pipeline. For this reason, we have to add the window functionality by hand right here too. Furthermore, we have to keep in mind that this function can only process one tuple at a time.
We use .process(my_dbscan_function)
to apply dbscan and the window on our data stream:
>>> vehicles.process(my_dbscan_function)
cluster | start | stop | avg_speed |
---|---|---|---|
1 | 0 | 5 | 52 |
2 | 0 | 5 | 25 |
2 | 0 | 5 | 17 |
Requirements
OS: Ubuntu 20.04+
Jupyter Notebook
Python 3+
Installation
- Install the python client with:
pip install nespy
- Run NebulaStream. There are two options: a) you start NebulaStream locally in CLion or b) you use docker.
a) Run NebulaStream with CLion
- Build NebulaStream. To build NebulaStream please follow this instruction.
- Start the Coordinator. If no parameters are configured the Coordinator starts with the default values. To configure the Coordinator please take a look here.
- Start the Worker. Just like the Coordinator the Worker starts with default values unless the parameters are configured. To configure the Worker please take a look here and to configure the sources take a look here.
b) Run the NebulaStream docker image
- Install docker (https://docs.docker.com/get-docker/)
- Pull the latest docker image with:
docker pull nebulastream/nes-executable-image:latest
or a specific version with:
docker pull nebulastream/nes-executable-image:<version_number>
- Run the latest docker image with:
docker run --network host --name nes nebulastream/nes-executable-image:latest
or
docker run --network host --name nes nebulastream/nes-executable-image:<version_number>
The following additional parameters are optional (source):
Coordinator
parameters | default value | description |
---|---|---|
-serverIp | 127.0.0.1 | |
-coordinatorPort | 4000 | Set the RPC Server port of the Coordinator |
-restPort | 8081 | Set the port exposed for REST endpoints |
–numberOfSlots | #processor in the host | Set the number of computing slots in the Coordinator |
–help | Display help message |
Worker
parameters | default value | description |
---|---|---|
–coordinatorIp | 127.0.0.1 | Set the server IP of the Coordinator to which the Worker should connect |
–coordinatorPort | 0 | Set the RPC server Port of the Coordinator to which the Worker connected |
–rpcPort | 0 | Set the RPC server of the Worker |
–dataPort | 0 | Set the data port of the Worker |
–sourceType | Set the type of the Source (must be either CSVSource or DefaultSource) | |
–sourceConfig | Set the configuration of the source (e.g., path to the CSV file) | |
–sourceFrequency | Set the sampling frequency of the source | |
–physicalStreamName | Set the physical name of the stream | |
–numberOfBuffersToProduce | Set the number of buffers to produce | |
–logicalStreamName | Set the logical stream name where this stream is added to | |
–parentId | Set the parentId of the Worker node | |
–localWorkerIp | 127.0.0.1 | Set worker ip |
–numberOfSlots | #processor in the host | Set the number of computing slots in the Worker |
–help | Display help message |
- Open Jupyter Notebook, import the python client with "import nespy as nes", and get started
License
Documentation
- Retrieving all Registered Logical Streams
- Retrieving all Physical Streams for a Logical Stream
- Selecting a Specific Logical Stream
- Add Logical Stream
- Update Logical Stream
- Delete Logical Stream
- Stop a Running Query
- Retrieving All Registered Queries
- Retrieving All Queries with a Status
- Retrieve a Specific Query with a Status
- Retrieving Execution Plan
- Retrieving Query Plan
- Retrieve Topology
1.Connect with NebulaStream
Connection is a class that manages the overall connection to NebulaStream.
Connection(host, port)
Example:
>>> c = ds.Connection('127.0.0.1', 8081)
2. Managing Data Streams
After setting up the connection with NebulaStream the user can call different functions to gather information about the data stream itself and manage data streams.
Retrieving all Registered Logical Streams
Connection.get_all_logical_streams()
Example:
>>> c.get.all_logical_streams()
Retrieving all Physical Streams for a Logical Stream
Connection.get_all_physical_streams(logical_stream_name)
Example:
>>> c.get_all_physical_streams("default_logical")
Selecting a Specific Logical Stream
Connection.get_logical_stream(position_or_name)
Example:
>>> data_stream = c.get_logical_stream(0)
or
>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream
id | value |
---|---|
0 | 1 |
1 | 1 |
2 | 1 |
Add Logical Stream
Connection.add_logical_stream(logical_stream_name, schema)
Example:
>>> schema = "Schema::create()->addField(\"test\",INT32);"
>>> c.add_logical_stream("new_logical_stream", schema)
{Success: True}
Update Logical Stream
Connection.update_logical_stream(logical_stream_name, schema)
Example:
>>> schema = "Schema::create()->addField(\"test\",INT32);"
>>> c.update_logical_stream("default_logical", schema):
{Success: True}
Delete Logical Stream
Connection.delete_logical_stream(logical_stream_name)
Example:
>>> c.delete_logical_stream("default_logical")
{Success: True}
Process Data Streams
Filter
data_stream[filter_predicate]
Examples:
>>> sensor[sensor["temperature"] > 3]
>>> sensor[sensor["temperature"] < 3]
>>> sensor[sensor["temperature"] <= 3]
>>> sensor[sensor["temperature"] >= 3]
>>> sensor[sensor["temperature"] == 3]
>>> sensor[sensor["temperature"] != 3]
>>> sensor[sensor["temperature"] > sensor["humidity"]
>>> sensor[(sensor["temperature"] > 3) & (sensor["temperature"] < 15)]
>>> sensor[(sensor["temperature"] < 3) | (sensor["temperature"] == 10)]
Map
data_stream[attribute_name] = new_value_for_this_attribute
Example:
>>> sensor["temperature"] = sensor["temperature"] + 0.01
>>> sensor["temperature"] = sensor["temperature"] - 0.01
>>> sensor["temperature"] = sensor["temperature"] * 0.01
>>> sensor["temperature"] = sensor["temperature"] / 0.01
>>> sensor["temperature"] = (sensor["sensor_a"] + sensor["sensor_b"]) / 2
>>> sensor["temperature"] = 0.01 * sensor["temperature"]
Select (in SQL)/Projection (in relational algebra)
For a single selection you only have to write the name of the desired attribute as a string into the brackets. For a multiple selection you write a list of attribute names as a string that you want to project.
data_stream[data_stream[attribute_name]]
data_stream[[attribute_name1, attribute_name2, attribute_name3]]
Example:
>>> cars[cars["car_id"]]
>>> cars[["car_id"]]
>>> cars[["car_id", "speed", "lat", "lon"]]
Rename
Rename only works if the operator before rename was a projection. It gives a temporary (not persisted!) name for the attribute.
data_stream.rename("new_name")
Example:
>>> cars[["car_id"]]
>>> cars.rename("id")
Windows
Windows allow to break a infinite data stream into finite data sets. As a result, we are then able to compute aggregations over these finite data sets. The following aggregation functions are supported:
- sum(on, name)
- count(on, name)
- min(on, name)
- max(on, name)
Warning : Do not try to rename the columns yet. This functionality isn't supported by NebulaStream yet! The avg function is supported by NebulaStream soon! Please be patient :)
parameter | description | default | |
---|---|---|---|
on | required | declares on which attribute to aggregate over | - |
name | optional | name of the new column (not supported yet!) | aggregation_attribute, e.g, sum_value |
In this section, we will explain how to use keyed and non-keyed windows in our python client.
Keyed Windows vs. Global Windows
Keyed Windows
A key for a window can be any attribute of your data stream. With keyed windows we can run multiple different windows and window aggregatins on one data stream. Therefore, a keyed window splits up a single data stream into multiple data streams to enable parallel computation.
Global Windows
Global windows are non-keyed windows. These window do not split a data stream into multiple data streams and therefore, they cannot compute multiple different aggregation function in parallel.
Tumbling Windows
DataStream.tumbling(on, event, event_unit, size, size_unit, lateness, lateness_unit)
parameter | description | default value | |
---|---|---|---|
on | required | key for the window, can be any attribute of the data stream, if not set this window is a global window | None |
event | required | value decides on which attribute in the data stream is the event time | 'timestamp' |
event_unit | optional | unit for the event, can only be 'min', 'sec', 'ms', and 'count' | 'sec' |
size | required | size of the window | 0 (meaning no window) |
size_unit | required | unit of the parameter size, can only be 'min', 'sec', 'ms', and 'count' | 'sec' |
lateness | optional | the allowed lateness of a tuple | 0 |
lateness_unit | optional | unit of the lateness, can only be 'min', 'sec', 'ms', and 'count' | 'sec' |
Example:
>>> # keyed window
>>> shop.tumbling(on="value", size=10, size_unit="sec").sum(on="sales")
>>> # global window
>>> shop.tumbling(size=10, size_unit="sec").sum(on="sales")
Sliding Windows
DataStream.sliding(on, event, event_unit, size=0, size_unit, slide, slide_unit, lateness, lateness_unit)
parameter | description | default value | |
---|---|---|---|
on | optional | key for the window, can be any attribute of the data stream, if not set this window is a global window | None |
event | required | value decides on which attribute in the data stream is the event time | 'timestamp' |
event_unit | optional | unit for the event, can only be 'min', 'sec', 'ms', and 'count' | 'sec' |
size | required | size of the window | 0 (meaning no window) |
size_unit | required | unit of the parameter size, can only be 'min', 'sec', 'ms', and 'count' | 'sec' |
slide | required | the slide value or the update rate of the window | 0 (meaning no slide) |
slide_unit | required | the unit of the slide value, can only be 'min', 'sec', 'ms', and 'count' | 'sec' |
lateness | optional | the allowed lateness of a tuple | 0 |
lateness_unit | optional | unit of the lateness, can only be 'min', 'sec', 'ms', and 'count' | 'sec' |
Example:
>>> # keyed window
>>> shop.sliding(on="sales", size=10, size_unit="sec", slide=5, slide_unit="sec").sum(on="sales")
>>> # global window
>>> shop.sliding(size=10, size_unit="sec", slide=5, slide_unit="sec").sum(on="sales")
Include other Python libraries
Stream Processing
>>> def my_function(data):
>>> #do something with a single tuple
>>> return data
>>> data_stream.process(my_function)
Batch Processing with DataFrames
This function enables user to compute in batches over DataFrames. If on_time_over is set to True the batch function is triggered every time a tuple is added to the batch. Otherwise, we only trigger the batch function when the timeframe has reached it's time. Currently, we only support 'sec', 'min', and 'h' as the timeframe_unit.
>>> def my_function(data):
>>> #do something with a DataFrame
>>> return data
>>>
>>> # We set the size of the buffer to 1 hour and only call the batch function my_function once after every hour.
>>> data_stream.batch(my_function, timeframe=1, timeframe_unit='h, on_time_over=True)
>>> data_stream.batch(my_function, timeframe=1, timeframe_unit='h, on_time_over=False)
Union
Union is the same as in relational algrebra and SQL. The schema of both data streams have to be the same.
DataStream.union(other_stream)
Example:
>>> bus = c.get_logical_stream("bus")
>>> cars = c.get_logical_stream("car")
>>> bus.union(car)
Join
Join works just like JOIN in SQL or the theta join in relational algebra. If both of the data streams have an attribute with the same name, we can set the parameter "on". Otherwise, we have to set the parameter "left_on" and "right_on".
DataStream.join(other_stream, on, left_on, right_on)
Example:
>>> position = get_logical_stream("car_position")
>>> details = get_logical_stream("car_details")
>>> position.join(details, left_on="id", right_on="car_id")
Manage Queries
Stop a Running Query
DataStream.stop_query()
Example:
>>> data_stream.stop_query()
Reset All Operators of a Query
This function resets all operators saved for this variable and also stops the running query. Therefore, the user can reuse this variable for another query.
DataStream.reset_operators()
Example:
>>> data_stream_reset_operators()
Retrieving All Registered Queries
Connection.get_all_queries()
Example:
>>> c.get_all_queries()
{
"edges": [
{
"source": "UNDEFINED(OP-2)",
"target": "SINK(OP-3)"
},
{
"source": "SOURCE(OP-1)",
"target": "UNDEFINED(OP-2)"
}
],
"nodes": [
{
"id": "3",
"name": "SINK(OP-3)",
"nodeType": "SINK"
},
{
"id": "2",
"name": "UNDEFINED(OP-2)",
"nodeType": "UNDEFINED"
},
{
"id": "1",
"name": "SOURCE(OP-1)",
"nodeType": "SOURCE"
}
]
}
Retrieving All Queries with a Status
Connection.get_queries_with_status(status)
Example:
>>> registered = c.get_queries_with_status("Registered")
>>> scheduling = c.get_queries_with_status("Scheduling")
>>> running = c.get_queries_with_status("Running")
>>> marked_for_stopped = c.get_queries_with_status("MarkedForStop")
>>> stopped = c.get_queries_with_status("Stopped")
>>> failed = c.get_queries_with_status("Failed")
>>> running
[null, "Query::from(\"default_logical\").project(Attribute(\"id\")).sink(PrintSinkDescriptor::create());"
Retrieve a Specific Query with a Status
Connection.get_queries_with_status_at_pos(status, pos)
Example:
>>> c.get_queries_with_status_at_pos("Running", 1)
Query::from("default_logical").project(Attribute("id")).sink(PrintSinkDescriptor::create());
Retrieving Execution Plan
Connection.get_query_execution_plan(queryId)
Example:
>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream["id"]
id |
---|
0 |
1 |
2 |
>>> data_stream.get_query_execution_plan()
{
"executionNodes": [
{
"ScheduledQueries": [
{
"queryId": 1,
"querySubPlans": [
{
"operator": "SINK(5)\n PROJECTION(2, schema=default_logical$id:INTEGER )\n "
"SOURCE(1,default_logical)\n",
"querySubPlanId": 1
}
]
}
],
"executionNodeId": 2,
"topologyNodeId": 2,
"topologyNodeIpAddress": "127.0.0.1"
},
{
"ScheduledQueries": [
{
"queryId": 1,
"querySubPlans": [
{
"operator": "SINK(3)\n SOURCE(4,)\n",
"querySubPlanId": 2
}
]
}
],
"executionNodeId": 1,
"topologyNodeId": 1,
"topologyNodeIpAddress": "127.0.0.1"
}
]
}
Retrieving Query Plan
Connection.get_query_plan(queryId)
Example:
>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream["id"]
id |
---|
0 |
1 |
2 |
>>> data_stream.get_query_plan(queryId)
{
"edges": [
{
"source": "UNDEFINED(OP-2)",
"target": "SINK(OP-3)"
},
{
"source": "SOURCE(OP-1)",
"target": "UNDEFINED(OP-2)"
}
],
"nodes": [
{
"id": "3",
"name": "SINK(OP-3)",
"nodeType": "SINK"
},
{
"id": "2",
"name": "UNDEFINED(OP-2)",
"nodeType": "UNDEFINED"
},
{
"id": "1",
"name": "SOURCE(OP-1)",
"nodeType": "SOURCE"
}
]
}
Retrieve Topology
Connection.get_nes_topology()
Example:
>>> c.get_nes_topology()
{
"edges": [
{
"source": 2,
"target": 1
}
],
"nodes": [
{
"available_resources": 65535,
"id": 1,
"ip_address": "127.0.0.1"
},
{
"available_resources": 8,
"id": 2,
"ip_address": "127.0.0.1"
}
]
}
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.