Skip to main content

C8 Source for extracting data from PostgresSQL.

Project description

c8-source-postgres

PyPI version PyPI - Python Version License: MIT

Singer tap that extracts data from a PostgreSQL database and produces JSON-formatted data following the Singer spec.

How to use it

The recommended method of running this tap is to use it with Macrometa data connectors. If you want to run this Singer Tap independently please read further.

Install and Run

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu .

It's recommended to use a virtualenv:

  python3 -m venv venv
  pip install c8-source-postgres

or

  make venv

Create a config.json

{
  "host": "localhost",
  "port": 5432,
  "user": "postgres",
  "password": "secret",
  "dbname": "db"
}

These are the same basic configuration properties used by the PostgreSQL command-line client (psql).

Full list of options in config.json:

Property Type Required? Description
host String Yes PostgreSQL host
port Integer Yes PostgreSQL port
user String Yes PostgreSQL user
password String Yes PostgreSQL password
dbname String Yes PostgreSQL database name
filter_schemas String No Comma separated schema names to scan only the required schemas to improve the performance of data extraction. (Default: None)
ssl String No If set to "true" then use SSL via postgres sslmode require option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail. (Default: None)
logical_poll_total_seconds Integer No Stop running the tap when no data received from wal after certain number of seconds. (Default: 10800)
break_at_end_lsn Boolean No Stop running the tap if the newly received lsn is after the max lsn that was detected when the tap started. (Default: true)
max_run_seconds Integer No Stop running the tap after certain number of seconds. (Default: 43200)
debug_lsn String No If set to "true" then add _sdc_lsn property to the singer messages to debug postgres LSN position in the WAL stream. (Default: None)
tap_id String No ID of the pipeline/tap (Default: None)
itersize Integer No Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000)
default_replication_method String No Default replication method to use when no one is provided in the catalog (Values: LOG_BASED, INCREMENTAL or FULL_TABLE) (Default: None)
use_secondary Boolean No Use a database replica for INCREMENTAL and FULL_TABLE replication (Default : False)
secondary_host String No PostgreSQL Replica host (required if use_secondary is True)
secondary_port Integer No PostgreSQL Replica port (required if use_secondary is True)

Run the tap in Discovery Mode

c8-source-postgres --config config.json --discover                # Should dump a Catalog to stdout
c8-source-postgres --config config.json --discover > catalog.json # Capture the Catalog

Add Metadata to the Catalog

Each entry under the Catalog's "stream" key will need the following metadata:

{
  "streams": [
    {
      "stream_name": "my_topic"
      "metadata": [{
        "breadcrumb": [],
        "metadata": {
          "selected": true,
          "replication-method": "LOG_BASED",
        }
      }]
    }
  ]
}

The replication method can be one of FULL_TABLE, INCREMENTAL or LOG_BASED.

Note: Log based replication requires a few adjustments in the source postgres database, please read further for more information.

Run the tap in Sync Mode

c8-source-postgres --config config.json --catalog catalog.json

The tap will write bookmarks to stdout which can be captured and passed as an optional --state state.json parameter to the tap for the next sync.

Log Based replication requirements

  • PostgreSQL database's running PostgreSQL versions 9.4.x or greater. To avoid a critical PostgreSQL bug, use at least one of the following minor versions:

    • PostgreSQL 12.0
    • PostgreSQL 11.2
    • PostgreSQL 10.7
    • PostgreSQL 9.6.12
    • PostgreSQL 9.5.16
    • PostgreSQL 9.4.21
  • A connection to the master instance. Log-based replication will only work by connecting to the master instance.

  • wal2json plugin: To use Log Based for your PostgreSQL integration, you must install the wal2json plugin. The wal2json plugin outputs JSON objects for logical decoding, which the tap then uses to perform Log-based Replication. Steps for installing the plugin vary depending on your operating system. Instructions for each operating system type are in the wal2json’s GitHub repository:

  • postgres config file: Locate the database configuration file (usually postgresql.conf) and define the parameters as follows:

    wal_level=logical
    max_replication_slots=5
    max_wal_senders=5
    

    Restart your PostgreSQL service to ensure the changes take effect.

    Note: For max_replication_slots and max_wal_senders, we’re defaulting to a value of 5. This should be sufficient unless you have a large number of read replicas connected to the master instance.

  • Existing replication slot: Log based replication requires a dedicated logical replication slot. In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a client in the order they were made on the original server. Each slot streams a sequence of changes from a single database.

    Login to the master instance as a superuser and using the wal2json plugin, create a logical replication slot:

      SELECT *
      FROM pg_create_logical_replication_slot('macrometa_<database_name>', 'wal2json');
    

    Note: Replication slots are specific to a given database in a cluster. If you want to connect multiple databases - whether in one integration or several - you’ll need to create a replication slot for each database.

To run tests:

  1. Install python test dependencies in a virtual env:
 make venv
  1. You need to have a postgres database to run the tests and export its credentials:
  export C8_SOURCE_POSTGRES_HOST=<postgres-host>
  export C8_SOURCE_POSTGRES_PORT=<postgres-port>
  export C8_SOURCE_POSTGRES_SECONDARY_HOST=<postgres-replica-host>
  export C8_SOURCE_POSTGRES_SECONDARY_PORT=<postgres-replica-port>
  export C8_SOURCE_POSTGRES_USER=<postgres-user>
  export C8_SOURCE_POSTGRES_PASSWORD=<postgres-password>

You can make use of the local docker-compose to spin up a test database by running make start_db

Test objects will be created in the postgres database.

  1. To run the tests:
  make test

To run pylint:

  1. Install python dependencies and run python linter
  make venv
  make pylint

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

c8-source-postgres-0.0.12.tar.gz (41.3 kB view details)

Uploaded Source

Built Distribution

c8_source_postgres-0.0.12-py3-none-any.whl (44.3 kB view details)

Uploaded Python 3

File details

Details for the file c8-source-postgres-0.0.12.tar.gz.

File metadata

  • Download URL: c8-source-postgres-0.0.12.tar.gz
  • Upload date:
  • Size: 41.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.0

File hashes

Hashes for c8-source-postgres-0.0.12.tar.gz
Algorithm Hash digest
SHA256 74aefebbbd446de6d4a4728a3fd65b228066ec34bb0d15a4afbc31d208912f69
MD5 c7db9617bf88d63fff45e96ba3f586f1
BLAKE2b-256 96233a0d9e45cf64755e9577a7a93156b8f19055ad0a365a8e1967e8c9a765e1

See more details on using hashes here.

File details

Details for the file c8_source_postgres-0.0.12-py3-none-any.whl.

File metadata

File hashes

Hashes for c8_source_postgres-0.0.12-py3-none-any.whl
Algorithm Hash digest
SHA256 30c0ea0f8ebfd56ef04ae9b5e02e1c6b3fdbf03c8b69899f2acfc2321735a112
MD5 7054f5a372b05d3c5a970c66be6136f6
BLAKE2b-256 ef85da23b511cc3bc47d3823d653e58322b2b6740b6af458f76d9fb430bd22bc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page