Skip to main content

A Python library to simplify data transfer between databases.

Project description

etlhelper

etlhelper is a Python library to simplify data transfer between databases.

Overview

etlhelper makes it easy to run a SQL query via Python and return the results. It is built upon the DBAPI2 specification and takes care of importing drivers, formatting connection strings and cursor management. This reduces the amount of boilerplate code required to query a relational database with Python.

Features

  • setup_oracle_client script installs Oracle Instant Client on Linux systems
  • DbParams objects provide consistent way to connect to different database types (currently Oracle, PostgreSQL, SQLite and MS SQL Server)
  • get_rows, iter_rows, fetchone and other functions for querying database
  • execute and executemany functions to insert data
  • copy_rows to transfer data from one database to another
  • Support for parameterised queries and in-flight transformation of data
  • Output results as namedtuple or dictionary
  • Timestamped log messages for tracking long-running data transfers
  • Helpful error messages display the failed query SQL

These tools can create easy-to-understand, lightweight, versionable and testable Extract-Transform-Load (ETL) workflows. etlhelper is not a tool for coordinating ETL jobs (use Apache Airflow), for converting GIS data formats (use ogr2ogr or fiona), for translating between SQL dialects or providing Object Relation Mapping (use SQLAlchemy). However, it can be used in conjunction with each of these.

screencast

The documentation below explains how the main features are used. See the individual function docstrings for full details of parameters and options.

For a high level introduction to etlhelper, see the FOSS4GUK 2019 presentation Open Source Spatial ETL with Python and Apache Airflow: video (20 mins), slides.

Documentation

Installation

Python packages

pip install etlhelper

Database driver packages are not included by default and should be specified in square brackets. Options are oracle (installs cx_Oracle), mssql (installs pyodbc) and postgres (installs psycopg2). Multiple values can be separated by commas.

pip install etlhelper[oracle,postgres]

The sqlite3 driver is included within Python's Standard Library.

Database driver dependencies

Some database drivers have additional dependencies. On Linux, these can be installed via the system package manager.

cx_Oracle (for Oracle):

  • sudo apt install libaio1 (Debian/Ubuntu) or sudo dnf install libaio (CentOS, RHEL, Fedora)

pyodbc (for MS SQL Server):

Oracle Instant Client

Oracle Instant Client libraries are required to connect to Oracle databases. On Linux, etlhelper provides a script to download and unzip them from the Oracle website. Once the drivers are installed, their location must be added to LD_LIBRARY_PATH environment variable before they can be used. setup_oracle_client writes a file that can then be "sourced" to do this for the current shell. These two steps can be executed in a single command as:

source $(setup_oracle_client)

This command must be run in each new shell session. See setup_oracle_client --help for further command line flags, including specifying an alternative URL or filesystem path for the zipfile location.

Connect to databases

DbParams

Database connection details are defined by DbParams objects. Connections are made via their connect functions (see below). DbParams objects are created as follows or from environment variables using the from_environment() function. The class initialisation function checks that the correct attributes have been provided for a given dbtype.

from etlhelper import DbParams

ORACLEDB = DbParams(dbtype='ORACLE', host="localhost", port=1521,
                    dbname="mydata", user="oracle_user")

POSTGRESDB = DbParams(dbtype='PG', host="localhost", port=5432,
                      dbname="mydata", user="postgres_user")

SQLITEDB = DbParams(dbtype='SQLITE', filename='/path/to/file.db')

MSSQLDB = DbParams(dbtype='MSSQL', host="localhost", port=1433,
                   dbname="mydata", user="mssql_user",
                   odbc_driver="ODBC Driver 17 for SQL Server")

DbParams objects have a function to check if a given database can be reached over the network. This does not require a username or password.

if not ORACLEDB.is_reachable():
    raise ETLHelperError("Network problems")

Other methods/properties are get_connection_string, get_sqlalchemy_connection_string, paramstyle and copy. See function docstrings for details.

connect function

The DbParams.connect() function returns a DBAPI2 connection as provided by the underlying driver. Using context-manager syntax as below ensures that the connection is closed after use.

with SQLITEDB.connect() as conn1:
    with POSTGRESDB.connect('PGPASSWORD') as conn2:
        do_something()

A standalone connect function provides backwards-compatibility with previous releases of etlhelper:

from etlhelper import connect
conn3 = connect(ORACLEDB, 'ORACLE_PASSWORD')

