Python Client for NebulaStream

Project description

Python CI Version License

NebulaStream Python Client

NebulaStream's python client is the first data science environment for NebulaStream. It allows users to...

  • use operators to execute queries in NebulaStream efficiently
  • perform data preparation tasks on data streams
  • train machine learning models with ease
  • integrate any python library


As an example, we want to perform a data exploration task. First, we need to connect with NebulaStream and select the data stream "vehicles". This data stream contains data from all by NebulaStream registered vehicles in the berlin, including their current position, their speed, the total driven route, and vehicle type.

>>> import nespy as nes

>>> c = nes.Connection('', 8081)
>>> vehicles = c.get_logical_stream("vehicles")
>>> vehicles
type id lat lon timestamp speed in km/h route (in meters)
6 1 52.513126 13.328347 1 51 27.000
4 2 52.513109 13.326388 1 25 5.35
6 3 52.512763 13.325291 1 62 10.000
9 4 52.512677 13.325964 1 51 10.000
3 5 52.512259 13.322778 1 29 1.89

What we want to do now is to find out the average speed over the last five minutes of all cars in berlin. Therefore, we only filter cars (type = 6) and apply a sliding window on these cars. We are going to set the window size on five minutes and the sliding size on every second. This means that we are computing the average speed over the last five minutes every second.

>>> vehicles = vehicles[vehicles["type"] == 6] 
>>> vehicles.sliding(size=5, size_unit="min", slide=1, slide_unit="sec").avg(on="value", name="avg_speed")
start end avg_speed
0 5 24
5 10 26
10 15 25

If we anaylize this data stream for an entire day we can find out how the speed changes throughout the day. We can also filter other vehicle types, e.g., underground, bus, trucks, and compare them to each other.

Additionally we can apply for example DBSCAN before using the window on each cluster. This way we find out which areas are prone to traffic jams at a specific time of day. We can write the DBSCAN algorithm by integrating other libaries. For this, we have to write a function like the following :

>>> def my_dbscan_function(current_tuple):
    # here comes the code for dbscan


Currently this function is the last function to be processed in the python pipeline. For this reason, we have to add the window functionality by hand right here too. Furthermore, we have to keep in mind that this function can only process one tuple at a time.

We use .process(my_dbscan_function) to apply dbscan and the window on our data stream:

>>> vehicles.process(my_dbscan_function)
cluster start stop avg_speed
1 0 5 52
2 0 5 25
2 0 5 17


OS: Ubuntu 20.04+
Jupyter Notebook
Python 3+


  1. Install the python client with:
pip install nespy
  1. Run NebulaStream. There are two options: a) you start NebulaStream locally in CLion or b) you use docker.

a) Run NebulaStream with CLion

  1. Build NebulaStream. To build NebulaStream please follow this instruction.
  2. Start the Coordinator. If no parameters are configured the Coordinator starts with the default values. To configure the Coordinator please take a look here.
  3. Start the Worker. Just like the Coordinator the Worker starts with default values unless the parameters are configured. To configure the Worker please take a look here and to configure the sources take a look here.

