Skip to main content

PostgreSQL CDC library using pgoutput and python

Project description

pypgoutput

Python package to read, parse and convert PostgreSQL logical decoding messages to change data capture messages. Built using psycopg2's logical replication support objects, PostgreSQL's pgoutput plugin and Pydantic.

Uses python >= 3.8

Developed on PostgreSQL 12 for now.

Installation

$ pip install pypgoutput

How it works

  • Replication messages are consumed via psycopg2's replication connection. https://www.psycopg.org/docs/extras.html#replication-support-objects
  • The binary messages from pgoutput logical decoding are parsed in the decoders.py module.
  • Parsed messages are converted to change events and yieled from the LogicalReplicationReader
  • Change events are nested Pydantic models where the tuple data (before/after) schema is dynamically generated depending on the table being processed.

Example

First, setup a publication and a logical replication slot in the source database.

CREATE PUBLICATION test_pub FOR ALL TABLES;
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'pgoutput');

Second, run the script to collect the changes:

import os
import pypgoutput

HOST = os.environ.get("PGHOST")
PORT = os.environ.get("PGPORT")
DATABASE_NAME = os.environ.get("PGDATABASE")
USER = os.environ.get("PGUSER")
PASSWORD = os.environ.get("PGPASSWORD")


cdc_reader = pypgoutput.LogicalReplicationReader(
                publication_name="test_pub",
                slot_name="test_slot",
                host=HOST,
                database=DATABASE_NAME,
                port=PORT,
                user=USER,
                password=PASSWORD,
            )
for message in cdc_reader:
    print(message.json(indent=2))

cdc_reader.stop()

Generate some change messages

CREATE TABLE public.readme (id integer primary key, created_at timestamptz default now());

INSERT INTO public.readme (id) SELECT data FROM generate_series(1, 3) AS data;

Output:

{
  "op": "I",
  "message_id": "4606b12b-ab41-41e6-9717-7ce92f8a9857",
  "lsn": 23530912,
  "transaction": {
    "tx_id": 499,
    "begin_lsn": 23531416,
    "commit_ts": "2022-01-14T17:22:10.298334+00:00"
  },
  "table_schema": {
    "column_definitions": [
      {
        "name": "id",
        "part_of_pkey": true,
        "type_id": 23,
        "type_name": "integer",
        "optional": false
      },
      {
        "name": "created_at",
        "part_of_pkey": false,
        "type_id": 1184,
        "type_name": "timestamp with time zone",
        "optional": true
      }
    ],
    "db": "test_db",
    "schema_name": "public",
    "table": "readme",
    "relation_id": 16403
  },
  "before": null,
  "after": {
    "id": 1,
    "created_at": "2022-01-14T17:22:10.296740+00:00"
  }
}
{
  "op": "I",
  "message_id": "1ede0643-42b6-4bb1-8a98-8e4ca10c7915",
  "lsn": 23531144,
  "transaction": {
    "tx_id": 499,
    "begin_lsn": 23531416,
    "commit_ts": "2022-01-14T17:22:10.298334+00:00"
  },
  "table_schema": {
    "column_definitions": [
      {
        "name": "id",
        "part_of_pkey": true,
        "type_id": 23,
        "type_name": "integer",
        "optional": false
      },
      {
        "name": "created_at",
        "part_of_pkey": false,
        "type_id": 1184,
        "type_name": "timestamp with time zone",
        "optional": true
      }
    ],
    "db": "test_db",
    "schema_name": "public",
    "table": "readme",
    "relation_id": 16403
  },
  "before": null,
  "after": {
    "id": 2,
    "created_at": "2022-01-14T17:22:10.296740+00:00"
  }
}
{
  "op": "I",
  "message_id": "fb477de5-8281-4102-96ee-649a838d38f2",
  "lsn": 23531280,
  "transaction": {
    "tx_id": 499,
    "begin_lsn": 23531416,
    "commit_ts": "2022-01-14T17:22:10.298334+00:00"
  },
  "table_schema": {
    "column_definitions": [
      {
        "name": "id",
        "part_of_pkey": true,
        "type_id": 23,
        "type_name": "integer",
        "optional": false
      },
      {
        "name": "created_at",
        "part_of_pkey": false,
        "type_id": 1184,
        "type_name": "timestamp with time zone",
        "optional": true
      }
    ],
    "db": "test_db",
    "schema_name": "public",
    "table": "readme",
    "relation_id": 16403
  },
  "before": null,
  "after": {
    "id": 3,
    "created_at": "2022-01-14T17:22:10.296740+00:00"
  }
}

Why use this package?

  • Preference to use the built in pgoutput plugin. Some plugins are not available in managed database services such as RDS or cannot be updated to a new version.
  • The pgoutput plugin includes useful metadata such as relid.

Useful links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypgoutput-0.0.3.tar.gz (14.3 kB view details)

Uploaded Source

Built Distribution

pypgoutput-0.0.3-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file pypgoutput-0.0.3.tar.gz.

File metadata

  • Download URL: pypgoutput-0.0.3.tar.gz
  • Upload date:
  • Size: 14.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.22.0 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.11.1 keyring/18.0.1 rfc3986/2.0.0 colorama/0.4.3 CPython/3.8.10

File hashes

Hashes for pypgoutput-0.0.3.tar.gz
Algorithm Hash digest
SHA256 0866a11ee4938a234bdac58624646760109348614b436612978b3496aae4fddb
MD5 099061b5dcea6320338df49f92ae454e
BLAKE2b-256 10d931efb332fa254a703a9eaf50054af70897834a75a0fd3f40d7e74373215a

See more details on using hashes here.

File details

Details for the file pypgoutput-0.0.3-py3-none-any.whl.

File metadata

  • Download URL: pypgoutput-0.0.3-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.2 readme-renderer/32.0 requests/2.22.0 requests-toolbelt/0.9.1 urllib3/1.26.8 tqdm/4.62.3 importlib-metadata/4.11.1 keyring/18.0.1 rfc3986/2.0.0 colorama/0.4.3 CPython/3.8.10

File hashes

Hashes for pypgoutput-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 8790f83cc15f4e52e9df9fea6a42cfb86d9839ac5c93b16587bd0950873904e0
MD5 28a246dd3fff4526564a22301c6c2afe
BLAKE2b-256 7ae8755a5b68b4b9bd36d962735a5291bddfb3d417e48b3e73dc950c6946e924

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page