Skip to main content

Tools for building data pipelines with Django

Project description

django-pipelines

Tools to help you build data pipelines in Django.

https://github.com/wildfish/django-pipelines

https://github.com/wildfish/django-pipelines/actions/workflows/build.yml/badge.svg https://coveralls.io/repos/wildfish/django-pipelines/badge.svg?branch=master&service=github

Features

  • Data pipeline management with configurable runners (celery support included), reporters and management UI.

  • Supports Django 3.2 to 4.2, on Python 3.9+.

See the full documentation for details of how django-pipelines works.

Quickstart

This is a quickstart guide for creating a simple pipeline.

Project Setup

We recommend using a virtual environment such as pyenv to manage your dependencies and Python versions. From this point we assume you have a environment setup, activated & pip installed.

Create a new Django project:

# Create the project directory
mkdir demo
cd demo

# Install
pip install django-pipelines

# Set up a new project
django-admin startproject demo .
cd demo
django-admin startapp mypipeline

Settings

Firstly we need to add pipelines and the new app demo.mypipeline to INSTALLED_APPS in demo/settings.py:

INSTALLED_APPS = [
    ...
    "django.contrib.humanize",
    "pipelines",
    "demo.mypipeline",
]

Migrate

Finally we sync the database by running:

cd ..
python manage.py migrate

Creating a Pipeline

Now the project is setup we are ready to start building our first pipeline. The first thing we need to do is setup a pipeline definition.

Create a new file demo/mypipeline/pipelines.py

Note: It is important that you name the file pipelines.py as this is used by the registry to discover your pipelines:

from pipelines.base import Pipeline, Task


class PrintWelcomeMessageTask(Task):
    title = "Print Welcome Message"

    def run(self, pipeline_id, run_id, cleaned_data):
        print("Hello World!")


class FirstPipeline(Pipeline):
    print_message = PrintWelcomeMessageTask()

    class Meta:
        title = "First pipeline"

Here we are creating a pipeline FirstPipeline with one task print_message. Pipelines can have multiple tasks and we will show this in the next section.

All Pipelines must inherit from the base class Pipeline.

We have also defined our first task: PrintWelcomeMessageTask. Here we are just printing “Hello World!” but can make this as complex as you require. All tasks should inherit from Task and have a run method defined.

And we’re done.

Running the pipeline

The quickest way to run a pipeline for testing is by running the management command:

python manage.py pipelines

After running this you should see a list of pipelines, numbered, and a prompt of ‘Which pipelines would you like to start?’:

Pipelines:
1). demo.mypipeline.pipelines.FirstPipeline:

Which pipelines would you like to start?

Type 1 to run FirstPipeline. You will now be shown a list of tasks the pipeline will run along with a prompt for how you wish to run it:

demo.mypipeline.pipelines.FirstPipeline will run the following tasks:
1). print_message: Print Message

Run [r], Run eager [e] or Cancel [c]?

You have the option to run the pipeline in eager mode or against the configured runner. Since we have not set a custom up we will run it in Eager mode by typing e. You should then see:

Hello World!
Pipeline Completed

As you can see the Hello World! message has been printed to the console just as we defined in the Task.

Adding another Task

To complete this example we will add another task to our pipeline. Update demo/mypipeline/pipelines.py:

import time
from pipelines.base import Pipeline, Task


class PrintWelcomeMessageTask(Task):
    title = "Print Welcome Message"

    def run(self, pipeline_id, run_id, cleaned_data):
        print("3 Times Table:")


class PrintNumbersTask(Task):
    title = "Print 3 times table"

    def run(self, pipeline_id, run_id, cleaned_data):
        for i in range(1, 13):
            print(f"{i} x 3 = {i*3}")
            time.sleep(0.5)


class FirstPipeline(Pipeline):
    print_message = PrintWelcomeMessageTask()
    numbers_task = PrintNumbersTask(config={"parents": ["print_message"]})

    class Meta:
        title = "First pipeline"

We have now defined a second task PrintNumbersTask which prints the 3 times table from 1-12. We also include a slight delay (0.5 seconds) between iterations so you can see it better when running the pipeline.

We have added this new task to our pipeline and specified that this should run after print_message by adding a config variable to the task instance config={"parents": ["print_message"]}. Adding this allows us to define the exact order each task should be ran.

If we again run the pipeline using the management command we should now see:

Demo Command

Monitoring Pipelines

To wire up the pipeline monitoring views we need to add them to the urls file. In demo/urls.py add:

from django.contrib import admin
from django.urls import include, path

urlpatterns = [
    path('admin/', admin.site.urls),
    path('pipelines/', include('pipelines.urls')),
]

In order to view these pages you need to be logged in as a staff user. So first lets create a superuser for our site:

python manage.py createsuperuser

After completing this you will have an account setup with superuser access, so lets run our site:

python manage.py runserver

We need to login before we can view any of the pipeline pages so navigate to http://127.0.0.1:8000/admin/ and login using the details you entered when creating the superuser.

Once logged in you can access the pipeline monitoring pages at: http://127.0.0.1:8000/pipelines/

Pipeline List

This list shows the each pipeline, how many tasks are associated with each, how may times it ran, including which have passed and which have failed, when it was last ran, and the average amount of time it took to run. You also have the option to run the pipeline.

If you click on the runs cell you will be taken to the pipeline execution page.

Pipeline Execution List

This shows details of each execution of the pipeline including: the pipeline and task count, when it was started, how long it took to run and what was the status of the pipeline.

To see further details of a particular execution, click on the pipeline name.

Pipeline Result Details

This details view shows a breakdown of each task which was ran including: the task name, status, and when it started finished and duration. You also have the option to rerun the individual task.

The logs recorded during the pipeline run is also shown on this page. This can be useful if you are trying to debug why a pipeline is not running correctly.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django-pipelines-0.1.1.tar.gz (600.8 kB view details)

Uploaded Source

Built Distribution

django_pipelines-0.1.1-py3-none-any.whl (67.4 kB view details)

Uploaded Python 3

File details

Details for the file django-pipelines-0.1.1.tar.gz.

File metadata

  • Download URL: django-pipelines-0.1.1.tar.gz
  • Upload date:
  • Size: 600.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.9.16

File hashes

Hashes for django-pipelines-0.1.1.tar.gz
Algorithm Hash digest
SHA256 2d0e6fc1ef5c19412e4809c139c59df86b69384d16bc36674b66d8c40808fea2
MD5 1cbadd98e52519b6151b7d3da76795aa
BLAKE2b-256 477aaa8cd7e55a5a0e12a45d3a2e7d796597dc326563b331b6f0ad66465c9cbe

See more details on using hashes here.

File details

Details for the file django_pipelines-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for django_pipelines-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 3ca17fe0798033c773aec09e188d9098255371ed2d4eee44c0a1b983df2b219c
MD5 1728004b879d4da9536e62abfb92fe7e
BLAKE2b-256 28874b51bb3dd6174255d37eda89c04569fa79fc6151772f6d43f5c441091d03

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page