Skip to main content

A Change Data Capture (CDC) library for data synchronization

Project description

EvolvisHub Data Handler

Evolvis AI Logo

PyPI version Python Versions License: MIT CI/CD Code Coverage

A robust Change Data Capture (CDC) library for efficient data synchronization across various databases and storage systems.

Features

  • Multi-Database Support: Seamlessly sync data between PostgreSQL, MySQL, SQL Server, Oracle, MongoDB, and more
  • Cloud Storage Integration: Native support for AWS S3, Google Cloud Storage, and Azure Blob Storage
  • File System Support: Handle CSV, JSON, and other file formats
  • Watermark Tracking: Efficient incremental sync with configurable watermark columns
  • Batch Processing: Optimize performance with configurable batch sizes
  • Error Handling: Robust error recovery and logging
  • Type Safety: Full type hints and validation with Pydantic
  • Extensible: Easy to add new adapters and data sources

Installation

# Install from PyPI
pip install evolvishub-data-handler

# Install with development dependencies
pip install evolvishub-data-handler[dev]

# Install with documentation dependencies
pip install evolvishub-data-handler[docs]

Quick Start

  1. Create a configuration file (e.g., config.yaml):
source:
  type: postgresql
  host: localhost
  port: 5432
  database: source_db
  username: source_user
  password: source_password
  watermark:
    column: updated_at
    type: timestamp
    initial_value: "1970-01-01 00:00:00"
  # Optional: Custom query for complex data extraction
  query: >
    SELECT id, name, email, updated_at,
           CASE WHEN deleted_at IS NOT NULL THEN 'delete'
                WHEN updated_at > :last_sync THEN 'update'
                ELSE 'insert' END as operation
    FROM users
    WHERE updated_at > :last_sync OR :last_sync IS NULL
    ORDER BY updated_at LIMIT :batch_size

destination:
  type: postgresql
  host: localhost
  port: 5432
  database: dest_db
  username: dest_user
  password: dest_password
  table: users
  watermark:
    column: updated_at
    type: timestamp

sync:
  mode: continuous  # one_time, continuous, or cron
  batch_size: 1000
  interval_seconds: 60  # For continuous mode
  cron_expression: "0 */2 * * *"  # For cron mode (every 2 hours)
  timezone: "UTC"  # Timezone for cron scheduling
  watermark_table: sync_watermark
  1. Use the library in your code:
from evolvishub_data_handler import CDCHandler

# Initialize the handler
handler = CDCHandler("config.yaml")

# Run one-time sync
handler.sync()

# Or run continuous sync
handler.run_continuous()
  1. Or use the command-line interface:
# One-time sync
evolvishub-cdc run -c config.yaml -m one_time

# Continuous sync
evolvishub-cdc run -c config.yaml -m continuous

# Cron-scheduled sync
evolvishub-cdc run -c config.yaml -m cron --cron "0 */2 * * *"

# With custom logging
evolvishub-cdc run -c config.yaml -l DEBUG --log-file sync.log

# Legacy commands (still supported)
evolvishub-cdc sync -c config.yaml
evolvishub-cdc continuous-sync -c config.yaml

Sync Modes

One-Time Sync

Run a single synchronization cycle and exit.

sync:
  mode: one_time
  batch_size: 1000

Continuous Sync

Run synchronization continuously at specified intervals.

sync:
  mode: continuous
  interval_seconds: 60  # Sync every 60 seconds
  batch_size: 1000

Cron-Scheduled Sync

Run synchronization based on cron expressions with timezone support.

sync:
  mode: cron
  cron_expression: "0 */2 * * *"  # Every 2 hours
  timezone: "America/New_York"
  batch_size: 1000

Common Cron Expressions:

  • "0 9 * * 1-5" - Every weekday at 9 AM
  • "0 */6 * * *" - Every 6 hours
  • "30 2 * * 0" - Every Sunday at 2:30 AM
  • "0 0 1 * *" - First day of every month at midnight
  • "0 8,12,16 * * *" - At 8 AM, 12 PM, and 4 PM every day

Custom Queries

Using Custom SQL Queries

Define complex data extraction logic with custom SQL queries:

