Skip to main content

PostgreSQL CDC library using pgoutput and python

Project description

pypgoutput

Python package to read, parse and convert PostgreSQL logical decoding messages to change data capture messages. Built using psycopg2's logical replication support objects and PostgreSQL's pgoutput plugin.

Uses python >= 3.8

Example

First setup publication and slot in the DB:

CREATE PUBLICATION test_pub FOR ALL TABLES;
SELECT * FROM pg_create_logical_replication_slot('test_slot', 'pgoutput');

Second, run the script to collect the changes:

import os
import pypgoutput

HOST = os.environ.get("PGHOST")
PORT = os.environ.get("PGPORT")
DATABASE_NAME = os.environ.get("PGDATABASE")
USER = os.environ.get("PGUSER")
PASSWORD = os.environ.get("PGPASSWORD")


cdc_reader = pypgoutput.LogicalReplicationReader(
                publication_name="test_pub",
                slot_name="test_slot",
                host=HOST,
                database=DATABASE_NAME,
                port=PORT,
                user=USER,
                password=PASSWORD,
            )
for message in cdc_reader:
    print(message.json(indent=2))

cdc_reader.stop()

Generate some change messages

CREATE TABLE public.readme (id integer primary key, created_at timestamptz default now());

INSERT INTO public.readme (id) SELECT data FROM generate_series(1, 3) AS data;

Output:

{
  "op": "I",
  "message_id": "4606b12b-ab41-41e6-9717-7ce92f8a9857",
  "lsn": 23530912,
  "transaction": {
    "tx_id": 499,
    "begin_lsn": 23531416,
    "commit_ts": "2022-01-14T17:22:10.298334+00:00"
  },
  "table_schema": {
    "column_definitions": [
      {
        "name": "id",
        "part_of_pkey": true,
        "type_id": 23,
        "type_name": "integer",
        "optional": false
      },
      {
        "name": "created_at",
        "part_of_pkey": false,
        "type_id": 1184,
        "type_name": "timestamp with time zone",
        "optional": true
      }
    ],
    "db": "test_db",
    "schema_name": "public",
    "table": "readme",
    "relation_id": 16403
  },
  "before": null,
  "after": {
    "id": 1,
    "created_at": "2022-01-14T17:22:10.296740+00:00"
  }
}
{
  "op": "I",
  "message_id": "1ede0643-42b6-4bb1-8a98-8e4ca10c7915",
  "lsn": 23531144,
  "transaction": {
    "tx_id": 499,
    "begin_lsn": 23531416,
    "commit_ts": "2022-01-14T17:22:10.298334+00:00"
  },
  "table_schema": {
    "column_definitions": [
      {
        "name": "id",
        "part_of_pkey": true,
        "type_id": 23,
        "type_name": "integer",
        "optional": false
      },
      {
        "name": "created_at",
        "part_of_pkey": false,
        "type_id": 1184,
        "type_name": "timestamp with time zone",
        "optional": true
      }
    ],
    "db": "test_db",
    "schema_name": "public",
    "table": "readme",
    "relation_id": 16403
  },
  "before": null,
  "after": {
    "id": 2,
    "created_at": "2022-01-14T17:22:10.296740+00:00"
  }
}
{
  "op": "I",
  "message_id": "fb477de5-8281-4102-96ee-649a838d38f2",
  "lsn": 23531280,
  "transaction": {
    "tx_id": 499,
    "begin_lsn": 23531416,
    "commit_ts": "2022-01-14T17:22:10.298334+00:00"
  },
  "table_schema": {
    "column_definitions": [
      {
        "name": "id",
        "part_of_pkey": true,
        "type_id": 23,
        "type_name": "integer",
        "optional": false
      },
      {
        "name": "created_at",
        "part_of_pkey": false,
        "type_id": 1184,
        "type_name": "timestamp with time zone",
        "optional": true
      }
    ],
    "db": "test_db",
    "schema_name": "public",
    "table": "readme",
    "relation_id": 16403
  },
  "before": null,
  "after": {
    "id": 3,
    "created_at": "2022-01-14T17:22:10.296740+00:00"
  }
}

Why use this package?

  • Preference to use the built in pgoutput plugin. Some plugins are not available in managed database services such as RDS or cannot be updated to a new version.
  • The pgoutput plugin includes useful metadata such as relid.

Useful links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypgoutput-0.0.1.tar.gz (13.5 kB view hashes)

Uploaded Source

Built Distribution

pypgoutput-0.0.1-py3-none-any.whl (12.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page