Faster PostgreSQL bulk inserts by COPY to a temp table, then INSERT into the target table.
Project description
pgcopyinsert
Fast PostgreSQL bulk loads using a temp table: COPY into TEMPORARY …, then INSERT … SELECT (optionally ON CONFLICT DO NOTHING / DO UPDATE). Built on SQLAlchemy 2 and fullmetalcopy (sync + async COPY).
Contents
- Why this pattern
- Requirements
- Install
- Quick start
- Package layout
- Main APIs
ON CONFLICTandconstraint- Examples
- Development
- Releasing
- Links
Why this pattern
- Reflect the target table so column names and types match PostgreSQL.
- Create a
TEMPORARYtable with the same columns (no constraints). COPYCSV bytes into the temp table (fast path).INSERT … SELECTinto the real table (your chosen insert / upsert builder).DROPthe temp table.
This keeps COPY simple and pushes deduplication / upsert rules into normal SQL INSERT logic.
flowchart LR
reflect[Reflect_target]
temp[Create_TEMP]
copy[COPY_to_temp]
ins[INSERT_from_temp]
drop[DROP_temp]
reflect --> temp --> copy --> ins --> drop
Requirements
- Python 3.10+
- PostgreSQL (server your SQLAlchemy engine points at)
- Runtime deps: SQLAlchemy ≥ 2.0, fullmetalcopy ≥ 0.2.0 (installed with
pip install pgcopyinsert) - A DB driver in your environment (psycopg, psycopg2, or asyncpg) matching your SQLAlchemy URL
Install
pip install pgcopyinsert
Extras (drivers + optional stacks):
pip install pgcopyinsert[psycopg2]
pip install pgcopyinsert[psycopg]
pip install pgcopyinsert[asyncpg]
pip install pgcopyinsert[psycopg,pandas]
pip install pgcopyinsert[asyncpg,polars]
If pip cannot resolve fullmetalcopy right after a release, upgrade pip (pip install -U pip) and retry.
Quick start
import io
import sqlalchemy as sa
from pgcopyinsert import copyinsert_csv
engine = sa.create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
buf = io.BytesIO(b"id,name\n1,Ada\n")
copyinsert_csv(buf, "people", "people_load_tmp", conn, schema="public", headers=True)
copyinsert_csv expects a binary CSV stream (io.BytesIO or file opened with "rb"). Use engine.begin() so temp DDL, copy, insert, and drop stay in one transaction.
Package layout
| Import | What you get |
|---|---|
from pgcopyinsert import copy_from_csv, copyinsert_csv, __version__ |
Top-level CSV + copyinsert entrypoints |
pgcopyinsert.temp |
create_temp_table_from_table, create_table_stmt |
pgcopyinsert.insert |
insert_from_table_stmt, insert_from_table_stmt_ocdn, insert_from_table_stmt_ocdu |
pgcopyinsert.pd |
Pandas copyinsert_dataframe |
pgcopyinsert.pl |
Polars copyinsert_polars (copyinsert_dataframe is deprecated) |
pgcopyinsert.asynchronous.copyinsert |
async copyinsert_csv |
pgcopyinsert.asynchronous.pd / pl |
Async Pandas / Polars helpers |
Main APIs
copyinsert_csv (sync)
| Parameter | Description |
|---|---|
csv_file |
BytesIO (or compatible binary buffer), positioned at start of CSV |
table_name |
Existing target table name (reflected) |
temp_name |
Name for the TEMPORARY load table |
connection |
SQLAlchemy Connection |
sep, null, columns, headers, schema |
Forwarded to fullmetalcopy copy_from_csv for the temp table |
insert_function |
Callable (temp_table, target_table, constraint) → executable insert (default: DO NOTHING) |
constraint |
PostgreSQL constraint name for ON CONFLICT, when required by your insert builder |
copy_from_csv (sync)
Thin re-export from fullmetalcopy: load CSV bytes directly into a normal table (no temp-table orchestration). See fullmetalcopy docs for parameters.
DataFrame helpers
Pandas (pgcopyinsert.pd.copyinsert_dataframe) and Polars (pgcopyinsert.pl.copyinsert_polars) accept the same idea: optional constraint, null, schema, sep, and insert_function, and delegate to copyinsert_csv / async equivalents.
ON CONFLICT and constraint
PostgreSQL’s ON CONFLICT often targets a named constraint (for example mytable_pkey). The constraint= string must match pg_constraint.conname (use \d tablename in psql or query the catalog). It is not always the same as a column name.
Examples
CSV sync (SQLAlchemy 2)
import io
import sqlalchemy as sa
from pgcopyinsert import copy_from_csv, copyinsert_csv
from pgcopyinsert.insert import (
insert_from_table_stmt_ocdn,
insert_from_table_stmt_ocdu,
)
from pgcopyinsert.temp import create_temp_table_from_table
engine = sa.create_engine("postgresql+psycopg2://scott:tiger@hostname/dbname")
# COPY into an existing table (fullmetalcopy)
with engine.connect() as conn:
with open("data.csv", "rb") as f:
buf = io.BytesIO(f.read())
buf.seek(0)
copy_from_csv(conn, buf, "staging_table", schema="public", headers=True)
conn.commit()
# Reflect + build a TEMP table with the same columns (no constraints)
meta = sa.MetaData()
meta.reflect(engine, schema="public")
table = sa.Table("target_table", meta, schema="public")
other_meta = sa.MetaData()
temp_table = create_temp_table_from_table(table, "target_load_tmp", other_meta)
# copyinsert: temp DDL → COPY → INSERT → DROP temp
with engine.connect() as conn:
with open("data.csv", "rb") as f:
buf = io.BytesIO(f.read())
buf.seek(0)
copyinsert_csv(
buf,
"target_table",
"target_load_tmp",
conn,
schema="public",
insert_function=insert_from_table_stmt_ocdn,
constraint="target_table_pkey",
)
conn.commit()
# Upsert (DO UPDATE)
with engine.connect() as conn:
with open("data.csv", "rb") as f:
buf = io.BytesIO(f.read())
buf.seek(0)
copyinsert_csv(
buf,
"target_table",
"target_load_tmp",
conn,
schema="public",
insert_function=insert_from_table_stmt_ocdu,
constraint="target_table_pkey",
)
conn.commit()
Single transaction with engine.begin()
import io
import sqlalchemy as sa
from pgcopyinsert import copyinsert_csv
engine = sa.create_engine("postgresql+psycopg2://user:pass@host/db")
with engine.begin() as conn:
buf = io.BytesIO(b"id\n42\n")
copyinsert_csv(buf, "my_table", "my_table_tmp", conn, headers=True)
Pandas
import pandas as pd
import sqlalchemy as sa
from pgcopyinsert.insert import insert_from_table_stmt_ocdu
import pgcopyinsert.pd as pci_pd
engine = sa.create_engine("postgresql+psycopg2://user:pass@host/db")
df = pd.DataFrame({"x": range(1000), "y": range(1000)})
with engine.connect() as conn:
pci_pd.copyinsert_dataframe(
df,
"xy_table",
"xy_table_tmp",
conn,
insert_function=insert_from_table_stmt_ocdu,
constraint="xy_table_pkey",
null="",
)
conn.commit()
Polars (sync)
Prefer copyinsert_polars. copyinsert_dataframe in this module is deprecated and will emit DeprecationWarning.
import polars as pl
import sqlalchemy as sa
from pgcopyinsert.insert import insert_from_table_stmt_ocdu
import pgcopyinsert.pl as pci_pl
engine = sa.create_engine("postgresql+psycopg2://user:pass@host/db")
df = pl.DataFrame({"x": range(1000), "y": range(1000)})
with engine.connect() as conn:
pci_pl.copyinsert_polars(
df,
"xy_table",
"xy_table_tmp",
conn,
insert_function=insert_from_table_stmt_ocdu,
constraint="xy_table_pkey",
)
conn.commit()
Async
import io
from sqlalchemy.ext.asyncio import create_async_engine
from pgcopyinsert.asynchronous.copyinsert import copyinsert_csv
async def load_once(dsn: str) -> None:
engine = create_async_engine(dsn)
async with engine.connect() as conn:
buf = io.BytesIO(b"id,name\n1,Ada\n")
await copyinsert_csv(buf, "people", "people_tmp", conn, schema="public", headers=True)
await conn.commit()
await engine.dispose()
# asyncio.run(load_once("postgresql+asyncpg://user:pass@host/db"))
For async DataFrames, use pgcopyinsert.asynchronous.pd and pgcopyinsert.asynchronous.pl.
Development
git clone https://github.com/eddiethedean/pgcopyinsert.git
cd pgcopyinsert
python -m venv .venv && source .venv/bin/activate
pip install -e ".[dev]"
ruff check src tests && ruff format --check src tests
mypy src/pgcopyinsert
pytest
Releasing
-
Confirm CI is green on
mainandpython -m buildsucceeds locally. -
Ensure
pyproject.tomlversionandCHANGELOG.mdmatch the tag you are about to cut. -
Create a signed or annotated tag (example for 0.3.0):
git tag -a v0.3.0 -m "Release 0.3.0" git push origin v0.3.0
-
Build artifacts and upload to PyPI (using Twine):
python -m build twine check dist/* twine upload dist/*
See CHANGELOG.md for release notes.
Links
- Repository: github.com/eddiethedean/pgcopyinsert
- Changelog: CHANGELOG.md
- PyPI: pypi.org/project/pgcopyinsert
License: MIT (see LICENSE).
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgcopyinsert-0.3.0.tar.gz.
File metadata
- Download URL: pgcopyinsert-0.3.0.tar.gz
- Upload date:
- Size: 16.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb43f5a5038cbfc0b2a236a5d902a516fb503f447e30066153ca01b03f5dadb9
|
|
| MD5 |
6f4aee9a047dc421a21efa40ffe41e27
|
|
| BLAKE2b-256 |
020e84ad34d20fee83266822ca28bf61c0c3d61cd43d6f1983d5efd26e2a4ea0
|
File details
Details for the file pgcopyinsert-0.3.0-py3-none-any.whl.
File metadata
- Download URL: pgcopyinsert-0.3.0-py3-none-any.whl
- Upload date:
- Size: 11.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f81043da540f0864666937340e905b500ec4b9aad983803fcc6cde6e2d630759
|
|
| MD5 |
b9504697498f1bc0b1435980c9b85212
|
|
| BLAKE2b-256 |
a11da75030fdf90aaf401a9a0cfbd99e4b2fa874f551596d9590133030faca78
|