Real-time pipeline 4 analytics
Project description
PipeRT2
PipeRT2 is an infrastructure for data processing with the ability of handling a high flow rate.
Design a complex dataflow dynamically can be done using PipeRT2. With a simple implementation of pipe's components a full dataflow can be dispatched.
Table of contents
Requirements
- Python 3.6
Components
Routine - The smallest component in the pipe.
Each routine has to implement a main_logic
function that contains the business logic of the routine.
There are three types of routines -
- SourceRoutine - The first routine in a pipe. Used for generating new data and streaming it through the pipeline.
- MiddleRoutine - Consumes data from other routines in the pipeline. Perform desired operations on any given data and send the results into the next routine.
- DestinationRoutine - The last routine of the pipe. Used for storing the results from all data manipulation.
Flow - Contains multiple routines with the same context.
Pipe - Controls the different elements and aspects of the system. Contains all flows. Distributing events through all components.
Installation
We publish PipeRT2 as PipeRT
package in PyPi.
Run pip3 install PipeRT
for installing the official PipeRT2 stable version.
Getting Started
For example, we're going to create a pipe which contains simple flows with very simple routines.
The First step is to create a 'SourceRoutine', which will be responsible for generating data inside our pipeline. We create the source class that generates data:
from pipert2 import SourceRoutine
class GenerateData(SourceRoutine):
def main_logic(self) -> dict:
return {
"value": "example"
}
Then we create the destination routine to store (in our case print) the pipeline's result:
from pipert2 import DestinationRoutine
class PrintResult(DestinationRoutine):
def main_logic(self, data: dict) -> None:
print(data["value"])
Now we create new pipe that contains a flow made by those two routines:
from pipert2 import Pipe, QueueNetwork
from pipert2.utils.consts.event_names import START_EVENT_NAME, KILL_EVENT_NAME
# Creating the pipe.
example_pipe = Pipe()
# Create an instance of each routine.
generate_data_routine = GenerateData()
print_result_routine = PrintResult()
# Create a flow with the required routines.
example_pipe.create_flow("example_flow", True, generate_data_routine, print_result_routine)
# Build the pipe.
example_pipe.build()
# Run the pipe.
example_pipe.notify_event(START_EVENT_NAME)
# Force all the pipe's routines stop.
example_pipe.notify_event(KILL_EVENT_NAME)
For connecting routines in a different order we use example_pipe.link
function, for example:
example_pipe.create_flow("example_flow", False, generate_data_routine, print_result_routine)
example_pipe.link(Wire(source=generate_data_routine, destinations=(print_result_routine,)))
For triggering an event for a specific flow or routine we add a dictionary of the required specific flows and routines:
- for example trigger all routines in
example_flow
:example_pipe.notify_event(START_EVENT_NAME, {"example_flow": []})
- for example trigger specific routines in
example_flow
:example_pipe.notify_event(START_EVENT_NAME, {"example_flow": [generate_data_routine.name, print_result_routine.name]})
Contributing
For contributing please contact with San-Moshe for accessing our Jira.
Please follow the conventions using in the project and make sure all checks pass.
The PR name needs to be in the format of [jira_ticket_id] - [Task description]
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file PipeRT-2.1.2.tar.gz
.
File metadata
- Download URL: PipeRT-2.1.2.tar.gz
- Upload date:
- Size: 47.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 61ef6eaf05d6eb230143224f2835c49ea65de140a94a6d3d0e284191701e5c0a |
|
MD5 | 723a616734d891959ef7e4517eac1653 |
|
BLAKE2b-256 | 5083bb4cde502e1c560d44f885d3d04c800cbe62278bc08649ecf6348711a14c |
File details
Details for the file PipeRT-2.1.2-py3-none-any.whl
.
File metadata
- Download URL: PipeRT-2.1.2-py3-none-any.whl
- Upload date:
- Size: 72.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0909bb0cc3ffebc2d7cccdd4a968555f537eb18bde58f236fb998ec7b730b093 |
|
MD5 | 4a3515a584d892a0ecf7dec0f6f7d0d2 |
|
BLAKE2b-256 | 374dbaebf41ba104d3b85e880c8c827a69c4dee311148c67476445c6feb70743 |