Skip to main content

A Change Data Capture (CDC) library for data synchronization

Project description

EvolvisHub Data Handler

Evolvis AI Logo

PyPI version Python Versions License: MIT Downloads CI/CD Code Coverage

A powerful and flexible data synchronization framework for Change Data Capture (CDC) operations with advanced scheduling, custom queries, and persistent watermark management.

Latest Version: 0.1.2 - Now with Oracle support, cron scheduling, SQLite watermarks, and custom queries!

🚀 Features

Database & Storage Support

  • Databases: PostgreSQL, MySQL, SQLite, Oracle (with TNS support), MongoDB, SQL Server
  • Cloud Storage: AWS S3, Google Cloud Storage, Azure Blob Storage
  • File Formats: CSV, JSON, Parquet

Advanced Sync Modes

  • One-time Sync: Run once and exit
  • Continuous Sync: Real-time synchronization with configurable intervals
  • Cron Scheduling: Complex scheduling with timezone support and cron expressions

Custom Query Support

  • Custom SQL Queries: Complex business logic with parameter substitution (:last_sync, :batch_size)
  • Simple SELECT: Framework automatically adds WHERE, ORDER BY, LIMIT clauses
  • Database-specific Syntax: Native SQL features for each database type

Persistent Watermark Storage

  • SQLite Storage: Independent watermark persistence across restarts
  • File Storage: JSON-based watermark storage
  • Database Storage: Traditional database-based watermarks
  • Error Tracking: Status monitoring and resume from last successful sync

Enterprise Features

  • Configurable: YAML and INI configuration files with validation
  • CLI Interface: Comprehensive command-line tools with logging
  • Extensible: Plugin-based adapter system for custom data sources
  • Production Ready: Error handling, retry logic, and monitoring

Installation

# Install from PyPI
pip install evolvishub-data-handler

# Install with development dependencies
pip install evolvishub-data-handler[dev]

# Install with documentation dependencies
pip install evolvishub-data-handler[docs]

Quick Start

  1. Create a configuration file (e.g., config.yaml):
source:
  type: postgresql
  host: localhost
  port: 5432
  database: source_db
  username: source_user
  password: source_password
  watermark:
    column: updated_at
    type: timestamp
    initial_value: "2024-01-01 00:00:00"
  # Custom query with business logic
  query: >
    SELECT id, name, email, updated_at,
           CASE WHEN deleted_at IS NOT NULL THEN 'delete'
                WHEN updated_at > :last_sync THEN 'update'
                ELSE 'insert' END as operation
    FROM users
    WHERE updated_at > :last_sync OR :last_sync IS NULL
    ORDER BY updated_at LIMIT :batch_size

destination:
  type: postgresql
  host: localhost
  port: 5432
  database: dest_db
  username: dest_user
  password: dest_password
  table: users

sync:
  mode: cron  # one_time, continuous, or cron
  cron_expression: "0 */2 * * *"  # Every 2 hours
  timezone: "UTC"
  batch_size: 1000
  # SQLite watermark storage for persistence
  watermark_storage:
    type: sqlite
    sqlite_path: "/var/lib/evolvishub/watermarks.db"
    table_name: "sync_watermark"
  error_retry_attempts: 3
  error_retry_delay: 5
  1. Use the library in your code:
from evolvishub_data_handler import CDCHandler

# Initialize the handler
handler = CDCHandler("config.yaml")

# Run one-time sync
handler.sync()

# Or run continuous sync
handler.run_continuous()
  1. Run synchronization using the CLI:
# One-time sync
evolvishub-cdc run -c config.yaml -m one_time

# Continuous sync
evolvishub-cdc run -c config.yaml -m continuous

# Cron-scheduled sync
evolvishub-cdc run -c config.yaml -m cron

# Override cron expression from command line
evolvishub-cdc run -c config.yaml --cron "0 */4 * * *"

# With custom logging
evolvishub-cdc run -c config.yaml -l DEBUG --log-file sync.log