source:
  type: postgresql
  # ... connection details ...
  query: >
    SELECT
      id, name, email, updated_at,
      CASE
        WHEN deleted_at IS NOT NULL THEN 'delete'
        WHEN updated_at > :last_sync THEN 'update'
        ELSE 'insert'
      END as operation,
      EXTRACT(EPOCH FROM updated_at) as updated_timestamp
    FROM users
    WHERE (updated_at > :last_sync OR :last_sync IS NULL)
      AND status = 'active'
    ORDER BY updated_at
    LIMIT :batch_size

Available Parameters:

  • :last_sync - Last synchronization timestamp
  • :batch_size - Configured batch size

Using Simple SELECT Statements

For simpler cases, use the select field:

source:
  type: postgresql
  # ... connection details ...
  select: "SELECT id, name, email, updated_at FROM users"
  watermark:
    column: updated_at
    type: timestamp

The framework automatically adds WHERE, ORDER BY, and LIMIT clauses based on watermark configuration.

Watermark Storage Options

Database Storage (Default)

Store watermarks in the source or destination database:

sync:
  watermark_table: sync_watermark  # Default behavior

SQLite Storage

Store watermarks in a separate SQLite database for persistence across restarts:

sync:
  watermark_storage:
    type: sqlite
    sqlite_path: "/var/lib/evolvishub/watermarks.db"
    table_name: "sync_watermark"

Benefits of SQLite Storage:

  • ✅ Persistent across application restarts
  • ✅ Independent of source/destination databases
  • ✅ Centralized watermark management
  • ✅ Error tracking and status monitoring
  • ✅ Resume from last successful sync point

File Storage

Store watermarks in a JSON file:

sync:
  watermark_storage:
    type: file
    file_path: "/var/lib/evolvishub/watermarks.json"

Supported Data Sources

Databases

  • PostgreSQL
  • MySQL
  • SQL Server
  • Oracle (with TNS name support)
  • MongoDB

Cloud Storage

  • AWS S3
  • Google Cloud Storage
  • Azure Blob Storage

File Systems

  • CSV files
  • JSON files
  • Parquet files

Development

Setup

  1. Clone the repository:
git clone https://github.com/evolvishub/evolvishub-data-handler.git
cd evolvishub-data-handler
  1. Create a virtual environment:
make venv
  1. Install development dependencies:
make install
  1. Install pre-commit hooks:
make install-hooks

Testing

Run the test suite:

make test

Code Quality

Format code:

make format

Run linters:

make lint

Building

Build the package:

make build

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Commit your changes
  4. Push to the branch
  5. Create a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

EvolvisHub Data Handler Adapter

A powerful and flexible data handling adapter for Evolvis AI's data processing pipeline. This tool provides seamless integration with various database systems and implements Change Data Capture (CDC) functionality.

About Evolvis AI

Evolvis AI is a leading provider of AI solutions that helps businesses unlock their data potential. We specialize in:

  • Data analysis and decision-making
  • Machine learning implementation
  • Process optimization
  • Predictive maintenance
  • Natural language processing
  • Custom AI solutions

Our mission is to make artificial intelligence accessible to businesses of all sizes, enabling them to compete in today's data-driven environment. As Forbes highlights: "Organizations that strategically adopt AI will have a significant competitive advantage in today's data-driven market."

Author

Alban Maxhuni, PhD
Email: a.maxhuni@evolvis.ai

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

evolvishub_data_handler-0.1.1.tar.gz (57.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

evolvishub_data_handler-0.1.1-py3-none-any.whl (48.3 kB view details)

Uploaded Python 3

File details

Details for the file evolvishub_data_handler-0.1.1.tar.gz.

File metadata

  • Download URL: evolvishub_data_handler-0.1.1.tar.gz
  • Upload date:
  • Size: 57.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.9

File hashes

Hashes for evolvishub_data_handler-0.1.1.tar.gz
Algorithm Hash digest
SHA256 8dc1f2f93d1ac9e2b103f8bd103d89140539b3869d85049dee74061f5fb28a1a
MD5 d4fed88c058c102f8c3b5e868f6d137a
BLAKE2b-256 d6cc4f5141a4aed6da77b1718c40f9f5f2b246b6e1ee7da6480e6e4e977ac6c9

See more details on using hashes here.

File details

Details for the file evolvishub_data_handler-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for evolvishub_data_handler-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 7736461039c19b90fbee9b131b7a7838f0ad0c84806fd01558d4ff156be7cfac
MD5 f724df53ce1f5b8a815b2580f45fa85e
BLAKE2b-256 8f0e53b96ac736aad849460ab285137e9652313857fd4bd35d00deb0961a4020

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page