Skip to main content

Easy ETL

Project description

Glide: Easy ETL

Generic badge Code style: black Documentation Status License: MIT Python 3.6+

Introduction

Glide is an easy-to-use data pipelining tool inspired by Consecution and Apache Storm Topologies.

Like those libraries, Glide is:

  • A simple, reusable approach to building robust ETL pipelines
  • A system for wiring together processing nodes to form a directed acyclic graph (DAG)

Glide also has:

  • An expanding suite of built-in nodes and pipelines that extract, transform, and load data from/to any combination of:
    • SQL databases (SQLite, DBAPI, and SQLAlchemy support)
    • Local or remote files (CSVs, Excel, and raw file support)
    • URLs (JSON endpoints, file downloads, APIs, etc.)
    • HTML Tables
    • Emails
  • Extensions for Pandas, Dask, Celery, Redis Queue and more
  • A variety of node and DAG parallel/distributed processing strategies
  • A simple decorator to generate a command line interface from a pipeline in ~one line of code
  • Flexible pipeline templating

Glide is not a task orchestration and/or dependency management tool like Airflow. Use Glide to define your easily developed/contained/reusable/testable data processing pipelines and then rely on a tool like Airflow to do what it's good at, namely scheduling and complex task dependency management.

Table of Contents

Installation

⚠️ Warning: This project is in an alpha state and is rapidly changing. It should not be used in production.

$ pip install glide

Primer

You are encouraged to take a deeper look at the docs, but the short of it is the following:

1. A Node is a part of a pipeline which has a run method that typically accepts data from upstream nodes, and pushes data to downstream nodes. For example:

class MyNode(Node):
    def run(self, data):
        # Some node-specific code here
        self.push(data)

2. A Glider is a pipeline of Node objects wired together in a DAG. It accepts input data in its consume method. For example:

glider = Glider(
    MyExtractNode("extract")
    | MyTransformNode("transform")
    | MyLoadNode("load")
)
glider.consume(data)

If a node's run method has additional parameters, they are populated from the node's context. More info on creating nodes and populating runtime context can be found here.

Examples

The following examples serve to quickly illustrate some core features and built-in nodes. There is much more Glide can do that is not shown here. Everything below assumes you have used the following shortcut to import all necessary node and pipeline classes:

from glide import *

Example: CSV Extract, Transform, and Load

Apply a transformation to data from a CSV, use a function to lowercase all strings, and load into an output CSV:

def lower_rows(data):
    for row in data:
        for k, v in row.items():
            row[k] = v.lower() if type(v) == str else v
    return data

glider = Glider(
    CSVExtract("extract")
    | Func("transform", func=lower_rows)
    | CSVLoad("load")
)
glider.consume(
    ["/path/to/infile.csv"],
    extract=dict(chunksize=100),
    load=dict(outfile="/path/to/outfile.csv"),
)

Example: SQL Extract and Load

Read from one table, write to another:

conn = get_my_sqlalchemy_conn()
sql = "select * from in_table limit 10"

glider = Glider(
    SQLExtract("extract")
    | SQLLoad("load"),
    global_state=dict(conn=conn) # conn is automagically passed to any nodes that accept a "conn" argument
)
glider.consume(
    [sql],
    load=dict(table="out_table")
)

Example: SQL Transactions

Start a transaction before writing to a database, rollback on failure:

glider = Glider(
    SQLExtract("extract")
    | SQLTransaction("tx")
    | SQLLoad("load", rollback=True),
    global_state=dict(conn=conn)
)
glider.consume(...)

Example: DataFrames

The Pandas extension allows you to operate with DataFrames:

def lower(s):
    return s.lower() if type(s) == str else s

glider = Glider(
    DataFrameCSVExtract("extract")
    | DataFrameApplyMap("transform", func=lower)
    | DataFrameCSVLoad("load", index=False, mode="a")
)
glider.consume(...)

See the extension tests and code for more examples and documentation.

Example: URL Extraction