Both versions accept additional keyword arguments that are passed to the connect function of the underlying driver. For example, the following sets the character encoding used by cx_Oracle to ensure that values are returned as UTF-8:

conn4 = connect(ORACLEDB, 'ORACLE_PASSWORD', encoding="UTF-8", nencoding="UTF8")

The above is a solution when special characters are scrambled in the returned data.

Disabling fast_executemany for SQL Server and other pyODBC connections

By default an etlhelper pyODBC connection uses a cursor with its fast_executemany attribute set to True. This setting improves the performance of the executemany when performing bulk inserts to a SQL Server database. However, this overides the default behaviour of pyODBC and there are some limitations in doing this. Importantly, it is only recommended for applications that use Microsoft's ODBC Driver for SQL Server. See pyODBC fast_executemany.

Using fast_executemany may raise a MemoryError if query involves columns of types TEXT and NTEXT, which are now deprecated. Under these circumstances, etlhelper falls back on fast_executemany being set to False and produces a warning output. See Inserting into SQL server with fast_executemany results in MemoryError.

If required, the fast_executemany attribute can be set to False via the connect function:

conn5 = connect(MSSQLDB, 'MSSQL_PASSWORD', fast_executemany=False)

This keyword argument is used by etlhelper, any further keyword arguments are passed to the connect function of the underlying driver.

Passwords

Database passwords must be specified via an environment variable. This reduces the temptation to store them within scripts. This can be done on the command line via:

  • export ORACLE_PASSWORD=some-secret-password on Linux
  • set ORACLE_PASSWORD=some-secret-password on Windows

Or in a Python terminal via:

import os
os.environ['ORACLE_PASSWORD'] = 'some-secret-password'

No password is required for SQLite databases.

Transfer data

Get rows

The get_rows function returns a list of named tuples containing data as native Python objects.

from my_databases import ORACLEDB
from etlhelper import get_rows

sql = "SELECT * FROM src"

with ORACLEDB.connect("ORA_PASSWORD") as conn:
    get_rows(sql, conn)

returns

[Row(id=1, value=1.234, simple_text='text', utf8_text='Öæ°\nz',
     day=datetime.date(2018, 12, 7),
     date_time=datetime.datetime(2018, 12, 7, 13, 1, 59)),
 Row(id=2, value=2.234, simple_text='text', utf8_text='Öæ°\nz',
     day=datetime.date(2018, 12, 8),
     date_time=datetime.datetime(2018, 12, 8, 13, 1, 59)),
 Row(id=3, value=2.234, simple_text='text', utf8_text='Öæ°\nz',
     day=datetime.date(2018, 12, 9),
     date_time=datetime.datetime(2018, 12, 9, 13, 1, 59))]

Data are accessible via index (row[4]) or name (row.day).

Other functions are provided to select data. fetchone, fetchmany and fetchall are equivalent to the cursor methods specified in the DBAPI v2.0. dump_rows passes each row to a function (default is print).

iter_rows

It is recommended to use iter_rows for looping over large result sets. It is a generator function that only yields data as requested. This ensures that the data are not all loaded into memory at once.

with ORACLEDB.connect("ORA_PASSWORD") as conn:
    for row in iter_rows(sql, conn):
        do_something(row)

Parameters

Variables can be inserted into queries by passing them as parameters. These "bind variables" are sanitised by the underlying drivers to prevent SQL injection attacks. The required paramstyle can be checked with MY_DB.paramstyle. A tuple is used for positional placeholders, or a dictionary for named placeholders.

select_sql = "SELECT * FROM src WHERE id = :id"

with ORACLEDB.connect("ORA_PASSWORD") as conn:
    get_rows(sql, conn, parameters={'id': 1})

Row factories

Row factories control the output format of returned rows. To return each row as a dictionary, use the following:

from etlhelper import get_rows
from etlhelper.row_factories import dict_row_factory

sql = "SELECT * FROM my_table"

with ORACLEDB.connect('ORACLE_PASSWORD') as conn:
    for row in get_rows(sql, conn, row_factory=dict_row_factory):
        print(row['id'])

The dict_row_factory is useful when data are to be serialised to JSON/YAML, or when modifying individual fields with a transform function (see below). When using dict_row_factory with copy_rows, it is necessary to use named placeholders for the INSERT query (e.g. %(id)s instead of %s for PostgreSQL, :id instead of :1 for Oracle).

Insert rows

