Easy ETL
Project description
Glide: Easy ETL
Introduction
Glide was inspired by and uses a similar syntax to Consecution, which is an easy-to-use pipeline abstraction tool inspired by Apache Storm Topologies. Like those libraries, Glide is:
- A simple, reusable approach to building robust ETL pipelines
- A system for wiring together processing nodes to form a DAG
Glide also has:
- An expanding suite of built-in nodes and pipelines that extract, transform, and load data from/to any combination of:
- SQL databases (SQLite, DBAPI, and SQLAlchemy support)
- URLs
- Local or remote files including:
- CSVs
- Excel files (including multi-sheet support)
- Raw/generic files
- Emails
- Built-in nodes for Pandas DataFrame-based pipelines, including optional support for DataFrame transformation via Dask or Swifter
- A variety of node and DAG parallel processing strategies via concurrent.futures Executors or optional Dask support
- A simple decorator to generate a command line interface from a pipeline in ~one line of code
- The ability to control node contexts via defaults and/or simple runtime overrides
Table of Contents
Installation
⚠️ Warning: This project is still in an alpha state and should probably not be used in production.
$ pip install glide
Examples
The following examples serve as a quickstart to illustrate some core features and built-in nodes. More complete documentation is in progress and can be viewed here.
Glider is the main pipeline class that takes a DAG of Nodes as input and
then accepts data to process in its consume method. In most cases consume
will iterate over its input as-is passing each item to the DAG to be
processed.
Note: Some inputs, such as Pandas objects, strings, file objects, dicts, and callables are automatically wrapped in a list to prevent them from being broken up, as iteration is often inefficient or nonsensical in those cases.
The examples below assume you have used the following (taboo) shortcut to import all necessary node and pipeline classes:
from glide import *
The names of the built-in classes aim to be explicit and therefore can end up a bit longer given the many combinations of ways to process data with Glide. As a convention, nodes prefixed with "Row" expect to operate on plain old python iterables, while nodes prefixed with "DataFrame" propagate Pandas DataFrames.
Let's build some pipelines to explore Glide further...
Example: Read a CSV
Here is a trivial example that reads a CSV and passes all rows to a PrettyPrinter
node in a single push to be pretty-printed:
glider = Glider(
RowCSVExtractor("extract")
| PrettyPrinter("load")
)
glider.consume(["/path/to/file.csv"])
Example: DataFrame Transformation
Here is a slightly more realistic example applying a transformation to a DataFrame read from a CSV, in this case lowercasing all strings before loading into an output CSV:
def lower(s):
return s.lower() if type(s) == str else s
glider = Glider(
DataFrameCSVExtractor("extract")
| DataFrameApplyMapTransformer("transform")
| DataFrameCSVLoader("load", index=False, mode="a")
)
glider.consume(
["/path/to/infile.csv"],
extract=dict(chunksize=100),
transform=dict(func=lower),
load=dict(outfile="/path/to/outfile.csv"),
)
Node Context
The above example also demonstrates two separate ways to pass context to nodes:
- Passing kwargs when instantiating the node. This becomes a default context for the node any time it is used/reused.
- Passing kwargs to
consumethat are node_name->node_context pairs. This context lasts only for theconsumecall.
Note: Further details can be found in the node creation documentation.
Also Note: Many of the provided nodes pass their context to well-documented functions, such as
DataFrame.to_csvin the case ofDataFrameCSVLoader. Review the documentation/code for each node for more detail on how args are processed and which are required.
Example: Parallel DataFrame Transformation
Let's do the same thing with the data split in parallel processes using a
ProcessPoolExecutor at the transformation step. Note that we instead use a
DataFrameProcessPoolTransformer and adjusted the func argument to the
transformer since it operates on a chunk of the DataFrame instead of being fed
individual elements from the DataFrame as apply_map does under the hood in
the previous example:
def lower(s):
return s.lower() if type(s) == str else s
def df_lower(df):
df = df.applymap(lower)
return df
glider = Glider(
DataFrameCSVExtractor("extract")
| DataFrameProcessPoolTransformer("transform")
| DataFrameCSVLoader("load", index=False, mode="a")
)
glider.consume(
["infile.csv"],
transform=dict(func=df_lower),
load=dict(outfile="outfile.csv"),
)
Note: there are transformer nodes for using Swifter and Dask as well if you install those extensions.
Example: Placeholder Nodes
You can also easily drop replacement nodes into a templated pipeline. In this
case we use a PlaceholderNode for the extract node in the pipeline
definition and then replace that with a DataFrameCSVExtractor. The result is
a pipeline that can extract a CSV from one file, perform some custom
transformation on the DataFrame, and then load it to another CSV.
glider = Glider(
PlaceholderNode("extract")
| MyTransformer("transform")
| DataFrameCSVLoader("load", index=False, mode="a")
)
glider["extract"] = DataFrameCSVExtractor("extract")
glider.consume(
["/path/to/infile.csv"],
extract=dict(chunksize=100),
load=dict(outfile="/path/to/outfile.csv")
)
Note: Any node can be replaced by name.
PlaceholderNodeis just a convenience.
Example: Global State
A Glider can also have a shared context that can be used to populate node
arguments via its optional global_state argument:
conn = get_my_sqlalchemy_conn()
sql = "select * from in_table limit 10"
glider = Glider(
DataFrameSQLExtractor("extract")
| DataFrameSQLLoader("load", if_exists="replace", index=False),
global_state=dict(conn=conn) # conn will automagically be passed to any nodes that require it
)
glider.consume(
[sql],
load=dict(table="out_table")
)
Example: Parallel Pipelines via ParaGlider
Glide also has support for completely parallelizing pipelines using a
ParaGlider (who said ETL isn't fun?!?) instead of a Glider. The following
code will create a process pool and split processing of the inputs over the
pool, with each process running the entire pipeline on part of the consumed
data:
glider = ProcessPoolParaGlider(
RowCSVExtractor('extract')
| Printer('load')
)
glider.consume(
["/path/to/infile1.csv", "/path/to/infile2.csv"],
extract=dict(nrows=50)
)
Example: Parallel Branching
If you don't want to execute the entire pipeline in parallel, you can also branch into parallel execution in the middle of the DAG utilizing a parallel push node as in the following example:
glider = Glider(
RowCSVExtractor("extract", nrows=60)
| ProcessPoolPush("push", split=True)
| [Printer("load1"), Printer("load2"), Printer("load3")]
)
glider.consume(["/path/to/infile.csv"])
The above example will extract 60 rows from a CSV and then push equal slices
to the logging nodes in parallel processes. Using split=False (default)
would have passed the entire 60 rows to each logging node in parallel
processes.
Note: Once you branch off into processes there is currently no way to reduce/join the pipeline back into the original process and resume single-process operation on the multiprocessed results. However, that can be achieved with threads if necessary as shown in the next example.
Example: Thread Reducers
glider = Glider(
RowCSVExtractor("extract", nrows=60)
| ThreadPoolPush("push", split=True)
| [Printer("load1"), Printer("load2"), Printer("load3")]
| ThreadReducer("reducer")
| Printer("loadall")
)
glider.consume(["/path/to/infile.csv"])
The above code will split the data and push to the first 3 logging nodes in
multiple threads. The ThreadReducer won't push until all of the previous
nodes have finished, and then the final logging node will print all of the
results.
Summary of Parallel Processing
At this point it's worth summarizing the various ways you can attempt parallel processing using Glide:
- Method 1: Parallelization within nodes such as
DataFrameProcessPoolTransformer - Method 2: Completely parallel pipelines via
ParaGliders(each process executes the entire pipeline) - Method 3: Branched parallelism using parallel push nodes such as
ProcessPoolPushorThreadPoolPush
Each has its own utility and/or quirks. Method 1 is perhaps the most straightforward since you return to single process operation after the node is done doing whatever it needed to do in parallel, though the shuffling of data to/from subprocesses is not without cost. Method 2 may be useful and easy to understand in certain cases as well. Method 3 can lead to more complex/confusing flows and should likely only be used towards the end of pipelines to branch the output in parallel, such as if writing to several databases in parallel as a final step.
Note: combining the approaches may not work and has not been tested.
Also Note: standard limitations apply regarding what types of data can be serialized and passed to a parallel process.
Debug Logging
To enable debug logging for Glide change the log level of the "glide" logger:
import logging
logging.getLogger("glide").setLevel(logging.DEBUG)
Glide will then print debug information about data passed through your
pipeline. There are also a variety of print nodes you can place in your
pipeline for general logging or debugging, such as Printer, PrettyPrinter,
LenPrinter, ReprPrinter, and FormatPrinter. See the node documentation
for more info.
Creating Nodes
Creating nodes is quite simple. You must inherit from the Glide Node class
and you must define a run method that takes at least one positional argument
for the data being pushed to it. The run method should call
self.push(data) with the data it wants to push downstream.
Here is an example of a simple transformer node:
from glide import Node
class ExampleTransformer(Node):
def run(self, data):
# Do something to the data here
self.push(data)
Earlier we mentioned node context. This comes into play when run is called
on the node, as the required and optional parts of the context are inferred from
the positional and keyword args of run. Take for example:
class MyNode(Node):
def run(self, data, conn, chunksize=None, **kwargs):
# Some node-specific code here
self.push(data)
All nodes expect their first positional arg to be the data going through the
pipeline. This node also requires a conn argument, and has an optional
chunksize argument. These values can be filled in from the following inputs
in priority order, with earlier methods overriding those further down the
list:
1. Context args passed to consume for the particular node:
conn = get_my_db_conn()
glider.consume(
data,
my_node=dict(conn=conn, chunksize=100)
)
2. Default context set on the node at init time:
conn = get_my_db_conn()
glider = Glider(
MyNode("my_node", conn=conn, chunksize=100)
)
3. Global pipeline state passed via global_state. This only works for populating positional args currently:
conn = get_my_db_conn()
glider = Glider(
MyNode("my_node"),
global_state=dict(conn=conn)
)
CLI Generation
Glide allows creating parameterized command line scripts from any pipeline
with a simple decorator using the Glider.cli method. Consider the following simple
SQL extract and load pipeline:
glider = Glider(
RowSQLExtractor("extract")
| RowSQLLoader("load")
)
You can create a command line script from the glider object as follows:
@glider.cli()
def main(data, node_contexts):
glider.consume(data, **node_contexts)
if __name__ == "__main__":
main()
The script arguments, their types, and whether they are required or not is all
inferred by inspecting the run arguments on the nodes of the pipeline and
prefixing the node name. For example, RowSQLLoader requires a conn and a
table argument, as well as having a few optional arguments. Since the node
is named "load", the CLI will automatically generate required args called
--load_conn and --load_table.
By default, the first positional argument(s) expected on the CLI is used to
populate the data argument. Let's ignore the fact that you can't pass a real
database connection object on the command line for a second and see how you
would run this script:
$ python my_script.py "select * from input_table limit 10" \
--extract_conn foo \
--load_conn bar \
--load_table output_table
To pass multiple inputs to data you would simply do use space-separated
positional arguments:
$ python my_script.py "sql query 1" "sql query 2" \
--extract_conn foo \
--load_conn bar \
--load_table output_table
One way to populate the conn arguments of pipeline nodes is to define it in
the global_state or in the node initialization calls. In either case it is
no longer considered a required command line argument. So the following
would work:
glider = Glider(
RowSQLExtractor("extract")
| RowSQLLoader("load"),
global_state=dict(conn=get_my_db_conn())
)
$ python my_script.py "select * from input_table limit 10" \
--load_table output_table
Blacklisting Args
In the previous example it is no longer necessary to even have the node-specific connection arguments show up on the command line. You can blacklist the arg from ever getting put into the CLI as follows:
@glider.cli(blacklist=["conn"])
def main(data, node_contexts):
glider.consume(data, **node_contexts)
Or, if you just wanted to blacklist an argument that appears in multiple nodes
from a single node (such as the conn argument required in both the extract
and load nodes in this example), you could be more explicit and prefix the
node name:
@glider.cli(blacklist=["load_conn"])
def main(data, node_contexts):
glider.consume(data, **node_contexts)
That would remove load_conn from the CLI but not extract_conn.
Custom Arguments
You can also override or add any argument you want using the Arg class which
takes the standard argparse arguments:
from glide.core import Glider, Arg
glider = ...
@glider.cli(Arg("--load_table", required=False, default="output_table"))
def main(data, node_contexts):
glider.consume(data, **node_contexts)
And now, assuming you had used the Glider with conn passed in the
global_state, you could simple do:
$ python my_script.py "select * from input_table limit 10"
You can override the data positional argument in this way too if you want to
change the type/requirements:
@glider.cli(Arg("data", type=str, default="some default sql query"))
def main(data, node_contexts):
glider.consume(data, **node_contexts)
Parent CLIs
If you want to inherit or share arguments you can accomplish that using the
Parent and Arg decorators together. These are using
climax under the hood, which is
utilizing argparse. For example, the following script inherits a --dry_run
boolean CLI flag:
from glide.core import Parent, Arg
@Parent()
@Arg("--dry_run", action="store_true")
def parent_cli():
pass
@glider.cli(parents=[parent_cli])
def main(data, dry_run=False, node_contexts):
if dry_run:
something_else()
else:
glider.consume(data, **node_contexts)
Argument Injection and Clean Up
The script decorator also has the ability to inject values into arguments based on the result of a function, and call clean up functions for the various injected arguments. The following example shows two useful cases:
def get_data():
# do something to populate data iterable
return data
@glider.cli(
Arg("--load_table", required=False, default="output_table")
inject=dict(data=get_data, conn=get_my_db_conn),
clean=dict(conn=lambda x: x.close()),
)
def main(data, node_contexts, **kwargs):
glider.consume(data, **node_contexts)
Here we use the inject decorator argument and pass a dictionary that maps
injected argument names to callables that return the value. We inject a data
arg and a conn arg and neither are necessary for the command line. This
automatically blacklists those args from the command line as well. Since we
added the load_table arg and gave it a default as well, we can now simply
run:
$ python my_script.py
Note: Injected args are also passed to the wrapped function as keyword args.
Also Note: If an injected argument name is mapped to a non-callable via
injectthe value will be used as is. The main difference is those values are interpreted as soon as the module is loaded (when the decorator is init'd). If that is not desirable, pass a callable as shown above which will only be executed once the decorated function is actually called.
The clean decorator argument takes a dictionary that maps argument names to
callables that accept the argument value to perform some clean up. In this
case, it closes the database connection after the wrapped method is complete.
Extensions
Installing Extensions
To install all extensions and dev dependencies:
$ pip install glide[complete]
To just install Glide plus a specific extension, such as Dask:
$ pip install glide[dask]
To access installed extensions import from the glide.extensions submodules as necessary.
Extensions
New extensions are welcome! To add an extension:
- Review the examples of other extensions in
glide.extensions - Add tests for your extensions and don't forget to add support in
setup.py - Review and follow the steps in How to Contribute
Here are some current ideas for extensions/endpoints in case you need inspiration:
- NoSQL databases
- Google Analytics
- Google Ads
- Facebook Ads
- HTML Tables
- Salesforce
You get the idea.
Documentation
Documentation is a work in progress. Most of the built-in nodes and pipelines are fairly self-explanatory so you can supplement your knowledge by perusing the tests directory.
How to Contribute
- Check for open issues or open a new issue to start a discussion around a feature idea or a bug.
- Fork the repository on GitHub to start making your changes to the master branch (or branch off of it).
- Write a test which shows that the bug was fixed or that the feature works as expected.
- Send a pull request. Add yourself to AUTHORS.
In order to run tests you will need to set a GLIDE_CONFIG_FILE environment variable that points to a .ini file containing information shown in tests/sample_config.ini.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file glide-0.1.8.tar.gz.
File metadata
- Download URL: glide-0.1.8.tar.gz
- Upload date:
- Size: 43.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/39.0.1 requests-toolbelt/0.9.1 tqdm/4.33.0 CPython/3.6.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b3fd027da91d8bcd8d7fe437ce1d21cce70ae65a168fcd572455a71d5cd676bd
|
|
| MD5 |
31966e3a8704e4a9a8e3369e63edbc54
|
|
| BLAKE2b-256 |
8ce64279d5c01e198f21104a827e4025063f48fe7dc95d6d6841ba310f22afe4
|
File details
Details for the file glide-0.1.8-py3-none-any.whl.
File metadata
- Download URL: glide-0.1.8-py3-none-any.whl
- Upload date:
- Size: 44.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/39.0.1 requests-toolbelt/0.9.1 tqdm/4.33.0 CPython/3.6.6
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b2246b52f39bdaee09385cc2e38db03113f3f25bcdced6d57095343a588c9924
|
|
| MD5 |
e8680fa69059131b75bfce99eee737e5
|
|
| BLAKE2b-256 |
ef50104bb82a52a1693b3faf663c1e9c959af7d1a6a353e858ddc303ad719dc3
|