Tools for monitoring and troubleshooting ELT arrangements.
Project description
ELT-Tools-AIO
Database Abstraction
A set of tools to serve as an abstraction layer over many commonly used databases, as long as it's supported by SQLAlchemy. It supports the following operations in an easy-to-use interface:
- asynchronous (non-blocking) operation
- count the number of rows in a table
- find duplicates in a table
- find records missing in target with respect to source
- find records on target which have been hard deleted from source
- execute a sql query against a table
ELT Pair Operations
In Extract-Load-Transform (ELT) operations, a table is extracted and loaded from one database to another with potential transformations after that (for example in a database view). This is akin to database replication, albeit not necessarily all tables nor all columns are transferred. One may also only transfer records from a certain date onwards.
Many common database engineering tasks relate to the source and target pairs. This library assists by implementing these commonly performed operations in a succinct interface such as:
- show a list of common tables between source and target database
- compare counts between source and target tables over a specified time window
- find primary keys of missing records in the target
- fill missing records into the target over a given date range
- find primary keys of orphaned records in the target (i.e. corresponding records from the source database have been deleted)
- remove orphaned records from target (even for large tables)
Configuration and Examples
The library provides two main classes: DataClient
for database abstraction and ELTDBPair
for
ELT operations between database pairs. The user passes configuration dictionaries into these classes.
The configuration describes database credentials, and details of which databases to pair up.
For example, to find duplicate on a particular table:
import asyncio
from os import environ
from elt_tools_aio.client import DataClientFactory
DATABASES = {
'db_key11': {
'engine': 'oltp_engine',
'sql_alchemy_conn_string': environ.get('mysql_db_uri'),
},
'db_key12': {
'engine': 'bigquery_engine',
'dataset_id': 'mydata',
'gcp_project': environ.get('GCP_PROJECT'),
'gcp_credentials': environ.get('GOOGLE_APPLICATION_CREDENTIALS'),
},
}
async def print_duplicate_keys():
factory = DataClientFactory(DATABASES)
client = factory(db_key='db_key11')
customer_duplicates = await client.find_duplicate_keys('customers', 'id')
print(customer_duplicates)
asyncio.run(print_duplicate_keys())
For example, to remove orphaned records on the target table of a particular ELT Pair using a binary search strategy on a large table:
import asyncio
from os import environ
from elt_tools_aio.client import ELTDBPairFactory
DATABASES = {
'db_key11': {
'engine': 'oltp_engine',
'sql_alchemy_conn_string': environ.get('mysql_db_uri'),
},
'db_key12': {
'engine': 'bigquery_engine',
'dataset_id': 'mydata',
'gcp_project': environ.get('GCP_PROJECT'),
'gcp_credentials': environ.get('GOOGLE_APPLICATION_CREDENTIALS'),
},
}
ELT_PAIRS = {
'pair1': {
'source': 'db_key11', 'target': 'db_key12'
},
}
async def remove_orphans():
factory = ELTDBPairFactory(ELT_PAIRS, DATABASES)
elt_pair = factory(pair_key='pair1')
_ = await elt_pair.remove_orphans_from_target_with_binary_search(
'customers',
'id',
timestamp_fields=['created_at']
)
asyncio.run(remove_orphans())
Installation instructions
$ pip install git+ssh://git@github.com/dewaldabrie/elt_tools_aio.git
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for elt_tools_aio-0.1.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ad42f0ebb53152f1035cac45cff9c7fa49d1baa31b782d629fec96f2f0f837fd |
|
MD5 | b3ff5598743b6e712e820a07476777d9 |
|
BLAKE2b-256 | 2abbda0b03dc3266ab9dbc0075558e4fe2ebb8d665aaf5b77695f57c94ca05ca |