execute can be used to insert a single row or to execute other single statements e.g. "CREATE TABLE ...". The executemany function is used to insert multiple rows of data. Large datasets are broken into chunks and inserted in batches to reduce the number of queries.

from etlhelper import executemany

rows = [(1, 'value'), (2, 'another value')]
insert_sql = "INSERT INTO some_table (col1, col2) VALUES (%s, %s)"

with POSTGRESDB.connect('PGPASSWORD') as conn:
    executemany(insert_sql, conn, rows)

The commit_chunks flag defaults to True. This ensures that an error during a large data transfer doesn't require all the records to be sent again. Some work may be required to determine which records remain to be sent. Setting commit_chunks to False will roll back the entire transfer in case of an error.

Copy rows

Copy rows takes the results from a SELECT query and applies them as parameters to an INSERT query. The source and destination tables must already exist.

from my_databases import POSTGRESDB, ORACLEDB
from etlhelper import copy_rows

select_sql = "SELECT id, name FROM src"
insert_sql = "INSERT INTO dest (id, name)
              VALUES (%s, %s)"

src_conn = ORACLEDB.connect("ORA_PASSWORD")
dest_conn = POSTGRESDB.connect("PG_PASSWORD")

copy_rows(select_sql, src_conn, insert_sql, dest_conn)

parameters can be passed to the SELECT query as before and the commit_chunks flag can be set.

Transform

Data can be transformed in-flight by applying a transform function. This is any Python callable (e.g. function) that takes an iterator (e.g. list) and returns another iterator. Transform functions are applied to data as they are read from the database and can be used with get_rows-type methods and with copy_rows.

The following code demonstrates that the returned chunk can have a different number of rows, and be of different length, to the input. When used with copy_rows, the INSERT query must contain the correct placeholders for the transform result. Extra data can result from a calculation, a call to a webservice or another database.

import random

def my_transform(chunk):
    # Append random integer (1-10), filter if <5.

    new_chunk = []
    for row in chunk:  # each row is a namedtuple
        extra_value = random.randrange(10)
        if extra_value >= 5:
            new_chunk.append((*row, extra_value))

    return new_chunk

copy_rows(select_sql, src_conn, insert_sql, dest_conn,
          transform=my_transform)

It can be easier to modify individual columns when using the dict_row_factory (see above).

from etlhelper.row_factories import dict_row_factory

def my_transform(chunk):
    # Add prefix to id, remove newlines, set lower case email addresses

    new_chunk = []
    for row in chunk:  # each row is a dictionary
        row['id'] += 1000
        row['description'] = row['description'].replace('\n', ' ')
        row['email'] = row['email'].lower()
        new_chunk.append(row)

    return new_chunk

get_rows(select_sql, src_conn, row_factory=dict_row_factory,
         transform=my_transform)

The iter_chunks and iter_rows functions that are used internally return generators. Each chunk or row of data is only accessed when it is required. The transform function can also be written to return a generator instead of a list. Data transformation can then be performed via memory-efficient iterator-chains.

Recipes

The following recipes demonstrate how etlhelper can be used.

Debug SQL and monitor progress with logging

ETL Helper provides a custom logging handler. Time-stamped messages indicating the number of rows processed can be enabled by setting the log level to INFO. Setting the level to DEBUG provides information on the query that was run, example data and the database connection.

import logging
from etlhelper import logger

logger.setLevel(logging.INFO)

Output from a call to copy_rows will look like:

2019-10-07 15:06:22,411 iter_chunks: Fetching rows
2019-10-07 15:06:22,413 executemany: 1 rows processed
2019-10-07 15:06:22,416 executemany: 2 rows processed
2019-10-07 15:06:22,419 executemany: 3 rows processed
2019-10-07 15:06:22,420 iter_chunks: 3 rows returned
2019-10-07 15:06:22,420 executemany: 3 rows processed in total

Note: errors on database connections output messages that include login credentials in clear text.

Database to database copy ETL script template

The following is a template for an ETL script. It copies copy all the sensor readings from the previous day from an Oracle source to PostgreSQL destination.

# copy_readings.py

import datetime as dt
from etl_helper import copy_rows
from my_databases import ORACLEDB, POSTGRESDB

CREATE_SQL = dedent("""
    CREATE TABLE IF NOT EXISTS sensordata.readings
    (
      sensor_data_id bigint PRIMARY KEY,
      measure_id bigint,
      time_stamp timestamp without time zone,
      meas_value double precision
    )
    """).strip()

