Skip to main content

Flowrunner is a lightweight package to organize and represent Data Engineering/Science workflows

Reason this release was yanked:

Seems to be failing at notebook level

Project description

flowrunner: A lightweight Data Engineering/Science Flow package

codecov  build  tests  documentation  Documentation Status  Python 3.8  Python 3.9  Code style: black  Imports: isort  pre-commit

What is it?

flowrunner is a lightweight package to organize and represent Data Engineering/Science workflows. Its designed to be integrated with any pre-existing framework like pandas or PySpark

Main Features

  • Lazy evaluation of DAG: flowrunner does not force you to execute/run your dag until you want to, only run it when its explicitly mentioned as run
  • Easy syntax to build new Flows
  • Easy data sharing between methods in a Flow using attributes
  • Data store to store output of a function(incase it has return) for later
  • Param store to easily pass reusable parameters to Flow
  • Visualizing your flow as a DAG

Installing flowrunner

To install flowrunner, following commands will work

Source code is hosted at https://github.com/prithvijitguha/flowRunner

pip install flowrunner

Or install from source

pip install git+https://github.com/prithvijitguha/flowrunner@main

Usage

Here is a quick example to run as is

# example.py
from flowrunner import BaseFlow, step, start, end

class ExampleFlow(BaseFlow):
    @start
    @step(next=['method2', 'method3'])
    def method1(self):
        self.a = 1

    @step(next=['method4'])
    def method2(self):
        self.a += 1

    @step(next=['method4'])
    def method3(self):
        self.a += 2

    @end
    @step
    def method4(self):
        self.a += 3
        print("output of flow is:", self.a)

You can run the flow with the following command

$ python -m flowrunner run example.py
output of flow is: 7

Or in a notebook/script like this:

ExampleFlow.run()

Visualize Flow as DAG(Directed Acyclical Graph)

ExampleFlow().display()

Your output will look like this.

image

Or can be run in cli like this:

python -m flowrunner display example.py

For CLI usage we create a file called exampleflow.html in the current directory with the same output

Show your Flow

ExampleFlow().show()
2023-03-08 22:35:24 LAPTOP flowrunner.system.logger[12692] INFO Found flow ExampleFlow
2023-03-08 22:35:24 LAPTOP flowrunner.system.logger[12692] DEBUG Validating flow for ExampleFlow
✅ Validated number of start nodes
✅ Validated start nodes 'next' values
✅ Validate number of middle_nodes
✅ Validated middle_nodes 'next' values
✅ Validated end nodes
✅ Validated start nodes 'next' values
2023-03-08 22:35:24 LAPTOP  flowrunner.system.logger[12692] DEBUG Show flow for ExampleFlow
method1

?
  Next=method2, method3


method2

?
  Next=method4


method3

?
  Next=method4

Or through CLI like below

python -m flowrunner show example.py

Pandas Example

# -*- coding: utf-8 -*-
import pandas as pd

from flowrunner import BaseFlow, end, start, step


class ExamplePandas(BaseFlow):
    @start
    @step(next=["transformation_function_1", "transformation_function_2"])
    def create_data(self):
        """
        This method we create the dataset we are going use. In real use cases,
        you'll have to read from a source (csv, parquet, etc)

        For this example we create two dataframes for students ranked by marked scored
        for when they attempted the example on 1st January 2023 and 12th March 2023

        After creating the dataset we pass it to the next methods

        - transformation_function_1
        - transformation_function_2
        """
        data1 = {"Name": ["Hermione", "Harry", "Ron"], "marks": [100, 85, 75]}

        data2 = {"Name": ["Hermione", "Ron", "Harry"], "marks": [100, 90, 80]}

        df1 = pd.DataFrame(data1, index=["rank1", "rank2", "rank3"])

        df2 = pd.DataFrame(data2, index=["rank1", "rank2", "rank3"])

        self.input_data_1 = df1
        self.input_data_2 = df2

    @step(next=["append_data"])
    def transformation_function_1(self):
        """
        Here we add a snapshot_date to the input dataframe of 2023-03-12
        """
        transformed_df = self.input_data_1
        transformed_df.insert(1, "snapshot_date", "2023-03-12")
        self.transformed_df_1 = transformed_df

    @step(next=["append_data"])
    def transformation_function_2(self):
        """
        Here we add a snapshot_date to the input dataframe of 2023-01-01
        """
        transformed_df = self.input_data_2
        transformed_df.insert(1, "snapshot_date", "2023-01-01")
        self.transformed_df_2 = transformed_df

    @step(next=["show_data"])
    def append_data(self):
        """
        Here we append the two dataframe together
        """
        self.final_df = pd.concat([self.transformed_df_1, self.transformed_df_2])

    @end
    @step
    def show_data(self):
        """
        Here we show the new final dataframe of aggregated data. However in real use cases. It would
        be more likely to write the data to some final layer/format
        """
        print(self.final_df)
        return self.final_df