b) Run the NebulaStream docker image

  1. Install docker (
  2. Pull the latest docker image with:
docker pull nebulastream/nes-executable-image:latest

or a specific version with:

docker pull nebulastream/nes-executable-image:<version_number>
  1. Run the latest docker image with:
docker run --network host --name nes nebulastream/nes-executable-image:latest


docker run --network host --name nes nebulastream/nes-executable-image:<version_number> 

The following additional parameters are optional (source):

parameters default value description
-coordinatorPort 4000 Set the RPC Server port of the Coordinator
-restPort 8081 Set the port exposed for REST endpoints
–numberOfSlots #processor in the host Set the number of computing slots in the Coordinator
–help Display help message
parameters default value description
–coordinatorIp Set the server IP of the Coordinator to which the Worker should connect
–coordinatorPort 0 Set the RPC server Port of the Coordinator to which the Worker connected
–rpcPort 0 Set the RPC server of the Worker
–dataPort 0 Set the data port of the Worker
–sourceType Set the type of the Source (must be either CSVSource or DefaultSource)
–sourceConfig Set the configuration of the source (e.g., path to the CSV file)
–sourceFrequency Set the sampling frequency of the source
–physicalStreamName Set the physical name of the stream
–numberOfBuffersToProduce Set the number of buffers to produce
–logicalStreamName Set the logical stream name where this stream is added to
–parentId Set the parentId of the Worker node
–localWorkerIp Set worker ip
–numberOfSlots #processor in the host Set the number of computing slots in the Worker
–help Display help message
  1. Open Jupyter Notebook, import the python client with "import nespy as nes", and get started


Apache License 2.0


1. Connect with NebulaStream

2. Manage Data Streams

3. Process Data Streams

4. Manage Queries

1.Connect with NebulaStream

Connection is a class that manages the overall connection to NebulaStream.

Connection(host, port)


>>> c = ds.Connection('', 8081)

2. Managing Data Streams

After setting up the connection with NebulaStream the user can call different functions to gather information about the data stream itself and manage data streams.

Retrieving all Registered Logical Streams



>>> c.get.all_logical_streams()

Retrieving all Physical Streams for a Logical Stream



>>> c.get_all_physical_streams("default_logical")

Selecting a Specific Logical Stream



>>> data_stream = c.get_logical_stream(0)


>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream
id value
0 1
1 1
2 1

Add Logical Stream

Connection.add_logical_stream(logical_stream_name, schema)


>>> schema = "Schema::create()->addField(\"test\",INT32);"
>>> c.add_logical_stream("new_logical_stream", schema)
{Success: True}

Update Logical Stream

Connection.update_logical_stream(logical_stream_name, schema)


>>> schema = "Schema::create()->addField(\"test\",INT32);"
>>> c.update_logical_stream("default_logical", schema):
{Success: True}

Delete Logical Stream



>>> c.delete_logical_stream("default_logical")
{Success: True}

Process Data Streams




>>> sensor[sensor["temperature"] > 3]
>>> sensor[sensor["temperature"] < 3]
>>> sensor[sensor["temperature"] <= 3]
>>> sensor[sensor["temperature"] >= 3]
>>> sensor[sensor["temperature"] == 3]
>>> sensor[sensor["temperature"] != 3]
>>> sensor[sensor["temperature"] > sensor["humidity"]
>>> sensor[(sensor["temperature"] > 3) & (sensor["temperature"] < 15)]
>>> sensor[(sensor["temperature"] < 3) | (sensor["temperature"] == 10)]


data_stream[attribute_name] = new_value_for_this_attribute


>>> sensor["temperature"] = sensor["temperature"] + 0.01
>>> sensor["temperature"] = sensor["temperature"] - 0.01
>>> sensor["temperature"] = sensor["temperature"] * 0.01
>>> sensor["temperature"] = sensor["temperature"] / 0.01
>>> sensor["temperature"] = (sensor["sensor_a"] + sensor["sensor_b"]) / 2
>>> sensor["temperature"] = 0.01 * sensor["temperature"]

Select (in SQL)/Projection (in relational algebra)

For a single selection you only have to write the name of the desired attribute as a string into the brackets. For a multiple selection you write a list of attribute names as a string that you want to project.

data_stream[[attribute_name1, attribute_name2, attribute_name3]]


>>> cars[cars["car_id"]]
>>> cars[["car_id"]]
>>> cars[["car_id", "speed", "lat", "lon"]]




>>> cars[["car_id"]]
>>> cars.rename("id")


Windows allow to break a infinite data stream into finite data sets. As a result, we are then able to compute aggregations over these finite data sets. The following aggregation functions are supported:

  • sum(on, name)
  • div(on, name)
  • count(on, name)
  • avg(on, name)
  • min(on, name)
  • max(on, name)
parameter description default
on required declares on which attribute to aggregate over -
name optional name of the new column aggregation_attribute, e.g, sum_value

In this section, we will explain how to use keyed and non-keyed windows in our python client.

Keyed Windows vs. Global Windows
Keyed Windows

A key for a window can be any attribute of your data stream. With keyed windows we can run multiple different windows and window aggregatins on one data stream. Therefore, a keyed window splits up a single data stream into multiple data streams to enable parallel computation.

Global Windows

Global windows are non-keyed windows. These window do not split a data stream into multiple data streams and therefore, they cannot compute multiple different aggregation function in parallel.

Tumbling Windows
DataStream.tumbling(on, event, event_unit, size, size_unit, lateness, lateness_unit)
parameter description default value
on required key for the window, can be any attribute of the data stream, if not set this window is a global window None
event required value decides on which attribute in the data stream is the event time 'timestamp'
event_unit optional unit for the event, can only be 'min', 'sec', 'ms', and 'count' 'sec'
size required size of the window 0 (meaning no window)
size_unit required unit of the parameter size, can only be 'min', 'sec', 'ms', and 'count' 'sec'
lateness optional the allowed lateness of a tuple 0
lateness_unit optional unit of the lateness, can only be 'min', 'sec', 'ms', and 'count' 'sec'


>>> # keyed window
>>> shop.tumbling(on="value", size=10, size_unit="sec").sum(on="sales")

>>> # global window
>>> shop.tumbling(size=10, size_unit="sec").sum(on="sales")

Sliding Windows
DataStream.sliding(on, event, event_unit, size=0, size_unit, slide, slide_unit, lateness, lateness_unit)
parameter description default value
on optional key for the window, can be any attribute of the data stream, if not set this window is a global window None
event required value decides on which attribute in the data stream is the event time 'timestamp'
event_unit optional unit for the event, can only be 'min', 'sec', 'ms', and 'count' 'sec'
size required size of the window 0 (meaning no window)
size_unit required unit of the parameter size, can only be 'min', 'sec', 'ms', and 'count' 'sec'
slide required the slide value or the update rate of the window 0 (meaning no slide)
slide_unit required the unit of the slide value, can only be 'min', 'sec', 'ms', and 'count' 'sec'
lateness optional the allowed lateness of a tuple 0
lateness_unit optional unit of the lateness, can only be 'min', 'sec', 'ms', and 'count' 'sec'


>>> # keyed window
>>> shop.sliding(on="sales", size=10, size_unit="sec", slide=5, slide_unit="sec").sum(on="sales")

>>> # global window
>>> shop.sliding(size=10, size_unit="sec", slide=5, slide_unit="sec").sum(on="sales")

Include other Python libraries

Stream Processing
>>> def my_function(data):
>>>     #do something with a single tuple
>>>     return data
>>> data_stream.process(my_function)

Batch Processing with DataFrames

This function enables user to compute in batches over DataFrames. If on_time_over is set the batch function is triggered every time a tuple is added to the batch. Otherwise, we only trigger the batch function when the timeframe has reached it's time. Currently, we only support 'sec', 'min', and 'h' as the timeframe_unit.

>>> def my_function(data):
>>>     #do something with a DataFrame
>>>     return data
>>> # We set the size of the buffer to 1 hour and only call the batch function my_function once after every hour.
>>> data_stream.batch(my_function, timeframe=1, timeframe_unit='h, on_time_over=BatchMode.ON_TIME_OVER) 
>>> data_stream.batch(my_function, timeframe=1, timeframe_unit='h, on_time_over=BatchMode.ON_CHANGE) 


Union is the same as in relational algrebra and SQL. The schema of both data streams have to be the same.



>>> bus = c.get_logical_stream("bus")
>>> cars = c.get_logical_stream("car")
>>> bus.union(car)


Join works just like JOIN in SQL or the theta join in relational algebra. If both of the data streams have an attribute with the same name, we can set the parameter "on". Otherwise, we have to set the parameter "left_on" and "right_on".

DataStream.join(other_stream, on, left_on, right_on)


>>> position = get_logical_stream("car_position")
>>> details = get_logical_stream("car_details")
>>> position.join(details, left_on="id", right_on="car_id")

Manage Queries

Stop a Running Query



>>> data_stream.stop_query()

Reset All Operators of a Query

This function resets all operators saved for this variable and also stops the running query. Therefore, the user can reuse this variable for another query.



>>> data_stream_reset_operators()

Retrieving All Registered Queries



>>> c.get_all_queries()
            "edges": [
                    "source": "UNDEFINED(OP-2)",
                    "target": "SINK(OP-3)"
                    "source": "SOURCE(OP-1)",
                    "target": "UNDEFINED(OP-2)"
            "nodes": [
                    "id": "3",
                    "name": "SINK(OP-3)",
                    "nodeType": "SINK"
                    "id": "2",
                    "name": "UNDEFINED(OP-2)",
                    "nodeType": "UNDEFINED"
                    "id": "1",
                    "name": "SOURCE(OP-1)",
                    "nodeType": "SOURCE"

Retrieving All Queries with a Status



>>> registered = c.get_queries_with_status("Registered")
>>> scheduling = c.get_queries_with_status("Scheduling")
>>> running = c.get_queries_with_status("Running")
>>> marked_for_stopped = c.get_queries_with_status("MarkedForStop")
>>> stopped = c.get_queries_with_status("Stopped")
>>> failed = c.get_queries_with_status("Failed")
>>> running
[null, "Query::from(\"default_logical\").project(Attribute(\"id\")).sink(PrintSinkDescriptor::create());"

Retrieve a Specific Query with a Status

Connection.get_queries_with_status_at_pos(status, pos)


>>> c.get_queries_with_status_at_pos("Running", 1)

Retrieving Execution Plan



>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream["id"] 
>>> data_stream.get_query_execution_plan()
            "executionNodes": [
                    "ScheduledQueries": [
                            "queryId": 1,
                            "querySubPlans": [
                                    "operator": "SINK(5)\n  PROJECTION(2, schema=default_logical$id:INTEGER )\n    "
                                    "querySubPlanId": 1
                    "executionNodeId": 2,
                    "topologyNodeId": 2,
                    "topologyNodeIpAddress": ""
                    "ScheduledQueries": [
                            "queryId": 1,
                            "querySubPlans": [
                                    "operator": "SINK(3)\n  SOURCE(4,)\n",
                                    "querySubPlanId": 2
                    "executionNodeId": 1,
                    "topologyNodeId": 1,
                    "topologyNodeIpAddress": ""

Retrieving Query Plan



>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream["id"] 
>>> data_stream.get_query_plan(queryId)
            "edges": [
                    "source": "UNDEFINED(OP-2)",
                    "target": "SINK(OP-3)"
                    "source": "SOURCE(OP-1)",
                    "target": "UNDEFINED(OP-2)"
            "nodes": [
                    "id": "3",
                    "name": "SINK(OP-3)",
                    "nodeType": "SINK"
                    "id": "2",
                    "name": "UNDEFINED(OP-2)",
                    "nodeType": "UNDEFINED"
                    "id": "1",
                    "name": "SOURCE(OP-1)",
                    "nodeType": "SOURCE"

Retrieve Topology



>>> c.get_nes_topology()
    "edges": [
            "source": 2,
            "target": 1
    "nodes": [
            "available_resources": 65535,
            "id": 1,
            "ip_address": ""
            "available_resources": 8,
            "id": 2,
            "ip_address": ""

