Skip to main content

Pipeline Penguin is a versatile python library for data quality.

Project description

Description

Pipeline Penguin is a versatile python library for data quality.

Documentation

Getting Started

How to install

You can use PyPI Test to install the early develoment build by executing this command:

pip install pipeline-penguin

Core Concepts

Before you start executing validations on you data, firstly you need to understand a few concepts:

Data Node

Any database your pipeline needs to process. It has a Node Type, that identifies what is the source of that data, like BigQuery.

Data Premise

Any premise on a Data Node, for example, the column "name" on the database "employers" must not have a number on it. Data Premises also have a type called Premise Type. If an Premise Type is SQL, you cannot execute a validation on a Data Node with a Node Type that does not have a SQL engine to process that query.

Connector

The way you access an Data Node to check a Data Premise.

Premise Output

Is the result of a Data Premise validation on a Data Node.

Implementing a test

  • Importing and instantiating PP
from pipeline_penguin import PipelinePenguin
pp = PipelinePenguin()
  • Defining the default connector
bq_connector = ConnectorSQLBigQuery('/config/service_account.json')
pp.connectors.define_default(bq_connector)
  • Creating a Data Node
node = pp.nodes.create_node('Node Name', DataNodeBigQuery, project_id='example', dataset_id='example', table_id='example')
pp.nodes.list_nodes()
  • Creating a Data Premise
node.insert_premise('Premise Name', DataPremiseSQLCheckIsNull, "Column Name")
  • Executing a validation
pp.nodes.run_premises()
  • Checking Logs
log_formatter = OutputFormatterLog()
outputs.format_outputs(log_formatter)

Implementing a custom Data Premise

  • Implementing a new DataPremise class
from pipeline_penguin.core.data_premise.sql import DataPremiseSQL
from pipeline_penguin.core.premise_output.premise_output import PremiseOutput
from pipeline_penguin.data_node.sql.bigquery import DataNodeBigQuery

class CheckBanana(DataPremiseSQL):
    def __init__(
        self,
        name: str,
        data_node: DataNodeBigQuery,
        column: str
    ):
        self.query_template = "SELECT * result FROM `{project}.{dataset}.{table}` WHERE LOWER({column}) = 'banana')"
        super().__init__(name, data_node, column)

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output
  • Testing a DataNode with a custom Data Premise
from pipeline_penguin import PipelinePenguin
import CheckBanana

pp = PipelinePenguin()

bq_connector = ConnectorSQLBigQuery('/config/service_account.json')
pp.connectors.define_default(bq_connector)

node = pp.nodes.create_node('Node Name', DataNodeBigQuery, project_id='example', dataset_id='example', table_id='example')

node.insert_premise('Check Null', DataPremiseSQLCheckIsNull, "Column Name")
node.insert_premise('Check Contains Banana', CheckBanana, "Column Name")

log_formatter = OutputFormatterLog()
outputs.format_outputs(log_formatter)

Collaborate

Installation

pipenv install

Tests

pipenv install --dev

Running tests

pipenv run test

Style format

Running format

pipenv run format

Checking format

pipenv run format-check

Developing documentation

Running local build

pipenv run docs

Bulding docs

pipenv run pipenv run build-docs

Support or Contact

Having trouble with PP? Check out our documentation or contact support and we’ll help you sort it out.

DP6 Koopa-Troopa Team e-mail: koopas@dp6.com.br

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipeline-penguin-0.1.3.tar.gz (38.2 kB view hashes)

Uploaded Source

Built Distribution

pipeline_penguin-0.1.3-py3-none-any.whl (89.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page