Fast Postgres → Parquet sync tool
Project description
rustream
Bidirectional Postgres sync tool. Reads tables from Postgres and writes Parquet/Iceberg files to local disk or S3, or ingests Parquet/CSV files from local disk or S3 back into Postgres. Supports incremental sync via watermark tracking and upsert-based ingestion.
Installation
From PyPI
pipx install rustream
# or
pip install rustream
From source
git clone https://github.com/kraftaa/rustream.git
cd rustream
cargo build --release
# binary is at target/release/rustream
With maturin (local dev)
pip install maturin
maturin develop --release
# now `rustream` is on your PATH
Usage
Sync (Postgres → Parquet/S3)
# Copy and edit the example config
cp config.example.yaml config.yaml
# Preview what will be synced (no files written)
rustream sync --config config.yaml --dry-run
# Run sync
rustream sync --config config.yaml
# If you need to wipe saved watermarks/cursors (full resync)
rustream sync --config config.yaml --reset-state
# Reset state for one table
rustream reset-state --table users
Ingest (S3/local → Postgres)
# Preview what would be ingested
rustream ingest --config ingest_config.yaml --dry-run
# Run ingest
rustream ingest --config ingest_config.yaml
Enable debug logging with RUST_LOG:
RUST_LOG=rustream=debug rustream sync --config config.yaml
RUST_LOG=rustream=debug rustream ingest --config ingest_config.yaml
Jobs / Worker (experimental)
# create control-plane table in Postgres
rustream init-jobs --control-db-url "$CONTROL_DB_URL"
# enqueue a table sync job
rustream add-job --control-db-url "$CONTROL_DB_URL" --table users --config config.yaml --interval-secs 300 --timeout-secs 900 --max-concurrent-jobs 1
# run the worker loop (polls every 5s; currently executes jobs sequentially)
rustream worker --control-db-url "$CONTROL_DB_URL" --poll-seconds 5 --max-concurrent 4
# force a job to run ASAP
rustream force-job --control-db-url "$CONTROL_DB_URL" --job-id 1
# status API (optional, returns JSON)
rustream status-api --control-db-url "$CONTROL_DB_URL" --bind 0.0.0.0:8080
# control DB URL can also come from env
# RUSTREAM_CONTROL_DB_URL=postgres://user:pass@host:5432/db rustream worker
# status endpoints:
# /jobs (json, optional ?status=pending), /jobs/html (auto-refresh + filter),
# /jobs/summary, /logs?limit=50, /health, /health/worker
# UI buttons: force run, retry failed, reset state
# reset all state (CLI)
rustream reset-state --state-dir .rustream_state
# reset one table
rustream reset-state --table users
# optional: run a data-quality command on each local output table
# (use {path} placeholder for the Parquet directory)
RUSTREAM_DQ_CMD="dq-prof --input {path}" rustream worker --control-db-url "$CONTROL_DB_URL"
Production templates
- Local:
examples/production-template/docker-compose.yml - K8s:
examples/production-template/helm - AWS:
examples/production-template/terraform
Configuration
Specific tables (recommended)
postgres:
host: localhost
database: mydb
user: postgres
password: secret
output:
type: local
path: ./output
tables:
- name: users
incremental_column: updated_at
incremental_tiebreaker_column: id
columns: # optional: pick specific columns
- id
- email
- created_at
- updated_at
- name: orders
incremental_column: updated_at
incremental_tiebreaker_column: id
- name: products # no incremental_column = full sync every run
- name: events # append-only example (no updated_at)
incremental_column: id
incremental_column_is_unique: true
All tables (auto-discover)
Omit tables to sync every table in the schema. Use exclude to skip some:
postgres:
host: localhost
database: mydb
user: postgres
output:
type: local
path: ./output
# schema: public # default
exclude:
- schema_migrations
- ar_internal_metadata
S3 output
output:
type: s3
bucket: my-data-lake
prefix: raw/postgres
region: us-east-1
AWS credentials come from environment variables, ~/.aws/credentials, or IAM role.
Iceberg output
output:
type: s3
bucket: my-data-lake
prefix: warehouse
region: us-east-1
format: iceberg
warehouse: s3://my-data-lake/warehouse
catalog:
type: filesystem # or "glue" (requires --features glue)
# glue_database: my_db # required when type=glue
Ingest (S3 → Postgres)
postgres:
host: localhost
database: mydb
user: postgres
password: secret
ingest:
input:
type: s3
bucket: my-data-lake
prefix: raw/postgres/
region: us-east-1
pattern: "**/*.parquet"
file_format: parquet # "parquet" or "csv"
write_mode: upsert # "insert" | "upsert" | "truncate_insert"
batch_size: 5000
target_schema: public
tables:
- file_pattern: "users/*.parquet"
target_table: users
key_columns: [id]
create_if_missing: true
- file_pattern: "orders/*.parquet"
target_table: orders
key_columns: [id]
Ingest from local files
ingest:
input:
type: local
path: ./parquet_files
pattern: "**/*.parquet"
file_format: parquet
write_mode: insert
batch_size: 5000
If no tables are listed, the target table name is inferred from the parent directory or filename.
Config reference (sync)
| Field | Description |
|---|---|
postgres.host |
Postgres host |
postgres.port |
Postgres port (default: 5432) |
postgres.database |
Database name |
postgres.user |
Database user |
postgres.password |
Database password (optional) |
output.type |
local or s3 |
output.path |
Local directory for Parquet files (when type=local) |
output.bucket |
S3 bucket (when type=s3) |
output.prefix |
S3 key prefix (when type=s3) |
output.region |
AWS region (when type=s3, optional) |
batch_size |
Rows per Parquet file (default: 10000) |
state_dir |
Directory for SQLite watermark state (default: .rustream_state) |
schema |
Schema to discover tables from (default: public) |
exclude |
List of table names to skip when using auto-discovery |
tables[].name |
Table name |
tables[].schema |
Schema name (default: public) |
tables[].columns |
Columns to sync (default: all) |
tables[].incremental_column |
Column for watermark-based incremental sync |
tables[].incremental_tiebreaker_column |
Stable cursor column for duplicate-safe incremental paging (required when incremental_column is set; recommended: primary key) |
tables[].incremental_column_is_unique |
Allow watermark-only incremental mode when incremental column is strictly unique/monotonic (e.g. append-only id) |
tables[].partition_by |
Partition output files: date, month, or year |
format |
Output format: parquet (default) or iceberg |
warehouse |
Warehouse path for Iceberg (required when format=iceberg) |
catalog.type |
Iceberg catalog: filesystem (default) or glue |
Config reference (ingest)
| Field | Description |
|---|---|
ingest.input.type |
local or s3 |
ingest.input.path |
Local directory (when type=local) |
ingest.input.bucket |
S3 bucket (when type=s3) |
ingest.input.prefix |
S3 key prefix (when type=s3) |
ingest.input.region |
AWS region (when type=s3, optional) |
ingest.input.pattern |
Glob pattern for file matching (default: **/*.parquet) |
ingest.file_format |
parquet (default) or csv |
ingest.write_mode |
insert (default), upsert, or truncate_insert |
ingest.batch_size |
Rows per INSERT statement (default: 5000) |
ingest.target_schema |
Postgres schema for target tables (default: public) |
ingest.tables[].file_pattern |
Glob pattern to match files to this table |
ingest.tables[].target_table |
Postgres table to write to |
ingest.tables[].key_columns |
Primary key columns (required for upsert mode) |
ingest.tables[].create_if_missing |
Auto-CREATE TABLE from file schema (default: false) |
Running Integration Tests
Some DB-backed tests are optional and run only when RUSTREAM_IT_DB_URL is set.
Without this env var, those tests no-op/return early.
export RUSTREAM_IT_DB_URL="host=localhost port=5432 dbname=mydb user=postgres password=secret"
cargo test
Verify Before Push/Tag
Run this locally before pushing or creating a release tag:
cargo fmt -- --check
cargo clippy --all-targets -- -D warnings
cargo test --all-targets
Tag releases also run the same checks in .github/workflows/release.yml before wheel build/publish.
How it works
Sync (Postgres → Parquet)
- Connects to Postgres and introspects each table's schema via
information_schema - Maps Postgres column types to Arrow types automatically
- Reads rows in batches, converting to Arrow RecordBatches
- Writes each batch as a Snappy-compressed Parquet file
- Tracks the high watermark (max value of
incremental_column) and optional cursor in local SQLite - Checkpoints incremental progress after each successfully written batch
- On next run, reads rows after the saved
(watermark, cursor)position
Tables without incremental_column do a full sync every run.
Ingest (Parquet/CSV → Postgres)
- Discovers files matching the glob pattern from local disk or S3
- Skips files already ingested (tracked in local SQLite)
- Reads each file into Arrow RecordBatches (Parquet or CSV with schema inference)
- Creates the target table if
create_if_missing: true(DDL from Arrow schema) - Writes rows via multi-row parameterized INSERT or INSERT...ON CONFLICT (upsert)
- Marks each file as ingested in SQLite to avoid reprocessing on next run
Supported Postgres types
| Postgres | Arrow |
|---|---|
boolean |
Boolean |
smallint |
Int16 |
integer, serial |
Int32 |
bigint, bigserial |
Int64 |
real |
Float32 |
double precision |
Float64 |
numeric / decimal |
Utf8 (preserves precision) |
text, varchar, char |
Utf8 |
bytea |
Binary |
date |
Date32 |
timestamp |
Timestamp(Microsecond) |
timestamptz |
Timestamp(Microsecond, UTC) |
uuid |
Utf8 |
json, jsonb |
Utf8 |
| arrays | Utf8 (JSON serialized) |
Publishing
The project uses maturin to package the Rust binary as a Python wheel (same approach as ruff, uv, etc). The CI workflow in .github/workflows/release.yml builds wheels for Linux, macOS, and Windows, then publishes to PyPI on tagged releases.
To publish manually:
# Build wheels for current platform
maturin build --release
# Upload to PyPI (needs PYPI_API_TOKEN)
maturin publish
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distributions
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rustream-0.3.2-py3-none-win_amd64.whl.
File metadata
- Download URL: rustream-0.3.2-py3-none-win_amd64.whl
- Upload date:
- Size: 17.3 MB
- Tags: Python 3, Windows x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58dfbce200ac6ae40ef4cbcfb2f5312ba9dfa91748b0a94490aa3b6a6cea03d1
|
|
| MD5 |
b1cc4cbc010de77c902b9ebb6dd0123d
|
|
| BLAKE2b-256 |
22d1e3723d8a6a6fc7d028ae5b36adf8f747017452d28aa4339fc54d028a6457
|
File details
Details for the file rustream-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.
File metadata
- Download URL: rustream-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
- Upload date:
- Size: 18.1 MB
- Tags: Python 3, manylinux: glibc 2.17+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
48ac456d734f77e2ab9fe6be407cef54dd569d4e973fe33328a41b03d20eb396
|
|
| MD5 |
8fd0415dffd3c8a2acfbf2e37aea6fd5
|
|
| BLAKE2b-256 |
55c43fd1c3522c22fa98490fa89caa8571e9125ab1e93e5ab15c9c766d196fae
|
File details
Details for the file rustream-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.
File metadata
- Download URL: rustream-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
- Upload date:
- Size: 17.1 MB
- Tags: Python 3, manylinux: glibc 2.17+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7bd2dae34c40a9c50e8680ffb7e67d08741b6be0051f854183ea2c1701d94780
|
|
| MD5 |
0eec4ab308c01ae6f3ad0331f864aa4d
|
|
| BLAKE2b-256 |
dc7a721ef6c424a2102136aae45d5e7f2cd5317f099e80441aeb68dc35627c42
|
File details
Details for the file rustream-0.3.2-py3-none-macosx_11_0_arm64.whl.
File metadata
- Download URL: rustream-0.3.2-py3-none-macosx_11_0_arm64.whl
- Upload date:
- Size: 15.9 MB
- Tags: Python 3, macOS 11.0+ ARM64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2f406fbb0a65a9ed56c996d375ae01627bb4a0d69dd190ab6fdf06ffd0d8b833
|
|
| MD5 |
f3c46d1f52a7e65e06371cc475effe34
|
|
| BLAKE2b-256 |
43eaa43423f0245790aeafb7909dd733f311ab82ac5d193ffee2bf09d7385563
|
File details
Details for the file rustream-0.3.2-py3-none-macosx_10_12_x86_64.whl.
File metadata
- Download URL: rustream-0.3.2-py3-none-macosx_10_12_x86_64.whl
- Upload date:
- Size: 17.3 MB
- Tags: Python 3, macOS 10.12+ x86-64
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4925d06c4b9902c8acda08f379e26bd101878df9f59779b1c98b8b17b0947810
|
|
| MD5 |
f6bfa4f750e34df37c2ecc1ff67d3ab2
|
|
| BLAKE2b-256 |
28a5789a70c1782f27f4589cac1025ad10e2a20c9898b8af30bf9053adf10fbf
|