Skip to main content

Fast Postgres → Parquet sync tool

Project description

rustream

Bidirectional Postgres sync tool. Reads tables from Postgres and writes Parquet/Iceberg files to local disk or S3, or ingests Parquet/CSV files from local disk or S3 back into Postgres. Supports incremental sync via watermark tracking and upsert-based ingestion.

Installation

From PyPI

pipx install rustream
# or
pip install rustream

From source

git clone https://github.com/kraftaa/rustream.git
cd rustream
cargo build --release
# binary is at target/release/rustream

With maturin (local dev)

pip install maturin
maturin develop --release
# now `rustream` is on your PATH

Usage

Sync (Postgres → Parquet/S3)

# Copy and edit the example config
cp config.example.yaml config.yaml

# Preview what will be synced (no files written)
rustream sync --config config.yaml --dry-run

# Run sync
rustream sync --config config.yaml

# If you need to wipe saved watermarks/cursors (full resync)
rustream sync --config config.yaml --reset-state

# Reset state for one table
rustream reset-state --table users

Ingest (S3/local → Postgres)

# Preview what would be ingested
rustream ingest --config ingest_config.yaml --dry-run

# Run ingest
rustream ingest --config ingest_config.yaml

Enable debug logging with RUST_LOG:

RUST_LOG=rustream=debug rustream sync --config config.yaml
RUST_LOG=rustream=debug rustream ingest --config ingest_config.yaml

Jobs / Worker (experimental)

# create control-plane table in Postgres
rustream init-jobs --control-db-url "$CONTROL_DB_URL"

# enqueue a table sync job
rustream add-job --control-db-url "$CONTROL_DB_URL" --table users --config config.yaml --interval-secs 300 --timeout-secs 900 --max-concurrent-jobs 1

# run the worker loop (polls every 5s; currently executes jobs sequentially)
rustream worker --control-db-url "$CONTROL_DB_URL" --poll-seconds 5 --max-concurrent 4

# force a job to run ASAP
rustream force-job --control-db-url "$CONTROL_DB_URL" --job-id 1

# status API (optional, returns JSON)
rustream status-api --control-db-url "$CONTROL_DB_URL" --bind 0.0.0.0:8080

# control DB URL can also come from env
# RUSTREAM_CONTROL_DB_URL=postgres://user:pass@host:5432/db rustream worker
# status endpoints:
#   /jobs (json, optional ?status=pending), /jobs/html (auto-refresh + filter),
#   /jobs/summary, /logs?limit=50, /health, /health/worker
#   UI buttons: force run, retry failed, reset state

# reset all state (CLI)
rustream reset-state --state-dir .rustream_state

# reset one table
rustream reset-state --table users

# optional: run a data-quality command on each local output table
# (use {path} placeholder for the Parquet directory)
RUSTREAM_DQ_CMD="dq-prof --input {path}" rustream worker --control-db-url "$CONTROL_DB_URL"

Production templates

  • Local: examples/production-template/docker-compose.yml
  • K8s: examples/production-template/helm
  • AWS: examples/production-template/terraform

Configuration

Specific tables (recommended)

postgres:
  host: localhost
  database: mydb
  user: postgres
  password: secret

output:
  type: local
  path: ./output

tables:
  - name: users
    incremental_column: updated_at
    incremental_tiebreaker_column: id
    columns:          # optional: pick specific columns
      - id
      - email
      - created_at
      - updated_at

  - name: orders
    incremental_column: updated_at
    incremental_tiebreaker_column: id

  - name: products    # no incremental_column = full sync every run

  - name: events      # append-only example (no updated_at)
    incremental_column: id
    incremental_column_is_unique: true

All tables (auto-discover)

Omit tables to sync every table in the schema. Use exclude to skip some:

postgres:
  host: localhost
  database: mydb
  user: postgres

output:
  type: local
  path: ./output

# schema: public    # default
exclude:
  - schema_migrations
  - ar_internal_metadata

S3 output

output:
  type: s3
  bucket: my-data-lake
  prefix: raw/postgres
  region: us-east-1

AWS credentials come from environment variables, ~/.aws/credentials, or IAM role.

Iceberg output

output:
  type: s3
  bucket: my-data-lake
  prefix: warehouse
  region: us-east-1

format: iceberg
warehouse: s3://my-data-lake/warehouse
catalog:
  type: filesystem    # or "glue" (requires --features glue)
  # glue_database: my_db  # required when type=glue

Ingest (S3 → Postgres)

postgres:
  host: localhost
  database: mydb
  user: postgres
  password: secret

