Skip to main content

A simple data workflow runner that helps you write better ETL scripts using reusable code pieces.

Project description

datarunner

A simple data workflow runner that helps you write better ETL scripts using reusable code pieces.

Quick Start Tutorial

Install using pip:

pip install datarunner

Then write a few steps (functions, classes, etc) that can be called, pass to datarunner.Workflow, and call run():

from datarunner import Workflow, Step


def setup():
    print('Ready to go!')

def extract():
    return 'data'

def transform(data):
    return data + ' using reusable code pieces, like Lego.'

class Load(Step):
    """ Sub-class Step to customize the callable """
    def __init__(self, destination):
        super().__init__()
        self.destination = destination

    def __str__(self):
        return f'Load("{self.destination}")'

    def run(self, data):
        print(f'Loading {data}')

flow = Workflow(setup,
                table_name1=[extract, transform, Load('example')])
flow.run()

It produces the following output:

setup
Ready to go!

table_name1
--------------------------------------------------------------------------------
extract
>> transform
>> Load("example")
Loading data using reusable code pieces, like Lego.

We can also use >> operator to convey the same flow:

flow = (Workflow()
        >> setup

        << 'table_name1'
        >> extract >> transform >> Load('example'))
flow.run()

To make the workflow more flexible (e.g. write to different dataset), use templates to provide some values at run time:

class Load(Step):
    TEMPLATE_ATTRS = ['destination']

    """ Sub-class Step to customize the callable """
    def __init__(self, destination):
        super().__init__()
        self.destination = destination

    def __str__(self):
        return f'Load("{self.destination}")'

    def run(self, data):
        print(f'Loading {data}')

flow = Workflow() >> extract >> transform >> Load('{dataset}.table_name1')
flow.run(dataset='staging')

It produces the following output:

extract
>> transform
>> Load("staging.table_name1")
Loading data using reusable code pieces, like Lego.

And finally, to test the workflow:

def test_flow():
   assert """\
extract
>> transform
>> Load("{dataset}.table_name1")""" == str(flow)

Workflow Layout

A recommended file layout for your ETL package:

my_package/steps/__init__.py            # Generic / common steps
my_package/steps/bigquery.py            # Group of steps for a specific service, like BigQuery.
my_package/datasource1.py               # ETL workflow for a single data source with steps specifc for the source
my_package/datasource2.py               # ETL workflow for another data source

Inside of datasource*.py, it should define flow = Workflow(…), but not run. From your ETL script, it should call flow.run() to run the workflow. This ensures the workflow is properly constructed when imported and can be used for testing without running it.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

datarunner-1.2.5.tar.gz (8.9 kB view details)

Uploaded Source

Built Distribution

datarunner-1.2.5-py3-none-any.whl (5.3 kB view details)

Uploaded Python 3

File details

Details for the file datarunner-1.2.5.tar.gz.

File metadata

  • Download URL: datarunner-1.2.5.tar.gz
  • Upload date:
  • Size: 8.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.40.0 CPython/3.7.3

File hashes

Hashes for datarunner-1.2.5.tar.gz
Algorithm Hash digest
SHA256 1d4622b9327d0f1b612e6ff4729d5f80c7061b0fb3e7efa3e918ba5942a7ac7a
MD5 070edb90c0b989e2ec4339481a79c3f6
BLAKE2b-256 2053f248f1858b3f2222e3bc4fc13fab57a295b7e48b91fbf481acd96b6a747e

See more details on using hashes here.

File details

Details for the file datarunner-1.2.5-py3-none-any.whl.

File metadata

  • Download URL: datarunner-1.2.5-py3-none-any.whl
  • Upload date:
  • Size: 5.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.15.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.8.0 requests-toolbelt/0.9.1 tqdm/4.40.0 CPython/3.7.3

File hashes

Hashes for datarunner-1.2.5-py3-none-any.whl
Algorithm Hash digest
SHA256 0a2d5fdc2c42ff5030578cf03d5907e521395fe7bcdea28a288ca5ac756b276a
MD5 8bc9be5ac0c739c15ef57f501bc71115
BLAKE2b-256 c8c8b28fba05bd3f35bd2be0586d3205f91ed6f268ef961694660186eb2bd2ae

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page