# Legacy commands (still supported)
evolvishub-cdc sync -c config.yaml
evolvishub-cdc continuous-sync -c config.yaml

🔥 What's New in v0.1.1

Advanced Sync Modes

  • Cron Scheduling: Schedule syncs with complex cron expressions and timezone support
  • Enhanced CLI: New unified run command with mode selection and override options
  • Flexible Timing: One-time, continuous, or scheduled synchronization

Custom Query Support

  • Parameter Substitution: Use :last_sync and :batch_size in custom queries
  • Business Logic: Implement complex data transformations in SQL
  • Database-Specific: Leverage native SQL features for each database

SQLite Watermark Storage

  • Persistence: Watermarks survive application restarts and database maintenance
  • Independence: No dependency on source/destination database availability
  • Error Tracking: Monitor sync status and resume from failures

Oracle Database Support

  • Complete Implementation: Full Oracle adapter with TNS name support
  • Enterprise Ready: Connection pooling, encoding options, and Oracle-specific features
  • Native SQL: Support for Oracle's TO_TIMESTAMP, FETCH FIRST, MERGE statements

Sync Modes

One-Time Sync

Run a single synchronization cycle and exit.

sync:
  mode: one_time
  batch_size: 1000

Continuous Sync

Run synchronization continuously at specified intervals.

sync:
  mode: continuous
  interval_seconds: 60  # Sync every 60 seconds
  batch_size: 1000

Cron-Scheduled Sync

Run synchronization based on cron expressions with timezone support.

sync:
  mode: cron
  cron_expression: "0 */2 * * *"  # Every 2 hours
  timezone: "America/New_York"
  batch_size: 1000

Common Cron Expressions:

  • "0 9 * * 1-5" - Every weekday at 9 AM
  • "0 */6 * * *" - Every 6 hours
  • "30 2 * * 0" - Every Sunday at 2:30 AM
  • "0 0 1 * *" - First day of every month at midnight
  • "0 8,12,16 * * *" - At 8 AM, 12 PM, and 4 PM every day

Custom Queries

Using Custom SQL Queries

Define complex data extraction logic with custom SQL queries:

source:
  type: postgresql
  # ... connection details ...
  query: >
    SELECT
      id, name, email, updated_at,
      CASE
        WHEN deleted_at IS NOT NULL THEN 'delete'
        WHEN updated_at > :last_sync THEN 'update'
        ELSE 'insert'
      END as operation,
      EXTRACT(EPOCH FROM updated_at) as updated_timestamp
    FROM users
    WHERE (updated_at > :last_sync OR :last_sync IS NULL)
      AND status = 'active'
    ORDER BY updated_at
    LIMIT :batch_size

Available Parameters:

  • :last_sync - Last synchronization timestamp
  • :batch_size - Configured batch size

Using Simple SELECT Statements

For simpler cases, use the select field:

source:
  type: postgresql
  # ... connection details ...
  select: "SELECT id, name, email, updated_at FROM users"
  watermark:
    column: updated_at
    type: timestamp

The framework automatically adds WHERE, ORDER BY, and LIMIT clauses based on watermark configuration.

Watermark Storage Options

Database Storage (Default)

Store watermarks in the source or destination database:

sync:
  watermark_table: sync_watermark  # Default behavior

SQLite Storage

Store watermarks in a separate SQLite database for persistence across restarts:

sync:
  watermark_storage:
    type: sqlite
    sqlite_path: "/var/lib/evolvishub/watermarks.db"
    table_name: "sync_watermark"

Benefits of SQLite Storage:

  • ✅ Persistent across application restarts
  • ✅ Independent of source/destination databases
  • ✅ Centralized watermark management
  • ✅ Error tracking and status monitoring
  • ✅ Resume from last successful sync point

File Storage

Store watermarks in a JSON file:

sync:
  watermark_storage:
    type: file
    file_path: "/var/lib/evolvishub/watermarks.json"

Supported Data Sources

Databases

  • PostgreSQL: Full support with advanced features
  • MySQL: Complete implementation with connection pooling
  • SQL Server: Native SQL Server adapter
  • Oracle: Enterprise support with TNS names and connection pooling
  • MongoDB: Document database synchronization
  • SQLite: Lightweight database support

