Definitions-based data loader
Project description
dapu
Dapu aka Data Puller is ETL tool using ELT approach. Pulling data from external sources by YAML/SQL definitions and transforming local data using SQL.
There are "simple" agents doing some steps.
Description
DRAFT!!!
Brings data from external database to target PostgreSQL database.
Can be used for data migration from old database to new database. Can beused for data loading to data warehouse.
Restrictions: target database must be PostgreSQL database (code uses several PG features like return ID, XML/JSON functions, schema-awareness, temporary tables, create table as, hashes).
First step on preparing target database must be done manually (call Your DBA) 'cause it needs Postgre cluster superuser password and access to pg_hba.conf in file system level.
Internal components
- Enabler (Versioner) makes minimal needed structure for keeping important metadata (keep it up to date by running it after each "pip install --upgrade dapu" (ps. after first install before runningenabler, make sure database connection to target database is ok)
- Registrar scans project directory for new and changed files and saves definitions of Tasks to Registry, does versioning on target tabels if definition say so (run it after each deploy of Your definitions)
- Manager checks which Tasks from Registry must be executed and puts them as Jobs into Agenda (run if regularly, after each 30 minutes, eg. 25th and 55th minute)
- Worker executes Jobs by calling Taskers / Jobbers as long it can, eg 27 minutes (run if regularly, after each 30 minutes, eg. 29th and 59th minute)
- Jobber runs one Job using Agents according to definition, and at sucessful end asks Manager to add dependent Tasks and Jobs into Agenda
- Cleaner eliminates old records from Agenda and old logs
- Generator (many, different?) speeds up writing the definitions for Tasks (develepoer tool)
- (Gatekeeper will be utility for granting database users (into groups defined earlier, but ... some pattern must exists for project names)
Installation
Prepare Your target database
Probably You already have it (according to use scenarios). If not, You are just starting new data warehouse project. So visit Your DBA.
Take script file with You. v000_roles_and_database.sql
If company policy is that every department must have own database (for warehouse) in this case ask many databases (they may be and don't need to be in same Postgre cluster).
But keep in mind that since users (called "roles") are in postgres database cluster (not in databases) same user cannot have different passwords, so make different admin-user for each database. Tedious, but this way You move on secure path.
Install python components (packages)
dapu depends on dbpoint.hub (datacontroller) (hopefully cames automatically if I'll describe it correctly). And same about psycopg2 and PyYAML.
Also depends on other dblib 2.0 compatible packages for databases (from database vendors), which are not included into datacontroller (to keep size small). Import those you need (and known to datacontroller).
Probably You want some data over API (WFS GML), so requests.
And Excel files (which must reside somewhere accesible using file system tools, like mounted device) where human-users can update files. openpyxl (only xlsx format).
pip install dapu
pip install requests
pip install openpyxl
if in trouble (if I was in trouble) then
pip install --index-url https://test.pypi.org/simple/ --no-deps dbpoint
pip install psycopg2
pip install PyYAML
PS. If pip don't work for You, use python3 -m pip
Run init script
Actually this script can execute on every deploy. If You upgrade dapu where may be new SQL-commands (DDL) which must be applied before new dapu code can run (without errors).
Init script connects to database -- for that it need main_dir. Same code in same server can serve multiple target databases, so we cannot work with one ENV.VAR. Instead of this all work is main_dir based. This main_dir will be first argument on execution and must aquire knowledge to connect from files. Normally deploy mechnanism prepares need file.
For windows dev environment there You dont have any automatic deploy engines... make one (bat-file with xcopy commands) and You can use conf_prep.py (small quite dumb utility which replaces appearances of "${var}" with var actual value. So, before calling assign right values to Your vars
set DAPU_USER=dapu_finances_admin
set DAPU_PWD=password123
set DAPU_HOST=pg16.pgservers.company.com
set DAPU_PORT=5432
set DAPU_DBN=warehouse_finances
python conf_prep.py finances_dir\connections.yaml
assuming that connections.yaml file includes line like
- name: main
driver: pg
host: ${DAPU_HOST}
port: ${DAPU_PORT}
engine: ~
database: ${DAPU_DBN}
username: ${DAPU_USER}
password: ${DAPU_PWD}
extra:
application_name: dapu
connect_timeout: 10 # sek
keepalives_idle=120 # default is 3600 and may not survive
python version_manager.py finances_dir
Visit network people if 1-hour problems
On initial loading or if source data really multiplies every day then one piece loading may need some more time. Most cases can be tuned using spliting, but sometimes not (big data movements inside target database, which would be fine, but if conenction looses then loader cannot mark work as done).
If network guys use HAProxy there may be a problem -- either HAProxy dont let through keepalives or just ignores keepalives (letting them through). In both cases HAProxy will close connection after 60 minutes (default).
Another case when You must visit DBA is wellhearted DBA. Such persons may wishing to You/project all good and install pg_bouncer (extremly useful tool for web information systems, but our loader is not web system). There can be two mistakes.
- pg_bouncer talks to outer world over TCP and to postgres over unix sockets. In this case postgres thinks that TCP keepalives are not needed to produce... and network will be lost. Correct is to use same "vocabulary", let pg_bouncer communicate to postgres over TCP as well.
- for loading to work correctly the configutation parameter pool_mode must let default (session). Other strategies may work for different systems but not for this loader.
Usage
There are 2 main components to execute. Manager will analyze registry and put some loading jobs to agenda. Worker will take jobs from agenda and make actual loadings (it works 27 minutes or until no jobs left to do, which is first).
Define cron to execute manager at xx:15 and xx:45, and worker at xx:30 and xx:00 (or put manager to run little bit before worker, eg. xx:25 and xx:55).
Dont put worker to run more then twice in hour! Actually You can, but You have to adjust max running time too. Up to 3 parallel workers are not sin, they can do that they must do. But on case of problems, it is more difficult to diagnose.
Sometimes there are third component. cleaner will delete old jobs and make some other sanitizing. Once in day/week is enought.
0,30 * * * * python worker.py finances_dir
15,45 * * * * python manager.py finances_dir
3 1 */2 * * python cleaner.py finances_dir
If You have more seperate warehouses/targets repeat those lines with corresponding directories (may-be adjusting minutes is also good idea)
If using Apache Airflow....
Roadmap
Main versioning must redone according to main_dir ideology (and cleaner too)
Logging must be redone (and errors more clear)
License
MIT
Project status
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file dapu-0.0.2-py3-none-any.whl
.
File metadata
- Download URL: dapu-0.0.2-py3-none-any.whl
- Upload date:
- Size: 71.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 056a67a7ced211e8c976f3713af700a9a1e97a7c56c88af91f359cf9a115d3e5 |
|
MD5 | c39b19a6c5047ac881cb4f60af53f655 |
|
BLAKE2b-256 | 328b640e8f581113be8524b957a5e5ecf0438a350cc4cbb74e19dc308e78323e |