Skip to main content

PostgreSQL extractor for mkpipe.

Project description

mkpipe-extractor-postgres

PostgreSQL extractor plugin for MkPipe. Reads PostgreSQL tables via JDBC.

Documentation

For more detailed documentation, please visit the GitHub repository.

License

This project is licensed under the Apache 2.0 License - see the LICENSE file for details.


Connection Configuration

connections:
  pg_source:
    variant: postgres
    host: localhost
    port: 5432
    database: mydb
    schema: public
    user: myuser
    password: mypassword

Table Configuration

pipelines:
  - name: pg_to_pg
    source: pg_source
    destination: pg_target
    tables:
      - name: public.users
        target_name: stg_users
        replication_method: full
        fetchsize: 100000

Incremental Replication

      - name: public.users
        target_name: stg_users
        replication_method: incremental
        iterate_column: updated_at
        iterate_column_type: datetime
        partitions_column: id
        partitions_count: 4
        fetchsize: 50000

Custom SQL

      - name: public.orders
        target_name: stg_orders
        replication_method: incremental
        iterate_column: created_at
        iterate_column_type: datetime
        custom_query: "SELECT id, user_id, amount FROM public.orders WHERE {query_filter}"

Use {query_filter} as a placeholder — replaced with the incremental WHERE clause, or WHERE 1=1 on full runs. Can also reference a SQL file:

        custom_query_file: orders.sql   # looks in sql/ directory

Read Parallelism

For large tables, set partitions_column and partitions_count to read in parallel using multiple JDBC connections:

      - name: public.events
        target_name: stg_events
        replication_method: incremental
        iterate_column: created_at
        iterate_column_type: datetime
        partitions_column: id       # numeric column to split on
        partitions_count: 8         # number of parallel JDBC partitions
        fetchsize: 50000

How it works

  • Spark queries min/max of partitions_column within the time window and divides into partitions_count equal slices
  • Each slice is fetched by a separate Spark task via a separate JDBC connection
  • fetchsize controls rows per JDBC round-trip

Performance Notes

  • Full replication: partitioning is not applied (only incremental).
  • partitions_column should be a numeric column with good distribution (e.g. primary key).
  • PostgreSQL default fetchsize in JDBC is 0 (loads all rows at once). Setting fetchsize: 50000–200000 avoids out-of-memory on large tables.

All Table Parameters

Parameter Type Default Description
name string required PostgreSQL table name (include schema: public.users)
target_name string required Destination table name
replication_method full / incremental full Replication strategy
iterate_column string Column used for incremental watermark
iterate_column_type int / datetime Type of iterate_column
partitions_column string same as iterate_column Column to split JDBC reads on
partitions_count int 10 Number of parallel JDBC partitions
fetchsize int 100000 Rows per JDBC fetch
custom_query string Override SQL with {query_filter} placeholder
custom_query_file string Path to SQL file (relative to sql/ dir)
write_partitions int Coalesce to N partitions before writing
tags list [] Tags for selective pipeline execution
pass_on_error bool false Skip table on error instead of failing

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mkpipe_extractor_postgres-0.5.0.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mkpipe_extractor_postgres-0.5.0-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file mkpipe_extractor_postgres-0.5.0.tar.gz.

File metadata

File hashes

Hashes for mkpipe_extractor_postgres-0.5.0.tar.gz
Algorithm Hash digest
SHA256 b843388ea156d04cb6cd0ebc78518e2b0c3c312196d8f8de32b88924c01aef03
MD5 7863b9264324e5628b6061b631323275
BLAKE2b-256 7a0c292e0b8817dbc671d6257244dee8fcf4f8ec302504c3afe5976202e88bb5

See more details on using hashes here.

File details

Details for the file mkpipe_extractor_postgres-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for mkpipe_extractor_postgres-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f0f9e757b636a02335f3fd19ae0ba77de73a27e5d7c864bd1f8d0dd13aff84b2
MD5 9fcaa8fb0fcf91b612a0ff4f056aec48
BLAKE2b-256 a66aae344d205cb472fb648dd810e3e8251d5011d85e35c0c457db9fbd54f9fb

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page