Skip to main content

Build and deploy a serverless data pipeline with no effort on AWS.

Project description

Datajob

Build and deploy a serverless data pipeline on AWS with no effort.

  • We support python shell / pyspark Glue jobs.
  • Orchestrate using stepfunctions as simple as task1 >> [task2,task3] >> task4
  • Let us know what you want to see next.

Dependencies are AWS CDK and Step Functions SDK for data science

Installation

Datajob can be installed using pip.
Beware that we depend on aws cdk cli!

pip install datajob
npm install -g aws-cdk@1.87.1 # latest version of datajob depends this version

Quickstart

We have a simple data pipeline composed of 2 glue jobs orchestrated sequentially using step functions.

import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow


current_dir = pathlib.Path(__file__).parent.absolute()

app = core.App()


with DataJobStack(
    scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:

    task1 = GlueJob(
        datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py"
    )

    task2 = GlueJob(
        datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py"
    )

    with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow:
        task1 >> task2

app.synth()

We add the above code in a file called datajob_stack.py in the root of the project.

Configure CDK

Follow the steps here to configure your credentials.

export AWS_PROFILE=my-profile # e.g. default
# use the aws cli to get your account number
export AWS_ACCOUNT=$(aws sts get-caller-identity --query Account --output text --profile $AWS_PROFILE)
export AWS_DEFAULT_REGION=your-region # e.g. eu-west-1

cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION

Deploy

Datajob will create s3 buckets based on the datajob_stack.id and the stage variable. The stage variable will typically be something like "dev", "stg", "prd", ... but since S3 buckets need to be globally unique, for this example we will use our $AWS_ACCOUNT for the --stage parameter.

export STAGE=$AWS_ACCOUNT

Navigate to datajob_stack.py file and deploy the data pipeline.

cd examples/data_pipeline_with_packaged_project
datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy
use cdk cli
cd examples/data_pipeline_with_packaged_project
python setup.py bdist_wheel
cdk deploy --app  "python datajob_stack.py" -c stage=$STAGE

Your glue jobs are deployed and the orchestration is configured.

Run

The step function state machine name is constructed as <datajob_stack.id>-<stage>-<step_functions_workflow.name>.

To run your data pipeline execute:

datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow

The terminal will output a link to the step functions page to follow up on your pipeline run.

Destroy

datajob destroy --config datajob_stack.py --stage $STAGE
use cdk cli ```shell script cdk destroy --app "python datajob_stack.py" -c stage=$STAGE ```

Note: you can use any cdk arguments in the datajob cli

Functionality

Using datajob's S3 data bucket

Dynamically reference the datajob_stack data bucket name to the arguments of your GlueJob by calling datajob_stack.context.data_bucket_name.

import pathlib

from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

current_dir = str(pathlib.Path(__file__).parent.absolute())

app = core.App()

with DataJobStack(
    scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:

    pyspark_job = GlueJob(
        datajob_stack=datajob_stack,
        name="pyspark-job",
        job_path="glue_job/glue_pyspark_example.py",
        job_type="glueetl",
        glue_version="2.0",  # we only support glue 2.0
        python_version="3",
        worker_type="Standard",  # options are Standard / G.1X / G.2X
        number_of_workers=1,
        arguments={
            "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
            "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
        },
    )

    with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
        pyspark_job >> ...

deploy to stage my-stage:

datajob deploy --config datajob_stack.py --stage my-stage --package setuppy

datajob_stack.context.data_bucket_name will evaluate to datajob-python-pyspark-my-stage

you can find this example here

Deploy files to deployment bucket

Specify the path to the folder we would like to include in the deployment bucket.

from aws_cdk import core
from datajob.datajob_stack import DataJobStack

app = core.App()

with DataJobStack(
    scope=app, id="some-stack-name", include_folder="path/to/folder/"
) as datajob_stack:

    ...
Package project

Package you project using poetry

datajob deploy --config datajob_stack.py --package poetry

Package you project using setup.py

datajob deploy --config datajob_stack.py --package setuppy
Using Pyspark
import pathlib

from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

current_dir = str(pathlib.Path(__file__).parent.absolute())

app = core.App()

with DataJobStack(
    scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:

    pyspark_job = GlueJob(
        datajob_stack=datajob_stack,
        name="pyspark-job",
        job_path="glue_job/glue_pyspark_example.py",
        job_type="glueetl",
        glue_version="2.0",  # we only support glue 2.0
        python_version="3",
        worker_type="Standard",  # options are Standard / G.1X / G.2X
        number_of_workers=1,
        arguments={
            "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
            "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
        },
    )

full example can be found in examples/data_pipeline_pyspark.

Orchestrate stepfunctions tasks in parallel
# task1 and task2 are orchestrated in parallel.
# task3 will only start when both task1 and task2 have succeeded.
[task1, task2] >> task3
Orchestrate 1 stepfunction task

Use the Ellipsis object to be able to orchestrate 1 job via step functions.

some_task >> ...

Datajob in depth

The datajob_stack is the instance that will result in a cloudformation stack. The path in project_root helps datajob_stack locate the root of the project where the setup.py/poetry pyproject.toml file can be found, as well as the dist/ folder with the wheel of your project .

import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack

current_dir = pathlib.Path(__file__).parent.absolute()
app = core.App()

with DataJobStack(
    scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:

    ...

When entering the contextmanager of DataJobStack:

A DataJobContext is initialized to deploy and run a data pipeline on AWS. The following resources are created:

  1. "data bucket"
    • an S3 bucket that you can use to dump ingested data, dump intermediate results and the final output.
    • you can access the data bucket as a Bucket object via datajob_stack.context.data_bucket
    • you can access the data bucket name via datajob_stack.context.data_bucket_name
  2. "deployment bucket"
    • an s3 bucket to deploy code, artifacts, scripts, config, files, ...
    • you can access the deployment bucket as a Bucket object via datajob_stack.context.deployment_bucket
    • you can access the deployment bucket name via datajob_stack.context.deployment_bucket_name

when exiting the context manager all the resources of our DataJobStack object are created.

We can write the above example more explicitly...
import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

app = core.App()

current_dir = pathlib.Path(__file__).parent.absolute()

app = core.App()

datajob_stack = DataJobStack(scope=app, id="data-pipeline-pkg", project_root=current_dir)
datajob_stack.init_datajob_context()

task1 = GlueJob(datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py")
task2 = GlueJob(datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py")

with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow:
    task1 >> task2

datajob_stack.create_resources()
app.synth()

Ideas

Any suggestions can be shared by starting a discussion

These are the ideas, we find interesting to implement;

  • add a time based trigger to the step functions workflow.
  • add an s3 event trigger to the step functions workflow.
  • add a lambda that copies data from one s3 location to another.
  • add an sns that notifies in case of any failure (slack/email)
  • version your data pipeline.
  • cli command to view the logs / glue jobs / s3 bucket
  • implement sagemaker services
    • processing jobs
    • hyperparameter tuning jobs
    • training jobs
  • implement lambda
  • implement ECS Fargate
  • create a serverless UI that follows up on the different pipelines deployed on possibly different AWS accounts using Datajob

Feedback is much appreciated!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datajob-0.7.0.tar.gz (21.7 kB view hashes)

Uploaded Source

Built Distribution

datajob-0.7.0-py3-none-any.whl (22.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page