PostgreSQL extractor for mkpipe.
Project description
mkpipe-extractor-postgres
PostgreSQL extractor plugin for MkPipe. Reads PostgreSQL tables via JDBC.
Documentation
For more detailed documentation, please visit the GitHub repository.
License
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
Connection Configuration
connections:
pg_source:
variant: postgres
host: localhost
port: 5432
database: mydb
schema: public
user: myuser
password: mypassword
Table Configuration
pipelines:
- name: pg_to_pg
source: pg_source
destination: pg_target
tables:
- name: public.users
target_name: stg_users
replication_method: full
fetchsize: 100000
Incremental Replication
- name: public.users
target_name: stg_users
replication_method: incremental
iterate_column: updated_at
iterate_column_type: datetime
partitions_column: id
partitions_count: 4
fetchsize: 50000
Custom SQL
- name: public.orders
target_name: stg_orders
replication_method: incremental
iterate_column: created_at
iterate_column_type: datetime
custom_query: "SELECT id, user_id, amount FROM public.orders WHERE {query_filter}"
Use {query_filter} as a placeholder — replaced with the incremental WHERE clause, or WHERE 1=1 on full runs. Can also reference a SQL file:
custom_query_file: orders.sql # looks in sql/ directory
Read Parallelism
For large tables, set partitions_column and partitions_count to read in parallel using multiple JDBC connections:
- name: public.events
target_name: stg_events
replication_method: incremental
iterate_column: created_at
iterate_column_type: datetime
partitions_column: id # numeric column to split on
partitions_count: 8 # number of parallel JDBC partitions
fetchsize: 50000
How it works
- Spark queries min/max of
partitions_columnwithin the time window and divides intopartitions_countequal slices - Each slice is fetched by a separate Spark task via a separate JDBC connection
fetchsizecontrols rows per JDBC round-trip
Performance Notes
- Full replication: partitioning is not applied (only incremental).
partitions_columnshould be a numeric column with good distribution (e.g. primary key).- PostgreSQL default
fetchsizein JDBC is 0 (loads all rows at once). Settingfetchsize: 50000–200000avoids out-of-memory on large tables.
All Table Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
string | required | PostgreSQL table name (include schema: public.users) |
target_name |
string | required | Destination table name |
replication_method |
full / incremental |
full |
Replication strategy |
iterate_column |
string | — | Column used for incremental watermark |
iterate_column_type |
int / datetime |
— | Type of iterate_column |
partitions_column |
string | same as iterate_column |
Column to split JDBC reads on |
partitions_count |
int | 10 |
Number of parallel JDBC partitions |
fetchsize |
int | 100000 |
Rows per JDBC fetch |
custom_query |
string | — | Override SQL with {query_filter} placeholder |
custom_query_file |
string | — | Path to SQL file (relative to sql/ dir) |
write_partitions |
int | — | Coalesce to N partitions before writing |
tags |
list | [] |
Tags for selective pipeline execution |
pass_on_error |
bool | false |
Skip table on error instead of failing |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mkpipe_extractor_postgres-0.5.0.tar.gz.
File metadata
- Download URL: mkpipe_extractor_postgres-0.5.0.tar.gz
- Upload date:
- Size: 8.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b843388ea156d04cb6cd0ebc78518e2b0c3c312196d8f8de32b88924c01aef03
|
|
| MD5 |
7863b9264324e5628b6061b631323275
|
|
| BLAKE2b-256 |
7a0c292e0b8817dbc671d6257244dee8fcf4f8ec302504c3afe5976202e88bb5
|
File details
Details for the file mkpipe_extractor_postgres-0.5.0-py3-none-any.whl.
File metadata
- Download URL: mkpipe_extractor_postgres-0.5.0-py3-none-any.whl
- Upload date:
- Size: 8.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f0f9e757b636a02335f3fd19ae0ba77de73a27e5d7c864bd1f8d0dd13aff84b2
|
|
| MD5 |
9fcaa8fb0fcf91b612a0ff4f056aec48
|
|
| BLAKE2b-256 |
a66aae344d205cb472fb648dd810e3e8251d5011d85e35c0c457db9fbd54f9fb
|