Cloud Storage

  • AWS S3
  • Google Cloud Storage
  • Azure Blob Storage

File Systems

  • CSV files
  • JSON files
  • Parquet files

📋 Configuration Examples

Oracle Database with TNS

source:
  type: oracle
  database: "PROD_DB"  # TNS name
  username: readonly_user
  password: secure_password
  table: ORDERS
  watermark:
    column: ORDER_DATE
    type: timestamp
    initial_value: "2024-01-01 00:00:00"
  # Oracle-specific query
  query: >
    SELECT ORDER_ID, CUSTOMER_ID, TOTAL_AMOUNT, ORDER_DATE
    FROM ORDERS
    WHERE ORDER_DATE > TO_TIMESTAMP(:last_sync, 'YYYY-MM-DD HH24:MI:SS')
    ORDER BY ORDER_DATE
    FETCH FIRST :batch_size ROWS ONLY

sync:
  mode: cron
  cron_expression: "0 */6 * * *"  # Every 6 hours
  timezone: "America/New_York"
  watermark_storage:
    type: sqlite
    sqlite_path: "/var/lib/evolvishub/oracle_watermarks.db"

Advanced PostgreSQL with Custom Logic

source:
  type: postgresql
  host: postgres-primary.company.com
  database: production
  username: etl_user
  password: secure_password
  query: >
    SELECT
      u.id, u.name, u.email, u.updated_at,
      p.department, p.role,
      CASE
        WHEN u.deleted_at IS NOT NULL THEN 'delete'
        WHEN u.updated_at > :last_sync THEN 'update'
        ELSE 'insert'
      END as operation,
      EXTRACT(EPOCH FROM u.updated_at) as updated_timestamp
    FROM users u
    LEFT JOIN user_profiles p ON u.id = p.user_id
    WHERE u.updated_at > :last_sync OR :last_sync IS NULL
    ORDER BY u.updated_at
    LIMIT :batch_size

sync:
  mode: continuous
  interval_seconds: 30
  watermark_storage:
    type: sqlite
    sqlite_path: "/opt/evolvishub/watermarks.db"

Multi-Database with File Storage

source:
  type: mysql
  host: mysql-server.company.com
  database: sales
  username: readonly_user
  password: secure_password
  select: "SELECT id, customer_name, order_total, created_at FROM sales"
  watermark:
    column: created_at
    type: timestamp

destination:
  type: file
  file_path: "/data/exports/sales_export.json"

sync:
  mode: cron
  cron_expression: "0 2 * * *"  # Daily at 2 AM
  watermark_storage:
    type: file
    file_path: "/var/lib/evolvishub/sales_watermarks.json"

Installation

pip install evolvishub-data-handler

Optional Dependencies

For specific database support, install additional packages:

# Oracle support
pip install oracledb

# PostgreSQL support
pip install psycopg2-binary

# MySQL support
pip install pymysql

# MongoDB support
pip install pymongo

# Cloud storage support
pip install boto3 google-cloud-storage azure-storage-blob

🖥️ CLI Reference

Main Commands

# Unified run command (recommended)
evolvishub-cdc run -c config.yaml [OPTIONS]

# Legacy commands (still supported)
evolvishub-cdc sync -c config.yaml
evolvishub-cdc continuous-sync -c config.yaml

Run Command Options

# Sync modes
evolvishub-cdc run -c config.yaml -m one_time     # One-time sync
evolvishub-cdc run -c config.yaml -m continuous  # Continuous sync
evolvishub-cdc run -c config.yaml -m cron        # Cron-scheduled sync

# Override cron expression
evolvishub-cdc run -c config.yaml --cron "0 */4 * * *"

# Logging options
evolvishub-cdc run -c config.yaml -l DEBUG                    # Set log level
evolvishub-cdc run -c config.yaml --log-file sync.log         # Log to file
evolvishub-cdc run -c config.yaml -l INFO --log-file app.log  # Both

