ETL helper for moving tabular data between MSSQL, PostgreSQL, MySQL, and local files
Project description
SyncDB
Python ETL helper for moving tabular data between Microsoft SQL Server, PostgreSQL, MySQL, and local files (CSV, Parquet, Excel, Pickle), with automatic schema creation, schema evolution, and batch progress reporting.
Table of Contents
- What SyncDB Does
- Installation
- Quick Start
- Core Concepts
- Connecting to Databases
- Transfer Modes (
append,insert_only,upsert,full_refresh,append_staging,snapshot,soft_delete) - Syncing Tables
- Filtering Data
- Schema Evolution
- Working with Files
- Advanced Features
- Progress Reporting
- Reading Sync Results
- Complete Examples
- API Reference
- Supported File Formats
- Running Tests
- Roadmap
What SyncDB Does
SyncDB copies data from a source (database table or file) to a destination (database table or file). It handles:
- Creating the destination table if it does not exist
- Adding or dropping columns when the schema changes
- Chunking large tables into batches so you never load millions of rows into memory at once
- Translating data types between engines (e.g. PostgreSQL
booleanto MSSQLbit) - Showing a live progress bar while data moves
- Returning structured sync results and optionally printing a final summary table
A 5-minute transfer job looks like this:
from syncdb import DatabaseConfig, SyncDB
src = DatabaseConfig(engine="mssql", connection_string="...")
dst = DatabaseConfig(engine="postgresql", connection_string="...")
sync = SyncDB(source=src, target=dst)
sync.sync_tables({
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "append",
"primary_key": ["order_id"],
}
})
That is all you need. SyncDB creates the public.orders table if it does not exist, maps every column type, and streams the data in batches.
Installation
After the package is published to PyPI:
pip install Qubdi-SyncDB
Install only the database connectors and file formats you actually need:
pip install "Qubdi-SyncDB[mssql]" # MSSQL / SQL Server
pip install "Qubdi-SyncDB[postgres]" # PostgreSQL
pip install "Qubdi-SyncDB[mysql]" # MySQL / MariaDB
pip install "Qubdi-SyncDB[files]" # Parquet + Excel (requires pandas)
pip install "Qubdi-SyncDB[all]" # Everything
For local development from this repository:
pip install -e .
Or with extras:
pip install -e ".[mssql]" # MSSQL / SQL Server
pip install -e ".[postgres]" # PostgreSQL
pip install -e ".[mysql]" # MySQL / MariaDB
pip install -e ".[files]" # Parquet + Excel (requires pandas)
pip install -e ".[all]" # Everything
CSV and Pickle work without any extras; they use Python's standard library.
The distribution name is Qubdi-SyncDB. The Python import name stays lowercase:
from syncdb import DatabaseConfig, SyncDB
Quick Start
Copy a table from SQL Server to PostgreSQL
from syncdb import DatabaseConfig, SyncDB
source = DatabaseConfig(
engine="mssql",
connection_string=(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=localhost,1433;Database=sales_db;"
"UID=admin;PWD=secret;TrustServerCertificate=yes;"
),
)
target = DatabaseConfig(
engine="postgresql",
connection_string="postgresql://admin:secret@localhost:5432/sales_db",
)
sync = SyncDB(source=source, target=target, batch_size=10_000)
sync.sync_tables({
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "append",
"primary_key": ["order_id"],
"order_by": ["order_id"],
}
})
# SyncDB prints a summary automatically:
#
# SyncDB summary (standard)
# +-----------------+--------+--------------+---------+---------+-------+
# | table | mode | rows written | batches | created | time |
# +-----------------+--------+--------------+---------+---------+-------+
# | public.orders | append | 52,341 | 11 | no | 3.2s |
# +-----------------+--------+--------------+---------+---------+-------+
# total: 52,341 rows in 11 batches across 1 tables in 3.2s
Export a query result to a Parquet file
# Pass a SQL string directly
sync.export_query_to_file(
query="SELECT * FROM dbo.orders WHERE status = 'shipped'",
output_path="shipped_orders.parquet",
)
# Or point to a .sql file on disk
sync.export_query_to_file(
query="queries/shipped_orders.sql",
output_path="shipped_orders.parquet",
)
Load a file into a database table
sync.import_file_to_table(
input_path="shipped_orders.parquet",
destination="public.shipped_orders",
fresh_insert=True, # truncate before inserting
)
Core Concepts
Batching
SyncDB never loads an entire table into memory. It reads batch_size rows at a time from the source, writes them to the target, then reads the next batch. The default is 5,000 rows. Raise it for fast networks with plenty of RAM; lower it for slow connections or wide rows.
batch_size accepts either an integer count or a percentage string. A percentage is resolved against the total row count before the first batch is read — useful when you want each batch to represent a fixed share of the table regardless of its size.
sync = SyncDB(source=src, target=dst, batch_size=50_000) # fixed count
sync = SyncDB(source=src, target=dst, batch_size="10%") # 10% of total rows per batch
When a percentage is given but the total row count cannot be determined (for example, due to missing SELECT COUNT(*) permission), SyncDB falls back to the default of 5,000 rows.
batch_size can be overridden at three levels. More specific settings always win:
| Level | How to set | Applies to |
|---|---|---|
| Global (SyncDB constructor) | SyncDB(..., batch_size=5000) |
All tables, all calls |
Per-call (sync_tables / sync_schema) |
sync.sync_tables(tables, batch_size=10_000) |
All tables in that single call |
| Per-table (table spec) | {"batch_size": 500} inside the spec dict |
That one table only |
# Override for one sync_tables call — all tables in this call use 10 000
results = sync.sync_tables(tables, batch_size=10_000)
# Override for one schema sync call
results = sync.sync_schema("dbo", "public", batch_size="5%")
# Override for a single table inside the spec dict
results = sync.sync_tables({
"wide_table": {
"source": "dbo.wide_table",
"destination": "public.wide_table",
"batch_size": 500, # overrides the global and call-level setting
},
"small_table": {
"source": "dbo.small_table",
"destination": "public.small_table",
# no batch_size → uses the call-level or global default
},
})
Automatic Table Creation
If the destination table does not exist, SyncDB creates it automatically by reading the source schema. You do not need to write any CREATE TABLE statements.
Automatic Schema Management
When you run a sync on an existing table and the source schema has changed, SyncDB can:
- Add new columns that appear in the source but not the target (always on)
- Drop extra columns from the target that are no longer in the source (opt-in via
drop_extra_columns=True)
Existing column types are never altered — this protects manually added columns and audit fields.
Dry Run
Pass dry_run=True to see what SyncDB would do without writing any data. Schema changes are still reported but not applied.
sync = SyncDB(source=src, target=dst, dry_run=True)
results = sync.sync_tables({"orders": {"source": "dbo.orders", "destination": "public.orders"}})
# SyncDB prints a summary of what would change. To also inspect results in code:
r = results[0]
# r.columns_added, r.columns_dropped, r.table_created are populated even in dry_run mode
Connecting to Databases
DatabaseConfig describes a single database connection. You can use either a connection string or individual parameters.
Option 1: Connection String
from syncdb import DatabaseConfig
# SQL Server / MSSQL
mssql = DatabaseConfig(
engine="mssql",
connection_string=(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=db.example.com,1433;Database=mydb;"
"UID=sa;PWD=Password123;TrustServerCertificate=yes;"
),
)
# PostgreSQL
pg = DatabaseConfig(
engine="postgresql",
connection_string="postgresql://user:password@db.example.com:5432/mydb",
)
# MySQL
mysql = DatabaseConfig(
engine="mysql",
connection_string="mysql://user:password@db.example.com:3306/mydb",
)
Option 2: Individual Parameters
pg = DatabaseConfig(
engine="postgresql",
host="db.example.com",
port=5432,
database="mydb",
user="admin",
password="secret",
connect_timeout=60,
)
mssql = DatabaseConfig(
engine="mssql",
host="db.example.com",
database="mydb",
user="sa",
password="Password123",
)
Engine Name Aliases
SyncDB accepts several spellings for each engine:
| You write | Resolved to |
|---|---|
"mssql", "sqlserver", "sql_server" |
"mssql" |
"postgresql", "postgres", "pg" |
"postgresql" |
"mysql" |
"mysql" |
Default Ports and Schemas
| Engine | Default Port | Default Schema |
|---|---|---|
| MSSQL | 1433 | dbo |
| PostgreSQL | 5432 | public |
| MySQL | 3306 | (uses database name) |
DatabaseConfig Parameters
| Parameter | Description | Default |
|---|---|---|
engine |
Database engine (see aliases above) | required |
connection_string |
Full DSN or URL | None |
host |
Server hostname | None |
port |
Server port | engine default |
database |
Database name | None |
user |
Login username | None |
password |
Login password | None |
default_schema |
Schema prefix for unqualified table names | engine default |
connect_timeout |
Seconds before a connection attempt fails | 30 |
pool_min / pool_max |
Connection pool size bounds | 1 / 5 |
options |
Extra driver-specific keyword arguments | {} |
Transfer Modes
The mode key inside a table spec controls how SyncDB handles existing rows in the target.
Quick reference
| Mode | Touches existing rows? | Deletes from target? | Best for |
|---|---|---|---|
append |
Yes — upsert by PK | Per-batch delete before insert | Incremental loads with updates |
insert_only |
No | Never | Append-only event/log tables |
upsert |
Yes — upsert by PK | Per-batch delete before insert | Explicit append + dedup mode |
full_refresh |
Replaces everything | Truncate once at start | Small lookup tables |
append_staging |
Replaces everything | Staging table + final replace | Safer full-table reloads |
snapshot |
No | Never | Historical snapshots with _synced_at |
soft_delete |
Yes | Marks missing rows | Targets with deleted_at lifecycle |
append — Upsert by Primary Key (default)
For each batch, SyncDB deletes any target rows whose primary keys appear in that batch, then inserts the batch. Updated source rows replace stale target rows without duplicating them.
Use this when you want to keep adding new rows and keep existing rows up to date (equivalent to Airbyte's Incremental | Append + Dedup).
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "append",
"primary_key": ["order_id"],
}
insert_only — Pure Append, Never Touch Existing Rows
Inserts every source row without checking for duplicates. Existing target rows are never deleted or updated.
Use this for immutable event logs, audit trails, or any table where every source row is a new fact (equivalent to Airbyte's Incremental | Append).
"page_views": {
"source": "dbo.page_views",
"destination": "public.page_views",
"mode": "insert_only",
}
upsert — Explicit Append + Dedup
Uses the same portable delete-then-insert primary-key behavior as append, but lets jobs state that they want upsert semantics directly.
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "upsert",
"primary_key": ["order_id"],
}
full_refresh — Truncate and Reload
Truncates the target table once, then inserts all source rows. The target table is empty at the start of each run (equivalent to Airbyte's Full Refresh | Overwrite).
Use this for small lookup/reference tables where a complete reload every run is fine.
"product_categories": {
"source": "dbo.product_categories",
"destination": "public.product_categories",
"mode": "full_refresh",
}
append_staging — Load Through a Staging Table
Bulk-loads all rows into a staging table, then replaces the live table contents from that staging table. This keeps the live table untouched while the source is being read and staging rows are inserted.
Connector-native transactional rename/swap is still a future optimization; the current implementation is portable across supported engines.
snapshot — Append a Historical Copy
Appends every source row and adds _synced_at to show when that snapshot was captured.
"customers": {
"source": "dbo.customers",
"destination": "public.customers_history",
"mode": "snapshot",
}
soft_delete — Mark Missing Rows
Upserts rows that still exist in the source and sets deleted_at on target rows whose primary key is no longer present in the source.
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "soft_delete",
"primary_key": ["order_id"],
}
Syncing Tables
sync_tables accepts a dictionary where each key is a logical name for the operation and each value is a table specification.
results = sync.sync_tables({
"customers": {
"source": "dbo.customers", # source table (schema.table or table)
"destination": "public.customers", # target table
"mode": "append", # transfer mode
"primary_key": ["customer_id"], # override auto-detected PKs
"order_by": ["customer_id"], # deterministic read order
"filter": {"where": "is_active = ?", "params": [1]},
},
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "full_refresh",
},
})
You can sync many tables in a single call. SyncDB opens the source and target connections once and reuses them for all tables.
Table Spec Fields
| Key | Required | Description |
|---|---|---|
source |
yes | Source table name: "schema.table" or "table" |
destination |
yes | Target table name: "schema.table" or "table" |
mode |
no | Transfer mode: "append", "insert_only", "upsert", "full_refresh", "append_staging", "snapshot", "soft_delete". Default: "append" |
batch_size |
no | Override batch size for this table only — integer or percentage string. Takes precedence over the call-level and global batch_size |
primary_key |
no | Override PK columns. Auto-detected from source schema when omitted |
order_by |
no | Column(s) to sort source reads for deterministic batching |
filter |
no | Restrict which source rows are read (see Filtering Data) |
rename |
no | Source-to-target column rename map |
type_overrides |
no | Target type overrides by target column name |
transform |
no | Callable that receives and returns each batch as list[dict] |
incremental_column |
no | Cursor column for persisted high-watermark filtering |
watermark_store |
no | JSON file path for persisted watermark values |
watermark_key |
no | Override the default storage key for the watermark (default: "source->destination:column") |
expect |
no | Data quality checks: min_rows, not_null, unique, range |
on_batch |
no | Callback called after each written batch with the current TableSyncResult |
Filtering Data
Use the filter key to copy only a subset of source rows.
Parameterized filter (recommended)
Pass a dict with where (the SQL expression) and params (the values). The ? placeholders prevent SQL injection.
# Only active customers
"filter": {"where": "is_active = ?", "params": [1]}
# Orders from a specific date range
"filter": {"where": "created_at >= ? AND created_at < ?", "params": ["2024-01-01", "2025-01-01"]}
# Orders for specific customers
"filter": {"where": "customer_id IN (?, ?, ?)", "params": [101, 202, 303]}
Plain string filter
Pass a raw SQL expression string. Use only when the values are literals you control.
"filter": "status = 'shipped' AND region = 'US'"
Note: SyncDB validates WHERE clauses and rejects dangerous tokens like
;,--,/*,xp_, andsp_. Parameterized filters are always safer.
Full example with filter
results = sync.sync_tables({
"recent_orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "append",
"primary_key": ["order_id"],
"order_by": ["order_id"],
"filter": {
"where": "created_at >= ? AND status != ?",
"params": ["2024-01-01", "cancelled"],
},
}
})
Schema Evolution
SyncDB automatically keeps the target schema in sync with the source. Here is what happens on each run:
| Situation | What SyncDB does |
|---|---|
| Target table does not exist | Creates it with matching columns and primary key |
| Source has a new column | Adds the column to target |
| Source dropped a column | Drops column from target (only if drop_extra_columns=True) |
| Column type changed in source | Does nothing — type changes are never applied automatically |
Example: Adding columns automatically
Suppose you add a loyalty_tier column to dbo.customers in MSSQL. On the next sync, SyncDB detects it is missing from public.customers and adds it before copying data.
results = sync.sync_tables({
"customers": {
"source": "dbo.customers",
"destination": "public.customers",
"mode": "append",
}
})
# SyncDB prints a summary automatically. To inspect the result in code:
added = results[0].columns_added # ['loyalty_tier']
Example: Dropping extra columns
sync = SyncDB(source=src, target=dst, drop_extra_columns=True)
When drop_extra_columns=True, columns that exist in the target but not in the source are dropped. Leave it False (the default) to protect audit columns or computed columns you add manually.
Working with Files
SyncDB can export query results to local files and import files into database tables.
Export: Database → File
# Export a query string to Parquet
sync.export_query_to_file(
query="SELECT * FROM dbo.orders WHERE status = 'shipped'",
output_path="exports/shipped_orders.parquet",
)
# Or point to a .sql file — its contents are read and executed
sync.export_query_to_file(
query="queries/shipped_orders.sql",
output_path="exports/shipped_orders.parquet",
)
# Export to CSV
sync.export_query_to_file(
query="SELECT customer_id, email FROM dbo.customers",
output_path="customers.csv",
)
# Export to Excel
sync.export_query_to_file(
query="SELECT * FROM dbo.summary",
output_path="summary.xlsx",
)
# With query parameters (prevents SQL injection)
sync.export_query_to_file(
query="SELECT * FROM dbo.orders WHERE region = ? AND year = ?",
params=["US", 2024],
output_path="us_orders_2024.parquet",
)
Output parent directories are created automatically — no need to mkdir beforehand.
Import: File → Database
# Load a Parquet file into PostgreSQL (append by default)
sync.import_file_to_table(
input_path="exports/shipped_orders.parquet",
destination="public.shipped_orders",
)
# Truncate first, then load
sync.import_file_to_table(
input_path="customers.csv",
destination="public.customers",
fresh_insert=True,
)
If the target table does not exist, SyncDB creates it using column types inferred from the first row of data.
File-only operations (no database)
You can use FileTransfer directly to convert between file formats:
from syncdb import FileTransfer
ft = FileTransfer()
# Read any supported format
rows = ft.read("data.csv") # list of dicts
rows = ft.read("data.parquet")
rows = ft.read("data.xlsx")
# Write any supported format
ft.write(rows, "output.parquet")
ft.write(rows, "output.csv")
ft.write(rows, "output.xlsx")
# Convert CSV → Parquet in two lines
rows = ft.read("data.csv")
ft.write(rows, "data.parquet")
Advanced Features
Incremental High-Watermark Sync
Use incremental_column to track the maximum value of a cursor column (for example updated_at) between runs. SyncDB saves the high-water mark automatically after each sync — no manual filter key needed on subsequent runs.
sync.sync_tables({
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "append",
"primary_key": ["order_id"],
"incremental_column": "updated_at", # SyncDB remembers the max value
"watermark_store": "watermarks.json", # persisted between runs
}
})
| Key | Description |
|---|---|
incremental_column |
Column whose maximum value is saved as the cursor |
watermark_store |
Path to a JSON file where values are saved. Defaults to .syncdb_watermarks.json |
watermark_key |
Override the storage key used inside the JSON file. Defaults to "source->destination:column" |
On the first run there is no saved value, so all rows are copied. From the second run onward only rows with a value greater than the saved mark are read.
Row Transforms
Pass a Python callable to transform to modify each batch before it is written — useful for masking PII, unit conversion, or enrichment without a separate pipeline step.
def mask_pii(rows):
for r in rows:
r["email"] = "***@***.***"
r["phone"] = "***"
return rows
sync.sync_tables({
"customers": {
"source": "dbo.customers",
"destination": "public.customers",
"transform": mask_pii,
}
})
The function receives each batch as a list[dict] and must return a list[dict].
Column Renaming
Use rename to map source column names to different target column names. This prevents a renamed column from being treated as a dropped column + new column, which would lose data.
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"rename": {"cust_id": "customer_id", "ord_dt": "order_date"},
}
Column Type Overrides
Use type_overrides to specify the exact target column type when the automatic type mapping is not appropriate for your workload.
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"type_overrides": {"price": "numeric(18,4)", "notes": "text", "flags": "jsonb"},
}
Data Quality Checks
Use expect to assert data conditions after each table sync. SyncDB raises ValueError and populates result.expectations_failed when a check fails.
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"expect": {
"min_rows": 1000,
"not_null": ["order_id", "customer_id"],
"unique": ["order_id"],
"range": {
"total_amount": {"min": 0},
"discount_pct": {"min": 0, "max": 100},
},
},
}
| Check | Description |
|---|---|
min_rows |
Fail if the target table has fewer than this many rows after the sync |
not_null |
List of column names that must contain no null values |
unique |
List of column names (or lists of names) that must have no duplicate values |
range |
Dict of {column: {min: value, max: value}} — fail if any value falls outside the bounds |
Per-Batch Callbacks
Supply an on_batch callable to be called after every batch. The function receives the current TableSyncResult snapshot — useful for custom metrics, rate limiting, or mid-sync alerting.
def emit_metric(result_so_far):
statsd.gauge("etl.rows_written", result_so_far.rows_written)
sync.sync_tables({
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"on_batch": emit_metric,
}
})
Retry on Transient Errors
Set retry_count and retry_delay_seconds on the SyncDB constructor to automatically retry failed batch writes with exponential backoff.
sync = SyncDB(
source=src,
target=dst,
retry_count=3, # retry up to 3 times per batch
retry_delay_seconds=2.0, # initial delay; doubles after each attempt (2s, 4s, 8s)
)
Progress Reporting
SyncDB prints a progress bar as data moves. Three modes are available:
| Mode | Behavior | Best for |
|---|---|---|
ProgressMode.multi_line |
New line per batch (default) | CI logs, log files |
ProgressMode.one_line |
Overwrites the same line per table, then commits it | Interactive terminals |
ProgressMode.none |
Silent | Scheduled jobs, custom logging |
from syncdb import ProgressMode, SyncDB
# Interactive terminal — animated progress on one line
sync = SyncDB(source=src, target=dst, progress_mode=ProgressMode.one_line)
# CI pipeline — each batch on its own line
sync = SyncDB(source=src, target=dst, progress_mode=ProgressMode.multi_line)
# No output at all
sync = SyncDB(source=src, target=dst, progress_mode=ProgressMode.none)
# String values also accepted
sync = SyncDB(source=src, target=dst, progress_mode="one_line")
When a total row count is available (SELECT COUNT(*) succeeds), the bar shows a filled progress bar, percentage, row count, and elapsed time. When the count query fails due to permissions, it falls back to displaying the running row count and elapsed time.
public.orders [=============>.......................] 40% 4,000 / 10,000 1.2s
public.customers [====================] 100% 8,200 / 8,200 0.9s
In one_line mode each table keeps its own output row — completed tables remain visible as new tables begin. Elapsed time resets at the start of each table.
Reading Sync Results
sync_tables returns a list of TableSyncResult objects — one per table in the spec. The easiest way to see results is the verbose parameter:
# prints a formatted summary table automatically when the sync finishes
sync = SyncDB(source=src, target=dst, verbose="standard")
results = sync.sync_tables({
"orders": {"source": "dbo.orders", "destination": "public.orders"},
"customers": {"source": "dbo.customers", "destination": "public.customers"},
})
# output:
#
# SyncDB summary (standard)
# +------------------+--------+--------------+---------+---------+------+
# | table | mode | rows written | batches | created | time |
# +------------------+--------+--------------+---------+---------+------+
# | public.orders | append | 52,341 | 11 | no | 3.2s |
# | public.customers | append | 8,200 | 2 | yes | 0.9s |
# +------------------+--------+--------------+---------+---------+------+
# total: 60,541 rows in 13 batches across 2 tables in 4.1s
Use verbose="detailed" for a full row with every TableSyncResult field, including schema changes, watermark values, and quality-check results.
verbose= |
Output |
|---|---|
"standard" |
One line per table — destination, mode, rows written, batches, created flag, and a totals row |
"detailed" |
Full table with all TableSyncResult fields including schema changes, watermark, and check results |
None |
Silent — return results only, print nothing |
The returned list is also useful for programmatic checks — for example, wiring results into a monitoring system or raising an alert when new tables are created:
results = sync.sync_tables({
"orders": {"source": "dbo.orders", "destination": "public.orders"},
"customers": {"source": "dbo.customers", "destination": "public.customers"},
})
for r in results:
if r.table_created:
alert(f"New table created: {r.destination}")
if r.columns_added:
notify_schema_change(r.destination, r.columns_added)
All TableSyncResult fields
| Field | Type | Description |
|---|---|---|
name |
str |
Logical name from the table spec dict key |
source |
str |
Source table as specified |
destination |
str |
Destination table as specified |
mode |
str |
Transfer mode used |
rows_read |
int |
Total rows read from source |
rows_written |
int |
Total rows written to target |
batches |
int |
Number of batches processed |
table_created |
bool |
True if the target table was created fresh |
schema_created |
bool |
True if the target schema was created |
columns_added |
list[str] |
Column names added to target |
columns_dropped |
list[str] |
Column names dropped from target |
rows_soft_deleted |
int |
Rows marked deleted_at in soft_delete mode |
expectations_failed |
list[str] |
Failure messages from expect checks (empty if all passed) |
watermark_value |
Any |
Highest watermark value seen in this sync run |
dry_run |
bool |
True if this was a dry run |
duration_seconds |
float |
Wall-clock seconds from start to finish for this table |
Complete Examples
Example 1: Nightly incremental load from SQL Server to PostgreSQL
from syncdb import DatabaseConfig, ProgressMode, SyncDB
src = DatabaseConfig(
engine="mssql",
connection_string=(
"Driver={ODBC Driver 17 for SQL Server};"
"Server=prod-sql.internal,1433;Database=operations;"
"UID=etl_user;PWD=etl_pass;TrustServerCertificate=yes;"
),
)
dst = DatabaseConfig(
engine="postgresql",
host="analytics-db.internal",
database="warehouse",
user="loader",
password="loader_pass",
)
sync = SyncDB(
source=src,
target=dst,
batch_size=20_000,
progress_mode=ProgressMode.one_line,
)
results = sync.sync_tables({
"orders": {
"source": "dbo.orders",
"destination": "public.orders",
"mode": "append",
"primary_key": ["order_id"],
"order_by": ["order_id"],
"filter": {"where": "updated_at >= ?", "params": ["2024-12-01"]},
},
"order_lines": {
"source": "dbo.order_lines",
"destination": "public.order_lines",
"mode": "append",
"primary_key": ["line_id"],
"order_by": ["line_id"],
},
"customers": {
"source": "dbo.customers",
"destination": "public.customers",
"mode": "full_refresh", # small table, reload completely each night
},
})
# SyncDB prints a summary table automatically
Example 2: Preview schema changes before applying them
from syncdb import DatabaseConfig, SyncDB
src = DatabaseConfig(engine="mssql", connection_string="...")
dst = DatabaseConfig(engine="postgresql", connection_string="...")
# Dry run — see what would change without touching any data
sync = SyncDB(source=src, target=dst, dry_run=True, drop_extra_columns=True)
results = sync.sync_tables({
"products": {
"source": "dbo.products",
"destination": "public.products",
}
})
# SyncDB prints a [DRY RUN] summary automatically.
# Inspect results in code if needed:
r = results[0]
# r.table_created, r.columns_added, r.columns_dropped are populated
Example 3: Export filtered data to Parquet, then reload into a second database
from syncdb import DatabaseConfig, SyncDB
mssql_cfg = DatabaseConfig(engine="mssql", connection_string="...")
pg_cfg = DatabaseConfig(engine="postgresql", connection_string="...")
# Step 1: export from MSSQL to a local Parquet file
sync_export = SyncDB(source=mssql_cfg)
sync_export.export_query_to_file(
query="SELECT * FROM dbo.daily_report WHERE report_date = ?",
params=["2024-12-31"],
output_path="daily_report_2024_12_31.parquet",
)
# Step 2: load the file into PostgreSQL
sync_load = SyncDB(target=pg_cfg)
sync_load.import_file_to_table(
input_path="daily_report_2024_12_31.parquet",
destination="public.daily_report",
fresh_insert=False, # append to existing rows
)
Example 4: MySQL to PostgreSQL with column drop
from syncdb import DatabaseConfig, SyncDB
mysql_cfg = DatabaseConfig(
engine="mysql",
host="mysql.internal",
database="app_db",
user="reader",
password="reader_pass",
)
pg_cfg = DatabaseConfig(
engine="postgresql",
connection_string="postgresql://writer:pass@pg.internal:5432/warehouse",
)
sync = SyncDB(
source=mysql_cfg,
target=pg_cfg,
drop_extra_columns=True, # remove stale columns from target
batch_size=5_000,
)
sync.sync_tables({
"users": {
"source": "users", # unqualified — uses config.database
"destination": "public.users",
"mode": "append",
"primary_key": ["user_id"],
},
"events": {
"source": "events",
"destination": "public.events",
"mode": "append",
"primary_key": ["event_id"],
"order_by": ["event_id"],
"filter": "status != 'deleted'",
},
})
Example 5: Convert file formats without a database
from syncdb import FileTransfer
ft = FileTransfer()
# CSV → Parquet
rows = ft.read("data.csv")
ft.write(rows, "data.parquet")
# Parquet → Excel
rows = ft.read("data.parquet")
ft.write(rows, "data.xlsx")
# Filter rows in Python before writing
rows = ft.read("orders.parquet")
active = [r for r in rows if r["status"] == "active"]
ft.write(active, "active_orders.csv")
Example 6: Routing results to a custom logger
Pass verbose=None to suppress the built-in summary table and progress_mode=ProgressMode.none to suppress the progress bar. Then use the returned results list to feed your own logger or monitoring system.
import logging
from syncdb import DatabaseConfig, ProgressMode, SyncDB
logging.basicConfig(level=logging.INFO)
log = logging.getLogger("etl")
src = DatabaseConfig(engine="mssql", connection_string="...")
dst = DatabaseConfig(engine="postgresql", connection_string="...")
sync = SyncDB(
source=src,
target=dst,
verbose=None, # suppress built-in summary
progress_mode=ProgressMode.none, # suppress progress bar
)
results = sync.sync_tables({
"invoices": {
"source": "dbo.invoices",
"destination": "public.invoices",
"mode": "append",
"primary_key": ["invoice_id"],
}
})
for r in results:
log.info(
"sync complete table=%s rows_written=%d batches=%d",
r.destination, r.rows_written, r.batches,
)
API Reference
SyncDB
SyncDB(
source: DatabaseConfig | None = None,
target: DatabaseConfig | None = None,
batch_size: int | str = 5000,
progress_mode: ProgressMode | str = ProgressMode.multi_line,
dry_run: bool = False,
drop_extra_columns: bool = False,
verbose: str | None = "standard",
verbose_stream: TextIO | None = None,
retry_count: int = 0,
retry_delay_seconds: float = 1.0,
)
| Parameter | Description | Default |
|---|---|---|
source |
Source database config | None |
target |
Target database config | None |
batch_size |
Rows per batch — integer count (10_000) or percentage of total rows ("10%") |
5000 |
progress_mode |
Progress display mode | MULTI_LINE |
dry_run |
Report changes without writing data | False |
drop_extra_columns |
Drop target columns not in source | False |
verbose |
Automatic summary after sync: "standard", "detailed", or None to silence |
"standard" |
verbose_stream |
Output stream for the summary table | sys.stdout |
retry_count |
Retry failed batch writes this many times | 0 |
retry_delay_seconds |
Initial retry delay; doubles after each retry | 1.0 |
SyncDB.sync_tables(tables, batch_size)
sync.sync_tables(
tables: dict[str, dict],
batch_size: int | str | None = None,
) -> list[TableSyncResult]
Syncs one or more tables. Opens connections once, reuses them for all tables, always closes both on completion or error.
batch_size overrides the instance-level default for every table in this call. A "batch_size" key inside an individual table spec takes precedence over this argument.
SyncDB.sync_schema(source_schema, destination_schema, exclude, mode, batch_size, table_prefix, table_suffix, **table_defaults)
sync.sync_schema(
source_schema="dbo",
destination_schema="public",
exclude=["tmp_*", "audit_log"],
mode="append",
batch_size=10_000, # optional — overrides instance default for this call
table_prefix="raw_", # prepend to every destination table name
table_suffix="_v2", # append to every destination table name
expect={"not_null": ["id"]}, # extra kwargs are copied into every table spec
)
Discovers source tables through the connector and builds a sync_tables spec automatically. batch_size overrides the instance-level default for every table in this schema sync. Extra keyword arguments are copied into every generated table spec.
table_prefix and table_suffix are applied only to destination table names. Both default to "" so existing calls are unaffected.
| Parameter | Description | Default |
|---|---|---|
table_prefix |
String prepended to every destination table name | "" |
table_suffix |
String appended to every destination table name | "" |
# dbo.customers → public.raw_customers
# dbo.orders → public.raw_orders
sync.sync_schema("dbo", "public", table_prefix="raw_", mode="full_refresh")
# dbo.customers → public.customers_20250101
sync.sync_schema("dbo", "public", table_suffix="_20250101", mode="snapshot")
# dbo.customers → public.raw_customers_backup
sync.sync_schema("dbo", "public", table_prefix="raw_", table_suffix="_backup")
SyncDB.run_config_file(path)
from syncdb import SyncDB
results = SyncDB.run_config_file("syncdb.json")
Loads a .json, .yaml, or .yml job file with source, target, settings, and tables sections, then runs sync_tables.
SyncDB.export_query_to_file(query, output_path, params, file_format)
sync.export_query_to_file(
query: str | Path, # SQL string or path to a .sql file
output_path: str | Path,
params: list | None = None,
file_format: str | None = None, # inferred from extension when omitted
) -> int # rows written
SyncDB.import_file_to_table(input_path, destination, file_format, fresh_insert)
sync.import_file_to_table(
input_path: str | Path,
destination: str,
file_format: str | None = None, # inferred from extension when omitted
fresh_insert: bool = False, # truncate before inserting
) -> int # rows inserted
Supported File Formats
| Format | Extension | Extra dependency | Notes |
|---|---|---|---|
| CSV | .csv |
none | All values are strings; no type inference |
| Parquet | .parquet |
pandas, pyarrow |
Preserves types; best for large datasets |
| Excel | .xlsx, .xls |
pandas, openpyxl |
Readable by humans; slow for large files |
| Pickle | .pickle |
none | Python-only; not portable across languages |
File format can be inferred from the file extension or passed explicitly:
sync.export_query_to_file(
query="SELECT * FROM dbo.data",
output_path="output.dat",
file_format="parquet", # override extension-based detection
)
Running Tests
Unit tests (no database required):
pytest
To inspect tests with colored start/end sections and a final summary:
pytest Tests/Library/Components/sync --syncdb-live-output
For the detailed version, use --syncdb-live-output-detail. This keeps the same
colored test sections and also prints SyncDB batch progress bars plus per-sync
summary tables:
pytest Tests/Library/DatabaseToDatabase --syncdb-live-output-detail
The live report hides raw paths and groups each test by workflow, scope, and
database direction, such as Database to Database, Table Sync, and
PGSQL -> MySQL.
Available database-to-database scenario ids are:
postgresql_to_mysql
postgresql_to_mssql
mysql_to_postgresql
mysql_to_mssql
mssql_to_postgresql
mssql_to_mysql
Run all enabled database directions with:
pytest Tests/Library/DatabaseToDatabase --syncdb-live-output
Several integration tests intentionally run the same sync two or three times to verify repeat-run behavior after the first successful run.
On Windows you can use the helper script:
.\run_tests.ps1 sync -live
.\run_tests.ps1 db -detail
Integration tests against real databases require Docker:
cd Tests/DataBase
docker compose up -d --build
cd ../..
pytest Tests/Library/DatabaseToDatabase
Database-to-database integration tests are shared across database pairs. By
default they generate every PGSQL/MSSQL/MySQL source-to-target direction. Set
SYNCDB_LIVE_SCENARIOS=postgresql_to_mysql or a comma-separated list of scenario
ids from Tests/Library/DatabaseToDatabase/parameters.py to narrow the pairs.
See Tests/DataBase/README.md for details on test database setup.
Roadmap
In Progress
- Connector-native staging swaps — upgrade
append_stagingfrom the current portable truncate-and-copy strategy to an engine-specific transactional rename/swap where the engine supports it. - Connector-native upsert — replace the portable delete-then-insert upsert with engine-specific implementations: PostgreSQL
ON CONFLICT, MySQLON DUPLICATE KEY UPDATE, and MSSQLMERGE.
Planned
Parallel table sync — sync independent tables concurrently using a thread pool to cut total wall-clock time.
sync = SyncDB(source=src, target=dst, max_workers=4)
CLI command — a syncdb run <config.yaml> console entry point so sync jobs can be run from scheduled tasks without writing Python code.
syncdb run syncdb.yaml
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file qubdi_syncdb-0.1.0.tar.gz.
File metadata
- Download URL: qubdi_syncdb-0.1.0.tar.gz
- Upload date:
- Size: 78.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
253f030b94a05c63c322a9281d8275a7babc2729d59b2f5963bbd9858bdbc7fb
|
|
| MD5 |
80fa66d9fb14a621cedead66f8a8f532
|
|
| BLAKE2b-256 |
7ce18cbfe6b198762a2240c4a5aeda82eeb6681cb4d7d82e6acb6664e69e3c28
|
File details
Details for the file qubdi_syncdb-0.1.0-py3-none-any.whl.
File metadata
- Download URL: qubdi_syncdb-0.1.0-py3-none-any.whl
- Upload date:
- Size: 65.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
40ebb6cfa49d218d2b9ee17824e289b07bd244b75ab950d719c9421b72906433
|
|
| MD5 |
09849e884245913dc59e8b30fbd4cd45
|
|
| BLAKE2b-256 |
d18c0376e6c7a1df46700e114726ab7a348217647d6950c7f13d53508aecc093
|