Extract data from each URL in the list of requests and load to a URL endpoint:

glider = Glider(URLExtract("extract") | URLLoad("load"))
reqs = [
    "https://jsonplaceholder.typicode.com/todos/1",
    "https://jsonplaceholder.typicode.com/todos/2",
]
glider.consume(
    reqs,
    extract=dict(data_type="json"),
    load=dict(
        url="https://jsonplaceholder.typicode.com/todos",
        data_param="json",
        headers={"Content-type": "application/json; charset=UTF-8"},
    ),
)

Example: JSON Converters

Load JSON from a string:

glider = Glider(URLExtract("extract") | JSONLoads("json"))
reqs = ["https://jsonplaceholder.typicode.com/todos/1"]
glider.consume(reqs, extract=dict(data_type="text"))

Or dump it to a string with the JSONDumps node.

Example: Filters

Filter the propagation of data based on the result of a function:

def data_check(node, data):
    # do some check on data, return True/False to control filtering
    return True

glider = Glider(
    MyExtract("extract")
    | Filter("filter", func=data_check)
    | MyLoad("load")
)

Example: IterPush

Push each row of an input iterable individually:

glider = Glider(
    CSVExtract("extract", nrows=20)
    | IterPush("iter")
    | Print("load")
)

Example: SplitPush

Split an iterable before pushing:

glider = Glider(SplitPush("push", split_count=2) | Print("print"))
glider.consume([range(4)])

Example: SplitByNode

Split an iterable evenly among downstream nodes:

glider = Glider(SplitByNode("push") | [Print("print1"), Print("print2")])
glider.consume([range(4)])

Example: Window Processing

Push a sliding window of the data:

glider = Glider(
    CSVExtract("extract", nrows=5)
    | WindowPush("window", size=3)
    | MyWindowCalcNode("calc")
)

Example: Date Windows

Generate a set of datetime windows and push them downstream:

import datetime

today = datetime.date.today()
glider = Glider(DateTimeWindowPush("windows") | PrettyPrint("print"))
glider.consume(
    None,
    windows=dict(
        start_date=today - datetime.timedelta(days=3), end_date=today, num_windows=2
    )
)

Or use DateWindowPush for date objects. Note that None is passed as the first arg to consume because the top node (DateTimeWindowPush) is a subclass of NoInputNode which takes no input data and generates data to push on its own.

Example: Parallel Transformation

Call a function in parallel processes on equal splits of data from a CSV:

glider = Glider(
    CSVExtract("extract")
    | ProcessPoolSubmit("transform", push_type=PushTypes.Result)
    | CSVLoad("load")
)
glider.consume(
    ["infile.csv"],
    transform=dict(func=lower_rows),
    load=dict(outfile="outfile.csv"),
)

We passed push_type=PushTypes.Result to force ProcessPoolSubmit to fetch and combine the asynchronous results before pushing to the downstream node. The default is to just pass the asynchronous task/futures objects forward, so the following would be equivalent:

glider = Glider(
    CSVExtract("extract")
    | ProcessPoolSubmit("transform")
    | FuturesReduce("reduce")
    | Flatten("flatten")
    | CSVLoad("load")
)

The FuturesReduce node waits for the results from each futures object, and then Flatten will combine each subresult back together into a single result to be loaded in the final CSVLoad node.

Example: Parallel Pipelines via ParaGlider

