Python ETL framework.
Project description
PyDin
PyDin is a Python-based ETL framework that allows you to create, automate and maintain data processes right out of the box.
This was originally a simple script on a small server to run a dozen of jobs in the context of a bigger project. But as the project grew, the script evolved and eventually became something else.
Thus, PyDin is a framework from Data Engineers to Data Engineers that follows some best practices and philosophy. It was designed to work in a real production environment with thousands of daily jobs.
More information can be found in the official documentation.
Installation
Just install using pip:
pip install pydin
Interface
The product provides several types of native interfaces: Framework, CLI and API.
Most of the time we use Framework as developers, CLI as administrators and API as advanced business users and third parties.
Using the CLI as the most convenient interface assumes the creation of a special manager.py file with the following code in it:
import pydin
manager = pydin.Manager()
When the file is saved, you can set an alias for it, like in the example below, and run any command it can take.
# Set an alias to run the file.
alias pydin='python ~/pydin/manager.py'
# Get the list of available commands.
pydin help
Visit the page to learn more about the CLI and other interfaces.
Framework
Data ETL processes are represented as Pipelines that can be built using Framework Models.
Here are some code snippets on how these Pipelines can be implemented:
import pydin as pd
# Load data from one database to another.
a = pd.Table(source_name='chinook', table_name='customers', chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='customers')
pipeline = pd.Pipeline(a, b)
pipeline.run()
# Load data by SQL SELECT from one database to another.
select = 'select * from customers where company is not null'
a = pd.Select(source_name='chinook', text=select, chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='companies')
pipeline = pd.Pipeline(a, b)
pipeline.run()
# Load data by SQL INSERT, using SELECT taken from the file.
path = '~/documents/reports/customers.sql'
n = pd.Insert(source_name='dwh', table_name='customers_report', path=path)
pipeline = pd.Pipeline(n)
pipeline.run()
# Load data by SQL INSERT, using a date filter and adding a key field.
path = '~/documents/history/invoices.sql'
n = pd.Insert(source_name='dwh', table_name='invoices_history',
select=path, date_field='invoice_date', days_back=1,
key_field=pd.process_id)
pipeline = pd.Pipeline(n)
pipeline.run()
# Load data from database into CSV file with pre-clearance.
select = 'select * from customers where company is not null'
a = pd.Select(source_name='chinook', text=select, chunk_size=10000)
b = pd.CSV(path='customers.csv', delimiter='\t', cleanup=True)
pipeline = pd.Pipeline(a, b)
pipeline.run()
# Load data from database into JSON and XML files at the same time.
a = pd.Table(source_name='dwh', table_name='customers', chunk_size=10000)
b = pd.JSON(path='customers.json', cleanup=True)
c = pd.XML(path='customers.xml', cleanup=True)
pipeline = pd.Pipeline(a, b, c)
pipeline.run()
# Load data from CSV file into database.
a = pd.CSV(path='customers.csv', delimiter='\t', chunk_size=1000)
b = pd.Table(source_name='dwh', table_name='customers')
pipeline = pd.Pipeline(a, b)
pipeline.run()
# Move files from GZ archives into directory.
n = pd.FileManager(path='/archives', mask='.*gz',
action='move', dest='/data', unzip=True)
pipeline = pd.Pipeline(n)
pipeline.run()
# Copy files from one directory to another with GZ archiving.
n = pd.FileManager(path='/data', mask='.*csv',
action='copy', dest='/backup', zip=True)
pipeline = pd.Pipeline(n)
pipeline.run()
Every Model stands for a specific data object. See the documentation for a description and advanced use of Models.
Automation
The product has a built-in Scheduler for the processes automation.
Install the Scheduler using the command below and following the instructions step by step:
pydin install
After that, create and configure the Job:
pydin create job
Files of created objects are placed in the same location as manager.py.
The most important file is script.py from the directory jobs/n, where n is the unique Job ID. In this file you can develop your Pipeline or write any Python code you want to schedule. We usually create one Job per Pipeline.
When you are ready, just start the Scheduler. Your ETL process is automated now.
pydin start scheduler
Database
For advanced PyDin features, such as process scheduling and maintenance, a database is required. Two database types are supported at the moment: Oracle and SQLite. Functionality is not a question of using one of them, you can get all the features using a simple SQLite database.
Transformations
Data transformation is an important part of most ETL processes. You very often need to change something in your data streams. That's why in PyDin we have a special Mapper that allows you to do any transformations with data records. Feel free to use any Python constructs or built-in Framework tools.
For instance, the following script explains how to change field names and convert a string to a datetime as required by the target table.
import pydin as pd
import datetime as dt
def transform(input_data):
output_data = {}
output_data['invoice_id'] = input_data['InvoiceId']
output_data['customer_id'] = input_data['CustomerId']
output_data['invoice_date'] = dt.datetime.fromisoformat(input_data['InvoiceDate'])
output_data['billing_address'] = input_data['BillingAddress']
output_data['billing_city'] = input_data['BillingCity']
output_data['billing_state'] = input_data['BillingState']
output_data['billing_country'] = input_data['BillingCountry']
output_data['billing_postal_code'] = input_data['BillingPostalCode']
output_data['total'] = input_data['Total']
return output_data
a = pd.Table(source_name='chinook', table_name='invoices', chunk_size=1000)
b = pd.Mapper(func=transform)
c = pd.Table(source_name='dwh', table_name='invoices')
pipeline = pd.Pipeline(a, b, c)
pipeline.run()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pydin-0.2.0.tar.gz
.
File metadata
- Download URL: pydin-0.2.0.tar.gz
- Upload date:
- Size: 65.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 49743ea1ad0b82ce0738ef583a83d8cb316b46cb6b8c81928ce1f79fee8900be |
|
MD5 | 397c7786ccc65f4e5e9f3980a60741f9 |
|
BLAKE2b-256 | 818ed42e9fc6b074ce7ba1a1168f3fe22f2b96e7017a4b03e7e61fa7e2671c2e |
File details
Details for the file pydin-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: pydin-0.2.0-py3-none-any.whl
- Upload date:
- Size: 67.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.11.1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3ba8ba9a22e45168cdc0aa9ccceb0db95e2a2b9c8ba55ebde4984e2b45ac956a |
|
MD5 | 41434e9734058e0fc3e459be1710ced1 |
|
BLAKE2b-256 | e8bfe5d851698cdac42f700a62ddc30ca81db92c12c546b1e4ae0632a0aff37d |