A simple data workflow runner that helps you write better ETL scripts using reusable code pieces.
Project description
datarunner
A simple data workflow runner that helps you write better ETL scripts using reusable code pieces.
Quick Start Tutorial
Install using pip:
pip install datarunner
Then write a few steps (functions, classes, etc) that can be called, pass to datarunner.Workflow, and call run():
from datarunner import Workflow, Step
def setup():
print('Ready to go!')
def extract():
return 'data'
def transform(data):
return data + ' using reusable code pieces, like Lego.'
class Load(Step):
""" Sub-class Step to customize the callable """
def __init__(self, destination):
super().__init__()
self.destination = destination
def __str__(self):
return f'Load("{self.destination}")'
def run(self, data):
print(f'Loading {data}')
flow = Workflow(setup,
table_name1=[extract, transform, Load('example')])
flow.run()
It should produce the following output:
setup Ready to go! table_name1 -------------------------------------------------------------------------------- extract >> transform >> Load("example") Loading data using reusable code pieces, like Lego.
If we skip setup, then we can also use >> operator to convey the same flow:
flow = Workflow() >> extract >> transform >> Load('example')
flow.run()
To make the workflow more flexible (e.g. write to different dataset), use templates to provide some values at run time:
class Load(Step):
TEMPLATE_ATTRS = ['destination']
""" Sub-class Step to customize the callable """
def __init__(self, destination):
super().__init__()
self.destination = destination
def __str__(self):
return f'Load("{self.destination}")'
def run(self, data):
print(f'Loading {data}')
flow = Workflow() >> extract >> transform >> Load('{dataset}.table_name1')
flow.run(dataset='staging')
It produces the following output:
extract >> transform >> Load("staging.table_name1") Loading data using reusable code pieces, like Lego.
And finally, to test the workflow:
def test_flow(): assert """ extract >> transform >> Load("{dataset}.table_name1") """ == str(flow)
Workflow Layout
When writing production workflows, it is recommended to layout the files in your package like:
my_package/steps/__init__.py # Generic / common steps my_package/steps/bigquery.py # Group of steps for a specific service, like BigQuery. my_package/datasource1.py # ETL workflow for a single data source with steps specifc for the source my_package/datasource2.py # ETL workflow for another data source
Inside of datasource*.py, it should define flow = Workflow(…), but not run. From your ETL script, it should call flow.run() to run the workflow. This ensures the workflow is properly constructed when imported and can be used for testing without running it.
Links & Contact Info
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for datarunner-1.1.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0c8b0aacb39cdf1a8f2d33ef04fb7a9694f24865af547ef673ee9735faa9a0f5 |
|
MD5 | 5eb3192b57f775f0edb5b1ea86080c8a |
|
BLAKE2b-256 | e4d88f866deb8cd3a8bc76d45c73d1250a0941a45abfe8521f9a5c755dbed85a |