Skip to main content

AWS Data Pipeline Wrapper

Project description

# pline Python library

<img src=""/>

AWS Data Pipeline Wrapper for `boto3`. Construct a Data Pipeline using Python objects.

Last updated: `0.4.2`

## Installation

pip install pline

## Overview

The payload `boto3` requires for a pipeline definition is somewhat complex. This library
provides the tools to model your pipeline using Python objects and transform the payload
into the expected data structure.

import pline

my_activity = pline.activities.ShellCommandActivity(
name='MyActivity', id='Activity_adbc1234')
my_activity.command = "echo $1 $2"
my_activity.scriptArgument = ['hello', 'world']

{ 'id' : 'Activity_adbc1234',
'name' : 'MyActivity',
'fields' : [ {'key': 'command', 'stringValue': 'echo $1 $2'},
{'key': 'type', 'stringValue': 'ShellCommandActivity'},
{'key': 'scriptArgument', 'stringValue': 'hello'},
{'key': 'scriptArgument', 'stringValue': 'world'} ]}

#### Data Pipeline Objects

Every object in a pipeline is an acestor of the `DataPipelineObject` class. Each object
owns three key attributes:

* `name`
* `id`
* `fields`

The `name` and `id` attributes must be set at initialization time, but `fields` is
handled internally by the object and should not be accessed directly.

Setting an object's attribute can be done via the initialization call or after the fact:

node = pline.data_nodes.S3DataNode(
id='MyDataNode1', name='MyDataNode1', workerGroup='TestGroup')
# => <S3DataNode name: "MyDataNode1", id: "MyDataNode1">
node.directoryPath = 's3://bucket/pipeline/'
print node.workerGroup
# => 'TestGroup'
print node.directoryPath
# => 's3://bucket/pipeline/'

`Pipeline` instances handle the conversion of pipeline objects to a payload, but objects can
be viewed in `boto`-friendly format by converting them to a `dict`:

{ 'name' : 'MyDataNode1',
'id' : 'MyDataNode1',
'fields' : [
{ 'key' : 'type', 'stringValue' : 'S3DataNode' },
{ 'key' : 'directoryPath', 'stringValue' : 's3://bucket/pipeline/' },
{ 'key' : 'workerGroup', 'stringValue' : 'TestGroup' }, ] }

#### Data Pipeline Parameters

As of `0.2.0`, `pline` supports passing parameters to data pipelines. Parameters can be added to the
pipeline and passed into `DataPipelineObject` instances.

my_param = pline.parameters.String(
id = 'MyParam1',
value = 'Here is the value I am using',
description = 'This value is extremely important',
watermark = 'Choose a value between 0 and 99.')

#### Typed Data Pipeline Objects/Parameters

Most objects in a data pipeline are typed -- that is, they are given a `type` attribute on initialization
that is added to the `fields` attribute. By default, the type is taken from the name of the class (which
corresponds to the type given by AWS' specs).

Custom classes can override this behavior by defining a `TYPE_NAME` class-level attribute:

class MyCustomS3DataNode(pline.S3DataNode):
TYPE_NAME = 'S3DataNode'
# ...

class MyCustomParam(pline.AwsS3ObjectKey):
TYPE_NAME = 'AwsS3ObjectKey'
# ...

## Example Pipeline

#### Create a pipeline object

pipeline = pline.Pipeline(
name = 'MyPipeline',
unique_id = 'MyPipeline1',
desc = 'An example pipeline description',
region = 'us-west-2' )

#### Connect (optional)

The pipeline will connect to AWS automatically if you have your AWS credentials set at
the environmental level. If you want to connect using a specific configuration:

aws_access_key_id = 'my_access_key',
aws_secret_access_key = 'my_secret_key' )

#### Create a schedule object

schedule = pline.Schedule(
id = 'Schedule1',
name = 'Schedule',
period = '1 day',
startAt = pline.keywords.startAt.FIRST_ACTIVATION_DATE_TIME,
occurrences = 1 )

#### Create the default pipeline definition

The pipeline object has a helper-method to create this object with sensible defaults:

definition = pipeline.definition( schedule,
pipelineLogUri = "s3://bucket/pipeline/log" )

#### Create an EC2 resource

This will be the machine running the tasks.

resource = pline.resources.Ec2Resource(
id = 'Resource1',
name = 'Resource',
role = 'DataPipelineDefaultRole',
resourceRole = 'DataPipelineDefaultResourceRole',
schedule = schedule )

#### Create an activity

activity = pline.activities.ShellCommandActivity(
id = 'MyActivity1',
name = 'MyActivity',
runsOn = resource,
schedule = schedule,
command = 'echo hello world' )

#### Create a parameterized activity and its parameter

param = pline.parameters.String(
id = 'myShellCmd',
value = 'grep -rc "GET" ${INPUT1_STAGING_DIR}/* > ${OUTPUT1_STAGING_DIR}/output.txt',
description = 'Shell command to run' )

param_activity = pline.activities.ShellCommandActivity(
id = 'MyParamActivity1',
name = 'MyParamActivity1',
runsOn = resource,
schedule = schedule,
command = param )

#### Add the objects to the pipeline

pipeline.add(schedule, definition, resource, activity, param_activity)

#### Add the parameters to the pipeline


#### View the pipeline definition payload

print pipeline.payload()

#### Validate the pipeline definiton


#### Create the pipeline in AWS

This will send the request to create a pipeline through boto


#### Adding new objects to the pipeline

Sometimes you may want to add an object to the pipeline after it has been created

# Add an alert
sns_alarm = pline.actions.SnsAlarm(
name = 'SnsAlarm',
id = 'SnsAlarm1',
topicArn = 'arn:aws:sns:us-east-1:12345678abcd:my-arn',
role = 'DataPipelineDefaultRole' )

# Associate it with the activity
activity.onFailure = sns_alarm

# Add it to the pipeline

Update the pipeline on AWS and activate it


## ShellCommand helper

The `ShellCommand` class can be used to compose chained commands

cmd = pline.utils.ShellCommand(
'docker start registry',
'sleep 3',
'docker pull localhost:5000/my_docker',
'docker stop registry' )
# => docker start registry;\
# sleep 3;\
# docker pull localhost:5000/my_docker;\
# docker stop registry

cmd.append('echo all done')
# => docker start registry;\
# sleep 3;\
# docker pull localhost:5000/my_docker;\
# docker stop registry;\
# echo all done

activity.command = cmd

Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pline-0.4.4.tar.gz (8.0 kB view hashes)

Uploaded source

Supported by

AWS AWS Cloud computing Datadog Datadog Monitoring Facebook / Instagram Facebook / Instagram PSF Sponsor Fastly Fastly CDN Google Google Object Storage and Download Analytics Huawei Huawei PSF Sponsor Microsoft Microsoft PSF Sponsor NVIDIA NVIDIA PSF Sponsor Pingdom Pingdom Monitoring Salesforce Salesforce PSF Sponsor Sentry Sentry Error logging StatusPage StatusPage Status page