Skip to main content

Python Client for NebulaStream

Project description

Python CI Version License

NebulaStream Python Client

NebulaStream's python client is the first data science environment for NebulaStream. It allows users to...

  • use operators to execute queries in NebulaStream efficiently
  • perform data preparation tasks on data streams
  • train machine learning models with ease
  • integrate any python library

Example

As an example, we want to perform a data exploration task. First, we need to connect with NebulaStream and select the data stream "vehicles". This data stream contains data from all by NebulaStream registered vehicles in the berlin, including their current position, their speed, the total driven route, and vehicle type.

>>> import nespy as nes

>>> c = nes.Connection('127.0.0.1', 8081)
>>> vehicles = c.get_logical_stream("vehicles")
>>> vehicles
type id lat lon timestamp speed in km/h route (in meters)
6 1 52.513126 13.328347 1 51 27.000
4 2 52.513109 13.326388 1 25 5.35
6 3 52.512763 13.325291 1 62 10.000
9 4 52.512677 13.325964 1 51 10.000
3 5 52.512259 13.322778 1 29 1.89

What we want to do now is to find out the average speed over the last five minutes of all cars in berlin. Therefore, we only filter cars (type = 6) and apply a sliding window on these cars. We are going to set the window size on five minutes and the sliding size on every second. This means that we are computing the average speed over the last five minutes every second.

>>> vehicles = vehicles[vehicles["type"] == 6] 
>>> vehicles.sliding(size=5, size_unit="min", slide=1, slide_unit="sec").avg(on="value", name="avg_speed")
start end avg_speed
0 5 24
5 10 26
10 15 25

If we anaylize this data stream for an entire day we can find out how the speed changes throughout the day. We can also filter other vehicle types, e.g., underground, bus, trucks, and compare them to each other.

Additionally we can apply for example DBSCAN before using the window on each cluster. This way we find out which areas are prone to traffic jams at a specific time of day. We can write the DBSCAN algorithm by integrating other libaries. For this, we have to write a function like the following :

>>> def my_dbscan_function(current_tuple):
    # here comes the code for dbscan

Note:

Currently this function is the last function to be processed in the python pipeline. For this reason, we have to add the window functionality by hand right here too. Furthermore, we have to keep in mind that this function can only process one tuple at a time.

We use .process(my_dbscan_function) to apply dbscan and the window on our data stream:

>>> vehicles.process(my_dbscan_function)
cluster start stop avg_speed
1 0 5 52
2 0 5 25
2 0 5 17

Requirements

OS: Ubuntu 20.04+
Jupyter Notebook
Python 3+

Installation

  1. Install the python client with:
pip install nespy
  1. Run NebulaStream. There are two options: a) you start NebulaStream locally in CLion or b) you use docker.

a) Run NebulaStream with CLion

  1. Build NebulaStream. To build NebulaStream please follow this instruction.
  2. Start the Coordinator. If no parameters are configured the Coordinator starts with the default values. To configure the Coordinator please take a look here.
  3. Start the Worker. Just like the Coordinator the Worker starts with default values unless the parameters are configured. To configure the Worker please take a look here and to configure the sources take a look here.