# Help
evolvishub-cdc --help
evolvishub-cdc run --help

Common Cron Expressions

Expression Description
"0 */2 * * *" Every 2 hours
"0 9 * * 1-5" Weekdays at 9 AM
"30 2 * * 0" Sundays at 2:30 AM
"0 0 1 * *" First day of month
"*/15 * * * *" Every 15 minutes

Development

Setup

  1. Clone the repository:
git clone https://github.com/evolvishub/evolvishub-data-handler.git
cd evolvishub-data-handler
  1. Create a virtual environment:
make venv
  1. Install development dependencies:
make install
  1. Install pre-commit hooks:
make install-hooks

Testing

Run the test suite:

make test

Code Quality

Format code:

make format

Run linters:

make lint

Building

Build the package:

make build

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Commit your changes
  4. Push to the branch
  5. Create a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

EvolvisHub Data Handler Adapter

A powerful and flexible data handling adapter for Evolvis AI's data processing pipeline. This tool provides seamless integration with various database systems and implements Change Data Capture (CDC) functionality.

🔧 Troubleshooting

Common Issues

Oracle Connection Errors

# Install Oracle client
pip install oracledb

# For TNS name issues, check tnsnames.ora
export TNS_ADMIN=/path/to/tns/admin

Cron Expression Validation

# Test cron expressions online: https://crontab.guru/
# Common mistake: Using 6 fields instead of 5
# Correct: "0 */2 * * *" (every 2 hours)
# Wrong: "0 0 */2 * * *" (6 fields)

Watermark Storage Issues

# Check SQLite file permissions
ls -la /var/lib/evolvishub/watermarks.db

# Verify directory exists and is writable
mkdir -p /var/lib/evolvishub
chmod 755 /var/lib/evolvishub

Configuration Validation

# Test configuration loading
from evolvishub_data_handler.config_loader import load_config
config = load_config("config.yaml")
print("Configuration is valid!")

Getting Help

  • 📖 Documentation: Check the examples/ directory for configuration samples
  • 🐛 Issues: Report bugs on GitHub Issues
  • 💬 Discussions: Ask questions in GitHub Discussions
  • 📧 Support: Contact a.maxhuni@evolvis.ai

About Evolvis AI

Evolvis AI is a leading provider of AI solutions that helps businesses unlock their data potential. We specialize in:

  • Data analysis and decision-making
  • Machine learning implementation
  • Process optimization
  • Predictive maintenance
  • Natural language processing
  • Custom AI solutions

Our mission is to make artificial intelligence accessible to businesses of all sizes, enabling them to compete in today's data-driven environment. As Forbes highlights: "Organizations that strategically adopt AI will have a significant competitive advantage in today's data-driven market."

Author

Alban Maxhuni, PhD
Email: a.maxhuni@evolvis.ai

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

evolvishub_data_handler-0.1.2.tar.gz (60.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

evolvishub_data_handler-0.1.2-py3-none-any.whl (50.3 kB view details)

Uploaded Python 3

File details

Details for the file evolvishub_data_handler-0.1.2.tar.gz.

File metadata

  • Download URL: evolvishub_data_handler-0.1.2.tar.gz
  • Upload date:
  • Size: 60.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for evolvishub_data_handler-0.1.2.tar.gz
Algorithm Hash digest
SHA256 d66b2283027fb25417bff9427afb11c7fe52957c62de670fe8c0beee84c19817
MD5 421e8eaba5484637c45f2029d08e2c0d
BLAKE2b-256 76d7f36431c45f3d05ebc9401f1850cbe8ac7858e38ee035f9f173f8186fd806

See more details on using hashes here.

File details

Details for the file evolvishub_data_handler-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for evolvishub_data_handler-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 765b147640a20126e6b6a7c92766655344aefce83ea586727998e8771d35ffdb
MD5 60f456d108090a4452a8ba97e2aadc08
BLAKE2b-256 05d73e436c515ec4c8fed6a64006c62db1033ff2b7515a38fbd1965e756f9d2c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page