Completely parallelize a pipeline using a ParaGlider (who said ETL isn't fun?!?). Split processing of the inputs (two files in this case) over the pool, with each process running the entire pipeline on part of the consumed data:

glider = ProcessPoolParaGlider(
    CSVExtract('extract')
    | Print('load')
)
glider.consume(
    ["/path/to/infile1.csv", "/path/to/infile2.csv"],
    extract=dict(nrows=50)
)

Example: Parallel Branching

Branch into parallel execution in the middle of the DAG utilizing a parallel push node:

glider = Glider(
    CSVExtract("extract", nrows=60)
    | ProcessPoolPush("push", split=True)
    | [Print("load1"), Print("load2"), Print("load3")]
)
glider.consume(["/path/to/infile.csv"])

The above example will extract 60 rows from a CSV and then push equal slices to the logging nodes in parallel processes. Using split=False (default) would have passed the entire 60 rows to each logging node in parallel processes.

Once you branch off into processes with a parallel push node there is no way to reduce/join the pipeline back into the original process and resume single-process operation. The entire remainder of the pipeline is executed in each subprocess. However, that is possible with threads as shown in the next example.

Example: Thread Reducers

glider = Glider(
    CSVExtract("extract", nrows=60)
    | ThreadPoolPush("push", split=True)
    | [Print("load1"), Print("load2"), Print("load3")]
    | ThreadReduce("reduce")
    | Print("loadall")
)
glider.consume(["/path/to/infile.csv"])

The above code will split the data and push to the first 3 logging nodes in multiple threads. The ThreadReduce node won't push until all of the previous nodes have finished, and then the final logging node will print all of the results.

Example: Templated Nodes and Pipelines

Drop replacement nodes into an existing pipeline. Any node can be replaced by name:

glider = Glider(
    PlaceholderNode("extract")
    | CSVLoad("load")
)
glider["extract"] = CSVExtract("extract")
glider.consume(...)

Or reuse an existing structure of nodes with a NodeTemplate:

nodes = NodeTemplate(
    CSVExtract("extract")
    | CSVLoad("load")
)
glider = Glider(nodes()) # Copy of nodes created with each call

Or reuse an existing pipeline structure with GliderTemplate:

template = GliderTemplate(
    CSVExtract("extract")
    | CSVLoad("load")
)
glider = template() # Copy of pipeline created with each call

Example: Data Integrity Checks

You can use the AssertFunc node to assert that some condition of the data is met:

glider = Glider(
    CSVExtract("extract", chunksize=10, nrows=20)
    | AssertFunc("length_check", func=lambda node, data: len(data) == 10)
    | CSVLoad("load") 
)

The func callable must accept two parameters, a reference to the node object and the data passed into that node. Any truthy value returned will pass the assertion test.

Similarly, you can do a sql-based check with AssertSQL, in this case simply verifying the number of rows inserted:

glider = Glider(
    SQLExtract("extract")
    | SQLLoad("load")
    | AssertSQL("sql_check")
)

sql = "select * from in_table limit 10"
assert_sql = "select (select count(*) as x from out_table) == 10 as assert"

glider.consume(
    [sql],
    extract=dict(conn=in_conn),
    load=dict(conn=out_conn, table="out_table"),
    sql_check=dict(conn=out_conn, sql=assert_sql)
)

This looks for a truthy value in the assert column of the result to pass the assertion. You can also use the data_check option of AssertSQL to instead have it do a comparison to the result of some function of the data:

glider = ...

sql = "select * from in_table limit 10"
assert_sql = "select count(*) as assert from out_table"

glider.consume(
    [sql],
    extract=dict(conn=in_conn),
    load=dict(conn=out_conn, table="out_table", push_data=True),
    sql_check=dict(
        conn=out_conn,
        sql=assert_sql,
        data_check=lambda node, data: len(data)
    )
)

Note that we also added push_data=True to the SQLLoad node to have it push the data instead of a table name.

Example: Debugging

To enable debug logging for Glide change the log level of the "glide" logger:

import logging
logging.getLogger("glide").setLevel(logging.DEBUG)

Glide will then print debug information about data passed through your pipeline.

You can also pass _log=True to the init method of any node to enable logging of processed data:

glider = Glider(
    SQLExtract("extract", _log=True)
    ...
)

Additionaly, you can pass _debug=True to the init method of any node to cause the node to drop into PDB right before calling run, assuming you aren't executing the pipeline in a subprocess:

glider = Glider(
    SQLExtract("extract", _debug=True)
    ...
)

Finally, there are a variety of print nodes you can place in your pipeline for general logging or debugging, such as Print, PrettyPrint, LenPrint, ReprPrint, and FormatPrint. See the node documentation for more info.

Example: Profiling Pipelines

Insert a Profile node somewhere in your pipeline to get profiler information for all downstream nodes:

glider = Glider(
    Profile("profile")
    ...
)

Example: Complex Pipelines

If the hierarchy of nodes you want to form is not achievable with the | operator, you can use the add_downstream Node method to form more complex graphs. More info can be found here.

Example: Plotting Pipeline DAGs

If you have the Graphviz package installed, you can generate a plot of your pipelines by simply doing the following:

glider = Glider(...)
glider.plot("/path/to/filename.png")

CLI Generation

With Glide you can create parameterized command line scripts from any pipeline with a simple decorator:

glider = Glider(
    SQLLoad("extract")
    | SQLExtract("load")
)

@glider.cli()
def main(glide_data, node_contexts):
    glider.consume(glide_data, **node_contexts)

if __name__ == "__main__":
    main()

The script arguments, their types, and whether they are required or not is all inferred by inspecting the run arguments on the nodes of the pipeline and prefixing the node name. Please see the full documentation here for more details.

Extensions

To install all extensions and dev dependencies:

$ pip install glide[complete]

You can also just install Glide plus a specific extension:

$ pip install glide[dask]
$ pip install glide[celery]
$ pip install glide[rq]
$ pip install glide[swifter]

To access installed extensions import from the glide.extensions submodules as necessary. Review the documentation and tests for current extensions for help getting started.

Pandas

Note that the Pandas extension is actually supported by default with all glide installs.

See the extension docs here for node/pipeline reference information. See the tests here for some additional examples.

Dask - Experimental

See the extension docs here for node/pipeline reference information. See the tests here for some additional examples.

Celery - Experimental

See the extension docs here for node/pipeline reference information. See the tests here for some additional examples.

Redis Queue - Experimental

See the extension docs here for node/pipeline reference information. See the tests here for some additional examples.

Swifter - Experimental

See the extension docs here for node/pipeline reference information. See the tests here for some additional examples.

Documentation

More thorough documentation can be found here. You can supplement your knowledge by perusing the tests directory or the module reference.

How to Contribute

See the CONTRIBUTING guide.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

glide-0.2.18.tar.gz (52.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

glide-0.2.18-py3-none-any.whl (58.0 kB view details)

Uploaded Python 3

File details

Details for the file glide-0.2.18.tar.gz.

File metadata

  • Download URL: glide-0.2.18.tar.gz
  • Upload date:
  • Size: 52.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/39.0.1 requests-toolbelt/0.9.1 tqdm/4.33.0 CPython/3.6.6

File hashes

Hashes for glide-0.2.18.tar.gz
Algorithm Hash digest
SHA256 b4ef7ea1a65c1605f25804dacc25719417ba5fd1f539f3b38fc26eadab6aa0c3
MD5 89983e5f3af5cb80c03674481773e608
BLAKE2b-256 5fb3d299bbb6930a734b3d97242b9ce372b0f05c96c54197451a521c7fb9187f

See more details on using hashes here.

File details

Details for the file glide-0.2.18-py3-none-any.whl.

File metadata

  • Download URL: glide-0.2.18-py3-none-any.whl
  • Upload date:
  • Size: 58.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/39.0.1 requests-toolbelt/0.9.1 tqdm/4.33.0 CPython/3.6.6

File hashes

Hashes for glide-0.2.18-py3-none-any.whl
Algorithm Hash digest
SHA256 1f3e5b05037284b3802fc2709f21af61d89395d12ab3433a74348e4f4e604d55
MD5 379cb1ca40d18c634edffec9fd0a5feb
BLAKE2b-256 a8270bb5df593b3009abce6567a91b703591e37b8c6bdce9ad7e35a2cd32c1ff

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page