Skip to main content

A Python library for Change Data Capture (CDC) workflows using PySpark and Delta Lake.

Project description

PyPI version Test License: MIT

pyspark-cdc

A Python library for Change Data Capture (CDC) workflows using PySpark. This project provides tools to capture, optimize, validate, and manage data changes efficiently in distributed environments.

Features

  • Full and incremental data capture.
  • Cron-based scheduling utilities for Delta optimizations.
  • No extra dependencies.
  • Use internal commitInfo.userMetadata to store watermark.

Installation

You can install the package using pip :

pip install pyspark-cdc

Usage

Let's assume that there is a table in a PostgreSQL database. Use this module to capture it as a managed delta table.

from pyspark_cdc import capture

... # necessary variables

df = (
    spark.read.format("jdbc")
    .options(
        url=f"{postgresql_jdbc_url}",
        dbtable=f"{postgresql_schema}.{postgresql_table}",
        user=f"{postgresql_user}",
        password=f"{postgresql_password}",
        driver="org.postgresql.Driver",                # Ensure JDBC Driver JAR is in Spark.
    )
    .load()
)
# quick start
(
    capture(df, spark)
    .table(f"{catalog}.{database}.{table_name}")       # managed table name
    .mode("incremental")
    .format("delta")
    .primary_keys(["ID"])                              # PK
    .watermark_column("UPDATED_AT")                    # Watermark column
    .enable_deletion_detect()                          # detect DELETE operations
    .start()
)
# With more
(
    capture(df, spark)
    .table(f"{catalog}.{database}.{table_name}")
    .mode("incremental")
    .format("delta")
    .primary_keys(["ID"])
    .watermark_column("UPDATED_AT")
    .partition_by(["COUNTRY", "GENDER"])               # partitioning
    .schedule_zorder("*/3", ["FIRST_NAME", "SURNAME"]) # run z-order every 3 days
    .schedule_vacuum("5,20")                           # run vacuum on 5th and 20th every month
    .schedule_compaction("10-25")                      # run compaction every day between 5th and 25th every month
    .enable_deletion_detect()                          # detect hard delete operations in source side
    .table_properties(                                 # extra delta table properties
        {
            "delta.deletedFileRetentionDuration": "interval 3 day",
            "delta.logRetentionDuration": "interval 3 day",
            "delta.appendOnly": "false",
            "delta.enableDeletionVectors": "true",
        }
    )
    .options(                                         # extra DataFrame writer options
        {
            "maxRecordsPerFile": 1000,
        }
    )
    .start()
)

See the samples/ directory and the tests/ folder for more usage examples and test cases.

Delta Optimize

Note: Schedulers use standard day-of-the-month crontab expressions:

  • *: any value
  • ,: value list separator
  • -: range of values
  • /: step values
  • 1-31: allowed values

⚠️ If you run capture multiple times in a day, the optimize schedulers are triggered during each run. You can turn it off by using .scheduler_switch("OFF")

Typical Scenarios

The following table summarizes common use cases:

Mode Primary Key Watermark Column Example Usage Comment
Full No need No need ...
.mode("full")
.format("delta")
...
No auto incremental PK or watermark, it's better to add watermark column for big tables.
Incremental Single Yes (datetime) ...
.mode("incremental")
.primary_keys(["ID"])
.watermark_column("UPDATED_AT")
.format("delta")
...
Common case.
Incremental Auto incremental PK No ...
.mode("incremental")
.primary_keys(["ID"])
.watermark_column("ID")
.format("delta")
...
Use auto incremental PK as watermark, but cannot capture UPDATE operations.
Incremental Multi Yes (datetime) ...
.mode("incremental")
.primary_keys(["ID", "FIRST_NAME"])
.watermark_column("UPDATED_AT")
.format("delta")
...
Common case.
Incremental Multi Yes (int) ...
.mode("incremental")
.primary_keys(["ID", "FIRST_NAME"])
.watermark_column("ID")
.format("delta")
...
Multi-column PK, Use auto incremental PK as watermark, but cannot capture UPDATE operations.

To capture DELETE operations, use enable_deletion_detect(), it will compare records at two sides based on the PK(s).

Library Version Matrix

The version of this module follows Databricks Runtime LTS

Version JDK Python Databricks Runtime Spark Delta Lake
v1.0.0 17 3.12 17 LTS 4.0.0 4.0.0

License

This project is licensed under the MIT License. See LICENSE for details.

Contact

For questions or support, open an issue on GitHub.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyspark_cdc-1.0.0rc1.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyspark_cdc-1.0.0rc1-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file pyspark_cdc-1.0.0rc1.tar.gz.

File metadata

  • Download URL: pyspark_cdc-1.0.0rc1.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.12.12 Linux/6.11.0-1018-azure

File hashes

Hashes for pyspark_cdc-1.0.0rc1.tar.gz
Algorithm Hash digest
SHA256 c9169a8a25cc4ea9ee9a575d74c648f2da9429321a573d4e4193c204e3d6fe98
MD5 920fd0f0bdba386a2a95904eb79074e3
BLAKE2b-256 e1d8149c3ad38a8e22c2cb60aee7bfd41e7f32de6aa085ea08aa7569a90e0d9b

See more details on using hashes here.

File details

Details for the file pyspark_cdc-1.0.0rc1-py3-none-any.whl.

File metadata

  • Download URL: pyspark_cdc-1.0.0rc1-py3-none-any.whl
  • Upload date:
  • Size: 14.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.1 CPython/3.12.12 Linux/6.11.0-1018-azure

File hashes

Hashes for pyspark_cdc-1.0.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 869fc31291dddb0d3420128a085a9d1a64fa8e7ffee96281aca61a32dae1af03
MD5 4bc9a6cec79266067c594b684a5cebc0
BLAKE2b-256 c3ae5b26f903ebf6927710f924ff18850b190d2556a29c8a9bde1b4b4e151754

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page