DELETE_SQL = dedent("""
    DELETE FROM sensordata.readings
    WHERE time_stamp BETWEEN %(startdate)s AND %(enddate)s
    """).strip()

SELECT_SQL = dedent("""
    SELECT id, measure_id, time_stamp, reading
    FROM sensor_data
    WHERE time_stamp BETWEEN :startdate AND :enddate
    ORDER BY time_stamp
    """).strip()

INSERT_SQL = dedent("""
    INSERT INTO sensordata.readings (sensor_data_id, measure_id, time_stamp,
      meas_value)
    VALUES (%s, %s, %s, %s)
    """).strip()


def copy_readings(startdate, enddate):
    params = {'startdate': startdate, 'enddate': enddate}

    with ORACLEDB.connect("ORA_PASSWORD") as src_conn:
        with POSTGRESDB.connect("PG_PASSWORD") as dest_conn:
            execute(CREATE_SQL dest_conn)
            execute(DELETE_SQL, dest_conn, parameters=params)
            copy_rows(SELECT_SQL, src_conn,
                      INSERT_SQL, dest_conn,
                      parameters=params)


if __name__ == "__main__":
    # Copy data from 00:00:00 yesterday to 00:00:00 today
    today = dt.combine(dt.date.today(), dt.time.min)
    yesterday = today - dt.timedelta(1)

    copy_readings(yesterday, today)

It is valuable to create idempotent scripts to ensure that they can be rerun without problems. In this example, the "CREATE TABLE IF NOT EXISTS" command can be called repeatedly. The DELETE_SQL command clears existing data prior to insertion to prevent duplicate key errors. SQL syntax such as "INSERT OR UPDATE", "UPSERT" or "INSERT ... ON CONFLICT" may be more efficient, but the the exact commands depend on the target database type.

Calling ETL Helper scripts from Apache Airflow

The following is an Apache Airflow DAG that uses the copy_readings function defined in the script above. The Airflow scheduler will create tasks for each day since 1 August 2019 and call copy_readings with the appropriate start and end times.

# readings_dag.py

import datetime as dt
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import copy_readings


def copy_readings_with_args(**kwargs):
    # Set arguments for copy_readings from context
    start = kwargs.get('prev_execution_date')
    end = kwargs.get('execution_date')
    copy_readings.copy_readings(start, end)

dag = DAG('readings',
          schedule_interval=dt.timedelta(days=1),
          start_date=dt.datetime(2019, 8, 1),
          catchup=True)

t1 = PythonOperator(
    task_id='copy_readings',
    python_callable=copy_readings_with_args,
    provide_context=True,
    dag=dag)

Spatial ETL

No specific drivers are required for spatial data if they are transferred as Well Known Text.

select_sql_oracle = """
    SELECT
      id,
      SDO_UTIL.TO_WKTGEOMETRY(geom)
    FROM src
    """

insert_sql_postgis = """
    INSERT INTO dest (id, geom) VALUES (
      %s,
      ST_Transform(ST_GeomFromWKT(%s), 27700)
    )
    """

Other spatial operations e.g. coordinate transforms, intersections and buffering can be carried out in the SQL. Transform functions can manipulate geometries using the Shapely library.

Database to API / NoSQL copy ETL script template

etlhelper can be combined with Python's Requests library to create an ETL for posting data from a database into an HTTP API. The API could be a NoSQL document store (e.g. ElasticSearch, Cassandra) or some other web service.

This example transfers data from Oracle to ElasticSearch. It uses iter_rows to fetch data from the database without loading it all into memory at once. A custom transform function creates a nested dictionary structure from each row of data. This is "dumped" into JSON and posted to the API via Requests.

# copy_samples.py

import datetime as dt
import json
import logging

import requests

from etlhelper import iter_rows
from etlhelper import logger as etl_logger

from db import ORACLE_DB

logger = logging.getLogger("copy_samples")

SELECT_SAMPLES = """
    SELECT CODE, DESCRIPTION
    FROM samples
    WHERE date_updated BETWEEN :startdate AND :enddate
    ORDER BY date_updated
    """
BASE_URL = "http://localhost:9200/"
HEADERS = {'Content-Type': 'application/json'}


