Skip to main content

Real-time pipeline 4 analytics

Project description

PipeRT2

Actions Status PyPI version Documentation Status

PipeRT2 is an infrastructure for data processing with the ability of handling a high flow rate.

Design a complex dataflow dynamically can be done using PipeRT2. With a simple implementation of pipe's components a full dataflow can be dispatched.

Table of contents

Requirements

  • Python 3.6

Components

Routine - The smallest component in the pipe.

Each routine has to implement a main_logic function that contains the business logic of the routine.

There are three types of routines -

  • SourceRoutine - The first routine in a pipe. Used for generating new data and streaming it through the pipeline.
  • MiddleRoutine - Consumes data from other routines in the pipeline. Perform desired operations on any given data and send the results into the next routine.
  • DestinationRoutine - The last routine of the pipe. Used for storing the results from all data manipulation.

Flow - Contains multiple routines with the same context.

Pipe - Controls the different elements and aspects of the system. Contains all flows. Distributing events through all components.

Installation

We publish PipeRT2 as PipeRT package in PyPi.

Run pip3 install PipeRT for installing the official PipeRT2 stable version.

Getting Started

For example, we're going to create a pipe which contains simple flows with very simple routines.

The First step is to create a 'SourceRoutine', which will be responsible for generating data inside our pipeline. We create the source class that generates data:

from pipert2 import SourceRoutine

class GenerateData(SourceRoutine):

    def main_logic(self) -> dict:
        return {
            "value": "example"
        }

Then we create the destination routine to store (in our case print) the pipeline's result:

from pipert2 import DestinationRoutine

class PrintResult(DestinationRoutine):

    def main_logic(self, data: dict) -> None:
        print(data["value"])

Now we create new pipe that contains a flow made by those two routines:

from pipert2 import Pipe, QueueNetwork
from pipert2.utils.consts.event_names import START_EVENT_NAME, KILL_EVENT_NAME

# Creating the pipe.
example_pipe = Pipe()

# Create an instance of each routine.
generate_data_routine = GenerateData()
print_result_routine = PrintResult()

# Create a flow with the required routines.
example_pipe.create_flow("example_flow", True, generate_data_routine, print_result_routine)

# Build the pipe.
example_pipe.build()

# Run the pipe.
example_pipe.notify_event(START_EVENT_NAME)

# Force all the pipe's routines stop.
example_pipe.notify_event(KILL_EVENT_NAME)

For connecting routines in a different order we use example_pipe.link function, for example:

example_pipe.create_flow("example_flow", False, generate_data_routine, print_result_routine)
example_pipe.link(Wire(source=generate_data_routine, destinations=(print_result_routine,)))

For triggering an event for a specific flow or routine we add a dictionary of the required specific flows and routines:

  • for example trigger all routines in example_flow:
    example_pipe.notify_event(START_EVENT_NAME, {"example_flow": []})
    
  • for example trigger specific routines in example_flow:
    example_pipe.notify_event(START_EVENT_NAME, {"example_flow": [generate_data_routine.name, print_result_routine.name]})  
    

Contributing

For contributing please contact with San-Moshe for accessing our Jira.

Please follow the conventions using in the project and make sure all checks pass.

The PR name needs to be in the format of [jira_ticket_id] - [Task description]

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

PipeRT-2.1.2.tar.gz (47.5 kB view details)

Uploaded Source

Built Distribution

PipeRT-2.1.2-py3-none-any.whl (72.9 kB view details)

Uploaded Python 3

File details

Details for the file PipeRT-2.1.2.tar.gz.

File metadata

  • Download URL: PipeRT-2.1.2.tar.gz
  • Upload date:
  • Size: 47.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for PipeRT-2.1.2.tar.gz
Algorithm Hash digest
SHA256 61ef6eaf05d6eb230143224f2835c49ea65de140a94a6d3d0e284191701e5c0a
MD5 723a616734d891959ef7e4517eac1653
BLAKE2b-256 5083bb4cde502e1c560d44f885d3d04c800cbe62278bc08649ecf6348711a14c

See more details on using hashes here.

File details

Details for the file PipeRT-2.1.2-py3-none-any.whl.

File metadata

  • Download URL: PipeRT-2.1.2-py3-none-any.whl
  • Upload date:
  • Size: 72.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.7

File hashes

Hashes for PipeRT-2.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 0909bb0cc3ffebc2d7cccdd4a968555f537eb18bde58f236fb998ec7b730b093
MD5 4a3515a584d892a0ecf7dec0f6f7d0d2
BLAKE2b-256 374dbaebf41ba104d3b85e880c8c827a69c4dee311148c67476445c6feb70743

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page