Skip to main content

Real-time pipeline 4 analytics

Project description

PipeRT2

Actions Status PyPI version Documentation Status

PipeRT2 is an infrastructure for data processing with the ability of handling a high flow rate.

Design a complex dataflow dynamically can be done using PipeRT2. With a simple implementation of pipe's components a full dataflow can be dispatched.

Table of contents

Requirements

  • Python 3.6

Components

Routine - The smallest component in the pipe.

Each routine has to implement a main_logic function that contains the business logic of the routine.

There are three types of routines -

  • SourceRoutine - The first routine in a pipe. Used for generating new data and streaming it through the pipeline.
  • MiddleRoutine - Consumes data from other routines in the pipeline. Perform desired operations on any given data and send the results into the next routine.
  • DestinationRoutine - The last routine of the pipe. Used for storing the results from all data manipulation.

Flow - Contains multiple routines with the same context.

Pipe - Controls the different elements and aspects of the system. Contains all flows. Distributing events through all components.

Installation

We publish PipeRT2 as PipeRT package in PyPi.

Run pip3 install PipeRT for installing the official PipeRT2 stable version.

Getting Started

For example, we're going to create a pipe which contains simple flows with very simple routines.

The First step is to create a 'SourceRoutine', which will be responsible for generating data inside our pipeline. We create the source class that generates data:

from pipert2 import SourceRoutine

class GenerateData(SourceRoutine):

    def main_logic(self) -> dict:
        return {
            "value": "example"
        }

Then we create the destination routine to store (in our case print) the pipeline's result:

from pipert2 import DestinationRoutine

class PrintResult(DestinationRoutine):

    def main_logic(self, data: dict) -> None:
        print(data["value"])

Now we create new pipe that contains a flow made by those two routines:

from pipert2 import Pipe, QueueNetwork
from pipert2.utils.consts.event_names import START_EVENT_NAME, KILL_EVENT_NAME

# Creating the pipe.
example_pipe = Pipe()

# Create an instance of each routine.
generate_data_routine = GenerateData()
print_result_routine = PrintResult()

# Create a flow with the required routines.
example_pipe.create_flow("example_flow", True, generate_data_routine, print_result_routine)

# Build the pipe.
example_pipe.build()

# Run the pipe.
example_pipe.notify_event(START_EVENT_NAME)

# Force all the pipe's routines stop.
example_pipe.notify_event(KILL_EVENT_NAME)

For connecting routines in a different order we use example_pipe.link function, for example:

example_pipe.create_flow("example_flow", False, generate_data_routine, print_result_routine)
example_pipe.link(Wire(source=generate_data_routine, destinations=(print_result_routine,)))

For triggering an event for a specific flow or routine we add a dictionary of the required specific flows and routines:

  • for example trigger all routines in example_flow:
    example_pipe.notify_event(START_EVENT_NAME, {"example_flow": []})
    
  • for example trigger specific routines in example_flow:
    example_pipe.notify_event(START_EVENT_NAME, {"example_flow": [generate_data_routine.name, print_result_routine.name]})  
    

Contributing

For contributing please contact with San-Moshe for accessing our Jira.

Please follow the conventions using in the project and make sure all checks pass.

The PR name needs to be in the format of [jira_ticket_id] - [Task description]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

PipeRT-2.1.3.tar.gz (47.4 kB view details)

Uploaded Source

Built Distribution

PipeRT-2.1.3-py3-none-any.whl (73.0 kB view details)

Uploaded Python 3

File details

Details for the file PipeRT-2.1.3.tar.gz.

File metadata

  • Download URL: PipeRT-2.1.3.tar.gz
  • Upload date:
  • Size: 47.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for PipeRT-2.1.3.tar.gz
Algorithm Hash digest
SHA256 d50f6b7ff1412f3fb24dd2e227e8d75bc88de24df9a0f5911d0964568c9cb507
MD5 7232ebc1f82fc49680239ef79a533a3b
BLAKE2b-256 de9fbbf6d2044eb3cf44b928decf733e49b55188767c0b22eadf498daf636849

See more details on using hashes here.

File details

Details for the file PipeRT-2.1.3-py3-none-any.whl.

File metadata

  • Download URL: PipeRT-2.1.3-py3-none-any.whl
  • Upload date:
  • Size: 73.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for PipeRT-2.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 72bb42a7777ff366a9bad0dffe75d9837deb63a6d91aa161d17100f03744617f
MD5 6a92029988def07b59e08debe2861ab3
BLAKE2b-256 62432af4704080a789b61052c91cbf00c23fd0ca175156d59286810e4c1769af

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page