Skip to main content

A collection of Python utility functions for ingesting data into SQLAlchemy-defined PostgreSQL tables, automatically migrating them as needed, and minimising locking

Project description

pg-bulk-ingest

A Python utility function for ingesting data into a SQLAlchemy-defined PostgreSQL table, automatically migrating it as needed, allowing concurrent reads as much as possible.

Allowing concurrent writes is not an aim of this function. This is designed for cases where PostgreSQL is used as a data warehouse, and the only writes to the table are from pg-bulk-ingest. It is assumed that there is only one pg-bulk-ingest running against a given table at any one time.

Installation

pg-bulk-ingest can be installed from PyPI using pip. psycopg2 or psycopg (Psycopg 3) must also be explicitly installed.

pip install pg-bulk-ingest psycopg

Usage

Data ingest to a table is done through the function ingest. This function:

  • creates the table if necessary
  • migrates any existing table if necessary, minimising locking
  • inserts the incoming data into the table in batches, where each batch is ingested in its own transaction
  • if the table has a primary key, performs an "upsert", matching on this primary key
  • handles "high-watermarking" to carry on from where a previous ingest finished or errored
  • optionally deletes all existing rows before ingestion
  • optionally calls a callback just before each batch is visible to other database clients

Full example:

import sqlalchemy as sa
from pg_bulk_ingest import HighWatermark, Visibility, Delete, ingest

# Run postgresql locally should allow the below to run
# docker run --rm -it -e POSTGRES_HOST_AUTH_METHOD=trust -p 5432:5432 postgres
engine = sa.create_engine('postgresql+psycopg://postgres@127.0.0.1:5432/')

# A SQLAlchemy Metadata of a single table definition
metadata = sa.MetaData()
my_table = sa.Table(
    "my_table",
    metadata,
    sa.Column("id", sa.INTEGER, primary_key=True),
    sa.Column("value", sa.VARCHAR(16), nullable=False),
    schema="my_schema",
)

# A function that yields batches of data, where each is a tuple of of
# (high watermark, batch metadata, data rows).
# The batches must all be strictly _after_ the high watermark passed into the function
# Each high watermark must be JSON-encodable
# Each row must have the SQLAlchemy table associated with it
def batches(high_watermark):
    if high_watermark is None or high_watermark < '2015-01-01':
        yield '2015-01-01', 'Any batch metadata', (
            (my_table, (3, 'a')),
            (my_table, (4, 'b')),
            (my_table, (5, 'c')),
        )
    if high_watermark is None or high_watermark < '2015-01-02':
        yield '2015-01-02', 'Any other batch metadata', (
            (my_table, (6, 'd')),
            (my_table, (7, 'e')),
            (my_table, (8, 'f')),
        )

def on_before_visible(conn, batch_metadata):
    # Can perform validation or update metadata table(s) just before data
    # is visible to other database clients
    # conn: is a SQLAlchemy connection in the same transaction as this batch
    # batch_metadata: the metadata for the most recent batch from the batches
    # function

with engine.connect() as conn:
    ingest(
        conn, metadata, batches,
        on_before_visible=on_before_visible,
        high_watermark=HighWatermark.LATEST,     # Carry on from where left off
        visibility=Visibility.AFTER_EACH_BATCH,  # Changes are visible after each batch
        delete=Delete.OFF,                       # Don't delete any existing rows
    )

API

The API is a single function ingest, together with classes of string constants: HighWatermark, Visibility, and Delete. The constants are known strings rather than opaque identifiers to allow the strings to be easily passed from dynamic/non-Python environments.


ingest(conn, metadata, batches, on_before_visible=lambda conn, latest_batch_metadata: None, high_watermark=HighWatermark.LATEST, visibility=Visibility.AFTER_EACH_BATCH, delete=Delete.OFF)

Ingests data into a table

  • conn - A SQLAlchemy connection not in a transaction, i.e. started by connection rather than begin.

  • metadata - A SQLAlchemy metadata of a single table.

  • batches - A function that takes a high watermark, returning an iterable that yields data batches that are strictly after this high watermark. See Usage above for an example.

  • on_before_visible (optional) - A function that takes a SQLAlchemy connection in a transaction and batch metadata, called just before data becomes visible to other database clients. See Usage above for an example.

  • high_watermark (optional) - A member of the HighWatermark class, or a JSON-encodable value.

    If this is HighWatermark.LATEST, then the most recent high watermark that been returned from a previous ingest's batch function whose corresponding batch has been succesfully ingested is passed into the batches function. If there has been no previous ingest, None will be passed.

    If this a JSON-encodable value other than HighWatermark.LATEST, then this value is passed directly to the batches function. This can be used to override any previous high-watermark. Existing data in the target table is not deleted unless specified by the delete parameter.

  • visibility (optional) - A member of the Visibilty class, controlling when ingests will be visible to other clients,

  • delete (optional) - A member of the Delete class, controlling if existing rows are to be deleted.


HighWatermark