Now when you run ExamplePandas().display() you get the following output

image

PySpark Example

# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

from flowrunner import BaseFlow, end, start, step

spark = SparkSession.builder.getOrCreate()


class ExamplePySpark(BaseFlow):
    @start
    @step(next=["transformation_function_1", "transformation_function_2"])
    def create_data(self):
        """
        This is an example where we use the Spark engine instead of Pandas

        This method we create the dataset we are going use. In real use cases,
        you'll have to read from a source (csv, parquet, etc)

        For this example we create two dataframes for students ranked by marked scored
        for when they attempted the example on 1st January 2023 and 12th March 2023

        After creating the dataset we pass it to the next methods

        - transformation_function_1
        - transformation_function_2
        """

        data1 = [
            ("Hermione",100),
            ("Harry", 85),
            ("Ron", 75),
        ]

        data2 =  [
            ("Hermione",100),
            ("Harry", 90),
            ("Ron", 80),
        ]

        columns = ["Name", "marks"]

        rdd1 = spark.sparkContext.parallelize(data1)
        rdd2 = spark.sparkContext.parallelize(data2)
        self.df1 = spark.createDataFrame(rdd1).toDF(*columns)
        self.df2 = spark.createDataFrame(rdd2).toDF(*columns)

    @step(next=["append_data"])
    def transformation_function_1(self):
        """
        Here we add a snapshot_date to the input dataframe of 2023-03-12
        """

        self.transformed_df_1 = self.df1.withColumn("snapshot_date", lit("2023-03-12"))

    @step(next=["append_data"])
    def transformation_function_2(self):
        """
        Here we add a snapshot_date to the input dataframe of 2023-01-01
        """
        self.transformed_df_2 = self.df2.withColumn("snapshot_date", lit("2023-01-01"))

    @step(next=["show_data"])
    def append_data(self):
        """
        Here we append the two dataframe together
        """
        self.final_df = self.transformed_df_1.union(self.transformed_df_2)

    @end
    @step
    def show_data(self):
        """
        Here we show the new final dataframe of aggregated data. However in real use cases. It would
        be more likely to write the data to some final layer/format
        """
        self.final_df.show()
        return self.final_df

Now when you run ExamplePySpark().display() you get the following output

image

Documentation

Check out the latest documentation here: FlowRunner documentation

Contributing

All contributions are welcome :smiley:

If you are interested in contributing, please check out this page: FlowRunner Contribution Page

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flowrunner-0.2.2.tar.gz (301.4 kB view details)

Uploaded Source

Built Distribution

flowrunner-0.2.2-py3-none-any.whl (6.6 kB view details)

Uploaded Python 3

File details

Details for the file flowrunner-0.2.2.tar.gz.

File metadata

  • Download URL: flowrunner-0.2.2.tar.gz
  • Upload date:
  • Size: 301.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for flowrunner-0.2.2.tar.gz
Algorithm Hash digest
SHA256 3cbe17a29cc32fe31e7c1778a0cff909eb968e8f3f8e63f7d89df34ba7b4f200
MD5 8bbd34ad2f1cba5f4aaf87a746f05ac9
BLAKE2b-256 c17b49b13b2424f79a4cd0bae998438ce226e74742d240295ba53b5654710938

See more details on using hashes here.

File details

Details for the file flowrunner-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: flowrunner-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 6.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for flowrunner-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 11a75cc751192b9feb4df86bc03ea78d962a11718b07608a85bcb9818845bb53
MD5 127bfd8dfaaaa31c82b03566ebf00915
BLAKE2b-256 8d41673c2d8b96471afa91709b343608bb75716d781299cfa69d1943f9202d9e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page