Skip to main content

Faster PostgreSQL bulk inserts by COPY to a temp table, then INSERT into the target table.

Project description

pgcopyinsert

PyPI version Python versions License: MIT

Fast PostgreSQL bulk loads using a temp table: COPY into TEMPORARY …, then INSERT … SELECT (optionally ON CONFLICT DO NOTHING / DO UPDATE). Built on SQLAlchemy 2 and fullmetalcopy (sync + async COPY).


Contents


Why this pattern

  1. Reflect the target table so column names and types match PostgreSQL.
  2. Create a TEMPORARY table with the same columns (no constraints).
  3. COPY CSV bytes into the temp table (fast path).
  4. INSERT … SELECT into the real table (your chosen insert / upsert builder).
  5. DROP the temp table.

This keeps COPY simple and pushes deduplication / upsert rules into normal SQL INSERT logic.

flowchart LR
  reflect[Reflect_target]
  temp[Create_TEMP]
  copy[COPY_to_temp]
  ins[INSERT_from_temp]
  drop[DROP_temp]
  reflect --> temp --> copy --> ins --> drop

Requirements

  • Python 3.10+
  • PostgreSQL (server your SQLAlchemy engine points at)
  • Runtime deps: SQLAlchemy ≥ 2.0, fullmetalcopy ≥ 0.2.0 (installed with pip install pgcopyinsert)
  • A DB driver in your environment (psycopg, psycopg2, or asyncpg) matching your SQLAlchemy URL

Install

pip install pgcopyinsert

Extras (drivers + optional stacks):

pip install pgcopyinsert[psycopg2]
pip install pgcopyinsert[psycopg]
pip install pgcopyinsert[asyncpg]
pip install pgcopyinsert[psycopg,pandas]
pip install pgcopyinsert[asyncpg,polars]

If pip cannot resolve fullmetalcopy right after a release, upgrade pip (pip install -U pip) and retry.


Quick start

import io

import sqlalchemy as sa

from pgcopyinsert import copyinsert_csv

engine = sa.create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    buf = io.BytesIO(b"id,name\n1,Ada\n")
    copyinsert_csv(buf, "people", "people_load_tmp", conn, schema="public", headers=True)

copyinsert_csv expects a binary CSV stream (io.BytesIO or file opened with "rb"). Use engine.begin() so temp DDL, copy, insert, and drop stay in one transaction.


Package layout

Import What you get
from pgcopyinsert import copy_from_csv, copyinsert_csv, __version__ Top-level CSV + copyinsert entrypoints
pgcopyinsert.temp create_temp_table_from_table, create_table_stmt
pgcopyinsert.insert insert_from_table_stmt, insert_from_table_stmt_ocdn, insert_from_table_stmt_ocdu
pgcopyinsert.pd Pandas copyinsert_dataframe
pgcopyinsert.pl Polars copyinsert_polars (copyinsert_dataframe is deprecated)
pgcopyinsert.asynchronous.copyinsert async copyinsert_csv
pgcopyinsert.asynchronous.pd / pl Async Pandas / Polars helpers

Main APIs

copyinsert_csv (sync)

Parameter Description
csv_file BytesIO (or compatible binary buffer), positioned at start of CSV
table_name Existing target table name (reflected)
temp_name Name for the TEMPORARY load table
connection SQLAlchemy Connection
sep, null, columns, headers, schema Forwarded to fullmetalcopy copy_from_csv for the temp table
insert_function Callable (temp_table, target_table, constraint) → executable insert (default: DO NOTHING)
constraint PostgreSQL constraint name for ON CONFLICT, when required by your insert builder

copy_from_csv (sync)

Thin re-export from fullmetalcopy: load CSV bytes directly into a normal table (no temp-table orchestration). See fullmetalcopy docs for parameters.

DataFrame helpers

Pandas (pgcopyinsert.pd.copyinsert_dataframe) and Polars (pgcopyinsert.pl.copyinsert_polars) accept the same idea: optional constraint, null, schema, sep, and insert_function, and delegate to copyinsert_csv / async equivalents.


ON CONFLICT and constraint

PostgreSQL’s ON CONFLICT often targets a named constraint (for example mytable_pkey). The constraint= string must match pg_constraint.conname (use \d tablename in psql or query the catalog). It is not always the same as a column name.


Examples

CSV sync (SQLAlchemy 2)

import io

import sqlalchemy as sa

from pgcopyinsert import copy_from_csv, copyinsert_csv
from pgcopyinsert.insert import (
    insert_from_table_stmt_ocdn,
    insert_from_table_stmt_ocdu,
)
from pgcopyinsert.temp import create_temp_table_from_table

engine = sa.create_engine("postgresql+psycopg2://scott:tiger@hostname/dbname")

# COPY into an existing table (fullmetalcopy)
with engine.connect() as conn:
    with open("data.csv", "rb") as f:
        buf = io.BytesIO(f.read())
    buf.seek(0)
    copy_from_csv(conn, buf, "staging_table", schema="public", headers=True)
    conn.commit()

# Reflect + build a TEMP table with the same columns (no constraints)
meta = sa.MetaData()
meta.reflect(engine, schema="public")
table = sa.Table("target_table", meta, schema="public")
other_meta = sa.MetaData()
temp_table = create_temp_table_from_table(table, "target_load_tmp", other_meta)

