Skip to main content

Object Oriented Library to create data pipelines

Project description

Open Data Pipeline

Object-oriented library to quickly deploy and test data pipelines and models

Installing Library

pip install ez-data-pipeline

General Contribution Guidelines

Create a feature branch, try to merge by end of day in order to avoid conflict. Every major commit will have associated test cases so make sure you use TDD.

Building first pipeline

Required Imports

import pandas as pd
import yfinance
from ez_data_pipeline.Foundation.default_pipeline.Iimporter import IImporter
from ez_data_pipeline.Foundation.default_pipeline.Imodel import Imodel
from ez_data_pipeline.Foundation.default_pipeline.Ipipeline import ModelPipeline
from ez_data_pipeline.Foundation.default_pipeline.Iproccess import ProcessPipeline, Processor
from ez_data_pipeline.Foundation.default_pipeline.IproccessMethod import ProcessingMethod
from ez_data_pipeline.Foundation.default_pipeline.Isave import Isave
from ez_data_pipeline.Foundation.utils import ImportData, AssetClass 

Creating Importing class

class yf_Import(IImporter):

    def __init__(self, ticker: str):
        self.ticker = ticker
        self.data_ticker = yfinance.Ticker(self.ticker)
        self.data = self.data_ticker.history("max")

    def _import(self) -> pd.DataFrame:
        return self.data

    def find_asset_class(self) -> str:
        return self.data_ticker.info['quoteType']

    def return_data(self) -> ImportData:
        data_dict = {"EQUITY": AssetClass.Stock, "ETF": AssetClass.ETF}
        yf_asset_type = self.find_asset_class()
        return ImportData(self._import(), data_dict[yf_asset_type])

    def __str__(self):
        return f"TICKER: {str(self.data)}"

    def __repr__(self):
        return f"yf_Import({self.ticker})"

Process Pipeline

class ProcessPipeline(ProcessPipeline):
    # Add processor to list
    class AddProcessor(Processor):
        def process(self, import_data: ImportData) -> ImportData:
            import_data.pd_data = import_data.pd_data + 1
            return import_data

    # Add processor to list
    class RemoveRowProcessor(Processor):
        def process(self, import_data: ImportData) -> ImportData:
            import_data.pd_data = import_data.pd_data.drop("High", axis=1)
            return import_data

    class MakeRandomOperation(Processor):
        def process(self, import_data: ImportData) -> ImportData:
            import_data.pd_data = import_data.pd_data + 2
            return import_data

Creating Saving Class

class CSVSave(Isave):
    def save(self):
        print("saved")
        

Creating Process Method

This method is meant to isolate the pre-processing section of model development to easily test it for a production enviroment

pm = ProcessingMethod(yf_Import, ProcessPipeline, CSVSave, "SPY")

Creating Model

class LinearModel(Imodel):
    """Write you cool Model here"""
    def run_model(self) -> pd.DataFrame:
        """Run your cool Model here"""
        return self.processed_data

Creating Full Pipeline

test_pipeline = ModelPipeline(data_model=LinearModel, data_processing=pm)
test_pipeline.run_pipeline()

print(test_pipeline.result)

Full Example

import pandas as pd
import yfinance
from ez_data_pipeline.Foundation.default_pipeline.Iimporter import IImporter
from ez_data_pipeline.Foundation.default_pipeline.Imodel import Imodel
from ez_data_pipeline.Foundation.default_pipeline.Ipipeline import ModelPipeline
from ez_data_pipeline.Foundation.default_pipeline.Iproccess import ProcessPipeline, Processor
from ez_data_pipeline.Foundation.default_pipeline.IproccessMethod import ProcessingMethod
from ez_data_pipeline.Foundation.default_pipeline.Isave import Isave
from ez_data_pipeline.Foundation.utils import ImportData, AssetClass


# Creating import method
class yf_Import(IImporter):

    def __init__(self, ticker: str):
        self.ticker = ticker
        self.data_ticker = yfinance.Ticker(self.ticker)
        self.data = self.data_ticker.history("max")

    def _import(self) -> pd.DataFrame:
        return self.data

    def find_asset_class(self) -> str:
        return self.data_ticker.info['quoteType']

    def return_data(self) -> ImportData:
        data_dict = {"EQUITY": AssetClass.Stock, "ETF": AssetClass.ETF}
        yf_asset_type = self.find_asset_class()
        return ImportData(self._import(), data_dict[yf_asset_type])

    def __str__(self):
        return f"TICKER: {str(self.data)}"

    def __repr__(self):
        return f"yf_Import({self.ticker})"


# Define the process pipeline
class ProcessPipeline(ProcessPipeline):
    # Add processor to list
    class AddProcessor(Processor):
        def process(self, import_data: ImportData) -> ImportData:
            import_data.pd_data = import_data.pd_data + 1
            return import_data

    # Add processor to list
    class RemoveRowProcessor(Processor):
        def process(self, import_data: ImportData) -> ImportData:
            import_data.pd_data = import_data.pd_data.drop("High", axis=1)
            return import_data

    class MakeRandomOperation(Processor):
        def process(self, import_data: ImportData) -> ImportData:
            import_data.pd_data = import_data.pd_data + 2
            return import_data


# Define Save class
class CSVSave(Isave):
    def save(self):
        print("saved")


class LinearModel(Imodel):
    """Write you cool Model here"""
    def run_model(self) -> pd.DataFrame:
        """Run your cool Model here"""
        return self.processed_data


pm = ProcessingMethod(yf_Import, ProcessPipeline, CSVSave, "SPY")

test_pipeline = ModelPipeline(data_model=LinearModel, data_processing=pm)
test_pipeline.run_pipeline()

print(test_pipeline.result)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

ez_data_pipeline-0.6-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file ez_data_pipeline-0.6-py3-none-any.whl.

File metadata

File hashes

Hashes for ez_data_pipeline-0.6-py3-none-any.whl
Algorithm Hash digest
SHA256 a42759f81b194c9d470ab7b2b5461fceeec610db79c56c33e1dcc65acee472ee
MD5 6f1187a42291599e2ab9d98664bd769b
BLAKE2b-256 8e6fa959f1eeeaabbf0b6f57ef475c6bb7449c6483b4c8b3c3220a56c8f938b2

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page