A Change Data Capture (CDC) library for data synchronization
Project description
EvolvisHub Data Handler
A robust Change Data Capture (CDC) library for efficient data synchronization across various databases and storage systems.
Features
- Multi-Database Support: Seamlessly sync data between PostgreSQL, MySQL, SQL Server, Oracle, MongoDB, and more
- Cloud Storage Integration: Native support for AWS S3, Google Cloud Storage, and Azure Blob Storage
- File System Support: Handle CSV, JSON, and other file formats
- Watermark Tracking: Efficient incremental sync with configurable watermark columns
- Batch Processing: Optimize performance with configurable batch sizes
- Error Handling: Robust error recovery and logging
- Type Safety: Full type hints and validation with Pydantic
- Extensible: Easy to add new adapters and data sources
Installation
# Install from PyPI
pip install evolvishub-data-handler
# Install with development dependencies
pip install evolvishub-data-handler[dev]
# Install with documentation dependencies
pip install evolvishub-data-handler[docs]
Quick Start
- Create a configuration file (e.g.,
config.yaml):
source:
type: postgresql
host: localhost
port: 5432
database: source_db
username: source_user
password: source_password
watermark:
column: updated_at
type: timestamp
initial_value: "1970-01-01 00:00:00"
# Optional: Custom query for complex data extraction
query: >
SELECT id, name, email, updated_at,
CASE WHEN deleted_at IS NOT NULL THEN 'delete'
WHEN updated_at > :last_sync THEN 'update'
ELSE 'insert' END as operation
FROM users
WHERE updated_at > :last_sync OR :last_sync IS NULL
ORDER BY updated_at LIMIT :batch_size
destination:
type: postgresql
host: localhost
port: 5432
database: dest_db
username: dest_user
password: dest_password
table: users
watermark:
column: updated_at
type: timestamp
sync:
mode: continuous # one_time, continuous, or cron
batch_size: 1000
interval_seconds: 60 # For continuous mode
cron_expression: "0 */2 * * *" # For cron mode (every 2 hours)
timezone: "UTC" # Timezone for cron scheduling
watermark_table: sync_watermark
- Use the library in your code:
from evolvishub_data_handler import CDCHandler
# Initialize the handler
handler = CDCHandler("config.yaml")
# Run one-time sync
handler.sync()
# Or run continuous sync
handler.run_continuous()
- Or use the command-line interface:
# One-time sync
evolvishub-cdc run -c config.yaml -m one_time
# Continuous sync
evolvishub-cdc run -c config.yaml -m continuous
# Cron-scheduled sync
evolvishub-cdc run -c config.yaml -m cron --cron "0 */2 * * *"
# With custom logging
evolvishub-cdc run -c config.yaml -l DEBUG --log-file sync.log
# Legacy commands (still supported)
evolvishub-cdc sync -c config.yaml
evolvishub-cdc continuous-sync -c config.yaml
Sync Modes
One-Time Sync
Run a single synchronization cycle and exit.
sync:
mode: one_time
batch_size: 1000
Continuous Sync
Run synchronization continuously at specified intervals.
sync:
mode: continuous
interval_seconds: 60 # Sync every 60 seconds
batch_size: 1000
Cron-Scheduled Sync
Run synchronization based on cron expressions with timezone support.
sync:
mode: cron
cron_expression: "0 */2 * * *" # Every 2 hours
timezone: "America/New_York"
batch_size: 1000
Common Cron Expressions:
"0 9 * * 1-5"- Every weekday at 9 AM"0 */6 * * *"- Every 6 hours"30 2 * * 0"- Every Sunday at 2:30 AM"0 0 1 * *"- First day of every month at midnight"0 8,12,16 * * *"- At 8 AM, 12 PM, and 4 PM every day
Custom Queries
Using Custom SQL Queries
Define complex data extraction logic with custom SQL queries:
source:
type: postgresql
# ... connection details ...
query: >
SELECT
id, name, email, updated_at,
CASE
WHEN deleted_at IS NOT NULL THEN 'delete'
WHEN updated_at > :last_sync THEN 'update'
ELSE 'insert'
END as operation,
EXTRACT(EPOCH FROM updated_at) as updated_timestamp
FROM users
WHERE (updated_at > :last_sync OR :last_sync IS NULL)
AND status = 'active'
ORDER BY updated_at
LIMIT :batch_size
Available Parameters:
:last_sync- Last synchronization timestamp:batch_size- Configured batch size
Using Simple SELECT Statements
For simpler cases, use the select field:
source:
type: postgresql
# ... connection details ...
select: "SELECT id, name, email, updated_at FROM users"
watermark:
column: updated_at
type: timestamp
The framework automatically adds WHERE, ORDER BY, and LIMIT clauses based on watermark configuration.
Watermark Storage Options
Database Storage (Default)
Store watermarks in the source or destination database:
sync:
watermark_table: sync_watermark # Default behavior
SQLite Storage
Store watermarks in a separate SQLite database for persistence across restarts:
sync:
watermark_storage:
type: sqlite
sqlite_path: "/var/lib/evolvishub/watermarks.db"
table_name: "sync_watermark"
Benefits of SQLite Storage:
- ✅ Persistent across application restarts
- ✅ Independent of source/destination databases
- ✅ Centralized watermark management
- ✅ Error tracking and status monitoring
- ✅ Resume from last successful sync point
File Storage
Store watermarks in a JSON file:
sync:
watermark_storage:
type: file
file_path: "/var/lib/evolvishub/watermarks.json"
Supported Data Sources
Databases
- PostgreSQL
- MySQL
- SQL Server
- Oracle (with TNS name support)
- MongoDB
Cloud Storage
- AWS S3
- Google Cloud Storage
- Azure Blob Storage
File Systems
- CSV files
- JSON files
- Parquet files
Development
Setup
- Clone the repository:
git clone https://github.com/evolvishub/evolvishub-data-handler.git
cd evolvishub-data-handler
- Create a virtual environment:
make venv
- Install development dependencies:
make install
- Install pre-commit hooks:
make install-hooks
Testing
Run the test suite:
make test
Code Quality
Format code:
make format
Run linters:
make lint
Building
Build the package:
make build
Contributing
- Fork the repository
- Create a feature branch
- Commit your changes
- Push to the branch
- Create a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
- Documentation: https://evolvishub.github.io/evolvishub-data-handler
- Issues: https://github.com/evolvishub/evolvishub-data-handler/issues
- Email: info@evolvishub.com
EvolvisHub Data Handler Adapter
A powerful and flexible data handling adapter for Evolvis AI's data processing pipeline. This tool provides seamless integration with various database systems and implements Change Data Capture (CDC) functionality.
About Evolvis AI
Evolvis AI is a leading provider of AI solutions that helps businesses unlock their data potential. We specialize in:
- Data analysis and decision-making
- Machine learning implementation
- Process optimization
- Predictive maintenance
- Natural language processing
- Custom AI solutions
Our mission is to make artificial intelligence accessible to businesses of all sizes, enabling them to compete in today's data-driven environment. As Forbes highlights: "Organizations that strategically adopt AI will have a significant competitive advantage in today's data-driven market."
Author
Alban Maxhuni, PhD
Email: a.maxhuni@evolvis.ai
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file evolvishub_data_handler-0.1.1.tar.gz.
File metadata
- Download URL: evolvishub_data_handler-0.1.1.tar.gz
- Upload date:
- Size: 57.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8dc1f2f93d1ac9e2b103f8bd103d89140539b3869d85049dee74061f5fb28a1a
|
|
| MD5 |
d4fed88c058c102f8c3b5e868f6d137a
|
|
| BLAKE2b-256 |
d6cc4f5141a4aed6da77b1718c40f9f5f2b246b6e1ee7da6480e6e4e977ac6c9
|
File details
Details for the file evolvishub_data_handler-0.1.1-py3-none-any.whl.
File metadata
- Download URL: evolvishub_data_handler-0.1.1-py3-none-any.whl
- Upload date:
- Size: 48.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7736461039c19b90fbee9b131b7a7838f0ad0c84806fd01558d4ff156be7cfac
|
|
| MD5 |
f724df53ce1f5b8a815b2580f45fa85e
|
|
| BLAKE2b-256 |
8f0e53b96ac736aad849460ab285137e9652313857fd4bd35d00deb0961a4020
|