A framework for building and running simple data engineering pipelines in Python.
Project description
PyDuct
A framework for building and running simple data engineering pipelines in Python.
In Data Science or Data Engineering you constantly hear term “data pipeline”. But there are so many meanings to this term and people often are refering to very specific tools or packages depending on their own background/needs. There are pipelines for pretty much everything and in Python alone I can think of Luigi, Airflow, scikit-learn pipelines, and Pandas pipes just off the top of my head - this article does a good job of helping you understand what is out there.
It can be quite confusing especially if you want a simple and agnostic pipeline that you can customize for your specific needs with no bells and whistles or lock-ins to libraries etc. That is where PyDuct comes in. It is for the simple data engineer who just wants to get stuff done in an ordered and repeatable way.
PyDuct is a simple data pipeline that automates a chain of transformations performed on some data.
PyDuct data pipelines are a great way of introducing automation, reproducibility, structure, and flow to your data engineering projects.
PyDuct was made by Robert Johnson and Alexander Kozlov and Mohammadreza Khanarmuei
What is it?
The PyDuct transformation pipelines use user defined transformation functions linked together into a TransformationPipe. The key feature of PyDuct is that the datasource passed in can be almost anything that you desire - e.g. a pandas dataframe, a geopandas dataframe, and iris datacube, a numppy array, so long as your transformation steps read and write the same object PyDuct will work for you.
Install
pip install pyduct
How to use
The TransformationPipe class accepts a list of transformation functions,'steps', to be applied sequentially. Each step contains a name and a function, applied to the input DataObject and will return a transformed DataObject. There is also a third argument in a step that is an optional dictionary of parameters to be passed to your step transformation functions.
In order to use PyDuct you need two things - a DataObject and a set of transformation steps
DataObject
In this very simplified example we will use a geopandas.GeoDataFrame as our input DataObject. To do this we will load an example data set from Kaggle on the global distribution of Volcano Eruptions: https://www.kaggle.com/datasets/texasdave/volcano-eruptions that we have stored in the repo for this package as 'volcano_data_2010.csv'
import pandas
import geopandas
Load the data and put it into a geopandas dataframe:
df1 = pandas.read_csv('../test_data/volcano_data_2010.csv')
# Keep only relevant columns
df = df1.loc[:, ("Year", "Name", "Country", "Latitude", "Longitude", "Type")]
# Create point geometries
geometry = geopandas.points_from_xy(df.Longitude, df.Latitude)
geo_df = geopandas.GeoDataFrame(df[['Year','Name','Country', 'Latitude', 'Longitude', 'Type']], geometry=geometry)
geo_df.head()
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
Year | Name | Country | Latitude | Longitude | Type | geometry | |
---|---|---|---|---|---|---|---|
0 | 2010 | Tungurahua | Ecuador | -1.467 | -78.442 | Stratovolcano | POINT (-78.44200 -1.46700) |
1 | 2010 | Eyjafjallajokull | Iceland | 63.630 | -19.620 | Stratovolcano | POINT (-19.62000 63.63000) |
2 | 2010 | Pacaya | Guatemala | 14.381 | -90.601 | Complex volcano | POINT (-90.60100 14.38100) |
3 | 2010 | Sarigan | United States | 16.708 | 145.780 | Stratovolcano | POINT (145.78000 16.70800) |
4 | 2010 | Karangetang [Api Siau] | Indonesia | 2.780 | 125.480 | Stratovolcano | POINT (125.48000 2.78000) |
Steps
Just as an example of something to do we will define only one transformation steps to spatially subset to the Australian region. Yes, i know that this is an unrealistic example but it is just here to show you how to implement pipelines.
We must now write our transformation function - keep in mind that the function must take our DataObject as an input and return a transformed DataObject as a return... in this example that is a geopandas.GeoDataFrame
from pyproj import crs
from shapely.geometry import Polygon, MultiPolygon, box, Point
def spatialCrop(gdf: geopandas.GeoDataFrame, **kwargs):
"""
This function will apply a sptial limit to a GeoDataFrame based on user-defined limits.
----------
parameters:
gdf (geopandas.GeoDataFrame): an input GeoDataFrame
kwargs (dict): parameters,
- boundingBox (list): an iterable (lon_min, lat_min, lon_max, lat_max) of the specified region.
Output:
transformed_gdf (gdp.GeoDataFrame): GeoDataFrame that is spatially limited to the boundingBox.
"""
if "boundingBox" not in kwargs:
return gdf
boundingBox = kwargs["boundingBox"]
# just an example so we are doing naughty things with the CRS... look away here...
coord_system = crs.crs.CRS('WGS 84')
bounding = geopandas.GeoDataFrame(
{
'limit': ['bounding box'],
'geometry': [
box(boundingBox[0], boundingBox[1], boundingBox[2],
boundingBox[3])
]
},
crs=coord_system)
limited_gdf = geopandas.tools.sjoin(gdf,
bounding,
op='intersects',
how='left')
limited_gdf = limited_gdf[limited_gdf['limit'] == 'bounding box']
limited_gdf = limited_gdf.drop(columns=['index_right', 'limit'])
return limited_gdf
Define a PyDuct Pipe
Now that we have a step or function and some data we can now define our transformation pipeline:
pipe = TransformationPipe(steps=[
('refine region', spatialCrop, {"boundingBox": [80, -50, 180, 0]})
])
Evaluate your PyDuct Pipe
This where things get interesting... we can now call evaluate
on our pipe and watch the magic happen:
Input data:
geo_df
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
Year | Name | Country | Latitude | Longitude | Type | geometry | |
---|---|---|---|---|---|---|---|
0 | 2010 | Tungurahua | Ecuador | -1.467 | -78.442 | Stratovolcano | POINT (-78.44200 -1.46700) |
1 | 2010 | Eyjafjallajokull | Iceland | 63.630 | -19.620 | Stratovolcano | POINT (-19.62000 63.63000) |
2 | 2010 | Pacaya | Guatemala | 14.381 | -90.601 | Complex volcano | POINT (-90.60100 14.38100) |
3 | 2010 | Sarigan | United States | 16.708 | 145.780 | Stratovolcano | POINT (145.78000 16.70800) |
4 | 2010 | Karangetang [Api Siau] | Indonesia | 2.780 | 125.480 | Stratovolcano | POINT (125.48000 2.78000) |
... | ... | ... | ... | ... | ... | ... | ... |
58 | 2018 | Kilauea | United States | 19.425 | -155.292 | Shield volcano | POINT (-155.29200 19.42500) |
59 | 2018 | Kadovar | Papua New Guinea | -3.620 | 144.620 | Stratovolcano | POINT (144.62000 -3.62000) |
60 | 2018 | Ijen | Indonesia | -8.058 | 114.242 | Stratovolcano | POINT (114.24200 -8.05800) |
61 | 2018 | Kilauea | United States | 19.425 | -155.292 | Shield volcano | POINT (-155.29200 19.42500) |
62 | 2018 | Aoba | Vanuatu | -15.400 | 167.830 | Shield volcano | POINT (167.83000 -15.40000) |
63 rows × 7 columns
Evaluation:
transformed_geo_df = pipe.evaluate(geo_df)
Transformed data:
transformed_geo_df
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
Year | Name | Country | Latitude | Longitude | Type | geometry | |
---|---|---|---|---|---|---|---|
6 | 2010 | Merapi | Indonesia | -7.542 | 110.442 | Stratovolcano | POINT (110.44200 -7.54200) |
8 | 2010 | Tengger Caldera | Indonesia | -7.942 | 112.950 | Stratovolcano | POINT (112.95000 -7.94200) |
9 | 2011 | Merapi | Indonesia | -7.542 | 110.442 | Stratovolcano | POINT (110.44200 -7.54200) |
22 | 2013 | Merapi | Indonesia | -7.542 | 110.442 | Stratovolcano | POINT (110.44200 -7.54200) |
23 | 2013 | Paluweh | Indonesia | -8.320 | 121.708 | Stratovolcano | POINT (121.70800 -8.32000) |
25 | 2013 | Paluweh | Indonesia | -8.320 | 121.708 | Stratovolcano | POINT (121.70800 -8.32000) |
29 | 2013 | Okataina | New Zealand | -38.120 | 176.500 | Lava dome | POINT (176.50000 -38.12000) |
31 | 2014 | Kelut | Indonesia | -7.930 | 112.308 | Stratovolcano | POINT (112.30800 -7.93000) |
39 | 2015 | Manam | Papua New Guinea | -4.100 | 145.061 | Stratovolcano | POINT (145.06100 -4.10000) |
41 | 2015 | Okataina | New Zealand | -38.120 | 176.500 | Lava dome | POINT (176.50000 -38.12000) |
45 | 2016 | Rinjani | Indonesia | -8.420 | 116.470 | Stratovolcano | POINT (116.47000 -8.42000) |
50 | 2017 | Dieng Volc Complex | Indonesia | -7.200 | 109.920 | Complex volcano | POINT (109.92000 -7.20000) |
52 | 2017 | Aoba | Vanuatu | -15.400 | 167.830 | Shield volcano | POINT (167.83000 -15.40000) |
53 | 2017 | Merapi | Indonesia | -7.542 | 110.442 | Stratovolcano | POINT (110.44200 -7.54200) |
55 | 2018 | Kadovar | Papua New Guinea | -3.620 | 144.620 | Stratovolcano | POINT (144.62000 -3.62000) |
59 | 2018 | Kadovar | Papua New Guinea | -3.620 | 144.620 | Stratovolcano | POINT (144.62000 -3.62000) |
60 | 2018 | Ijen | Indonesia | -8.058 | 114.242 | Stratovolcano | POINT (114.24200 -8.05800) |
62 | 2018 | Aoba | Vanuatu | -15.400 | 167.830 | Shield volcano | POINT (167.83000 -15.40000) |
The power of this work is in its reproducibility and scalablilty.
Credits
- Logo art from "Vecteezy.com"
- Demo data from "Kaggle.com"
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.