Skip to main content

Lightweight Python data sync library - CDC and dump replication to cloud warehouses

Project description

PyReplicatorX

Lightweight, open-source Python library for syncing data from databases to cloud warehouses. Think of it as a Python-native alternative to Airbyte — no JVM, no heavy infrastructure, just pip install and go.

Features

  • CDC (Change Data Capture) — Stream real-time changes from PostgreSQL (WAL2JSON) and MySQL (binlog)
  • Full & Partial Dumps — One-shot table dumps with optional date-range filtering
  • Cloud Warehouse Destinations — Load into Redshift, Snowflake, or BigQuery
  • Parquet Staging — Efficient columnar format staged on S3 or GCS before loading
  • Schema Evolution — Auto-detect new columns and evolve destination schemas
  • Checkpointing — Resume from where you left off after restarts (SQLite-based)

Architecture

Source (CDC/Dump) → Queue → Processor → Stager (Parquet → S3/GCS) → Destination (COPY INTO)
     │                                       │                            │
     └── Checkpoint Store (SQLite)           └── Schema Manager ──────────┘

All sources emit a common NormalizedRecord format. The pipeline batches records per table, writes Parquet files with Snappy compression, uploads to cloud storage, then issues native COPY/Load commands to the destination warehouse.

Supported Connectors

Source CDC Full Dump Partial Dump
PostgreSQL Yes Yes Yes
MySQL Yes Yes Yes
Destination Load Method Schema Evolution
Redshift COPY FROM S3 FORMAT AS PARQUET ALTER TABLE ADD COLUMN
Snowflake COPY INTO with MATCH_BY_COLUMN_NAME Native (auto)
BigQuery GCS Load Job ALLOW_FIELD_ADDITION

Quick Start

Install

# Core (PostgreSQL source + Redshift destination + S3 staging)
pip install pyreplicatorx

# With MySQL support
pip install pyreplicatorx[mysql]

# With Snowflake destination
pip install pyreplicatorx[snowflake]

# With BigQuery destination
pip install pyreplicatorx[bigquery]

# Everything
pip install pyreplicatorx[all]

Configure

Create a config file (e.g., config.json):

{
  "pipeline_name": "my_sync",
  "source": {
    "engine": "postgres",
    "host": "localhost",
    "port": 5432,
    "user": "postgres",
    "password": "MY_DB_PASSWORD_ENV_VAR",
    "database": "mydb",
    "sslmode": "prefer",
    "slot_name": "pyreplicatorx_slot",
    "tables": ["public.users", "public.orders"]
  },
  "destination": {
    "engine": "redshift",
    "host": "my-cluster.region.redshift.amazonaws.com",
    "port": 5439,
    "user": "admin",
    "password": "MY_REDSHIFT_PASSWORD_ENV_VAR",
    "database": "analytics",
    "schema": "public",
    "iam_role": "arn:aws:iam::123456789:role/RedshiftCopyRole"
  },
  "staging": {
    "type": "s3",
    "bucket": "my-staging-bucket",
    "prefix": "pyreplicatorx/staging",
    "region": "us-east-1"
  },
  "settings": {
    "batch_size": 1000,
    "batch_timeout_seconds": 30
  }
}

Passwords reference environment variable names (e.g., MY_DB_PASSWORD_ENV_VAR) that are resolved at runtime from .env files or the shell environment.

Run

# Stream CDC changes (long-running)
pyreplicatorx cdc -f config.json

# Dump tables (one-shot)
pyreplicatorx dump -f config.json

# Validate config without running
pyreplicatorx validate -f config.json

Dump Configuration

Full and partial dumps are configured in the source.dump_tables array:

{
  "source": {
    "dump_tables": [
      "public.users",
      {
        "table": "public.events",
        "date_column": "created_at",
        "start_date": "2025-01-01",
        "end_date": "2025-12-31"
      }
    ]
  }
}
  • String entry — Full table dump
  • Object entry — Partial dump filtered by date column

Development

# Clone and install in editable mode
git clone https://github.com/your-org/pyreplicatorx.git
cd pyreplicatorx
pip install -e ".[dev]"

# Start test PostgreSQL (port 5433)
docker compose -f tests/test_dbs/postgres/docker-compose.yml up -d

# Format
black --line-length 200 pyreplicatorx/

# Lint
ruff check pyreplicatorx/

# Test
pytest

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyreplicatorx-0.1.0.tar.gz (30.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyreplicatorx-0.1.0-py3-none-any.whl (39.5 kB view details)

Uploaded Python 3

File details

Details for the file pyreplicatorx-0.1.0.tar.gz.

File metadata

  • Download URL: pyreplicatorx-0.1.0.tar.gz
  • Upload date:
  • Size: 30.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for pyreplicatorx-0.1.0.tar.gz
Algorithm Hash digest
SHA256 61ceee60b245a1fe93e22bc0692866e96f7392dd26b1fd059659314344de8ba9
MD5 4bedf03bac5e0c4bb967a8c267cc9f87
BLAKE2b-256 f436efc4d4c84e36e4d2a821f5d6fafe047bef6e56e66a94691bdf23066cd859

See more details on using hashes here.

File details

Details for the file pyreplicatorx-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: pyreplicatorx-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 39.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.0

File hashes

Hashes for pyreplicatorx-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a409cadbfddc2290409100e3e6a19d0c312cab5811992c3ace3071a309d5884c
MD5 0e541be1708ec574fd73f224e8048a71
BLAKE2b-256 bd04d29d6a3114a7503fd72793607a7ea2b4815f0515d4185663468b69baaad6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page