ingest:
  input:
    type: s3
    bucket: my-data-lake
    prefix: raw/postgres/
    region: us-east-1
    pattern: "**/*.parquet"

  file_format: parquet       # "parquet" or "csv"
  write_mode: upsert         # "insert" | "upsert" | "truncate_insert"
  batch_size: 5000
  target_schema: public

  tables:
    - file_pattern: "users/*.parquet"
      target_table: users
      key_columns: [id]
      create_if_missing: true

    - file_pattern: "orders/*.parquet"
      target_table: orders
      key_columns: [id]

Ingest from local files

ingest:
  input:
    type: local
    path: ./parquet_files
    pattern: "**/*.parquet"

  file_format: parquet
  write_mode: insert
  batch_size: 5000

If no tables are listed, the target table name is inferred from the parent directory or filename.

Config reference (sync)

Field Description
postgres.host Postgres host
postgres.port Postgres port (default: 5432)
postgres.database Database name
postgres.user Database user
postgres.password Database password (optional)
output.type local or s3
output.path Local directory for Parquet files (when type=local)
output.bucket S3 bucket (when type=s3)
output.prefix S3 key prefix (when type=s3)
output.region AWS region (when type=s3, optional)
batch_size Rows per Parquet file (default: 10000)
state_dir Directory for SQLite watermark state (default: .rustream_state)
schema Schema to discover tables from (default: public)
exclude List of table names to skip when using auto-discovery
tables[].name Table name
tables[].schema Schema name (default: public)
tables[].columns Columns to sync (default: all)
tables[].incremental_column Column for watermark-based incremental sync
tables[].incremental_tiebreaker_column Stable cursor column for duplicate-safe incremental paging (required when incremental_column is set; recommended: primary key)
tables[].incremental_column_is_unique Allow watermark-only incremental mode when incremental column is strictly unique/monotonic (e.g. append-only id)
tables[].partition_by Partition output files: date, month, or year
format Output format: parquet (default) or iceberg
warehouse Warehouse path for Iceberg (required when format=iceberg)
catalog.type Iceberg catalog: filesystem (default) or glue

Config reference (ingest)

