Skip to main content

Python ETL framework.

Project description

PyDin

version release release date last commit downloads python license

PyDin is a Python-based ETL framework that allows you to create, automate and maintain data processes right out of the box.

This was originally a simple script on a small server to run a dozen of jobs in the context of a bigger project. But as the project grew, the script evolved and eventually became something else.

Thus, PyDin is a framework from Data Engineers to Data Engineers that follows some best practices and philosophy. It was designed to work in a real production environment with thousands of daily jobs.

More information can be found in the official documentation.

Installation

Just install using pip:

pip install pydin

Interface

The product provides several types of native interfaces: Framework, CLI and API.

Most of the time we use Framework as developers, CLI as administrators and API as advanced business users and third parties.

Using the CLI as the most convenient interface assumes the creation of a special manager.py file with the following code in it:

import pydin


manager = pydin.Manager()

When the file is saved, you can set an alias for it, like in the example below, and run any command it can take.

# Set an alias to run the file.
alias pydin='python ~/pydin/manager.py'

# Get the list of available commands.
pydin help

Visit the page to learn more about the CLI and other interfaces.

Framework

Data ETL processes are represented as Pipelines that can be built using Framework Models.

Here are some code snippets on how these Pipelines can be implemented:

import pydin as pd


# Load data from one database to another.
a = pd.Table(source_name='chinook', table_name='customers', chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='customers')
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Load data by SQL SELECT from one database to another.
select = 'select * from customers where company is not null'
a = pd.Select(source_name='chinook', text=select, chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='companies')
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Load data by SQL INSERT, using SELECT taken from the file.
path = '~/documents/reports/customers.sql'
n = pd.Insert(source_name='dwh', table_name='customers_report', path=path)
pipeline = pd.Pipeline(n)
pipeline.run()

# Load data by SQL INSERT, using a date filter and adding a key field.
path = '~/documents/history/invoices.sql'
n = pd.Insert(source_name='dwh', table_name='invoices_history',
              select=path, date_field='invoice_date', days_back=1,
              key_field=pd.process_id)
pipeline = pd.Pipeline(n)
pipeline.run()

# Load data from database into CSV file with pre-clearance.
select = 'select * from customers where company is not null'
a = pd.Select(source_name='chinook', text=select, chunk_size=10000)
b = pd.CSV(path='customers.csv', delimiter='\t', cleanup=True)
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Load data from database into JSON and XML files at the same time.
a = pd.Table(source_name='dwh', table_name='customers', chunk_size=10000)
b = pd.JSON(path='customers.json', cleanup=True)
c = pd.XML(path='customers.xml', cleanup=True)
pipeline = pd.Pipeline(a, b, c)
pipeline.run()

# Load data from CSV file into database.
a = pd.CSV(path='customers.csv', delimiter='\t', chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='customers')
pipeline = pd.Pipeline(a, b)
pipeline.run()

# Move files from GZ archives into directory.
n = pd.FileManager(path='/archives', mask='.*gz',
                   action='move', dest='/data', unzip=True)
pipeline = pd.Pipeline(n)
pipeline.run()

# Copy files from one directory to another with GZ archiving.
n = pd.FileManager(path='/data', mask='.*csv',
                   action='copy', dest='/backup', zip=True)
pipeline = pd.Pipeline(n)
pipeline.run()

Every Model stands for a specific data object. See the documentation for a description and advanced use of Models.

Automation

The product has a built-in Scheduler for the processes automation.

Install the Scheduler using the command below and following the instructions step by step:

pydin install

After that, create and configure the Job:

pydin create job

Files of created objects are placed in the same location as manager.py.

The most important file is script.py from the directory jobs/n, where n is the unique Job ID. In this file you can develop your Pipeline or write any Python code you want to schedule. We usually create one Job per Pipeline.

When you are ready, just start the Scheduler. Your ETL process is automated now.

pydin start scheduler

Database

For advanced PyDin features, such as process scheduling and maintenance, a database is required. Two database types are supported at the moment: Oracle and SQLite. Functionality is not a question of using one of them, you can get all the features using a simple SQLite database.

Transformations

Data transformation is an important part of most ETL processes. You very often need to change something in your data streams. That's why in PyDin we have a special Mapper that allows you to do any transformations with data records. Feel free to use any Python constructs or built-in Framework tools.

For instance, the following script explains how to change field names and convert a string to a datetime as required by the target table.

import pydin as pd
import datetime as dt


def transform(input_data):
    output_data = {}
    output_data['invoice_id'] = input_data['InvoiceId']
    output_data['customer_id'] = input_data['CustomerId']
    output_data['invoice_date'] = dt.datetime.fromisoformat(input_data['InvoiceDate'])
    output_data['billing_address'] = input_data['BillingAddress']
    output_data['billing_city'] = input_data['BillingCity']
    output_data['billing_state'] = input_data['BillingState']
    output_data['billing_country'] = input_data['BillingCountry']
    output_data['billing_postal_code'] = input_data['BillingPostalCode']
    output_data['total'] = input_data['Total']
    return output_data


a = pd.Table(source_name='chinook', table_name='invoices', chunk_size=1000)
b = pd.Mapper(func=transform)
c = pd.Table(source_name='dwh', table_name='invoices')
pipeline = pd.Pipeline(a, b, c)
pipeline.run()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pydin-0.2.0.tar.gz (65.9 kB view details)

Uploaded Source

Built Distribution

pydin-0.2.0-py3-none-any.whl (67.5 kB view details)

Uploaded Python 3

File details

Details for the file pydin-0.2.0.tar.gz.

File metadata

  • Download URL: pydin-0.2.0.tar.gz
  • Upload date:
  • Size: 65.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.1

File hashes

Hashes for pydin-0.2.0.tar.gz
Algorithm Hash digest
SHA256 49743ea1ad0b82ce0738ef583a83d8cb316b46cb6b8c81928ce1f79fee8900be
MD5 397c7786ccc65f4e5e9f3980a60741f9
BLAKE2b-256 818ed42e9fc6b074ce7ba1a1168f3fe22f2b96e7017a4b03e7e61fa7e2671c2e

See more details on using hashes here.

File details

Details for the file pydin-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pydin-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 67.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.1

File hashes

Hashes for pydin-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3ba8ba9a22e45168cdc0aa9ccceb0db95e2a2b9c8ba55ebde4984e2b45ac956a
MD5 41434e9734058e0fc3e459be1710ced1
BLAKE2b-256 e8bfe5d851698cdac42f700a62ddc30ca81db92c12c546b1e4ae0632a0aff37d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page