Skip to main content

A simple data workflow runner that helps you write better ETL scripts using reusable code pieces.

Project description

datarunner

A simple data workflow runner that helps you write better ETL scripts using reusable code pieces.

Quick Start Tutorial

Install using pip:

pip install datarunner

Then write a few steps (functions, classes, etc) that can be called, pass to datarunner.Workflow, and call run():

from datarunner import Workflow, Step


def setup():
    print('Ready to go!')

def extract():
    return 'data'

def transform(data):
    return data + ' using reusable code pieces, like Lego.'

class Load(Step):
    """ Sub-class Step to customize the callable """
    def __init__(self, destination):
        super().__init__()
        self.destination = destination

    def __str__(self):
        return f'Load("{self.destination}")'

    def run(self, data):
        print(f'Loading {data}')

flow = Workflow(setup,
                table_name1=[extract, transform, Load('example')])
flow.run()

It should produce the following output:

setup
Ready to go!

table_name1
--------------------------------------------------------------------------------
extract
>> transform
>> Load("example")
Loading data using reusable code pieces, like Lego.

If we skip setup, then we can also use >> operator to convey the same flow:

flow = Workflow() >> extract >> transform >> Load('example')
flow.run()

We can take a step further by using templates to provide some information at run time:

class Load(Step):
    TEMPLATE_ATTRS = ['destination']

    """ Sub-class Step to customize the callable """
    def __init__(self, destination):
        super().__init__()
        self.destination = destination

    def __str__(self):
        return f'Load("{self.destination}")'

    def run(self, data):
        print(f'Loading {data}')

flow = Workflow() >> extract >> transform >> Load('{dataset}.table_name1')
flow.run(dataset='staging')

It produces the following output:

extract
>> transform
>> Load("staging.table_name1")
Loading data using reusable code pieces, like Lego.

Finally, to test the workflow:

def test_flow():
   assert """
   extract
   >> transform
   >> Load("{dataset}.table_name1")
   """ == str(flow)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for datarunner, version 1.1.3
Filename, size File type Python version Upload date Hashes
Filename, size datarunner-1.1.3-py3-none-any.whl (4.7 kB) File type Wheel Python version py3 Upload date Hashes View hashes
Filename, size datarunner-1.1.3.tar.gz (7.7 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page