A Python library for developing great data pipelines
Project description
ploomber
Click here for documentation. Code on Github.
ploomber is an expressive workflow management library that provides incremental builds, testing and debugging tools to accelerate DS/ML pipeline development.
Compatible with Python 3.5 and higher.
At a glance
Airflow-like syntax to declare pipelines as code
Interactive. Once a pipeline is declared, you can build it right away, inspect it, check status or even generate HTML reports, use it in a Python session or a Jupyter notebook
Fast experimentation. Modify your source code and ploomber will figure out which tasks to run and which ones to skip based on source code changes
Test-driven. Use on_finish hooks to check data assumptions (e.g. verify that an output data frame has no NAs, an “age” column has only positive numbers, etc)
SQL-friendly. SQL scripts can be parametrized for simplicity and consistency using jinja2
Notebook-friendly. Generate reports in Jupyter notebook format (or HTML) by executing parametrized notebooks using papermill
Switch configurations. Automatically switch pipeline configuration based on the current environment, for example, make all pipeline output files go to /data/project/{{user}}, where {{user}} will be automatically replaced depending on the current user
Reduce boilerplate code by using the Tasks library, for example, dump SQL tables or upload a local file to a database
Install
If you want to try out everything ploomber has to offer:
pip install "ploomber[all]"
Note that installing everything will attemp to install pygraphviz, which depends on graphviz, you have to install that first:
# if you are using conda (recommended)
conda install graphviz
# if you are using homebew
brew install graphviz
# for other systems, see: https://www.graphviz.org/download/
If you want to start with the minimal amount of dependencies:
pip install ploomber
Example
from ploomber import DAG
from ploomber.products import File
from ploomber.tasks import PythonCallable, SQLDump
from ploomber.clients import SQLAlchemyClient
dag = DAG()
# the first task dumps data from the db to the local filesystem
task_dump = SQLDump('SELECT * FROM example',
File(tmp_dir / 'example.csv'),
dag,
name='dump',
client=SQLAlchemyClient(uri),
chunksize=None)
def _add_one(upstream, product):
"""Add one to column a
"""
df = pd.read_csv(str(upstream['dump']))
df['a'] = df['a'] + 1
df.to_csv(str(product), index=False)
def on_finish(task):
df = pd.read_csv(str(task.product))
assert not df['a'].isna().sum()
# we convert the Python function to a Task
task_add_one = PythonCallable(_add_one,
File(tmp_dir / 'add_one.csv'),
dag,
name='add_one')
# verify there are no NAs in columns a
task_add_one.on_finish = on_finish
# declare how tasks relate to each other
task_dump >> task_add_one
# run the pipeline - incremental builds: ploomber will keep track of each
# task's source code and will only execute outdated tasks in the next run
dag.build()
# a DAG also serves as a tool to interact with your pipeline, for example,
# status will return a summary table
dag.status()
# start a debugging session (only works if task is a PythonCallable)
dag['add_one'].debug()
CHANGELOG
0.3.2 (2020-04-07)
Faster Product status checking, now performed at rendering time
New products: GenericProduct and GenericSQLRelation for Products that do not have a specific implementation (e.g. you can use Hive with the DBAPI client + GenericSQLRelation)
Improved DAG build reports, subselect columns, transform to pandas.DataFrame and dict
Parallel executor now returns build reports, just like the Serial executor
0.3.1 (2020-04-01)
DAG parallel executor
Interact with pipelines from the command line (entry module)
Bug fixes
Refactored access to Product.metadata
0.3 (2020-03-20)
New Quickstart and User Guide section in documentation
DAG rendering and build now continue until no more tasks can render/build (instead of failing at the first exception)
New @with_env and @load_env decorators for managing environments
Env expansion ({{user}} expands to the current, also {{git}} and {{version}} available)
Task.name is now optional when Task is initialized with a source that has __name__ attribute (Python functions) or a name attribute (like Placeholders returned from SourceLoader)
New Task.on_render hook
Bug fixes
A lot of new tests
Now compatible with Python 3.5 and higher
0.2.1 (2020-02-20)
Adds integration with pdb via PythonCallable.debug
Env.start now accepts a filename to look for
Improvements to data_frame_validator
0.2 (2020-02-13)
Simplifies installation
Deletes BashCommand, use ShellScript
More examples added
Refactored env module
Renames SQLStore to SourceLoader
Improvements to SQLStore
Improved documentation
Renamed PostgresCopy to PostgresCopyFrom
SQLUpload and PostgresCopy have now the same API
A few fixes to PostgresCopy (#1, #2)
0.1
First release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.