Skip to main content

Definitions-based data loader

Project description

dapu

Dapu aka Data Puller is ETL tool using ELT approach. Pulling data from external sources by YAML/SQL definitions and transforming local data using SQL.

There are "simple" agents doing some steps.

Description

DRAFT!!!

Brings data from external database to target PostgreSQL database.

Can be used for data migration from old database to new database. Can beused for data loading to data warehouse.

Restrictions: target database must be PostgreSQL database (code uses several PG features like return ID, XML/JSON functions, schema-awareness, temporary tables, create table as, hashes).

First step on preparing target database must be done manually (call Your DBA) 'cause it needs Postgre cluster superuser password and access to pg_hba.conf in file system level.

Internal components

  • Enabler (Versioner) makes minimal needed structure for keeping important metadata (keep it up to date by running it after each "pip install --upgrade dapu" (ps. after first install before runningenabler, make sure database connection to target database is ok)
  • Registrar scans project directory for new and changed files and saves definitions of Tasks to Registry, does versioning on target tabels if definition say so (run it after each deploy of Your definitions)
  • Manager checks which Tasks from Registry must be executed and puts them as Jobs into Agenda (run if regularly, after each 30 minutes, eg. 25th and 55th minute)
  • Worker executes Jobs by calling Taskers / Jobbers as long it can, eg 27 minutes (run if regularly, after each 30 minutes, eg. 29th and 59th minute)
  • Jobber runs one Job using Agents according to definition, and at sucessful end asks Manager to add dependent Tasks and Jobs into Agenda
  • Cleaner eliminates old records from Agenda and old logs
  • Generator (many, different?) speeds up writing the definitions for Tasks (develepoer tool)
  • (Gatekeeper will be utility for granting database users (into groups defined earlier, but ... some pattern must exists for project names)

Installation

Prepare Your target database

Probably You already have it (according to use scenarios). If not, You are just starting new data warehouse project. So visit Your DBA.

Take script file with You. v000_roles_and_database.sql

If company policy is that every department must have own database (for warehouse) in this case ask many databases (they may be and don't need to be in same Postgre cluster).

But keep in mind that since users (called "roles") are in postgres database cluster (not in databases) same user cannot have different passwords, so make different admin-user for each database. Tedious, but this way You move on secure path.

Install python components (packages)

dapu depends on dbpoint.hub (datacontroller) (hopefully cames automatically if I'll describe it correctly). And same about psycopg2 and PyYAML.

Also depends on other dblib 2.0 compatible packages for databases (from database vendors), which are not included into datacontroller (to keep size small). Import those you need (and known to datacontroller).

Probably You want some data over API (WFS GML), so requests.

And Excel files (which must reside somewhere accesible using file system tools, like mounted device) where human-users can update files. openpyxl (only xlsx format).

pip install dapu
pip install requests
pip install openpyxl

if in trouble (if I was in trouble) then

pip install --index-url https://test.pypi.org/simple/ --no-deps dbpoint
pip install psycopg2
pip install PyYAML

PS. If pip don't work for You, use python3 -m pip

Run init script

Actually this script can execute on every deploy. If You upgrade dapu where may be new SQL-commands (DDL) which must be applied before new dapu code can run (without errors).

Init script connects to database -- for that it need main_dir. Same code in same server can serve multiple target databases, so we cannot work with one ENV.VAR. Instead of this all work is main_dir based. This main_dir will be first argument on execution and must aquire knowledge to connect from files. Normally deploy mechnanism prepares need file.

For windows dev environment there You dont have any automatic deploy engines... make one (bat-file with xcopy commands) and You can use conf_prep.py (small quite dumb utility which replaces appearances of "${var}" with var actual value. So, before calling assign right values to Your vars

set DAPU_USER=dapu_finances_admin
set DAPU_PWD=password123
set DAPU_HOST=pg16.pgservers.company.com
set DAPU_PORT=5432
set DAPU_DBN=warehouse_finances
python conf_prep.py finances_dir\connections.yaml

assuming that connections.yaml file includes line like

- name: main
  driver: pg
  host: ${DAPU_HOST}
  port: ${DAPU_PORT}
  engine: ~
  database: ${DAPU_DBN}
  username: ${DAPU_USER}
  password: ${DAPU_PWD}
  extra:
    application_name: dapu
    connect_timeout: 10 # sek
    keepalives_idle=120 # default is 3600 and may not survive 
python version_manager.py finances_dir

Visit network people if 1-hour problems

On initial loading or if source data really multiplies every day then one piece loading may need some more time. Most cases can be tuned using spliting, but sometimes not (big data movements inside target database, which would be fine, but if conenction looses then loader cannot mark work as done).

If network guys use HAProxy there may be a problem -- either HAProxy dont let through keepalives or just ignores keepalives (letting them through). In both cases HAProxy will close connection after 60 minutes (default).

Another case when You must visit DBA is wellhearted DBA. Such persons may wishing to You/project all good and install pg_bouncer (extremly useful tool for web information systems, but our loader is not web system). There can be two mistakes.

  • pg_bouncer talks to outer world over TCP and to postgres over unix sockets. In this case postgres thinks that TCP keepalives are not needed to produce... and network will be lost. Correct is to use same "vocabulary", let pg_bouncer communicate to postgres over TCP as well.
  • for loading to work correctly the configutation parameter pool_mode must let default (session). Other strategies may work for different systems but not for this loader.

Usage

There are 2 main components to execute. Manager will analyze registry and put some loading jobs to agenda. Worker will take jobs from agenda and make actual loadings (it works 27 minutes or until no jobs left to do, which is first).

Define cron to execute manager at xx:15 and xx:45, and worker at xx:30 and xx:00 (or put manager to run little bit before worker, eg. xx:25 and xx:55).

Dont put worker to run more then twice in hour! Actually You can, but You have to adjust max running time too. Up to 3 parallel workers are not sin, they can do that they must do. But on case of problems, it is more difficult to diagnose.

Sometimes there are third component. cleaner will delete old jobs and make some other sanitizing. Once in day/week is enought.

0,30 * * * * python worker.py finances_dir 
15,45 * * * * python manager.py finances_dir 
3 1 */2 * * python cleaner.py finances_dir

If You have more seperate warehouses/targets repeat those lines with corresponding directories (may-be adjusting minutes is also good idea)

If using Apache Airflow....

Roadmap

Main versioning must redone according to main_dir ideology (and cleaner too)

Logging must be redone (and errors more clear)

License

MIT

Project status

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

dapu-0.0.2-py3-none-any.whl (71.5 kB view details)

Uploaded Python 3

File details

Details for the file dapu-0.0.2-py3-none-any.whl.

File metadata

  • Download URL: dapu-0.0.2-py3-none-any.whl
  • Upload date:
  • Size: 71.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.4

File hashes

Hashes for dapu-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 056a67a7ced211e8c976f3713af700a9a1e97a7c56c88af91f359cf9a115d3e5
MD5 c39b19a6c5047ac881cb4f60af53f655
BLAKE2b-256 328b640e8f581113be8524b957a5e5ecf0438a350cc4cbb74e19dc308e78323e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page