Skip to main content

Singer.io target for loading data to PostgreSQL - PipelineWise compatible

Project description

pipelinewise-target-postgres

PyPI version PyPI - Python Version License: Apache2

Singer target that loads data into PostgreSQL following the Singer spec.

This is a PipelineWise compatible target connector.

How to use it

The recommended method of running this target is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Target Postgres

If you want to run this Singer Target independently please read further.

Install

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

make venv

To run

Like any other target that's following the singer specification:

some-singer-tap | target-postgres --config [config.json]

It's reading incoming messages from STDIN and using the properties in config.json to upload data into Postgres.

Note: To avoid version conflicts run tap and targets in separate virtual environments.

Spin up a PG DB

Make use of the available docker-compose file to spin up a PG DB.

docker-compose up -d --build db

Configuration settings

Running the target connector requires a config.json file. An example with the minimal settings:

{
    "host": "localhost",
    "port": 5432,
    "user": "my_user",
    "password": "secret",
    "dbname": "target_db",
    "default_target_schema": "public"
}

Full list of options in config.json:

Property Type Required? Description
host String Yes PostgreSQL host
port Integer Yes PostgreSQL port
user String Yes PostgreSQL user
password String Yes PostgreSQL password
dbname String Yes PostgreSQL database name
batch_size_rows Integer (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Postgres.
flush_all_streams Boolean (Default: False) Flush and load every stream into Postgres when one batch is full. Warning: This may trigger the COPY command to use files with low number of records.
parallelism Integer (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max.
max_parallelism Integer (Default: 16) Max number of parallel threads to use when flushing tables.
default_target_schema String Name of the schema where the tables will be created. If schema_mapping is not defined then every stream sent by the tap is loaded into this schema.
default_target_schema_select_permission String Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created
schema_mapping Object Useful if you want to load multiple streams from one tap to multiple Postgres schemas.

If the tap sends the stream_id in <schema_name>-<table_name> format then this option overwrites the default_target_schema value. Note, that using schema_mapping you can overwrite the default_target_schema_select_permission value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.

Note: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a PipelineWise YAML Example.
add_metadata_columns Boolean (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in postgres etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix _SDC_. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the _SDC_DELETED_AT metadata column. Without the add_metadata_columns option the deleted rows from singer taps will not be recognisable in Postgres.
hard_delete Boolean (Default: False) When hard_delete option is true then DELETE SQL commands will be performed in Postgres to delete rows in tables. It's achieved by continuously checking the _SDC_DELETED_AT metadata column sent by the singer tap. Due to deleting rows requires metadata columns, hard_delete option automatically enables the add_metadata_columns option as well.
data_flattening_max_level Integer (Default: 0) Object type RECORD items from taps can be transformed to flattened columns by creating columns automatically.

When value is 0 (default) then flattening functionality is turned off.
primary_key_required Boolean (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined.
validate_records Boolean (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Postgres. Enabling this option will detect invalid records earlier but could cause performance degradation.
temp_dir String (Default: platform-dependent) Directory of temporary CSV files with RECORD messages.

To run tests:

  1. Define environment variables that requires running the tests
  export TARGET_POSTGRES_HOST=<postgres-host>
  export TARGET_POSTGRES_PORT=<postgres-port>
  export TARGET_POSTGRES_USER=<postgres-password>
  export TARGET_POSTGRES_PASSWORD=<postgres-password>
  export TARGET_POSTGRES_DBNAME=<postgres-dbname>
  export TARGET_POSTGRES_SCHEMA=<postgres-schema>

PS: You can run make env to export pre-defined environment variables

  1. Install python dependencies in a virtual env and run unit and integration tests
  make venv
  1. To run unit tests:
  make unit_test
  1. To run integration tests:
  make integration_test

To run pylint:

  1. Install python dependencies and run python linter
 make venv pylint

License

Apache License Version 2.0

See LICENSE to see the full text.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinewise-target-postgres-2.1.2.tar.gz (17.9 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file pipelinewise-target-postgres-2.1.2.tar.gz.

File metadata

File hashes

Hashes for pipelinewise-target-postgres-2.1.2.tar.gz
Algorithm Hash digest
SHA256 37cb00dd503f286f51cddf14b55ddad191df01699d70c157167bd8f1b59ccab8
MD5 3820bdfbec0dda0927c3b40414cf3a95
BLAKE2b-256 308e821973f9f707901acd6ebf257b7c7142b48e28d4b8c9200c080a00fa01be

See more details on using hashes here.

File details

Details for the file pipelinewise_target_postgres-2.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for pipelinewise_target_postgres-2.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7b098d2b153193f1b0fcee089120504a0ad7f4e0439d002bcfd4d7ba51bd82ac
MD5 e4362d70cfee1af9744609edbffe1ed6
BLAKE2b-256 b2f8c6c7b1f62f6cf0f1940b75ec8f8185783d778348ef6a29ecfd445bdf0cc5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page