Skip to main content

Pipeline Penguin is a versatile python library for data quality.

Project description

Description

Pipeline Penguin is a versatile python library for data quality.

Documentation

Getting Started

How to install

You can use PyPI Test to install the early develoment build by executing this command:

pip install pipeline-penguin

Core Concepts

Before you start executing validations on you data, firstly you need to understand a few concepts:

Data Node

Any database your pipeline needs to process. It has a Node Type, that identifies what is the source of that data, like BigQuery.

Data Premise

Any premise on a Data Node, for example, the column "name" on the database "employers" must not have a number on it. Data Premises also have a type called Premise Type. If an Premise Type is SQL, you cannot execute a validation on a Data Node with a Node Type that does not have a SQL engine to process that query.

Connector

The way you access an Data Node to check a Data Premise.

Premise Output

Is the result of a Data Premise validation on a Data Node.

Implementing a test

  • Importing and instantiating PP
from pipeline_penguin import PipelinePenguin
pp = PipelinePenguin()
  • Defining the default connector
bq_connector = ConnectorSQLBigQuery('/config/service_account.json')
pp.connectors.define_default(bq_connector)
  • Creating a Data Node
node = pp.nodes.create_node('Node Name', DataNodeBigQuery, project_id='example', dataset_id='example', table_id='example')
pp.nodes.list_nodes()
  • Creating a Data Premise
node.insert_premise('Premise Name', DataPremiseSQLCheckIsNull, "Column Name")
  • Executing a validation
pp.nodes.run_premises()
  • Checking Logs
log_formatter = OutputFormatterLog()
outputs.format_outputs(log_formatter)

Implementing a custom Data Premise

  • Implementing a new DataPremise class
from pipeline_penguin.core.data_premise.sql import DataPremiseSQL
from pipeline_penguin.core.premise_output.premise_output import PremiseOutput
from pipeline_penguin.data_node.sql.bigquery import DataNodeBigQuery

class CheckBanana(DataPremiseSQL):
    def __init__(
        self,
        name: str,
        data_node: DataNodeBigQuery,
        column: str
    ):
        self.query_template = "SELECT * result FROM `{project}.{dataset}.{table}` WHERE LOWER({column}) = 'banana')"
        super().__init__(name, data_node, column)

    def query_args(self):
        """Method for returning the arguments to be passed on the query template of this
        validation.

        Returns:
            A `dictionary` with the query parameters.
        """
        return {
            "project": self.data_node.project_id,
            "dataset": self.data_node.dataset_id,
            "table": self.data_node.table_id,
            "column": self.column
        }

    def validate(self) -> PremiseOutput:
        """Method for executing the validation over the DataNode.

        Returns:
            PremiseOutput: Object storeing the results for this validation.
        """

        query = self.query_template.format(**self.query_args())
        connector = self.data_node.get_connector(self.type)
        data_frame = connector.run(query)

        failed_count = len(data_frame["result"])
        passed = failed_count == 0

        output = PremiseOutput(
            self, self.data_node, self.column, passed, failed_count, data_frame
        )
        return output
  • Testing a DataNode with a custom Data Premise
from pipeline_penguin import PipelinePenguin
import CheckBanana

pp = PipelinePenguin()

bq_connector = ConnectorSQLBigQuery('/config/service_account.json')
pp.connectors.define_default(bq_connector)

node = pp.nodes.create_node('Node Name', DataNodeBigQuery, project_id='example', dataset_id='example', table_id='example')

node.insert_premise('Check Null', DataPremiseSQLCheckIsNull, "Column Name")
node.insert_premise('Check Contains Banana', CheckBanana, "Column Name")

log_formatter = OutputFormatterLog()
outputs.format_outputs(log_formatter)

Collaborate

Installation

pipenv install

Tests

pipenv install --dev

Running tests

pipenv run test

Style format

Running format

pipenv run format

Checking format

pipenv run format-check

Developing documentation

Running local build

pipenv run docs

Bulding docs

pipenv run pipenv run build-docs

Support or Contact

Having trouble with PP? Check out our documentation or contact support and we’ll help you sort it out.

DP6 Koopa-Troopa Team e-mail: koopas@dp6.com.br

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipeline-penguin-0.1.3.tar.gz (38.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pipeline_penguin-0.1.3-py3-none-any.whl (89.2 kB view details)

Uploaded Python 3

File details

Details for the file pipeline-penguin-0.1.3.tar.gz.

File metadata

  • Download URL: pipeline-penguin-0.1.3.tar.gz
  • Upload date:
  • Size: 38.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.5

File hashes

Hashes for pipeline-penguin-0.1.3.tar.gz
Algorithm Hash digest
SHA256 2b29276d63db3076aaef92b91a7616c25483826dbd1090aabbd0f11ceccd1be4
MD5 a2d45edb62cdb6db306dd7e9cff97a33
BLAKE2b-256 c51b8c3a3d4291f7f8beaf78836c420efa0dfc8261cd4c257a0a1636ed6fa2b6

See more details on using hashes here.

File details

Details for the file pipeline_penguin-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: pipeline_penguin-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 89.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.7.1 importlib_metadata/4.8.2 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.9.5

File hashes

Hashes for pipeline_penguin-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 ae032d458c0881381a1ba7a0a1d1263f5734bc9cd13a5dae2b8533be1bbaf72f
MD5 571e5f93cfc0db01a6745f95f5003686
BLAKE2b-256 e5663acbba701fd3accf13a7574539730f12b6c4fd2d4d3042733e14499e8a04

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page