Real-time pipeline 4 analytics
Project description
PipeRT2
PipeRT2 is an infrastructure for data processing with the ability of handling a high flow rate.
Design a complex dataflow dynamically can be done using PipeRT2. With a simple implementation of pipe's components a full dataflow can be dispatched.
Table of contents
- Requirements
- Components
- Installation
- Getting Started
- Advanced
- The Events Mechanism
- Custom Events
- Using The Cockpit
- Running via RPC CLI
- Running via API
- Synchroniser
- Constant FPS
- Unlink Routines
- FAQ
- Contributing
Requirements
- Python 3.6
Components
Routine - The smallest component in the pipe.
- Each routine has to implement a
main_logic
function that contains the business logic of the routine.
Flow - Contains multiple routines with the same context.
Each flow within the pipe runs as a seperate process. Utilizing this correctly will improve the pipes performence.
For example, if you have a pipe that includes multiple CPU heavy operations, it is better to seperate them into different routines within different flows. Doing so will maximize your pipes performence.
Pipe - Controls the different elements and aspects of the system. Contains all flows. Distributing events through all components.
Installation
We publish PipeRT2 as PipeRT
package in PyPi.
Run pip3 install PipeRT
for installing the official PipeRT2 stable version.
Getting Started
For example, we're going to create a pipe which contains simple flows with very simple routines.
The First step is to create a 'FPSRoutine', which will be responsible for generating data inside our pipeline. We create the source class that generates data:
from pipert2 import FPSRoutine
class GenerateData(FPSRoutine):
def main_logic(self) -> dict:
return {
"value": "example"
}
Then we create the destination routine (which is also FPSRoutine) to store (in our case print) the pipeline's result:
from pipert2 import FPSRoutine
class PrintResult(FPSRoutine):
def main_logic(self, data: dict) -> None:
print(data["value"])
Now we create new pipe that contains a flow made by those two routines:
from pipert2 import Pipe, QueueNetwork
from pipert2.utils.consts.event_names import START_EVENT_NAME, KILL_EVENT_NAME
# Creating the pipe.
example_pipe = Pipe()
# Create an instance of each routine.
generate_data_routine = GenerateData()
print_result_routine = PrintResult()
# Create a flow with the required routines.
example_pipe.create_flow("example_flow", True, generate_data_routine, print_result_routine)
# Build the pipe.
example_pipe.build()
# Run the pipe.
example_pipe.notify_event(START_EVENT_NAME)
# Force all the pipe's routines stop.
example_pipe.notify_event(KILL_EVENT_NAME)
For connecting routines in a different order we use example_pipe.link
function, for example:
example_pipe.create_flow("example_flow", False, generate_data_routine, print_result_routine)
example_pipe.link(Wire(source=generate_data_routine, destinations=(print_result_routine,)))
For triggering an event for a specific flow or routine we add a dictionary of the required specific flows and routines:
- for example trigger all routines in
example_flow
:example_pipe.notify_event(START_EVENT_NAME, {"example_flow": []})
- for example trigger specific routines in
example_flow
:example_pipe.notify_event(START_EVENT_NAME, {"example_flow": [generate_data_routine.name, print_result_routine.name]})
Custom Data Types
Instead of using the Data
class to pass arguments throughout the pipe's routines, you can create custom class that will inherit from Data
with your own parameters.
For example:
class Example(Data):
def __init__(self):
self.custom_param = "custom param"
class SrcRoutine(FPSRoutine):
def main_logic(self) -> Example:
return Example()
class MidRoutine(FPSRoutine):
def main_logic(self, example: Example) -> Example:
print(example.custom_param) // output -> "custom param"
example.custom_param = "change"
return example
class DstRoutine(FPSRoutine):
def main_logic(self, example):
print(example.custom_param) // output -> "change"
Advanced
The Routine
When inhereting the base routine class, there are 3 main functions to extend upon.
###The first:
main_logic
- The
main_logic
function acts as the core of the routine. Each routine has to implement this method in order for it to work. - The
main_logic
function occurs each time new data is being received from another routine. A routine that generates data will have itsmain_logic
executed whenever possible. - The
main_logic
function can serve a few purposes according to the routines need:- It can receive data from another routine which is linked to current routine
- It can generate new data and send it to another routine
- It can get data and return nothing (f.e - saving to file/send to remote API/etc..)
def main_logic(self, data: Data = None) -> Optional[Data]:
###The second:
setup
The setup
function of a routine happens right before the routine starts working.
The setup
function should be used to set a starting state for your routine.
For example: opening a file, setting up a socket for a stream, resetting attributes of the routine, etc...
###The third:
cleanup
The cleanup
function acts as the counterpart to the setup
.
The cleanup
function should be used to clean any resources left used by the routine.
For example: releasing a file reader, closing a socket, etc...
The Events Mechanism
Events within the pipe can change its behaviour in real time.
Events can be called with the Pipe
or Routine
objects using the notify_event
function in the following syntax:
# Notifies all of the flows within the pipe with the given event.
example_pipe.notify_event(<Event_name>)
# Notifies a specific flow or flows with the given event.
example_pipe.notify_event(<Event_name>, {<Flow_name1>: [], <Flow_name2>: []...})
# Notifies only specified routines with the given event.
example_pipe.notify_event(<Event_name>, {<Flow_name1>: [<routine_name1>, <routine_name2>...]})
# Same applies for routine except
class SomeRoutine(FPSRoutine):
...
def SomeFunc(self):
# In order to notify event within the routine
self.notify_event(<Event_name>, {<Flow_name1>: [<routine_name1>, <routine_name2>...]})
# Same syntax used in notify_event of the pipe
# Or alternatively
some_routine = SomeRoutine()
some_routine.notify_event(<Event_name>, {<Flow_name1>: [<routine_name1>, <routine_name2>...]})
The pipe package has a few builtin events already implemented, those events are:
- STOP_EVENT_NAME: Stops the specified routines.
- KILL_EVENT_NAME: Force stops the specified routines.
- START_EVENT_NAME: Starts the specified routines.
Custom Events
When writing your routines, you can implement your own events to issue custom behaviour.
Here is an example routine that has two custom events:
class SomeRoutine(FPSRoutine):
def __init__(self, name):
super().__init__(name)
self.cap = None
# This event causes the routine to set its opencv reader.
@events("CUSTOM_EVENT_NAME")
def some_func(self):
# Some logic
To call the new events notify_event
is used just like any other event:
from pipert2 import Pipe
from pipert2.utils.consts.event_names import START_EVENT_NAME
# Creating the pipe.
example_pipe = Pipe()
# Create an instance of each routine.
some_routine = SomeRoutine("some_routine")
print_result_routine = PrintResult()
# Create a flow with the required routines.
example_pipe.create_flow("example_flow", True, some_routine, print_result_routine)
# Notify the custom event
example_pipe.notify_event("CUSTOM_EVENT_NAME", {"example_flow": ["some_routine"]}, example_param1="some_value1", example_param2="some_value2"...)
# Start the pipe
example_pipe.notify_event(START_EVENT_NAME)
Using The Cockpit
Installation
To install the Cockpit pipeline integration along with its dependencies, use the following command:
pip install PipeRT[cockpit]
OR (if the command above doesn't work):
pip install 'PipeRT[cockpit]'
Usage
(Before you get started, make sure you have an instance of the cockpit up and running. For more information visit the PipeRT-Cockpit repository)
In order for the pipe to be able to communicate with the cockpit a few things must be done.
First create a .env
file with the following contents:
SOCKET_LOGGER_URL="<cockpit url here (usually http://localhost:3000 if on the same system)>/api/socketio"
After that your pipes default logger with the socket logger like so:
from pipert2 import Pipe
from pipert2.utils.socketio_logger.socket_logger import get_socket_logger
# logger level indicates what logs will be sent, if logging.INFO is provided info logs and above will be sent and so on.
example_pipe = Pipe(logger=get_socket_logger("<desired base name here>", <logger level>))
And that's it!
After that your pipe will send its logs to the cockpit!
Running via RPC CLI
Installation
To install the pipeline RPC Server along with its dependencies, use the following command:
pip install PipeRT[rpc]
OR (if the command above doesn't work):
pip install 'PipeRT[rpc]'
Usage
Firstly, in order to use this capability, you need to install the optional package via pip install PipeRT[rpc]
The next step is running the RPC Server:
rpc_pipe = Pipe()
rpc_server = RPCPipeWrapper(pipe)
rpc_server.run_rpc_server(endpoint="<end_point>")
You can easily connect to the RPC server via Python and CLI following the example in the ZeroRPC's page
In order to execute pipe events you need to run the execute
function of the server.
Arguments to pipe events are passed in a JSON format:
- for example via python:
client.execute('start') # no arguments example client.execute('join', '{"to_kill":true}') # including arguments example
- for example via CLI:
zerorpc tcp://0.0.0.0:1234 execute start
zerorpc tcp://0.0.0.0:1234 execute join '{"to_kill":true}'
Running via API
Installation
To install the pipeline REST api along with its dependencies, use the following command:
pip install PipeRT[api]
OR (if the command above doesn't work):
pip install 'PipeRT[api]'
Usage
After creating a pipeline, you need to call run_api_wrapper with your host and port:
pipe = Pipe()
...
api_wrapper = APIWrapper(host="<host>", port=<port>, pipe=pipe)
api_wrapper.run()
In order to execute pipe events you need to execute GET
http calls for your_host:your_port
address.
-
To start the pipe, use route:
your_host:your_port/start
-
To pause the pipe, use route:
your_host:your_port/pause
-
To kill the pipe and kill the API server, use route:
your_host:your_port/kill
-
To start or stop a specific routine, use route:
- Start:
<your_host>:<your_port>/routines/<routine_name>/events/<start>/execute
- Stop:
<your_host>:<your_port>/routines/<routine_name>/events/<stop>/execute
- Start:
-
To trigger a custom event for all of the routines, use route:
<your_host>:<your_port>/routines/events/<event_name>/execute
-
To trigger a custom event on a specific routine, use route:
<your_host>:<your_port>/routines/<routine_name>/events/<custom_event>/execute
-
For add additional arguments, add it to the body of the request as json as:
{
"extra_args": {
"param1": value, "param2": value
}
}
Synchroniser
In the pipe there is a synchronising mechanism which is used to synchronise the routine's FPS. This mechanism forces routines to rest, if their FPS is significantly higher than that of the bottlenecks routines. It saves resources, and should not affect the number of the processed routines.
The best example of a case where the synchronising mechanism would be useful, is when there are fast routines followed by routines with lower FPS.
To activate this mechanism, create the pipe should with auto_pacing_mechanism
parameter as true, for example:
pipe = Pipe(auto_pacing_mechanism=True)
Constant FPS
How to set it?
When initializing a routine, call the set_const_fps
function with the required FPS.
class Example(DestinationRoutine):
def __init__(self, required_fps):
self.set_const_fps(required_fps)
Unlink Routines
To disconnect one routine from another, and stop the output of its results to that other specific routine,
you should use the unlink
event.
To unlink them use:
- Python code -
pipe.notify_event(UNLINK, specific_routine="<source_routine_name>", unlink_routine_name="<destination_routine_name>")
- API - Send http POST request to
<your_host>:<your_port>/routines/<source_routine_name>/events/unlink/execute
with the parameters:
{
"extra_args": {
"unlink_routine_name": <destination_routine_name>
}
}
FAQ
Q: What will happen when nothing is returned from the main logic?
A: Not returning anything from a function will return None.
We detect when None is returned and just ignore it.
So in short, you will not send anything to the next routine in line.
.
Q: What happens if an exception is raised within the Pipe (main_logic, setup, cleanup)?
A: setup and cleanup methods - The routine Thread will crash.
It will cause the routine to stop working untill you
stop and start again.
main_logic method - The crash will notify the user with the routine’s logger.
The crash won’t effect the routine’s execution because it will just
take the next data inline from the message handler and will
execute the main logic on it.
.
Q: Why and how to use data transmitters?
A: The user can decide to not transport the data of a message through the message broker
and choose different approach, for example via Shared memory or local file system.
Contributing
For contributing please contact with San-Moshe for accessing our Jira.
Please follow the conventions using in the project and make sure all checks pass.
The PR name needs to be in the format of [jira_ticket_id] - [Task description]
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file PipeRT-2.4.0.tar.gz
.
File metadata
- Download URL: PipeRT-2.4.0.tar.gz
- Upload date:
- Size: 74.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a77c1edc0ea93ef80fdac0d463260b93cababa31773858bfcef6234fb789ecee |
|
MD5 | 7a75b03e5df80f0f6f272aa6f58a7be6 |
|
BLAKE2b-256 | ccf5f6a106d6244ec665e6c9d9def0c19f855c2b447e976a7d706b2a0b246f94 |
File details
Details for the file PipeRT-2.4.0-py3-none-any.whl
.
File metadata
- Download URL: PipeRT-2.4.0-py3-none-any.whl
- Upload date:
- Size: 110.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.27.1 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.63.0 importlib-metadata/4.11.2 keyring/23.5.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.9.10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 27270642940fe20972bdfad1e5c79b2e558423368e88ef6d325e973ecf6b20ff |
|
MD5 | 9dd7d6cf176c4ea17c9614e36f21a8d4 |
|
BLAKE2b-256 | 0a6fd852d44c1b7c7ea27d3ebfdc05160b7255adbaeaa8467e9b904aa3c0740b |