def copy_samples(startdate, enddate):
    """Read samples from Oracle and post to REST API."""
    logger.info("Copying samples with timestamps from %s to %s",
                startdate.isoformat(), enddate.isoformat())

    row_count = 0
    with ORACLE_DB.connect('ORACLE_PASSWORD') as conn:
        # Iterate over rows in memory-safe way.  Transform function converts
        # rows to nested dictionaries suitable for json.dumps().
        for item in iter_rows(SELECT_SAMPLES, conn,
                              parameters={"startdate": startdate,
                                          "enddate": enddate},
                              transform=transform_samples):

            # Post data to API
            logger.debug(item)
            response = requests.post(BASE_URL + 'samples/_doc', headers=HEADERS,
                                     data=json.dumps(item))

            # Check for failed rows
            try:
                response.raise_for_status()
                logger.debug("<%s>: %s\n", response.status_code, response.text)
            except requests.HTTPError:
                logger.error(response.json())

            # Log message for each 5000 rows processed
            row_count += 1
            if row_count % 5000 == 0:
                logger.info("%s items transferred", row_count)

    logger.info("Transfer complete")


def transform_samples(chunk):
    """Transform rows to dictionaries suitable for posting to API"""
    new_chunk = []
    for row in chunk:
        new_row = {
            'sample_code': row.CODE,
            'description': row.DESCRIPTION,
            'metadata': {
                'source': 'ORACLE_DB',  # fixed value
                'transferred_at': dt.datetime.now().isoformat()  # dynamic value
                }
            }
        logger.debug(new_row)
        new_chunk.append(new_row)
    return new_chunk


if __name__ == "__main__":
    # Configure logging
    handler = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s')
    handler.setFormatter(formatter)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(handler)
    logger.propagate = False
    etl_logger.setLevel(logging.INFO)

    # Copy data from 00:00:00 yesterday to 00:00:00 today
    today = dt.combine(dt.date.today(), dt.time.min)
    yesterday = today - dt.timedelta(1)

    copy_samples(yesterday, today)

In this example, failed rows are logged and the process continues. To fail the whole job, simply re-raise the HTTPError after logging it. This example posts one record at a time. If the API supports bulk uploads, it may be faster to use iter_chunks and post records in batches.

Export data to CSV

The Pandas library can connect to databases via SQLAlchemy. It has powerful tools for manipulating tabular data. ETL Helper makes it easy to prepare the SQL Alchemy connection.

import pandas as pd
from sqlalchemy import create_engine

from my_databases import ORACLEDB

engine = create_engine(ORACLEDB.get_sqlalchemy_connection_string("ORACLE_PASSWORD"))

sql = "SELECT * FROM my_table"
df = pd.read_sql(sql, engine)
df.to_csv('my_data.csv', header=True, index=False, float_format='%.3f')

Development

Maintainers

ETL Helper was created by and is maintained by British Geological Survey Informatics.

Development status

The code is still under active development and breaking changes are possible. Users should pin the version in their dependency lists and watch the repository for new releases. See CONTRIBUTING.md for details on how to contribute.

Licence

ETL Helper is distributed under the LGPL v3.0 licence. Copyright: © BGS / UKRI 2019

References

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

etlhelper-0.9.6.zip (68.6 kB view details)

Uploaded Source

Built Distribution

etlhelper-0.9.6-py3-none-any.whl (33.6 kB view details)

Uploaded Python 3

File details

Details for the file etlhelper-0.9.6.zip.

File metadata

  • Download URL: etlhelper-0.9.6.zip
  • Upload date:
  • Size: 68.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.6.0 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.6.9

File hashes

Hashes for etlhelper-0.9.6.zip
Algorithm Hash digest
SHA256 4f0a281be8cea89987192632d4b6d716e1383d86e2a6eeba9618b548298c0517
MD5 753e296619c9b9e7a2a7fbc9a254a95d
BLAKE2b-256 b8c8a5ee50809aa06b8384eefc0aa90d713b57bfb962df6f035cdb44ca49b809

See more details on using hashes here.

File details

Details for the file etlhelper-0.9.6-py3-none-any.whl.

File metadata

  • Download URL: etlhelper-0.9.6-py3-none-any.whl
  • Upload date:
  • Size: 33.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.6.0 requests-toolbelt/0.9.1 tqdm/4.49.0 CPython/3.6.9

File hashes

Hashes for etlhelper-0.9.6-py3-none-any.whl
Algorithm Hash digest
SHA256 6027d6acf49519364481a57315435dbbc3e960b693de3dd612ae67707c4c7310
MD5 60d59dc37afcc6468fce504d092d590b
BLAKE2b-256 1bf4f6303fe77ed2a30a876b130d25818a335b12a85c5cf06d90221cb95e692a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page