Python Debezium Embedded Engine
Project description
pydbzengine
A Pythonic interface for the Debezium Engine, allowing you to consume database Change Data Capture (CDC) events directly in your Python applications.
Features
- Pure Python Interface: Interact with the powerful Debezium Engine using simple Python classes and methods.
- Pluggable Event Handlers: Easily create custom handlers to process CDC events according to your specific needs.
- Built-in Iceberg Handler: Stream change events directly into Apache Iceberg tables with zero boilerplate.
- Seamless Integration: Designed to work with popular Python data tools like dlt (data load tool).
- All Debezium Connectors: Supports all standard Debezium connectors (PostgreSQL, MySQL, SQL Server, Oracle, etc.).
How it Works
This library acts as a bridge between the Python world and the Java-based Debezium Engine. It uses Pyjnius to manage the JVM and interact with Debezium's Java classes, exposing a clean, Pythonic API so you can focus on your data logic without writing Java code.
Pre-available Data Handling Classes
pydbzengine comes with several built-in handlers to stream CDC events directly into common data destinations. You can also easily create your own by extending the BasePythonChangeHandler.
Apache Iceberg Handlers (pydbzengine[iceberg])
The Iceberg handlers are designed to stream CDC events directly into Apache Iceberg tables, enabling you to build a robust data lakehouse architecture. They automatically handle data serialization into the appropriate Arrow/Parquet format and manage Iceberg transactions.
-
IcebergChangeHandler: A straightforward handler that appends change data to a source-equivalent Iceberg table using a predefined schema.- Use Case: Best for creating a "bronze" layer where you want to capture the raw Debezium event. The
beforeandafterpayloads are stored as complete JSON strings. - Schema: Uses a fixed schema where complex nested fields (
source,before,after) are stored asStringType.- With consuming data as json, all source system schema changes will be absorbed automatically.
- Automatic Table Creation & Partitioning: It automatically creates a new Iceberg table for each source table and partitions it by day on the
_consumed_attimestamp for efficient time-series queries. - Enriched Metadata: It also adds
_consumed_at,_dbz_event_key, and_dbz_event_key_hashcolumns for enhanced traceability.
- Use Case: Best for creating a "bronze" layer where you want to capture the raw Debezium event. The
-
IcebergChangeHandlerV2: A more advanced handler that automatically infers the schema from the Debezium events and creates a well-structured Iceberg table accordingly.- Use Case: Ideal for scenarios where you want the pipeline to automatically create tables with native data types that mirror the source. This allows for direct querying of the data without needing to parse JSON.
- Schema and Features:
- Automatic Schema Inference: It inspects the first batch of records for a given table and infers the schema using PyArrow, preserving native data types (e.g.,
LongType,TimestampType). - Robust Type Handling: If a field's type cannot be inferred from the initial batch (e.g., it is always
null), it safely falls back toStringTypeto prevent errors. - Automatic Table Creation & Partitioning: It automatically creates a new Iceberg table for each source table and partitions it by day on the
_consumed_attimestamp for efficient time-series queries. - Enriched Metadata: It also adds
_consumed_at,_dbz_event_key, and_dbz_event_key_hashcolumns for enhanced traceability.
- Automatic Schema Inference: It inspects the first batch of records for a given table and infers the schema using PyArrow, preserving native data types (e.g.,
dlt (data load tool) Handler (pydbzengine[dlt])
DltChangeHandler: This handler integrates seamlessly with thedltlibrary. It passes Debezium events to adltpipeline, allowing you to load the data into any destinationdltsupports (e.g., DuckDB, BigQuery, Snowflake, Redshift, and more).- Use Case: Perfect for users who want to leverage
dlt's powerful features for schema inference, data normalization, and loading data into a data warehouse or database.
- Use Case: Perfect for users who want to leverage
Base Handler for Custom Logic
BasePythonChangeHandler: This is the abstract base class for creating your own custom handlers. By extending this class and implementing thehandle_batchmethod, you can process change events with your own Python logic.- Use Case: Use this for any custom destination or logic not covered by the built-in handlers. Examples include:
- Sending events to a message queue like Kafka or RabbitMQ.
- Calling a custom API endpoint for each event.
- Performing real-time aggregations or analytics.
- Use Case: Use this for any custom destination or logic not covered by the built-in handlers. Examples include:
Installation
Prerequisites
You must have a Java Development Kit (JDK) version 11 or newer installed and available in your system's PATH.
Recommended Installation: From GitHub
Due to the package size (including .jar artifacts), new versions are not published to PyPI. It is recommended to install the package directly from GitHub to get the latest features and fixes.
You can install either the latest development version from the main branch or a specific, stable version from a release tag.
To install the latest development version:
# For core functionality
pip install git+https://github.com/memiiso/pydbzengine.git
# With extras (e.g., iceberg, dlt)
pip install 'pydbzengine[iceberg]@git+https://github.com/memiiso/pydbzengine.git'
pip install 'pydbzengine[dlt]@git+https://github.com/memiiso/pydbzengine.git'
# To install a specific version from a release tag (e.g., 3.2.0.0):
pip install git+https://github.com/memiiso/pydbzengine.git@3.2.0.0
Alternative: From PyPI (May be Outdated)
An older version is available on PyPI. You can install it, but be aware that it may not have the latest updates.
# For core functionality
pip install pydbzengine
# With extras
pip install 'pydbzengine[iceberg]'
pip install 'pydbzengine[dlt]'
How to Use
Consume events With custom Python consumer
- First install the packages,
pip install pydbzengine[dev] - Second extend the BasePythonChangeHandler and implement your python consuming logic. see the example below
from typing import List
from pydbzengine import ChangeEvent, BasePythonChangeHandler
from pydbzengine import Properties, DebeziumJsonEngine
class PrintChangeHandler(BasePythonChangeHandler):
"""
A custom change event handler class.
This class processes batches of Debezium change events received from the engine.
The `handleJsonBatch` method is where you implement your logic for consuming
and processing these events. Currently, it prints basic information about
each event to the console.
"""
def handleJsonBatch(self, records: List[ChangeEvent]):
"""
Handles a batch of Debezium change events.
This method is called by the Debezium engine with a list of ChangeEvent objects.
Change this method to implement your desired processing logic. For example,
you might parse the event data, transform it, and load it into a database or
other destination.
Args:
records: A list of ChangeEvent objects representing the changes captured by Debezium.
"""
print(f"Received {len(records)} records")
for record in records:
print(f"destination: {record.destination()}")
print(f"key: {record.key()}")
print(f"value: {record.value()}")
print("--------------------------------------")
if __name__ == '__main__':
props = Properties()
props.setProperty("name", "engine")
props.setProperty("snapshot.mode", "initial_only")
# Add further Debezium connector configuration properties here. For example:
# props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector")
# props.setProperty("database.hostname", "your_database_host")
# props.setProperty("database.port", "3306")
# Create a DebeziumJsonEngine instance, passing the configuration properties and the custom change event handler.
engine = DebeziumJsonEngine(properties=props, handler=PrintChangeHandler())
# Start the Debezium engine to begin consuming and processing change events.
engine.run()
Consume events to Apache Iceberg
from pyiceberg.catalog import load_catalog
from pydbzengine import DebeziumJsonEngine
from pydbzengine.handlers.iceberg import IcebergChangeHandler
from pydbzengine import Properties
conf = {
"uri": "http://localhost:8181",
# "s3.path-style.access": "true",
"warehouse": "warehouse",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
}
catalog = load_catalog(name="rest", **conf)
handler = IcebergChangeHandler(catalog=catalog, destination_namespace=("iceberg", "debezium_cdc_data",))
dbz_props = Properties()
dbz_props.setProperty("name", "engine")
dbz_props.setProperty("snapshot.mode", "always")
# ....
# Add further Debezium connector configuration properties here. For example:
# dbz_props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector")
engine = DebeziumJsonEngine(properties=dbz_props, handler=handler)
engine.run()
Consume events with dlt
For the full code please see dlt_consuming.py
from pydbzengine import DebeziumJsonEngine
from pydbzengine.helper import Utils
from pydbzengine.handlers.dlt import DltChangeHandler
from pydbzengine import Properties
import dlt
# Create a dlt pipeline and set destination. in this case DuckDb.
dlt_pipeline = dlt.pipeline(
pipeline_name="dbz_cdc_events_example",
destination="duckdb",
dataset_name="dbz_data"
)
handler = DltChangeHandler(dlt_pipeline=dlt_pipeline)
dbz_props = Properties()
dbz_props.setProperty("name", "engine")
dbz_props.setProperty("snapshot.mode", "always")
# ....
engine = DebeziumJsonEngine(properties=dbz_props, handler=handler)
# Run the Debezium engine asynchronously with a timeout.
# This runs for a limited time and then terminates automatically.
Utils.run_engine_async(engine=engine, timeout_sec=60)
Contributors
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pydbzengine-3.3.1.0.tar.gz.
File metadata
- Download URL: pydbzengine-3.3.1.0.tar.gz
- Upload date:
- Size: 183.6 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
72ef5355ee7725fb51481e9dcd19ae36789e8ca962963fbfa891af58af2bb09f
|
|
| MD5 |
00da480087b9e2abd4c29f3225212bb8
|
|
| BLAKE2b-256 |
d069eafaec101b99e0e727fb8436ae06f1a0138f66e53e072527eb37b75774d9
|
File details
Details for the file pydbzengine-3.3.1.0-py3-none-any.whl.
File metadata
- Download URL: pydbzengine-3.3.1.0-py3-none-any.whl
- Upload date:
- Size: 183.7 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
40954df12c89150264736b0f5ac8c280d5951ec690cd5c60d7198c768a44daa8
|
|
| MD5 |
c3b0e8c2c95e4e7ed2e9ea958d151d83
|
|
| BLAKE2b-256 |
1e47ae471bb53d0a98493ceb121f7ec559eb9defba99f8f7e2bfd203ec31134f
|