A class of constants to indicate to the ingest function how it should use any previously stored high watermark. Its single member is:

  • LATEST - use the most recently high watermark, passing it to the batches function. This is the string __LATEST__.

Visibility

A class of constants to indicate when data changes are visible to other database clients. Schema changes become visible before the first batch.

  • AFTER_EACH_BATCH - data changes are visible to other database clients after each batch. This is the string __AFTER_EACH_BATCH__.

Delete

A class of constants that controls how existing data in the table is deleted

  • OFF

    There is no deleting of existing data. This is the string __OFF__.

  • ALL

    All existing data in the table is deleted. This is the string __ALL__.

Data types

The SQLAlchemy "CamelCase" data types are not supported in table definitions. Instead, you must use types specified with "UPPERCASE" data types. These are non-abstracted database-level types. This is to support automatic migrations - the real database type is required in order to make a comparison with the live table and the one passed into the ingest function.

Also not supported is the sqlalchemy.JSON type. Instead use sa.dialects.postgresql.JSON or sa.dialects.postgresql.JSONB.

Indexes

Indexes can be added by any of two mechanisms:

  1. Setting index=True on a column.

    sa.Table(
        "my_table",
        metadata,
        sa.Column("id", sa.INTEGER, primary_key=True),
        sa.Column("value", sa.VARCHAR(16), nullable=False, index=True),
        schema="my_schema",
    )
    
  2. Passing sqlalchemy.Index objects after the column list when defining the table. The name of each index should be None, which allows SQLAlchemy to give it a name unlikely to conflict with other indexes.

    sa.Table(
        "my_table",
        metadata,
        sa.Column("id", sa.INTEGER, primary_key=True),
        sa.Column("value", sa.VARCHAR(16), nullable=False),
        sa.Index(None, "value"),
        schema="my_schema",
    )
    

Under the hood

  • Ingestion is done exclusively with COPY FROM.
  • Ingestion is transactional, each batch is ingested completely or not at all
  • The table is migrated to match the definition, using techniques to avoid exclusively locking the table to allow parallel SELECT queries.
  • If the table has a primary key, then an "upsert" is performed. Data is ingested into an intermediate table, and an INSERT ... ON CONFLICT(...) DO UPDATE is performed to copy rows from this intermediate table to the existing table. This doesn't involve an ACCESS EXCLUSIVE lock on the live table, so SELECTs can continue in parallel.
  • Migrations usually require ACCESS EXCLUSIVE lock on the live table. However, if there is no known technique for a migration without a long-running lock, then an intermediate table is created, matching the required definition, existing data is copied into this table, and it replaces the live table before the first batch. This replacement requires an ACCESS EXCLUSIVE lock, but only for a short time. Backends that hold locks that block migrations are forcably terminated after a delay.
  • The high watermark is stored on the table as a COMMENT, JSON-encoded. For example if the most recent high watermark is the string 2014-07-31, then the comment would be {"pg-bulk-ingest": {"high-watermark": "2014-07-31"}}.

Compatibility

  • Python >= 3.7.1 (tested on 3.7.1, 3.8.0, 3.9.0, 3.10.0, and 3.11.0)
  • psycopg2 >= 2.9.2 or Psycopg 3 >= 3.1.4
  • SQLAlchemy >= 1.4.24 (tested on 1.4.24 and 2.0.0)
  • PostgreSQL >= 9.6 (tested on 9.6, 10.0, 11.0, 12.0, 13.0, 14.0, and 15.0)

Note that SQLAlchemy < 2 does not support Psycopg 3, and for SQLAlchemy < 2 future=True must be passed to create_engine.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pg_bulk_ingest-0.0.18.tar.gz (9.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pg_bulk_ingest-0.0.18-py3-none-any.whl (9.2 kB view details)

Uploaded Python 3

File details

Details for the file pg_bulk_ingest-0.0.18.tar.gz.

File metadata

  • Download URL: pg_bulk_ingest-0.0.18.tar.gz
  • Upload date:
  • Size: 9.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/4.0.1 CPython/3.11.3

File hashes

Hashes for pg_bulk_ingest-0.0.18.tar.gz
Algorithm Hash digest
SHA256 fe7fa86a1e3c6abd338110eb97716f83f30448c62fb890e92adc28b28a668872
MD5 5968c6f138e6c1a503cf64124f6c4922
BLAKE2b-256 7f6b6f6421bde71ca7dd5d9fe0ba8362e7d984b567fc45849c129067d23a623b

See more details on using hashes here.

File details

Details for the file pg_bulk_ingest-0.0.18-py3-none-any.whl.

File metadata

File hashes

Hashes for pg_bulk_ingest-0.0.18-py3-none-any.whl
Algorithm Hash digest
SHA256 6d9f57bb485efe17190244f4636b5e482ba3a0753e7d182462a762cce9451ef4
MD5 bee5b123fd14dfd7d7e7271fd77df932
BLAKE2b-256 3fbc3431a4e9ddc6fd7c87aa532850bb969458c393a6a2f4fd61fe0dc4b28789

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page