Skip to main content

A Python library for developing great data pipelines

Project description

ploomber

https://travis-ci.org/ploomber/ploomber.svg?branch=master Documentation Status

Click here for documentation

ploomber is an expressive workflow management library that provides incremental builds, testing and debugging tools to accelerate DS/ML pipeline development.

Install

If you want to try out everything ploomber has to offer:

pip install ploomber[all]

Note that installing everything will attemp to install pygraphviz, which depends on graphviz, you have to install that first:

# if you are using conda (recommended)
conda install graphviz
# if you are using homebew
brew install graphviz
# for other systems, see: https://www.graphviz.org/download/

If you want to start with the minimal amount of dependencies:

pip install ploomber

Example

from ploomber import DAG
from ploomber.products import File
from ploomber.tasks import PythonCallable, SQLDump
from ploomber.clients import SQLAlchemyClient

dag = DAG()

# the first task dumps data from the db to the local filesystem
task_dump = SQLDump('SELECT * FROM example',
                    File(tmp_dir / 'example.csv'),
                    dag,
                    name='dump',
                    client=SQLAlchemyClient(uri),
                    chunksize=None)

def _add_one(upstream, product):
    """Add one to column a
    """
    df = pd.read_csv(str(upstream['dump']))
    df['a'] = df['a'] + 1
    df.to_csv(str(product), index=False)

def on_finish(task):
    df = pd.read_csv(str(task.product))
    assert not df['a'].isna().sum()

# we convert the Python function to a Task
task_add_one = PythonCallable(_add_one,
                              File(tmp_dir / 'add_one.csv'),
                              dag,
                              name='add_one')
# verify there are no NAs in columns a
task_add_one.on_finish = on_finish

# declare how tasks relate to each other
task_dump >> task_add_one

# run the pipeline - incremental buids: ploomber will keep track of each
# task's source code and will only execute outdated tasks in the next run
dag.build()

# a DAG also serves as a tool to interact with your pipeline, for example,
# status will return a summary table
dag.status()

# start a debugging session (only works if task is a PythonCallable)
dag['add_one'].debug()

CHANGELOG

0.2.1 (2020-02-20)

  • Adds integration with pdb via PythonCallable.debug
  • Env.start now accepts a filename to look for
  • Improvements to data_frame_validator

0.2 (2020-02-13)

  • Simplifies installation
  • Deletes BashCommand, use ShellScript
  • More examples added
  • Refactored env module
  • Renames SQLStore to SourceLoader
  • Improvements to SQLStore
  • Improved documentation
  • Renamed PostgresCopy to PostgresCopyFrom
  • SQLUpload and PostgresCopy have now the same API
  • A few fixes to PostgresCopy (#1, #2)

0.1

  • First release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for ploomber, version 0.2.1
Filename, size File type Python version Upload date Hashes
Filename, size ploomber-0.2.1-py3-none-any.whl (82.6 kB) File type Wheel Python version py3 Upload date Hashes View
Filename, size ploomber-0.2.1.tar.gz (62.1 kB) File type Source Python version None Upload date Hashes View

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page