Skip to main content

Real-time pipeline 4 analytics

Project description

PipeRT2

Actions Status PyPI version Documentation Status

PipeRT2 is an infrastructure for data processing with the ability of handling a high flow rate.

Design a complex dataflow dynamically can be done using PipeRT2. With a simple implementation of pipe's components a full dataflow can be dispatched.

Table of contents

Requirements

  • Python 3.6

Components

Routine - The smallest component in the pipe.

Each routine has to implement a main_logic function that contains the business logic of the routine.

There are three types of routines -

  • SourceRoutine - The first routine in a pipe. Used for generating new data and streaming it through the pipeline.
  • MiddleRoutine - Consumes data from other routines in the pipeline. Perform desired operations on any given data and send the results into the next routine.
  • DestinationRoutine - The last routine of the pipe. Used for storing the results from all data manipulation.

Flow - Contains multiple routines with the same context.

Pipe - Controls the different elements and aspects of the system. Contains all flows. Distributing events through all components.

Installation

We publish PipeRT2 as PipeRT package in PyPi.

Run pip3 install PipeRT for installing the official PipeRT2 stable version.

Getting Started

For example, we're going to create a pipe which contains simple flows with very simple routines.

The First step is to create a 'SourceRoutine', which will be responsible for generating data inside our pipeline. We create the source class that generates data:

from pipert2 import SourceRoutine

class GenerateData(SourceRoutine):

    def main_logic(self) -> dict:
        return {
            "value": "example"
        }

Then we create the destination routine to store (in our case print) the pipeline's result:

from pipert2 import DestinationRoutine

class PrintResult(DestinationRoutine):

    def main_logic(self, data: dict) -> None:
        print(data["value"])

Now we create new pipe that contains a flow made by those two routines:

from pipert2 import Pipe, QueueNetwork
from pipert2.utils.consts.event_names import START_EVENT_NAME, KILL_EVENT_NAME

# Creating the pipe.
example_pipe = Pipe()

# Create an instance of each routine.
generate_data_routine = GenerateData()
print_result_routine = PrintResult()

# Create a flow with the required routines.
example_pipe.create_flow("example_flow", True, generate_data_routine, print_result_routine)

# Build the pipe.
example_pipe.build()

# Run the pipe.
example_pipe.notify_event(START_EVENT_NAME)

# Force all the pipe's routines stop.
example_pipe.notify_event(KILL_EVENT_NAME)

For connecting routines in a different order we use example_pipe.link function, for example:

example_pipe.create_flow("example_flow", False, generate_data_routine, print_result_routine)
example_pipe.link(Wire(source=generate_data_routine, destinations=(print_result_routine,)))

For triggering an event for a specific flow or routine we add a dictionary of the required specific flows and routines:

  • for example trigger all routines in example_flow:
    example_pipe.notify_event(START_EVENT_NAME, {"example_flow": []})
    
  • for example trigger specific routines in example_flow:
    example_pipe.notify_event(START_EVENT_NAME, {"example_flow": [generate_data_routine.name, print_result_routine.name]})  
    

Contributing

For contributing please contact with San-Moshe for accessing our Jira.

Please follow the conventions using in the project and make sure all checks pass.

The PR name needs to be in the format of [jira_ticket_id] - [Task description]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

PipeRT-2.2.0.tar.gz (55.3 kB view hashes)

Uploaded Source

Built Distribution

PipeRT-2.2.0-py3-none-any.whl (86.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page