A Change Data Capture (CDC) library for data synchronization
Project description
EvolvisHub Data Handler
A powerful and flexible data synchronization framework for Change Data Capture (CDC) operations with advanced scheduling, custom queries, and persistent watermark management.
Latest Version: 0.1.2 - Now with Oracle support, cron scheduling, SQLite watermarks, and custom queries!
🚀 Features
Database & Storage Support
- Databases: PostgreSQL, MySQL, SQLite, Oracle (with TNS support), MongoDB, SQL Server
- Cloud Storage: AWS S3, Google Cloud Storage, Azure Blob Storage
- File Formats: CSV, JSON, Parquet
Advanced Sync Modes
- One-time Sync: Run once and exit
- Continuous Sync: Real-time synchronization with configurable intervals
- Cron Scheduling: Complex scheduling with timezone support and cron expressions
Custom Query Support
- Custom SQL Queries: Complex business logic with parameter substitution (
:last_sync,:batch_size) - Simple SELECT: Framework automatically adds WHERE, ORDER BY, LIMIT clauses
- Database-specific Syntax: Native SQL features for each database type
Persistent Watermark Storage
- SQLite Storage: Independent watermark persistence across restarts
- File Storage: JSON-based watermark storage
- Database Storage: Traditional database-based watermarks
- Error Tracking: Status monitoring and resume from last successful sync
Enterprise Features
- Configurable: YAML and INI configuration files with validation
- CLI Interface: Comprehensive command-line tools with logging
- Extensible: Plugin-based adapter system for custom data sources
- Production Ready: Error handling, retry logic, and monitoring
Installation
# Install from PyPI
pip install evolvishub-data-handler
# Install with development dependencies
pip install evolvishub-data-handler[dev]
# Install with documentation dependencies
pip install evolvishub-data-handler[docs]
Quick Start
- Create a configuration file (e.g.,
config.yaml):
source:
type: postgresql
host: localhost
port: 5432
database: source_db
username: source_user
password: source_password
watermark:
column: updated_at
type: timestamp
initial_value: "2024-01-01 00:00:00"
# Custom query with business logic
query: >
SELECT id, name, email, updated_at,
CASE WHEN deleted_at IS NOT NULL THEN 'delete'
WHEN updated_at > :last_sync THEN 'update'
ELSE 'insert' END as operation
FROM users
WHERE updated_at > :last_sync OR :last_sync IS NULL
ORDER BY updated_at LIMIT :batch_size
destination:
type: postgresql
host: localhost
port: 5432
database: dest_db
username: dest_user
password: dest_password
table: users
sync:
mode: cron # one_time, continuous, or cron
cron_expression: "0 */2 * * *" # Every 2 hours
timezone: "UTC"
batch_size: 1000
# SQLite watermark storage for persistence
watermark_storage:
type: sqlite
sqlite_path: "/var/lib/evolvishub/watermarks.db"
table_name: "sync_watermark"
error_retry_attempts: 3
error_retry_delay: 5
- Use the library in your code:
from evolvishub_data_handler import CDCHandler
# Initialize the handler
handler = CDCHandler("config.yaml")
# Run one-time sync
handler.sync()
# Or run continuous sync
handler.run_continuous()
- Run synchronization using the CLI:
# One-time sync
evolvishub-cdc run -c config.yaml -m one_time
# Continuous sync
evolvishub-cdc run -c config.yaml -m continuous
# Cron-scheduled sync
evolvishub-cdc run -c config.yaml -m cron
# Override cron expression from command line
evolvishub-cdc run -c config.yaml --cron "0 */4 * * *"
# With custom logging
evolvishub-cdc run -c config.yaml -l DEBUG --log-file sync.log
# Legacy commands (still supported)
evolvishub-cdc sync -c config.yaml
evolvishub-cdc continuous-sync -c config.yaml
🔥 What's New in v0.1.1
Advanced Sync Modes
- Cron Scheduling: Schedule syncs with complex cron expressions and timezone support
- Enhanced CLI: New unified
runcommand with mode selection and override options - Flexible Timing: One-time, continuous, or scheduled synchronization
Custom Query Support
- Parameter Substitution: Use
:last_syncand:batch_sizein custom queries - Business Logic: Implement complex data transformations in SQL
- Database-Specific: Leverage native SQL features for each database
SQLite Watermark Storage
- Persistence: Watermarks survive application restarts and database maintenance
- Independence: No dependency on source/destination database availability
- Error Tracking: Monitor sync status and resume from failures
Oracle Database Support
- Complete Implementation: Full Oracle adapter with TNS name support
- Enterprise Ready: Connection pooling, encoding options, and Oracle-specific features
- Native SQL: Support for Oracle's TO_TIMESTAMP, FETCH FIRST, MERGE statements
Sync Modes
One-Time Sync
Run a single synchronization cycle and exit.
sync:
mode: one_time
batch_size: 1000
Continuous Sync
Run synchronization continuously at specified intervals.
sync:
mode: continuous
interval_seconds: 60 # Sync every 60 seconds
batch_size: 1000
Cron-Scheduled Sync
Run synchronization based on cron expressions with timezone support.
sync:
mode: cron
cron_expression: "0 */2 * * *" # Every 2 hours
timezone: "America/New_York"
batch_size: 1000
Common Cron Expressions:
"0 9 * * 1-5"- Every weekday at 9 AM"0 */6 * * *"- Every 6 hours"30 2 * * 0"- Every Sunday at 2:30 AM"0 0 1 * *"- First day of every month at midnight"0 8,12,16 * * *"- At 8 AM, 12 PM, and 4 PM every day
Custom Queries
Using Custom SQL Queries
Define complex data extraction logic with custom SQL queries:
source:
type: postgresql
# ... connection details ...
query: >
SELECT
id, name, email, updated_at,
CASE
WHEN deleted_at IS NOT NULL THEN 'delete'
WHEN updated_at > :last_sync THEN 'update'
ELSE 'insert'
END as operation,
EXTRACT(EPOCH FROM updated_at) as updated_timestamp
FROM users
WHERE (updated_at > :last_sync OR :last_sync IS NULL)
AND status = 'active'
ORDER BY updated_at
LIMIT :batch_size
Available Parameters:
:last_sync- Last synchronization timestamp:batch_size- Configured batch size
Using Simple SELECT Statements
For simpler cases, use the select field:
source:
type: postgresql
# ... connection details ...
select: "SELECT id, name, email, updated_at FROM users"
watermark:
column: updated_at
type: timestamp
The framework automatically adds WHERE, ORDER BY, and LIMIT clauses based on watermark configuration.
Watermark Storage Options
Database Storage (Default)
Store watermarks in the source or destination database:
sync:
watermark_table: sync_watermark # Default behavior
SQLite Storage
Store watermarks in a separate SQLite database for persistence across restarts:
sync:
watermark_storage:
type: sqlite
sqlite_path: "/var/lib/evolvishub/watermarks.db"
table_name: "sync_watermark"
Benefits of SQLite Storage:
- ✅ Persistent across application restarts
- ✅ Independent of source/destination databases
- ✅ Centralized watermark management
- ✅ Error tracking and status monitoring
- ✅ Resume from last successful sync point
File Storage
Store watermarks in a JSON file:
sync:
watermark_storage:
type: file
file_path: "/var/lib/evolvishub/watermarks.json"
Supported Data Sources
Databases
- PostgreSQL: Full support with advanced features
- MySQL: Complete implementation with connection pooling
- SQL Server: Native SQL Server adapter
- Oracle: Enterprise support with TNS names and connection pooling
- MongoDB: Document database synchronization
- SQLite: Lightweight database support
Cloud Storage
- AWS S3
- Google Cloud Storage
- Azure Blob Storage
File Systems
- CSV files
- JSON files
- Parquet files
📋 Configuration Examples
Oracle Database with TNS
source:
type: oracle
database: "PROD_DB" # TNS name
username: readonly_user
password: secure_password
table: ORDERS
watermark:
column: ORDER_DATE
type: timestamp
initial_value: "2024-01-01 00:00:00"
# Oracle-specific query
query: >
SELECT ORDER_ID, CUSTOMER_ID, TOTAL_AMOUNT, ORDER_DATE
FROM ORDERS
WHERE ORDER_DATE > TO_TIMESTAMP(:last_sync, 'YYYY-MM-DD HH24:MI:SS')
ORDER BY ORDER_DATE
FETCH FIRST :batch_size ROWS ONLY
sync:
mode: cron
cron_expression: "0 */6 * * *" # Every 6 hours
timezone: "America/New_York"
watermark_storage:
type: sqlite
sqlite_path: "/var/lib/evolvishub/oracle_watermarks.db"
Advanced PostgreSQL with Custom Logic
source:
type: postgresql
host: postgres-primary.company.com
database: production
username: etl_user
password: secure_password
query: >
SELECT
u.id, u.name, u.email, u.updated_at,
p.department, p.role,
CASE
WHEN u.deleted_at IS NOT NULL THEN 'delete'
WHEN u.updated_at > :last_sync THEN 'update'
ELSE 'insert'
END as operation,
EXTRACT(EPOCH FROM u.updated_at) as updated_timestamp
FROM users u
LEFT JOIN user_profiles p ON u.id = p.user_id
WHERE u.updated_at > :last_sync OR :last_sync IS NULL
ORDER BY u.updated_at
LIMIT :batch_size
sync:
mode: continuous
interval_seconds: 30
watermark_storage:
type: sqlite
sqlite_path: "/opt/evolvishub/watermarks.db"
Multi-Database with File Storage
source:
type: mysql
host: mysql-server.company.com
database: sales
username: readonly_user
password: secure_password
select: "SELECT id, customer_name, order_total, created_at FROM sales"
watermark:
column: created_at
type: timestamp
destination:
type: file
file_path: "/data/exports/sales_export.json"
sync:
mode: cron
cron_expression: "0 2 * * *" # Daily at 2 AM
watermark_storage:
type: file
file_path: "/var/lib/evolvishub/sales_watermarks.json"
Installation
pip install evolvishub-data-handler
Optional Dependencies
For specific database support, install additional packages:
# Oracle support
pip install oracledb
# PostgreSQL support
pip install psycopg2-binary
# MySQL support
pip install pymysql
# MongoDB support
pip install pymongo
# Cloud storage support
pip install boto3 google-cloud-storage azure-storage-blob
🖥️ CLI Reference
Main Commands
# Unified run command (recommended)
evolvishub-cdc run -c config.yaml [OPTIONS]
# Legacy commands (still supported)
evolvishub-cdc sync -c config.yaml
evolvishub-cdc continuous-sync -c config.yaml
Run Command Options
# Sync modes
evolvishub-cdc run -c config.yaml -m one_time # One-time sync
evolvishub-cdc run -c config.yaml -m continuous # Continuous sync
evolvishub-cdc run -c config.yaml -m cron # Cron-scheduled sync
# Override cron expression
evolvishub-cdc run -c config.yaml --cron "0 */4 * * *"
# Logging options
evolvishub-cdc run -c config.yaml -l DEBUG # Set log level
evolvishub-cdc run -c config.yaml --log-file sync.log # Log to file
evolvishub-cdc run -c config.yaml -l INFO --log-file app.log # Both
# Help
evolvishub-cdc --help
evolvishub-cdc run --help
Common Cron Expressions
| Expression | Description |
|---|---|
"0 */2 * * *" |
Every 2 hours |
"0 9 * * 1-5" |
Weekdays at 9 AM |
"30 2 * * 0" |
Sundays at 2:30 AM |
"0 0 1 * *" |
First day of month |
"*/15 * * * *" |
Every 15 minutes |
Development
Setup
- Clone the repository:
git clone https://github.com/evolvishub/evolvishub-data-handler.git
cd evolvishub-data-handler
- Create a virtual environment:
make venv
- Install development dependencies:
make install
- Install pre-commit hooks:
make install-hooks
Testing
Run the test suite:
make test
Code Quality
Format code:
make format
Run linters:
make lint
Building
Build the package:
make build
Contributing
- Fork the repository
- Create a feature branch
- Commit your changes
- Push to the branch
- Create a Pull Request
License
This project is licensed under the MIT License - see the LICENSE file for details.
Support
- Documentation: https://evolvishub.github.io/evolvishub-data-handler
- Issues: https://github.com/evolvishub/evolvishub-data-handler/issues
- Email: info@evolvishub.com
EvolvisHub Data Handler Adapter
A powerful and flexible data handling adapter for Evolvis AI's data processing pipeline. This tool provides seamless integration with various database systems and implements Change Data Capture (CDC) functionality.
🔧 Troubleshooting
Common Issues
Oracle Connection Errors
# Install Oracle client
pip install oracledb
# For TNS name issues, check tnsnames.ora
export TNS_ADMIN=/path/to/tns/admin
Cron Expression Validation
# Test cron expressions online: https://crontab.guru/
# Common mistake: Using 6 fields instead of 5
# Correct: "0 */2 * * *" (every 2 hours)
# Wrong: "0 0 */2 * * *" (6 fields)
Watermark Storage Issues
# Check SQLite file permissions
ls -la /var/lib/evolvishub/watermarks.db
# Verify directory exists and is writable
mkdir -p /var/lib/evolvishub
chmod 755 /var/lib/evolvishub
Configuration Validation
# Test configuration loading
from evolvishub_data_handler.config_loader import load_config
config = load_config("config.yaml")
print("Configuration is valid!")
Getting Help
- 📖 Documentation: Check the
examples/directory for configuration samples - 🐛 Issues: Report bugs on GitHub Issues
- 💬 Discussions: Ask questions in GitHub Discussions
- 📧 Support: Contact a.maxhuni@evolvis.ai
About Evolvis AI
Evolvis AI is a leading provider of AI solutions that helps businesses unlock their data potential. We specialize in:
- Data analysis and decision-making
- Machine learning implementation
- Process optimization
- Predictive maintenance
- Natural language processing
- Custom AI solutions
Our mission is to make artificial intelligence accessible to businesses of all sizes, enabling them to compete in today's data-driven environment. As Forbes highlights: "Organizations that strategically adopt AI will have a significant competitive advantage in today's data-driven market."
Author
Alban Maxhuni, PhD
Email: a.maxhuni@evolvis.ai
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file evolvishub_data_handler-0.1.2.tar.gz.
File metadata
- Download URL: evolvishub_data_handler-0.1.2.tar.gz
- Upload date:
- Size: 60.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d66b2283027fb25417bff9427afb11c7fe52957c62de670fe8c0beee84c19817
|
|
| MD5 |
421e8eaba5484637c45f2029d08e2c0d
|
|
| BLAKE2b-256 |
76d7f36431c45f3d05ebc9401f1850cbe8ac7858e38ee035f9f173f8186fd806
|
File details
Details for the file evolvishub_data_handler-0.1.2-py3-none-any.whl.
File metadata
- Download URL: evolvishub_data_handler-0.1.2-py3-none-any.whl
- Upload date:
- Size: 50.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.9
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
765b147640a20126e6b6a7c92766655344aefce83ea586727998e8771d35ffdb
|
|
| MD5 |
60f456d108090a4452a8ba97e2aadc08
|
|
| BLAKE2b-256 |
05d73e436c515ec4c8fed6a64006c62db1033ff2b7515a38fbd1965e756f9d2c
|