Python Debezium Embedded Engine
Project description
pydbzengine
A Python module to use Debezium Engine in python. Consume Database CDC events using python.
Java integration is using Pyjnius, It is a Python library for accessing Java classes
Installation
install:
pip install pydbzengine
# install from github:
pip install https://github.com/memiiso/pydbzengine/archive/master.zip --upgrade --user
How to Use
First install the packages, pip install pydbzengine[dev]
from typing import List
from pydbzengine import ChangeEvent, BasePythonChangeHandler
from pydbzengine import Properties, DebeziumJsonEngine
class PrintChangeHandler(BasePythonChangeHandler):
"""
A custom change event handler class.
This class processes batches of Debezium change events received from the engine.
The `handleJsonBatch` method is where you implement your logic for consuming
and processing these events. Currently, it prints basic information about
each event to the console.
"""
def handleJsonBatch(self, records: List[ChangeEvent]):
"""
Handles a batch of Debezium change events.
This method is called by the Debezium engine with a list of ChangeEvent objects.
Change this method to implement your desired processing logic. For example,
you might parse the event data, transform it, and load it into a database or
other destination.
Args:
records: A list of ChangeEvent objects representing the changes captured by Debezium.
"""
print(f"Received {len(records)} records")
for record in records:
print(f"destination: {record.destination()}")
print(f"key: {record.key()}")
print(f"value: {record.value()}")
print("--------------------------------------")
if __name__ == '__main__':
props = Properties()
props.setProperty("name", "engine")
props.setProperty("snapshot.mode", "initial_only")
# Add further Debezium connector configuration properties here. For example:
# props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector")
# props.setProperty("database.hostname", "your_database_host")
# props.setProperty("database.port", "3306")
# Create a DebeziumJsonEngine instance, passing the configuration properties and the custom change event handler.
engine = DebeziumJsonEngine(properties=props, handler=PrintChangeHandler())
# Start the Debezium engine to begin consuming and processing change events.
engine.run()
How to consume events with dlt
For the full code please see dlt_consuming.py
Contributors
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pydbzengine-3.0.7.1.tar.gz.
File metadata
- Download URL: pydbzengine-3.0.7.1.tar.gz
- Upload date:
- Size: 43.6 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
151e4e970cec0126765a2d9c9ba4c5be91ca446f885d1874a8f15c10d603ddc7
|
|
| MD5 |
67b8f694bedd620fa1cef1ef089a8bd9
|
|
| BLAKE2b-256 |
cb6300565bbe2ca6634f66c63d5395a3256b85f2c5c9b5ab7aeecb5c44be323e
|
File details
Details for the file pydbzengine-3.0.7.1-py3-none-any.whl.
File metadata
- Download URL: pydbzengine-3.0.7.1-py3-none-any.whl
- Upload date:
- Size: 43.6 MB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.12.8
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fcb457031e259c3b65c0def797500b2a66acb4d3fca1496b9f7a78ecd41dc013
|
|
| MD5 |
968ebec70d208673ab3cac115f2d3e3d
|
|
| BLAKE2b-256 |
96aae8930cae5bc2b5127dd75a75ab98959b6dad8e8b9e7c72567bfcb8ad2fdd
|