Object Oriented Library to create data pipelines
Project description
Open Data Pipeline
Object-oriented library to quickly deploy and test data pipelines and models
Installing Library
pip install ez-data-pipeline
General Contribution Guidelines
Create a feature branch, try to merge by end of day in order to avoid conflict. Every major commit will have associated test cases so make sure you use TDD.
Building first pipeline
Required Imports
import pandas as pd
import yfinance
from ez_data_pipeline.Foundation.default_pipeline.Iimporter import IImporter
from ez_data_pipeline.Foundation.default_pipeline.Imodel import Imodel
from ez_data_pipeline.Foundation.default_pipeline.Ipipeline import ModelPipeline
from ez_data_pipeline.Foundation.default_pipeline.Iproccess import ProcessPipeline, Processor
from ez_data_pipeline.Foundation.default_pipeline.IproccessMethod import ProcessingMethod
from ez_data_pipeline.Foundation.default_pipeline.Isave import Isave
from ez_data_pipeline.Foundation.utils import ImportData, AssetClass
Creating Importing class
class yf_Import(IImporter):
def __init__(self, ticker: str):
self.ticker = ticker
self.data_ticker = yfinance.Ticker(self.ticker)
self.data = self.data_ticker.history("max")
def _import(self) -> pd.DataFrame:
return self.data
def find_asset_class(self) -> str:
return self.data_ticker.info['quoteType']
def return_data(self) -> ImportData:
data_dict = {"EQUITY": AssetClass.Stock, "ETF": AssetClass.ETF}
yf_asset_type = self.find_asset_class()
return ImportData(self._import(), data_dict[yf_asset_type])
def __str__(self):
return f"TICKER: {str(self.data)}"
def __repr__(self):
return f"yf_Import({self.ticker})"
Process Pipeline
class ProcessPipeline(ProcessPipeline):
# Add processor to list
class AddProcessor(Processor):
def process(self, import_data: ImportData) -> ImportData:
import_data.pd_data = import_data.pd_data + 1
return import_data
# Add processor to list
class RemoveRowProcessor(Processor):
def process(self, import_data: ImportData) -> ImportData:
import_data.pd_data = import_data.pd_data.drop("High", axis=1)
return import_data
class MakeRandomOperation(Processor):
def process(self, import_data: ImportData) -> ImportData:
import_data.pd_data = import_data.pd_data + 2
return import_data
Creating Saving Class
class CSVSave(Isave):
def save(self):
print("saved")
Creating Process Method
This method is meant to isolate the pre-processing section of model development to easily test it for a production enviroment
pm = ProcessingMethod(yf_Import, ProcessPipeline, CSVSave, "SPY")
Creating Model
class LinearModel(Imodel):
"""Write you cool Model here"""
def run_model(self) -> pd.DataFrame:
"""Run your cool Model here"""
return self.processed_data
Creating Full Pipeline
test_pipeline = ModelPipeline(data_model=LinearModel, data_processing=pm)
test_pipeline.run_pipeline()
print(test_pipeline.result)
Full Example
import pandas as pd
import yfinance
from ez_data_pipeline.Foundation.default_pipeline.Iimporter import IImporter
from ez_data_pipeline.Foundation.default_pipeline.Imodel import Imodel
from ez_data_pipeline.Foundation.default_pipeline.Ipipeline import ModelPipeline
from ez_data_pipeline.Foundation.default_pipeline.Iproccess import ProcessPipeline, Processor
from ez_data_pipeline.Foundation.default_pipeline.IproccessMethod import ProcessingMethod
from ez_data_pipeline.Foundation.default_pipeline.Isave import Isave
from ez_data_pipeline.Foundation.utils import ImportData, AssetClass
# Creating import method
class yf_Import(IImporter):
def __init__(self, ticker: str):
self.ticker = ticker
self.data_ticker = yfinance.Ticker(self.ticker)
self.data = self.data_ticker.history("max")
def _import(self) -> pd.DataFrame:
return self.data
def find_asset_class(self) -> str:
return self.data_ticker.info['quoteType']
def return_data(self) -> ImportData:
data_dict = {"EQUITY": AssetClass.Stock, "ETF": AssetClass.ETF}
yf_asset_type = self.find_asset_class()
return ImportData(self._import(), data_dict[yf_asset_type])
def __str__(self):
return f"TICKER: {str(self.data)}"
def __repr__(self):
return f"yf_Import({self.ticker})"
# Define the process pipeline
class ProcessPipeline(ProcessPipeline):
# Add processor to list
class AddProcessor(Processor):
def process(self, import_data: ImportData) -> ImportData:
import_data.pd_data = import_data.pd_data + 1
return import_data
# Add processor to list
class RemoveRowProcessor(Processor):
def process(self, import_data: ImportData) -> ImportData:
import_data.pd_data = import_data.pd_data.drop("High", axis=1)
return import_data
class MakeRandomOperation(Processor):
def process(self, import_data: ImportData) -> ImportData:
import_data.pd_data = import_data.pd_data + 2
return import_data
# Define Save class
class CSVSave(Isave):
def save(self):
print("saved")
class LinearModel(Imodel):
"""Write you cool Model here"""
def run_model(self) -> pd.DataFrame:
"""Run your cool Model here"""
return self.processed_data
pm = ProcessingMethod(yf_Import, ProcessPipeline, CSVSave, "SPY")
test_pipeline = ModelPipeline(data_model=LinearModel, data_processing=pm)
test_pipeline.run_pipeline()
print(test_pipeline.result)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
No source distribution files available for this release.See tutorial on generating distribution archives.
Built Distribution
File details
Details for the file ez_data_pipeline-0.6-py3-none-any.whl
.
File metadata
- Download URL: ez_data_pipeline-0.6-py3-none-any.whl
- Upload date:
- Size: 7.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a42759f81b194c9d470ab7b2b5461fceeec610db79c56c33e1dcc65acee472ee |
|
MD5 | 6f1187a42291599e2ab9d98664bd769b |
|
BLAKE2b-256 | 8e6fa959f1eeeaabbf0b6f57ef475c6bb7449c6483b4c8b3c3220a56c8f938b2 |