Field Description
ingest.input.type local or s3
ingest.input.path Local directory (when type=local)
ingest.input.bucket S3 bucket (when type=s3)
ingest.input.prefix S3 key prefix (when type=s3)
ingest.input.region AWS region (when type=s3, optional)
ingest.input.pattern Glob pattern for file matching (default: **/*.parquet)
ingest.file_format parquet (default) or csv
ingest.write_mode insert (default), upsert, or truncate_insert
ingest.batch_size Rows per INSERT statement (default: 5000)
ingest.target_schema Postgres schema for target tables (default: public)
ingest.tables[].file_pattern Glob pattern to match files to this table
ingest.tables[].target_table Postgres table to write to
ingest.tables[].key_columns Primary key columns (required for upsert mode)
ingest.tables[].create_if_missing Auto-CREATE TABLE from file schema (default: false)

Running Integration Tests

Some DB-backed tests are optional and run only when RUSTREAM_IT_DB_URL is set. Without this env var, those tests no-op/return early.

export RUSTREAM_IT_DB_URL="host=localhost port=5432 dbname=mydb user=postgres password=secret"
cargo test

Verify Before Push/Tag

Run this locally before pushing or creating a release tag:

cargo fmt -- --check
cargo clippy --all-targets -- -D warnings
cargo test --all-targets

Tag releases also run the same checks in .github/workflows/release.yml before wheel build/publish.

How it works

Sync (Postgres → Parquet)

  1. Connects to Postgres and introspects each table's schema via information_schema
  2. Maps Postgres column types to Arrow types automatically
  3. Reads rows in batches, converting to Arrow RecordBatches
  4. Writes each batch as a Snappy-compressed Parquet file
  5. Tracks the high watermark (max value of incremental_column) and optional cursor in local SQLite
  6. Checkpoints incremental progress after each successfully written batch
  7. On next run, reads rows after the saved (watermark, cursor) position

Tables without incremental_column do a full sync every run.

Ingest (Parquet/CSV → Postgres)

  1. Discovers files matching the glob pattern from local disk or S3
  2. Skips files already ingested (tracked in local SQLite)
  3. Reads each file into Arrow RecordBatches (Parquet or CSV with schema inference)
  4. Creates the target table if create_if_missing: true (DDL from Arrow schema)
  5. Writes rows via multi-row parameterized INSERT or INSERT...ON CONFLICT (upsert)
  6. Marks each file as ingested in SQLite to avoid reprocessing on next run

Supported Postgres types

Postgres Arrow
boolean Boolean
smallint Int16
integer, serial Int32
bigint, bigserial Int64
real Float32
double precision Float64
numeric / decimal Utf8 (preserves precision)
text, varchar, char Utf8
bytea Binary
date Date32
timestamp Timestamp(Microsecond)
timestamptz Timestamp(Microsecond, UTC)
uuid Utf8
json, jsonb Utf8
arrays Utf8 (JSON serialized)

Publishing

The project uses maturin to package the Rust binary as a Python wheel (same approach as ruff, uv, etc). The CI workflow in .github/workflows/release.yml builds wheels for Linux, macOS, and Windows, then publishes to PyPI on tagged releases.

To publish manually:

# Build wheels for current platform
maturin build --release

# Upload to PyPI (needs PYPI_API_TOKEN)
maturin publish

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distributions

If you're not sure about the file name format, learn more about wheel file names.

rustream-0.3.2-py3-none-win_amd64.whl (17.3 MB view details)

Uploaded Python 3Windows x86-64

rustream-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.1 MB view details)

Uploaded Python 3manylinux: glibc 2.17+ x86-64

rustream-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl (17.1 MB view details)

Uploaded Python 3manylinux: glibc 2.17+ ARM64

rustream-0.3.2-py3-none-macosx_11_0_arm64.whl (15.9 MB view details)

Uploaded Python 3macOS 11.0+ ARM64

rustream-0.3.2-py3-none-macosx_10_12_x86_64.whl (17.3 MB view details)

Uploaded Python 3macOS 10.12+ x86-64

File details

Details for the file rustream-0.3.2-py3-none-win_amd64.whl.

File metadata

  • Download URL: rustream-0.3.2-py3-none-win_amd64.whl
  • Upload date:
  • Size: 17.3 MB
  • Tags: Python 3, Windows x86-64
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rustream-0.3.2-py3-none-win_amd64.whl
Algorithm Hash digest
SHA256 58dfbce200ac6ae40ef4cbcfb2f5312ba9dfa91748b0a94490aa3b6a6cea03d1
MD5 b1cc4cbc010de77c902b9ebb6dd0123d
BLAKE2b-256 22d1e3723d8a6a6fc7d028ae5b36adf8f747017452d28aa4339fc54d028a6457

See more details on using hashes here.

File details

Details for the file rustream-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.

File metadata

File hashes

Hashes for rustream-0.3.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
Algorithm Hash digest
SHA256 48ac456d734f77e2ab9fe6be407cef54dd569d4e973fe33328a41b03d20eb396
MD5 8fd0415dffd3c8a2acfbf2e37aea6fd5
BLAKE2b-256 55c43fd1c3522c22fa98490fa89caa8571e9125ab1e93e5ab15c9c766d196fae

See more details on using hashes here.

File details

Details for the file rustream-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl.

File metadata

File hashes

Hashes for rustream-0.3.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Algorithm Hash digest
SHA256 7bd2dae34c40a9c50e8680ffb7e67d08741b6be0051f854183ea2c1701d94780
MD5 0eec4ab308c01ae6f3ad0331f864aa4d
BLAKE2b-256 dc7a721ef6c424a2102136aae45d5e7f2cd5317f099e80441aeb68dc35627c42

See more details on using hashes here.

File details

Details for the file rustream-0.3.2-py3-none-macosx_11_0_arm64.whl.

File metadata

File hashes

Hashes for rustream-0.3.2-py3-none-macosx_11_0_arm64.whl
Algorithm Hash digest
SHA256 2f406fbb0a65a9ed56c996d375ae01627bb4a0d69dd190ab6fdf06ffd0d8b833
MD5 f3c46d1f52a7e65e06371cc475effe34
BLAKE2b-256 43eaa43423f0245790aeafb7909dd733f311ab82ac5d193ffee2bf09d7385563

See more details on using hashes here.

File details

Details for the file rustream-0.3.2-py3-none-macosx_10_12_x86_64.whl.

File metadata

File hashes

Hashes for rustream-0.3.2-py3-none-macosx_10_12_x86_64.whl
Algorithm Hash digest
SHA256 4925d06c4b9902c8acda08f379e26bd101878df9f59779b1c98b8b17b0947810
MD5 f6bfa4f750e34df37c2ecc1ff67d3ab2
BLAKE2b-256 28a5789a70c1782f27f4589cac1025ad10e2a20c9898b8af30bf9053adf10fbf

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page