A data pipeline framework.
Project description
Normandy
v0.2 by Epsilon DataLabs
Normandy is a python framework for data pipelines, which main objective is standardizing your team code and provide a data treatment methodology flexible to your team needs.
Installing Normandy
Normandy is available via PIP using:
pip install normandy
Features
With Normandy you can define your data pipeline system and within it several data flows with each one use or produce different kind of data.
First let's explain the basic Normandy terminology:
-
Flow: Is a complete data process (ETL), you may have any number of flows on a Normandy project and each one may or may not share code or data.
-
Steps: A Normandy flow is composed by any number of steps, this steps are one or more processes, the steps works as a guideline of priorities, processes from one step cannot start before the processes of the previous step has ended.
-
Processes: This are the lower level of granularity in Normandy, each process belong to a step, and all processes from one step are processed in parallel, which mean that they can share the input data but you must be cautious that each process have different outputs.
So, Normandy let you:
-
Create multiple data flows, and share code between then avoiding repeat code.
-
Parallel processing at process level, allowing you, for example, to read from several sources or produce several outputs from the same data set simultaneously.
-
A standardized form of coding.
-
A clean and understandable data pipeline structure.
Creating a Normandy project
Normandy offer a easy files structure, nevertheless, you may create it using the create project
command with the path where to create the project.
normandy --create-project -file-path write/project/path
If the given path do not exist Normandy will create it.
This command will also create a template of the pipeline_conf.yml
file used to configure Normandy behavior.
How to use Normandy
The Normandy behavior is configured in the pipeline_conf.yml
file, it has two sections:
-
flows: In this section is defined every aspect of each flow you want to use.
-
confs: Here are settled general configurations like environments.
Defining Flows
Each flow has two options that need to be settled, those are tags, and steps.
In the tags sections you must specify with which tags the flow will run, those work to carefully select which flows you want to run in each command.
If several flows have the same tag, all flows with that tag will run when the tag is selected, and they will run in the order they are defined on the pipeline_conf.yml
file.
At least one flow should have the tag default, this flow will run when no tags are selected.
To set the steps is enough to list then with the process they should run.
A configured flow, should looks like:
my_flow:
tags:
- default
- sr1
steps:
read:
- read_file
process:
- process_data
finish:
- prints
Defining Steps:
A step is a individual process or group of processes from a pipeline, this are used to secure order, each process on a step must end before the processes of the next step start.
A step defined in the pipeline_conf.yml
is a reference to a sub folder with the same name on the pipeline
folder, by example, the previously defined flow has three steps: read, process and finish, then the pipeline folder should look like:
- pipeline
- read
- read_file.py
- process
- process_data.py
- finish
- prints.py
- read
Steps have no further configurations excepts the process within it, note that you may have several process (python files) on each step, but if you don´t listed on the step definition it will not run, this allow you to create several flows using the same step, but the step itself in each flow may be different if different processes are called.
Defining Processes
The processes are the smaller unit of Normandy pipelines, and here is where your code must be in.
You may define a process on the pipeline_conf.yml
file on two ways:
-
As a list: Use this method if you do not want to add any configuration out of the defaults, you must use the list version even if the step has just one process.
-
As a dictionary: With this method you can make more in depth configurations to the specific process, the allowed configurations are:
-
avoid_tags: With this setup you can told Normandy to do not run this process f a specific tags has been use, this is very useful if you have several flows very similar between then then you can define just one flow and use the tags to shape the flow.
-
error_tolerance: If your process may fail and that does not affect the rest of the data flow, you may activate this feature, so, in case the process fail, the flow would still run.
-
Note that if you want to use one of the special setups on one process of a step, you must specify all others process as a dictionary too, even if in one of the processes no special setup is used, on this case the setup ignore
is used.
A complete example:
my_flow:
tags:
- default
- salarians
steps:
read:
- read_file
- read_database
process:
main_processing:
avoid_tags:
- hammerhead
side_processing:
avoid_tags:
- Shepard
error_tolerance: True
finish:
write_data:
- ignore
backup_writing:
error_tolerance: True
Pipeline Configuration
The main objective of this section if to configure your environment settings, but also is important to declare the project full path.
You are free to make all configurations you need over the environment section of the configuration file, just must use the keyword "envs", by example if you want to specify a reading and writing path of each environments do as follow:
confs:
path: your/project/path
envs:
dev:
read:
raw_dev
write:
processed_dev
prod:
read:
raw
write:
processed
Writing a process
AS mentioned before, each process listed on the pipeline_conf.yml
file is making a reference to a python file inside the correspondent step folder, whatever, this file must have defined inside it a process function which need to parameter pipe
and log
.
Pipe is a Normandy pipeline object, at the stage is only function is to use the environment configurations defined on pipeline_conf.yml
, to get this use the get_env_confs()
method, which returns a dictionary with the mentioned configurations.
log is a Normandy logger object, the main function is to easily log your code.
Process snippet:
from normandy.engine.variables_storage import variables_storage
def process(pipe, log):
# Configurations
env_confs = pipe.get_confs()["read"]
var_str = variables_storage()
log.info("Configuration ready")
# Code here your process
# ...
return
The Normandy logger
Normandy provide a fully settled logger.
Each flow execution will have a separated log folder inside the log folder of the project root directory, the folder would be called as {flow_name}_{execution_datetime}
, inside it a bunch of file would be created, one file for each Normandy object, with the intention to let you easily get in depth logs or just a general one. The main flow log would have the logs of each of the children's objects, this means, in the main log would be the flow logs, the steps logs and even the processes logs.
The Normandy logger have three basic methods to use:
-
info
-
warning
-
error
Each one just receive one parameter which is the message you want to log.
The Normandy Variable Storage
Normandy also provide a variable storage object to let you share the data between processes.
It's very easily to use, with just two methods:
-
get: It let you use a previously stored variable
-
update: It let you store a new variable or overwrite it if it already exists.
A usage example:
import pandas as pd
from normandy.engine.variables_storage import variables_storage
def process(pipe, log):
# Get confs
read_path = pipe.get_env_confs()["read"]
var_str = variables_storage()
log.info("Configuration ready")
main_df = pd.read_csv(f"{read_path}/sample_data.csv")
log.info("Data read")
var_str.update("main_df", main_df)
log.info("Data stored")
return
How to run it
To run the Normandy pipeline use the command run-pipeline
as below from the project directory:
normandy --run-pipeline
With this command the default flow would run on the defined dev environment.
To specify tags you may use the tag parameter:
normandy --run-pipeline -tags my_tag -tags sr2
Finally to specify the environment use the parameter env:
normandy --run-pipeline -env prod
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.