b) Run the NebulaStream docker image

  1. Install docker (https://docs.docker.com/get-docker/)
  2. Pull the latest docker image with:
docker pull nebulastream/nes-executable-image:latest

or a specific version with:

docker pull nebulastream/nes-executable-image:<version_number>
  1. Run the latest docker image with:
docker run --network host --name nes nebulastream/nes-executable-image:latest

or

docker run --network host --name nes nebulastream/nes-executable-image:<version_number> 

The following additional parameters are optional (source):

Coordinator
parameters default value description
-serverIp 127.0.0.1
-coordinatorPort 4000 Set the RPC Server port of the Coordinator
-restPort 8081 Set the port exposed for REST endpoints
–numberOfSlots #processor in the host Set the number of computing slots in the Coordinator
–help Display help message
Worker
parameters default value description
–coordinatorIp 127.0.0.1 Set the server IP of the Coordinator to which the Worker should connect
–coordinatorPort 0 Set the RPC server Port of the Coordinator to which the Worker connected
–rpcPort 0 Set the RPC server of the Worker
–dataPort 0 Set the data port of the Worker
–sourceType Set the type of the Source (must be either CSVSource or DefaultSource)
–sourceConfig Set the configuration of the source (e.g., path to the CSV file)
–sourceFrequency Set the sampling frequency of the source
–physicalStreamName Set the physical name of the stream
–numberOfBuffersToProduce Set the number of buffers to produce
–logicalStreamName Set the logical stream name where this stream is added to
–parentId Set the parentId of the Worker node
–localWorkerIp 127.0.0.1 Set worker ip
–numberOfSlots #processor in the host Set the number of computing slots in the Worker
–help Display help message
  1. Open Jupyter Notebook, import the python client with "import nespy as nes", and get started

License

Apache License 2.0

Documentation

1. Connect with NebulaStream

2. Manage Data Streams

3. Process Data Streams

4. Plotting Data Streams with Nespy

5. Manage Queries

1.Connect with NebulaStream

Connection is a class that manages the overall connection to NebulaStream.

Connection(host, port)

Example:

>>> c = ds.Connection('127.0.0.1', 8081)

2. Managing Data Streams

After setting up the connection with NebulaStream the user can call different functions to gather information about the data stream itself and manage data streams.

Retrieving all Registered Logical Streams

Connection.get_all_logical_streams()

Example:

>>> c.get.all_logical_streams()

Retrieving all Physical Streams for a Logical Stream

Connection.get_all_physical_streams(logical_stream_name)

Example:

>>> c.get_all_physical_streams("default_logical")

Selecting a Specific Logical Stream

Connection.get_logical_stream(position_or_name)

Example:

>>> data_stream = c.get_logical_stream(0)

or

>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream
id value
0 1
1 1
2 1

Add Logical Stream

Connection.add_logical_stream(logical_stream_name, schema)

Example:

>>> schema = "Schema::create()->addField(\"test\",INT32);"
>>> c.add_logical_stream("new_logical_stream", schema)
{Success: True}

Update Logical Stream

Connection.update_logical_stream(logical_stream_name, schema)

Example:

>>> schema = "Schema::create()->addField(\"test\",INT32);"
>>> c.update_logical_stream("default_logical", schema):
{Success: True}

Delete Logical Stream

Connection.delete_logical_stream(logical_stream_name)

Example:

>>> c.delete_logical_stream("default_logical")
{Success: True}

3. Process Data Streams

Filter

data_stream[filter_predicate]

Examples:

>>> sensor[sensor["temperature"] > 3]
>>> sensor[sensor["temperature"] < 3]
>>> sensor[sensor["temperature"] <= 3]
>>> sensor[sensor["temperature"] >= 3]
>>> sensor[sensor["temperature"] == 3]
>>> sensor[sensor["temperature"] != 3]
>>> sensor[sensor["temperature"] > sensor["humidity"]
>>> sensor[(sensor["temperature"] > 3) & (sensor["temperature"] < 15)]
>>> sensor[(sensor["temperature"] < 3) | (sensor["temperature"] == 10)]

Map

data_stream[attribute_name] = new_value_for_this_attribute

Example:

>>> sensor["temperature"] = sensor["temperature"] + 0.01
>>> sensor["temperature"] = sensor["temperature"] - 0.01
>>> sensor["temperature"] = sensor["temperature"] * 0.01
>>> sensor["temperature"] = sensor["temperature"] / 0.01
>>> sensor["temperature"] = (sensor["sensor_a"] + sensor["sensor_b"]) / 2
>>> sensor["temperature"] = 0.01 * sensor["temperature"]

Select (in SQL)/Projection (in relational algebra)

For a single selection you only have to write the name of the desired attribute as a string into the brackets. For a multiple selection you write a list of attribute names as a string that you want to project.

data_stream[data_stream[attribute_name]]
data_stream[[attribute_name1, attribute_name2, attribute_name3]]

Example:

>>> cars[cars["car_id"]]
>>> cars[["car_id"]]
>>> cars[["car_id", "speed", "lat", "lon"]]

Rename

Rename only works if the operator before rename was a projection. It gives a temporary (not persisted!) name for the attribute.

data_stream.as({"old_name":"new_name"})

Example:

>>> cars[["car_id"]]
>>> cars.rename({car_id": "id"})

Windows

Windows allow to break a infinite data stream into finite data sets. As a result, we are then able to compute aggregations over these finite data sets. The following aggregation functions are supported:

  • sum(on, name)
  • count(on, name)
  • min(on, name)
  • max(on, name)

Warning : Do not try to rename the columns yet. This functionality isn't supported by NebulaStream yet! The avg function is supported by NebulaStream soon! Please be patient :)

parameter description default
on required declares on which attribute to aggregate over -
name optional name of the new column (not supported yet!) aggregation_attribute, e.g, sum_value

In this section, we will explain how to use keyed and non-keyed windows in our python client.

Keyed Windows vs. Global Windows
Keyed Windows

A key for a window can be any attribute of your data stream. With keyed windows we can run multiple different windows and window aggregatins on one data stream. Therefore, a keyed window splits up a single data stream into multiple data streams to enable parallel computation.

Global Windows

Global windows are non-keyed windows. These window do not split a data stream into multiple data streams and therefore, they cannot compute multiple different aggregation function in parallel.

Tumbling Windows
DataStream.tumbling(on, event, event_unit, size, size_unit, lateness, lateness_unit)
parameter description default value
on required key for the window, can be any attribute of the data stream, if not set this window is a global window None
event required value decides on which attribute in the data stream is the event time 'timestamp'
event_unit optional unit for the event, can only be 'min', 'sec', 'ms', and 'count' 'sec'
size required size of the window 0 (meaning no window)
size_unit required unit of the parameter size, can only be 'min', 'sec', 'ms', and 'count' 'sec'
lateness optional the allowed lateness of a tuple 0
lateness_unit optional unit of the lateness, can only be 'min', 'sec', 'ms', and 'count' 'sec'

Example:

>>> # keyed window
>>> shop.tumbling(on="value", size=10, size_unit="sec").sum(on="sales")

>>> # global window
>>> shop.tumbling(size=10, size_unit="sec").sum(on="sales")

Sliding Windows
DataStream.sliding(on, event, event_unit, size=0, size_unit, slide, slide_unit, lateness, lateness_unit)
parameter description default value
on optional key for the window, can be any attribute of the data stream, if not set this window is a global window None
event required value decides on which attribute in the data stream is the event time 'timestamp'
event_unit optional unit for the event, can only be 'min', 'sec', 'ms', and 'count' 'sec'
size required size of the window 0 (meaning no window)
size_unit required unit of the parameter size, can only be 'min', 'sec', 'ms', and 'count' 'sec'
slide required the slide value or the update rate of the window 0 (meaning no slide)
slide_unit required the unit of the slide value, can only be 'min', 'sec', 'ms', and 'count' 'sec'
lateness optional the allowed lateness of a tuple 0
lateness_unit optional unit of the lateness, can only be 'min', 'sec', 'ms', and 'count' 'sec'

Example:

>>> # keyed window
>>> shop.sliding(on="sales", size=10, size_unit="sec", slide=5, slide_unit="sec").sum(on="sales")

>>> # global window
>>> shop.sliding(size=10, size_unit="sec", slide=5, slide_unit="sec").sum(on="sales")

Include other Python libraries

Stream Processing
>>> def my_function(data):
>>>     #do something with a single tuple
>>>     return data
>>> data_stream.process(my_function)

Batch Processing with DataFrames

This function enables user to compute in batches over DataFrames. If on_time_over is set to True the batch function is triggered every time a tuple is added to the batch. Otherwise, we only trigger the batch function when the timeframe has reached it's time. Currently, we only support 'sec', 'min', and 'h' as the timeframe_unit.

>>> def my_function(data):
>>>     #do something with a DataFrame
>>>     return data
>>>     
>>> # We set the size of the buffer to 1 hour and only call the batch function my_function once after every hour.
>>> data_stream.batch(my_function, timeframe=1, timeframe_unit='h, on_time_over=True) 
>>> data_stream.batch(my_function, timeframe=1, timeframe_unit='h, on_time_over=False) 

Union

Union is the same as in relational algrebra and SQL. The schema of both data streams have to be the same.

DataStream.union(other_stream)

Example:

>>> bus = c.get_logical_stream("bus")
>>> cars = c.get_logical_stream("car")
>>> bus.union(car)

Join

Join works just like JOIN in SQL or the theta join in relational algebra. If both of the data streams have an attribute with the same name, we can set the parameter "on". Otherwise, we have to set the parameter "left_on" and "right_on".

DataStream.join(other_stream, on, left_on, right_on)

Example:

>>> position = get_logical_stream("car_position")
>>> details = get_logical_stream("car_details")
>>> position.join(details, left_on="id", right_on="car_id")

Sinks

Nespy offers different types of sources:

  • ZMQ Sink
  • Kafka Sink
  • File Sink
  • Print Sink

The default source is using ZMQ.

To select other sink use:

DataStream.set_sink(set_sink(sink, host, port, topic, brokers, timeout, path)
parameter description
sink required can only be "zmq", "kafka", "file" or "print"
host required for ZMQ Sink Host for ZMQ Sink
port required for ZMQ Sink Port for ZMQ Sink
topic required for Kafka Sink Topic for Kafka Sink
brokers required for Kafka Sink Brokers for Kafka Sink
timout required for Kafka Sink Timout for Kafka Sink
path required for File Sink Path to where the result should be saved

4. Plotting Data Streams with Nespy

TBD

5. Manage Queries

Stop a Running Query

DataStream.stop_query()

Example:

>>> data_stream.stop_query()

Reset All Operators of a Query

This function resets all operators saved for this variable and also stops the running query. Therefore, the user can reuse this variable for another query.

DataStream.reset_operators()

Example:

>>> data_stream_reset_operators()

Retrieving All Registered Queries

Connection.get_all_queries()

Example:

>>> c.get_all_queries()
{
            "edges": [
                {
                    "source": "UNDEFINED(OP-2)",
                    "target": "SINK(OP-3)"
                },
                {
                    "source": "SOURCE(OP-1)",
                    "target": "UNDEFINED(OP-2)"
                }
            ],
            "nodes": [
                {
                    "id": "3",
                    "name": "SINK(OP-3)",
                    "nodeType": "SINK"
                },
                {
                    "id": "2",
                    "name": "UNDEFINED(OP-2)",
                    "nodeType": "UNDEFINED"
                },
                {
                    "id": "1",
                    "name": "SOURCE(OP-1)",
                    "nodeType": "SOURCE"
                }
            ]
        }

Retrieving All Queries with a Status

Connection.get_queries_with_status(status)

Example:

>>> registered = c.get_queries_with_status("Registered")
>>> scheduling = c.get_queries_with_status("Scheduling")
>>> running = c.get_queries_with_status("Running")
>>> marked_for_stopped = c.get_queries_with_status("MarkedForStop")
>>> stopped = c.get_queries_with_status("Stopped")
>>> failed = c.get_queries_with_status("Failed")
>>> running
[null, "Query::from(\"default_logical\").project(Attribute(\"id\")).sink(PrintSinkDescriptor::create());"

Retrieve a Specific Query with a Status

Connection.get_queries_with_status_at_pos(status, pos)

Example:

>>> c.get_queries_with_status_at_pos("Running", 1)
Query::from("default_logical").project(Attribute("id")).sink(PrintSinkDescriptor::create());

Retrieving Execution Plan

Connection.get_query_execution_plan(queryId)

Example:

>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream["id"] 
id
0
1
2
>>> data_stream.get_query_execution_plan()
{
            "executionNodes": [
                {
                    "ScheduledQueries": [
                        {
                            "queryId": 1,
                            "querySubPlans": [
                                {
                                    "operator": "SINK(5)\n  PROJECTION(2, schema=default_logical$id:INTEGER )\n    "
                                                "SOURCE(1,default_logical)\n",
                                    "querySubPlanId": 1
                                }
                            ]
                        }
                    ],
                    "executionNodeId": 2,
                    "topologyNodeId": 2,
                    "topologyNodeIpAddress": "127.0.0.1"
                },
                {
                    "ScheduledQueries": [
                        {
                            "queryId": 1,
                            "querySubPlans": [
                                {
                                    "operator": "SINK(3)\n  SOURCE(4,)\n",
                                    "querySubPlanId": 2
                                }
                            ]
                        }
                    ],
                    "executionNodeId": 1,
                    "topologyNodeId": 1,
                    "topologyNodeIpAddress": "127.0.0.1"
                }
            ]
        }

Retrieving Query Plan

Connection.get_query_plan(queryId)

Example:

>>> data_stream = c.get_logical_stream("default_logical")
>>> data_stream["id"] 
id
0
1
2
>>> data_stream.get_query_plan(queryId)
{
            "edges": [
                {
                    "source": "UNDEFINED(OP-2)",
                    "target": "SINK(OP-3)"
                },
                {
                    "source": "SOURCE(OP-1)",
                    "target": "UNDEFINED(OP-2)"
                }
            ],
            "nodes": [
                {
                    "id": "3",
                    "name": "SINK(OP-3)",
                    "nodeType": "SINK"
                },
                {
                    "id": "2",
                    "name": "UNDEFINED(OP-2)",
                    "nodeType": "UNDEFINED"
                },
                {
                    "id": "1",
                    "name": "SOURCE(OP-1)",
                    "nodeType": "SOURCE"
                }
            ]
        }

Retrieve Topology

Connection.get_nes_topology()

Example:

>>> c.get_nes_topology()
{
    "edges": [
        {
            "source": 2,
            "target": 1
        }
    ],
    "nodes": [
        {
            "available_resources": 65535,
            "id": 1,
            "ip_address": "127.0.0.1"
        },
        {
            "available_resources": 8,
            "id": 2,
            "ip_address": "127.0.0.1"
        }
    ]
}

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

nespy-0.0.6.tar.gz (59.2 kB view hashes)

Uploaded Source

Built Distribution

nespy-0.0.6-py3-none-any.whl (53.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page