Skip to main content

Python ETL framework.

Project description

PyDin

version release release date last commit downloads python license

PyDin is a Python-based ETL framework that allows you to create, automate and maintain data processes right out of the box.

This was originally a simple script on a small server to run a dozen of jobs in the context of a bigger project. But as the project grew, the script evolved and eventually became something else.

Thus, PyDin is a framework from Data Engineers to Data Engineers that follows some best practices and philosophy. It was designed to work in a real production environment with thousands of daily jobs.

More information can be found in the official documentation.

Installation

Just install using pip:

pip install pydin

Interface

The product provides several types of native interfaces: Framework, CLI and API.

Most of the time we use Framework as developers, CLI as administrators and API as advanced business users and third parties.

Using the CLI as the most convenient interface assumes the creation of a special manager.py file with the following code in it:

import pydin


manager = pydin.Manager()

When the file is saved, you can set an alias for it, like in the example below, and run any command it can take.

# Set an alias to run the file.
alias pydin='python ~/pydin/manager.py'

# Get the list of available commands.
pydin help

Visit the page to learn more about the CLI and other interfaces.

Framework

Data ETL processes are represented as Pipelines that can be built using Framework Models.

Here are some code snippets on how these Pipelines can be implemented:

import pydin as pd


# Load data from one database to another.
a = pd.Table(source_name='chinook', table_name='customers', chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='customers')
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Load data by SQL SELECT from one database to another.
select = 'select * from customers where company is not null'
a = pd.Select(source_name='chinook', text=select, chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='companies')
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Load data by SQL INSERT, using SELECT taken from the file.
path = '~/documents/reports/customers.sql'
n = pd.Insert(source_name='dwh', table_name='customers_report', path=path)
pipeline = pd.Pipeline(n)
pipeline.run()

# Load data by SQL INSERT, using a date filter and adding a key field.
path = '~/documents/history/invoices.sql'
n = pd.Insert(source_name='dwh', table_name='invoices_history',
              select=path, date_field='invoice_date', days_back=1,
              key_field=pd.process_id)
pipeline = pd.Pipeline(n)
pipeline.run()

# Load data from database into CSV file with pre-clearance.
select = 'select * from customers where company is not null'
a = pd.Select(source_name='chinook', text=select, chunk_size=10000)
b = pd.CSV(path='customers.csv', delimiter='\t', cleanup=True)
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Load data from database into JSON and XML files at the same time.
a = pd.Table(source_name='dwh', table_name='customers', chunk_size=10000)
b = pd.JSON(path='customers.json', cleanup=True)
c = pd.XML(path='customers.xml', cleanup=True)
pipeline = pd.Pipeline(a, b, c)
pipeline.run()

# Load data from CSV file into database.
a = pd.CSV(path='customers.csv', delimiter='\t', chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='customers')
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Move files from GZ archives into directory.
n = pd.FileManager(path='/archives', mask='.*gz',
                   action='move', dest='/data', unzip=True)
pipeline = pd.Pipeline(n)
pipeline.run()

# Copy files from one directory to another with GZ archiving.
n = pd.FileManager(path='/data', mask='.*csv',
                   action='copy', dest='/backup', zip=True)
pipeline = pd.Pipeline(n)
pipeline.run()

Every Model stands for a specific data object. See the documentation for a description and advanced use of Models.

Automation

The product has a built-in Scheduler for the processes automation.

Install the Scheduler using the command below and following the instructions step by step:

pydin install

After that, create and configure the Job:

pydin create job

Files of created objects are placed in the same location as manager.py.

The most important file is script.py from the directory jobs/n, where n is the unique Job ID. In this file you can develop your Pipeline or write any Python code you want to schedule. We usually create one Job per Pipeline.

When you are ready, just start the Scheduler. Your ETL process is automated now.

pydin start scheduler

Database

For advanced PyDin features, such as process scheduling and maintenance, a database is required. Two database types are supported at the moment: Oracle and SQLite. Functionality is not a question of using one of them, you can get all the features using a simple SQLite database.

Transformations

Data transformation is an important part of most ETL processes. You very often need to change something in your data streams. That's why in PyDin we have a special Mapper that allows you to do any transformations with data records. Feel free to use any Python constructs or built-in Framework tools.

For instance, the following script explains how to change field names and convert a string to a datetime as required by the target table.

import pydin as pd
import datetime as dt


def transform(input_data):
    output_data = {}
    output_data['invoice_id'] = input_data['InvoiceId']
    output_data['customer_id'] = input_data['CustomerId']
    output_data['invoice_date'] = dt.datetime.fromisoformat(input_data['InvoiceDate'])
    output_data['billing_address'] = input_data['BillingAddress']
    output_data['billing_city'] = input_data['BillingCity']
    output_data['billing_state'] = input_data['BillingState']
    output_data['billing_country'] = input_data['BillingCountry']
    output_data['billing_postal_code'] = input_data['BillingPostalCode']
    output_data['total'] = input_data['Total']
    return output_data


a = pd.Table(source_name='chinook', table_name='invoices', chunk_size=1000)
b = pd.Mapper(func=transform)
c = pd.Table(source_name='dwh', table_name='invoices')
pipeline = pd.Pipeline(a, b, c)
pipeline.run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydin-0.2.0.tar.gz (65.9 kB view hashes)

Uploaded Source

Built Distribution

pydin-0.2.0-py3-none-any.whl (67.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page