Skip to main content

A Python library for Change Data Capture (CDC) workflows using PySpark and Delta Lake.

Project description

PyPI version Test License: MIT

pyspark-cdc

A Python library for Change Data Capture (CDC) workflows using PySpark. This project provides tools to capture, optimize, validate, and manage data changes efficiently in distributed environments.

Features

  • Full and incremental data capture.
  • Cron-based scheduling utilities for Delta optimizations.
  • No extra dependencies.
  • Use internal commitInfo.userMetadata to store watermark.

Installation

You can install the package using pip :

pip install pyspark-cdc

Usage

Let's assume that there is a table in a PostgreSQL database. Use this module to capture it as a managed delta table.

from pyspark_cdc import capture

... # necessary variables

df = (
    spark.read.format("jdbc")
    .options(
        url=f"{postgresql_jdbc_url}",
        dbtable=f"{postgresql_schema}.{postgresql_table}",
        user=f"{postgresql_user}",
        password=f"{postgresql_password}",
        driver="org.postgresql.Driver",                # Ensure JDBC Driver JAR is in Spark.
    )
    .load()
)
# quick start
(
    capture(df, spark)
    .table(f"{catalog}.{database}.{table_name}")       # managed table name
    .mode("incremental")
    .format("delta")
    .primary_keys(["ID"])                              # PK
    .watermark_column("UPDATED_AT")                    # Watermark column
    .enable_deletion_detect()                          # detect DELETE operations
    .start()
)
# With more
(
    capture(df, spark)
    .table(f"{catalog}.{database}.{table_name}")
    .mode("incremental")
    .format("delta")
    .primary_keys(["ID"])
    .watermark_column("UPDATED_AT")
    .partition_by(["COUNTRY", "GENDER"])               # partitioning
    .schedule_zorder("*/3", ["FIRST_NAME", "SURNAME"]) # run z-order every 3 days
    .schedule_vacuum("5,20")                           # run vacuum on 5th and 20th every month
    .schedule_compaction("10-25")                      # run compaction every day between 5th and 25th every month
    .enable_deletion_detect()                          # detect hard delete operations in source side
    .table_properties(                                 # extra delta table properties
        {
            "delta.deletedFileRetentionDuration": "interval 3 day",
            "delta.logRetentionDuration": "interval 3 day",
            "delta.appendOnly": "false",
            "delta.enableDeletionVectors": "true",
        }
    )
    .options(                                         # extra DataFrame writer options
        {
            "maxRecordsPerFile": 1000,
        }
    )
    .start()
)

See the samples/ directory and the tests/ folder for more usage examples and test cases.

Delta Optimize

Note: Schedulers use standard day-of-the-month crontab expressions:

  • *: any value
  • ,: value list separator
  • -: range of values
  • /: step values
  • 1-31: allowed values

⚠️ If you run capture multiple times in a day, the optimize schedulers are triggered during each run. You can turn it off by using .scheduler_switch("OFF")

Typical Scenarios

The following table summarizes common use cases:

Mode Primary Key Watermark Column Example Usage Comment
Full No need No need ...
.mode("full")
.format("delta")
...
No auto incremental PK or watermark, it's better to add watermark column for big tables.
Incremental Single Yes (datetime) ...
.mode("incremental")
.primary_keys(["ID"])
.watermark_column("UPDATED_AT")
.format("delta")
...
Common case.
Incremental Auto incremental PK No ...
.mode("incremental")
.primary_keys(["ID"])
.watermark_column("ID")
.format("delta")
...
Use auto incremental PK as watermark, but cannot capture UPDATE operations.
Incremental Multi Yes (datetime) ...
.mode("incremental")
.primary_keys(["ID", "FIRST_NAME"])
.watermark_column("UPDATED_AT")
.format("delta")
...
Common case.
Incremental Multi Yes (int) ...
.mode("incremental")
.primary_keys(["ID", "FIRST_NAME"])
.watermark_column("ID")
.format("delta")
...
Multi-column PK, Use auto incremental PK as watermark, but cannot capture UPDATE operations.

To capture DELETE operations, use enable_deletion_detect(), it will compare records at two sides based on the PK(s).

Library Version Matrix

The version of this module follows Databricks Runtime LTS

Version JDK Python Databricks Runtime Spark Delta Lake
1.0.0 17 3.12 17 LTS 4.0.0 4.0.0

License

This project is licensed under the MIT License. See LICENSE for details.

Contact

For questions or support, open an issue on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_cdc-1.0.0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_cdc-1.0.0-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_cdc-1.0.0.tar.gz.

File metadata

  • Download URL: pyspark_cdc-1.0.0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.12.12 Linux/6.11.0-1018-azure

File hashes

Hashes for pyspark_cdc-1.0.0.tar.gz
Algorithm Hash digest
SHA256 b63694ade102563b4d82a4a7a4e1e679484ef411f55219e4bcd332e57ff34ffa
MD5 c16b61f1436d0edbdd50c08eaf608d71
BLAKE2b-256 bdb541434b944162b4dd2d78ef4b9f2df6f1e712291cf3a0eb5998a97bf2b255

See more details on using hashes here.

File details

Details for the file pyspark_cdc-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: pyspark_cdc-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 14.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.12.12 Linux/6.11.0-1018-azure

File hashes

Hashes for pyspark_cdc-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 51734a1526d84a7d4ce7acd46e7bdfd505bd0de3fca2543e32a0a48e18d47131
MD5 e35dee375f95e7904f01fd22275b9a71
BLAKE2b-256 a0adbfeb42b5a98fc623a294da032440272cbb2c52eaad1a25bb102eaa21d311

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page