A convention over configuration workflow orchestrator. Develop locally (Jupyter or your favorite editor), deploy to Airflow or Kubernetes.
Project description
Ploomber is the simplest way to build reliable data pipelines for Data Science and Machine Learning. Provide your source code in a standard form and Ploomber will automatically construct the pipeline for you. Tasks can be anything from Python functions, Jupyter notebooks, Python/R/shell scripts, and SQL scripts.
Once your pipeline is constructed, you'll be equipped with lots of development features to experiment faster. When you're ready, deploy to Airflow or Kubernetes (using Argo) without code changes.
Here's how a pipeline task looks like:
Function | Jupyter notebook or Python script | SQL script | Pipeline declaration |
---|---|---|---|
def clean_users(product, upstream):
# runs 'get_users' before this task and
# passes the output location
df = pd.read_csv(upstream['get_users'])
# your code here...
# save output using the provided
# product variable
df.to_csv(product)
|
# + tags=["parameters"]
# run 'clean users' and 'clean_activity'
# before this task
upstream = ['clean_users', 'clean_activity']
# -
# a new code cell is injected here with
# the output location of this task
# (product) and dependencies: 'clean_users,
# 'clean_activity'
# your code here...
# save output using the provided product variable
Path(product).write_bytes(pickle.dumps(model))
|
-- {{product}} is replaced by the table name
CREATE TABLE AS {{product}}
/*
runs 'raw_data' before this task and replaces
{{upstream['raw_data']}} with table name
*/
SELECT * FROM {{upstream['raw_data']}}
|
tasks:
# script
- source: scripts/get_users.py
product: output/users-raw.csv
# function
- source: functions.clean_users
product: output/users-clean.csv
# notebook
- source: notebooks/model-template.ipynb
product:
model: output/model.pickle
nb: output/model-evaluation.html
|
To run your pipeline, call ploomber build
. For full flexibility, you can directly use the Python API. Click here to see an
example.
Main features
- Jupyter integration. When you open your notebooks, Ploomber will
automatically inject a new cell with the location of your input
files, as inferred from your
upstream
variable. If you open a Python or R script, it will be converted to a notebook on the fly. - Incremental builds. Speed up execution by skipping tasks whose source code hasn't changed.
- Parallelization. Run tasks in parallel to speed up computations.
- Pipeline testing. Run tests upon task execution to verify that the output data has the right properties (e.g. values within expected range).
- Pipeline inspection. Start an interactive session with
ploomber interact
to debug your pipeline. Calldag['task_name'].debug()
to start a debugging session. - Deployment to Kubernetes and Airflow. You can develop and execute locally. Once you are ready to deploy, export to Kubernetes or Airflow.
Try it out!
You can choose from one of the hosted options (no installation required):
Or run an example locally:
# clone the sample projects
git clone https://github.com/ploomber/projects
# move to the machine learning pipeline example
cd projects/spec-api-python
# install dependencies
# 1) if you have conda installed
conda env create -f environment.yml
conda activate spec-api-python
# 2) if you don't have conda
pip install ploomber pandas scikit-learn pyarrow sklearn-evaluation
# open README.ipynb or execute the following commands in the terminal...
# create output folder
mkdir output
# run the pipeline
ploomber build
When execution finishes, you'll see the output in the output/
folder.
More examples available here.
Installation
pip install ploomber
Compatible with Python 3.6 and higher.
How does Ploomber compare to X?
Ploomber has two goals:
- Provide an excellent development experience for Data Science/Machine learning projects, which require a lot of experimentation/iteration: incremental builds and Jupyter integration are a fundamental part of this.
- Integrate with deployment tools (Airflow and Argo) to streamline deployment.
For a complete comparison, read our survey on workflow management tools.
Resources
CHANGELOG
0.9.5 (2021-03-07)
- Changes a lot of error messages for clarity
- Clearer
__repr__
forPlaceholder
,File
, andMetaProduct
- Default placeholders can be used in
pipeline.yaml
without definingenv.yaml
- Better formatting for displaying DAG build and render errors
- Spec API initializes task spec as
SQLDump
if product has suffix.csv
or.parquet
- Coloring CLI error traceback
- Spec API skips
SourceLoader
if passing an absolute path DAG.clients
validates keys (usingDAGClients
)params
available as hook argument- Rewritten Spec API documentation
0.9.4 (2021-02-15)
- Better display of errors when building or rendering a DAG (layout and colors)
File
implements theos.PathLike
interface (this works now:pandas.read_parquet(File('file.parquet'))
)- Several error messages refactored for clarity
- Adds
DAGSpec.find()
to automatically findpipeline.yaml
0.9.3 (2021-02-13)
- Adds
OnlineDAG
to convertDAG
objects for in-memory inference - Spec API (
pipeline.yaml
) supports DAG-level and Task-levelserializer
andserializer
- CLI looks for
src/{pkg}/pipeline.yaml
ifpipeline.yaml
doesn't exist - Adds
{{cwd}}
placeholder forenv.yaml
that expands to current working directory
0.9.2 (2021-02-11)
- Support for Python 3.9
SQLAlchemyClient
now accepts an argument to pass custom parameters tosqlalchemy.create_engine
- Temporarily pins papermill version due to an incompatibility with jupytext and nbformat (jupytext does not support cell ids yet)
- Adds
--on-finish/-of
toploomber task
to execute theon_finish
hook - DAGs with R notebooks can render even if the ir kernel is not installed
0.9.1 (2021-02-01)
File
now supports aclient
argument to upload products to cloud storage- Adds
GCloudStorageClient
- Fixes error that caused jupyter to fail to initialize the dag when adding a function to a module already included in the YAML spec
- Fixes IPython namespace errors when using
ploomber interact
- Adds
ploomber.testing.sql.assert_no_duplicates_in_column
to check for record duplicates and optionally show duplicates statistics - Deprecates a few internal methods:
Table.save
,DAG.to_dict()
,Task.to_dict()
- Improvements to SQL static analyzer to warn when relations created
by a SQL script do not match
Product
- A few changes to
Metadata
(internal API) to cover some edge cases - Warning when
Product
metadata is corrupted - Adds new
meta.import_tasks_from
option in YAML specs to import tasks from another file
0.9 (2021-01-18)
- Deprecates
ploomber new
andploomber add
- Adds
ploomber scaffold
- Jupyter plugin now exports functions as notebooks using
jupyter_functions_as_notebooks
inpipeline.yaml
0.8.6 (2021-01-08)
ploomber add
generates template tasks and functions if they don't exist- Jupyter plugin now shows PythonCallable tasks as notebooks
0.8.5 (2020-12-14)
- Documentation tutorials re-organization and CSS fixes
- Improvements to the
InMemoryDAG
API - Minor bug fixes
File.__repr__
shows a relative path whenever possible
0.8.4 (2020-11-21)
- Adds support for passing glob-like patterns in
ploomber build
(viaDAGSpec.from_directory
)
0.8.3 (2020-11-15)
- Full Windows compatibility
- Adds documentation to show how to customize notebook output using
nbconvert
- Improvements to introductory tutorials
- Adds
--debug/-d
option toploomber build
to drop a debugger if an exception happens - Ensuring all dag-level, task-level and product-level clients are
closed after
dag.build()
is done - Minor bug fixes
0.8.2 (2020-10-31)
- Removes
matplotlib
from dependencies, now usingIPython.display
for inline plotting - Fixes bug that caused custom args to
{PythonCallable, NotebookRunner}.develop(args="--arg=value")
not to be sent correctly to the subprocess NotebookRunner
(initialized from ipynb) only considers the actual code as its source, ignores the rest of the JSON contents- Fixes bug when
EnvDict
was initialized from anotherEnvDict
PythonCallableSource
can be initialized with dotted pathsDAGSpec
loadsenv.yaml
when initialized with a YAML spec and there is aenv.yaml
file in the spec parent folderDAGSpec
converts relative paths in sources to be so to the project's root folder- Adds
lazy_import
toDAGspec
, to avoid importingPythonCallable
sources (passes the dotted paths as strings instead)
0.8.1 (2020-10-18)
ploomber interact
allows to switch DAG parameters, just likeploomber build
- Adds
PythonCallable.develop()
to develop Python functions interactively NotebookRunner.develop()
to develop now also works with Jupyter lab
0.8 (2020-10-15)
- Dropping support for Python 3.5
- Removes
DAGSpec.from_file
, loading from a file is now handled directly by theDAGSpec
constructor - Performance improvements, DAG does not fetch metadata when it doesn't need to
- Factory functions: Bool parameters with default values are now represented as flags when called from the CLI
- CLI arguments to replace values from
env.yaml
are now built with double hyphens instead of double underscores NotebookRunner
creates parent folders for output file if they don't exist- Bug fixes
0.7.5 (2020-10-02)
- NotebookRunner.develop accepts passing arguments to jupyter notebook
- Spec API now supports PythonCallable (by passing a dotted path)
- Upstream dependencies of PythonCallables can be inferred via the
extract_upstream
option in the Spec API - Faster
DAG.render(force=True)
(avoid checking metadata when possible) - Faster notebook rendering when using the extension thanks to the improvement above
data_frame_validator
improvement:validate_schema
can now validate optional columns dtypes- Bug fixes
0.7.4 (2020-09-14)
- Improved
__repr__
methods in PythonCallableSource and NotebookSource - Improved output layout for tables
- Support for nbconvert>=6
- Docstrings are parsed from notebooks and displayed in DAG status table (#242)
- Jupyter extension now works for DAGs defined via directories (via
ENTRY_POINT
env variable) - Adds Jupyter integration guide to documentation
- Several bug fixes
0.7.3 (2020-08-19)
- Improved support for R notebooks (
.Rmd
) - New section for
testing.sql
module in the documentation
0.7.2 (2020-08-17)
- New guides: parametrized pipelines, SQL templating, pipeline testing and debugging
NotebookRunner.debug(kind='pm')
for post-mortem debugging- Fixes bug in Jupyter extension when the pipeline has a task whose source is not a file (e.g. SQLDump)
- Fixes a bug in the CLI custom arg parser that caused dynamic params not to show up
DAGspec
now supportsSourceLoader
- Docstring (from dotted path entry point) is shown in the CLI summary
- Customized sphinx build to execute guides from notebooks
0.7.1 (2020-08-06)
- Support for R
- Adding section on R pipeline to the documentation
- Construct pipeline from a directory (no need to write a
pipeline.yaml
file) - Improved error messages when DAG fails to initialize (jupyter notebook app)
- Bug fixes
- CLI accepts factory function parameters as positional arguments,
types are inferred using type hints, displayed when calling
--help
- CLI accepts env variables (if any), displayed when calling
--help
0.7 (2020-07-30)
- Simplified CLI (breaking changes)
- Refactors internal API for notebook conversion, adds tests for common formats
- Metadata is deleted when saving a script from the Jupyter notebook app to make sure the task runs in the next pipeline build
- SQLAlchemyClient now supports custom tokens to split source
0.6.3 (2020-07-24)
- Adding
--log
option to CLI commands - Fixes a bug that caused the
dag
variable not to be exposed during interactive sessions - Fixes
ploomber task
forced run - Adds SQL pipeline tutorial to get started docs
- Minor CSS changes to docs
0.6.2 (2020-07-22)
- Support for
env.yaml
inpipeline.yaml
- Improved CLI. Adds
plot
,report
andtask
commands`
0.6.1 (2020-07-20)
- Changes
pipeline.yaml
default (extract_product: True) - Documentation re-design
- Simplified
ploomber new
generated files - Ability to define
product
in SQL scripts - Products are resolved to absolute paths to avoid ambiguity
- Bug fixes
0.6 (2020-07-08)
- Adds Jupyter notebook extension to inject parameters when opening a task
- Improved CLI
ploomber new
,ploomber add
andploomber entry
- Spec API documentation additions
- Support for
on_finish
,on_failure
andon_render
hooks in spec API - Improved validation for DAG specs
- Several bug fixes
0.5.1 (2020-06-30)
- Reduces the number of required dependencies
- A new option in DBAPIClient to split source with a custom separator
0.5 (2020-06-27)
- Adds CLI
- New spec API to instantiate DAGs using YAML files
- NotebookRunner.debug() for debugging and .develop() for interacive development
- Bug fixes
0.4.1 (2020-05-19)
- PythonCallable.debug() now works in Jupyter notebooks
0.4.0 (2020-05-18)
- PythonCallable.debug() now uses IPython debugger by default
- Improvements to Task.build() public API
- Moves hook triggering logic to Task to simplify executors implementation
- Adds DAGBuildEarlyStop exception to signal DAG execution stop
- New option in Serial executor to turn warnings and exceptions capture off
- Adds Product.prepare_metadata hook
- Implements hot reload for notebooks and python callables
- General clean ups for old
__str__
and__repr__
in several modules - Refactored ploomber.sources module and ploomber.placeholders (previously ploomber.templates)
- Adds NotebookRunner.debug() and NotebookRunner.develop()
- NotebookRunner: now has an option to run static analysis on render
- Adds documentation for DAG-level hooks
- Bug fixes
0.3.5 (2020-05-03)
- Bug fixes #88, #89, #90, #84, #91
- Modifies Env API: Env() is now Env.load(), Env.start() is now Env()
- New advanced Env guide added to docs
- Env can now be used with a context manager
- Improved DAGConfigurator API
- Deletes logger configuration in executors constructors, logging is available via DAGConfigurator
0.3.4 (2020-04-25)
- Dependencies cleanup
- Removed (numpydoc) as dependency, now optional
- A few bug fixes: #79, #71
- All warnings are captured and shown at the end (Serial executor)
- Moves differ parameter from DAG constructor to DAGConfigurator
0.3.3 (2020-04-23)
- Cleaned up some modules, deprecated some rarely used functionality
- Improves documentation aimed to developers looking to extend ploomber
- Introduces DAGConfigurator for advanced DAG configuration [Experimental API]
- Adds task to upload files to S3 (ploomber.tasks.UploadToS3), requires boto3
- Adds DAG-level on_finish and on_failure hooks
- Support for enabling logging in entry points (via
--logging
) - Support for starting an interactive session using entry points (via python -i -m)
- Improved support for database drivers that can only send one query at a time
- Improved repr for SQLAlchemyClient, shows URI (but hides password)
- PythonCallable now validates signature against params at render time
- Bug fixes
0.3.2 (2020-04-07)
- Faster Product status checking, now performed at rendering time
- New products: GenericProduct and GenericSQLRelation for Products that do not have a specific implementation (e.g. you can use Hive with the DBAPI client + GenericSQLRelation)
- Improved DAG build reports, subselect columns, transform to pandas.DataFrame and dict
- Parallel executor now returns build reports, just like the Serial executor
0.3.1 (2020-04-01)
- DAG parallel executor
- Interact with pipelines from the command line (entry module)
- Bug fixes
- Refactored access to Product.metadata
0.3 (2020-03-20)
- New Quickstart and User Guide section in documentation
- DAG rendering and build now continue until no more tasks can render/build (instead of failing at the first exception)
- New
@with_env
and@load_env
decorators for managing environments - Env expansion ({{user}} expands to the current, also {{git}} and {{version}} available)
Task.name
is now optional when Task is initialized with a source that has__name__
attribute (Python functions) or a name attribute (like Placeholders returned from SourceLoader)- New Task.on_render hook
- Bug fixes
- A lot of new tests
- Now compatible with Python 3.5 and higher
0.2.1 (2020-02-20)
- Adds integration with pdb via PythonCallable.debug
- Env.start now accepts a filename to look for
- Improvements to data_frame_validator
0.2 (2020-02-13)
- Simplifies installation
- Deletes BashCommand, use ShellScript
- More examples added
- Refactored env module
- Renames SQLStore to SourceLoader
- Improvements to SQLStore
- Improved documentation
- Renamed PostgresCopy to PostgresCopyFrom
- SQLUpload and PostgresCopy have now the same API
- A few fixes to PostgresCopy (#1, #2)
0.1
- First release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.