A Python library for Change Data Capture (CDC) workflows using PySpark and Delta Lake.
Project description
pyspark-cdc
A Python library for Change Data Capture (CDC) workflows using PySpark. This project provides tools to capture, optimize, validate, and manage data changes efficiently in distributed environments.
Features
- Full and incremental data capture.
- Cron-based scheduling utilities for Delta optimizations.
- No extra dependencies.
- Use internal
commitInfo.userMetadatato store watermark.
Installation
You can install the package using pip :
pip install pyspark-cdc
Usage
Let's assume that there is a table in a PostgreSQL database. Use this module to capture it as a managed delta table.
from pyspark_cdc import capture
... # necessary variables
df = (
spark.read.format("jdbc")
.options(
url=f"{postgresql_jdbc_url}",
dbtable=f"{postgresql_schema}.{postgresql_table}",
user=f"{postgresql_user}",
password=f"{postgresql_password}",
driver="org.postgresql.Driver", # Ensure JDBC Driver JAR is in Spark.
)
.load()
)
# quick start
(
capture(df, spark)
.table(f"{catalog}.{database}.{table_name}") # managed table name
.mode("incremental")
.format("delta")
.primary_keys(["ID"]) # PK
.watermark_column("UPDATED_AT") # Watermark column
.enable_deletion_detect() # detect DELETE operations
.start()
)
# With more
(
capture(df, spark)
.table(f"{catalog}.{database}.{table_name}")
.mode("incremental")
.format("delta")
.primary_keys(["ID"])
.watermark_column("UPDATED_AT")
.partition_by(["COUNTRY", "GENDER"]) # partitioning
.schedule_zorder("*/3", ["FIRST_NAME", "SURNAME"]) # run z-order every 3 days
.schedule_vacuum("5,20") # run vacuum on 5th and 20th every month
.schedule_compaction("10-25") # run compaction every day between 5th and 25th every month
.enable_deletion_detect() # detect hard delete operations in source side
.table_properties( # extra delta table properties
{
"delta.deletedFileRetentionDuration": "interval 3 day",
"delta.logRetentionDuration": "interval 3 day",
"delta.appendOnly": "false",
"delta.enableDeletionVectors": "true",
}
)
.options( # extra DataFrame writer options
{
"maxRecordsPerFile": 1000,
}
)
.start()
)
See the samples/ directory and the tests/ folder for more usage examples and test cases.
Delta Optimize
Note: Schedulers use standard day-of-the-month crontab expressions:
- *: any value
- ,: value list separator
- -: range of values
- /: step values
- 1-31: allowed values
⚠️ If you run capture multiple times in a day, the optimize schedulers are triggered during each run. You can turn it off by using
.scheduler_switch("OFF")
Typical Scenarios
The following table summarizes common use cases:
| Mode | Primary Key | Watermark Column | Example Usage | Comment |
|---|---|---|---|---|
| Full | No need | No need | ... .mode("full") .format("delta") ... |
No auto incremental PK or watermark, it's better to add watermark column for big tables. |
| Incremental | Single | Yes (datetime) | ... .mode("incremental") .primary_keys(["ID"]) .watermark_column("UPDATED_AT") .format("delta") ... |
Common case. |
| Incremental | Auto incremental PK | No | ... .mode("incremental") .primary_keys(["ID"]) .watermark_column("ID") .format("delta") ... |
Use auto incremental PK as watermark, but cannot capture UPDATE operations. |
| Incremental | Multi | Yes (datetime) | ... .mode("incremental") .primary_keys(["ID", "FIRST_NAME"]) .watermark_column("UPDATED_AT") .format("delta") ... |
Common case. |
| Incremental | Multi | Yes (int) | ... .mode("incremental") .primary_keys(["ID", "FIRST_NAME"]) .watermark_column("ID") .format("delta") ... |
Multi-column PK, Use auto incremental PK as watermark, but cannot capture UPDATE operations. |
To capture DELETE operations, use enable_deletion_detect(), it will compare records at two sides based on the PK(s).
Library Version Matrix
The version of this module follows Databricks Runtime LTS
| Version | JDK | Python | Databricks Runtime | Spark | Delta Lake |
|---|---|---|---|---|---|
| 1.0.0 | 17 | 3.12 | 17 LTS | 4.0.0 | 4.0.0 |
License
This project is licensed under the MIT License. See LICENSE for details.
Contact
For questions or support, open an issue on GitHub.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pyspark_cdc-1.0.0.tar.gz.
File metadata
- Download URL: pyspark_cdc-1.0.0.tar.gz
- Upload date:
- Size: 12.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.12.12 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b63694ade102563b4d82a4a7a4e1e679484ef411f55219e4bcd332e57ff34ffa
|
|
| MD5 |
c16b61f1436d0edbdd50c08eaf608d71
|
|
| BLAKE2b-256 |
bdb541434b944162b4dd2d78ef4b9f2df6f1e712291cf3a0eb5998a97bf2b255
|
File details
Details for the file pyspark_cdc-1.0.0-py3-none-any.whl.
File metadata
- Download URL: pyspark_cdc-1.0.0-py3-none-any.whl
- Upload date:
- Size: 14.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.12.12 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
51734a1526d84a7d4ce7acd46e7bdfd505bd0de3fca2543e32a0a48e18d47131
|
|
| MD5 |
e35dee375f95e7904f01fd22275b9a71
|
|
| BLAKE2b-256 |
a0adbfeb42b5a98fc623a294da032440272cbb2c52eaad1a25bb102eaa21d311
|