# copyinsert: temp DDL → COPY → INSERT → DROP temp
with engine.connect() as conn:
    with open("data.csv", "rb") as f:
        buf = io.BytesIO(f.read())
    buf.seek(0)
    copyinsert_csv(
        buf,
        "target_table",
        "target_load_tmp",
        conn,
        schema="public",
        insert_function=insert_from_table_stmt_ocdn,
        constraint="target_table_pkey",
    )
    conn.commit()

# Upsert (DO UPDATE)
with engine.connect() as conn:
    with open("data.csv", "rb") as f:
        buf = io.BytesIO(f.read())
    buf.seek(0)
    copyinsert_csv(
        buf,
        "target_table",
        "target_load_tmp",
        conn,
        schema="public",
        insert_function=insert_from_table_stmt_ocdu,
        constraint="target_table_pkey",
    )
    conn.commit()

Single transaction with engine.begin()

import io

import sqlalchemy as sa

from pgcopyinsert import copyinsert_csv

engine = sa.create_engine("postgresql+psycopg2://user:pass@host/db")

with engine.begin() as conn:
    buf = io.BytesIO(b"id\n42\n")
    copyinsert_csv(buf, "my_table", "my_table_tmp", conn, headers=True)

Pandas

import pandas as pd
import sqlalchemy as sa

from pgcopyinsert.insert import insert_from_table_stmt_ocdu
import pgcopyinsert.pd as pci_pd

engine = sa.create_engine("postgresql+psycopg2://user:pass@host/db")
df = pd.DataFrame({"x": range(1000), "y": range(1000)})

with engine.connect() as conn:
    pci_pd.copyinsert_dataframe(
        df,
        "xy_table",
        "xy_table_tmp",
        conn,
        insert_function=insert_from_table_stmt_ocdu,
        constraint="xy_table_pkey",
        null="",
    )
    conn.commit()

Polars (sync)

Prefer copyinsert_polars. copyinsert_dataframe in this module is deprecated and will emit DeprecationWarning.

import polars as pl
import sqlalchemy as sa

from pgcopyinsert.insert import insert_from_table_stmt_ocdu
import pgcopyinsert.pl as pci_pl

engine = sa.create_engine("postgresql+psycopg2://user:pass@host/db")
df = pl.DataFrame({"x": range(1000), "y": range(1000)})

with engine.connect() as conn:
    pci_pl.copyinsert_polars(
        df,
        "xy_table",
        "xy_table_tmp",
        conn,
        insert_function=insert_from_table_stmt_ocdu,
        constraint="xy_table_pkey",
    )
    conn.commit()

Async

import io

from sqlalchemy.ext.asyncio import create_async_engine

from pgcopyinsert.asynchronous.copyinsert import copyinsert_csv

async def load_once(dsn: str) -> None:
    engine = create_async_engine(dsn)
    async with engine.connect() as conn:
        buf = io.BytesIO(b"id,name\n1,Ada\n")
        await copyinsert_csv(buf, "people", "people_tmp", conn, schema="public", headers=True)
        await conn.commit()
    await engine.dispose()


# asyncio.run(load_once("postgresql+asyncpg://user:pass@host/db"))

For async DataFrames, use pgcopyinsert.asynchronous.pd and pgcopyinsert.asynchronous.pl.


Development

git clone https://github.com/eddiethedean/pgcopyinsert.git
cd pgcopyinsert
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
ruff check src tests && ruff format --check src tests
mypy src/pgcopyinsert
pytest

Releasing

  1. Confirm CI is green on main and python -m build succeeds locally.

  2. Ensure pyproject.toml version and CHANGELOG.md match the tag you are about to cut.

  3. Create a signed or annotated tag (example for 0.3.0):

    git tag -a v0.3.0 -m "Release 0.3.0"
    git push origin v0.3.0
    
  4. Build artifacts and upload to PyPI (using Twine):

    python -m build
    twine check dist/*
    twine upload dist/*
    

See CHANGELOG.md for release notes.


Links

License: MIT (see LICENSE).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgcopyinsert-0.3.0.tar.gz (16.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgcopyinsert-0.3.0-py3-none-any.whl (11.7 kB view details)

Uploaded Python 3

File details

Details for the file pgcopyinsert-0.3.0.tar.gz.

File metadata

  • Download URL: pgcopyinsert-0.3.0.tar.gz
  • Upload date:
  • Size: 16.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for pgcopyinsert-0.3.0.tar.gz
Algorithm Hash digest
SHA256 fb43f5a5038cbfc0b2a236a5d902a516fb503f447e30066153ca01b03f5dadb9
MD5 6f4aee9a047dc421a21efa40ffe41e27
BLAKE2b-256 020e84ad34d20fee83266822ca28bf61c0c3d61cd43d6f1983d5efd26e2a4ea0

See more details on using hashes here.

File details

Details for the file pgcopyinsert-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pgcopyinsert-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 11.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for pgcopyinsert-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f81043da540f0864666937340e905b500ec4b9aad983803fcc6cde6e2d630759
MD5 b9504697498f1bc0b1435980c9b85212
BLAKE2b-256 a11da75030fdf90aaf401a9a0cfbd99e4b2fa874f551596d9590133030faca78

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page