High-level Python API for PostgreSQL/PostGIS/TimescaleDB
Project description
pycopg
High-level Python API for PostgreSQL/PostGIS/TimescaleDB built on psycopg 3.
Simple, powerful, pythonic database operations with sync and async support.
Installation
# Basic installation
pip install pycopg
# With .env file support
pip install pycopg[dotenv]
# With PostGIS support
pip install pycopg[geo]
# Full installation (all optional dependencies)
pip install pycopg[all]
Quick Start
from pycopg import Database, Config
# Connect from environment variables
db = Database.from_env()
# Or with explicit config
db = Database(Config(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="secret"
))
# Or from URL
db = Database.from_url("postgresql://user:pass@localhost:5432/mydb")
# Create a new database and connect to it
db = Database.create("myapp", user="admin", password="secret")
# Or create using credentials from .env
db = Database.create_from_env("myapp")
Core Features
Database Exploration
db.list_schemas() # ['public', 'app', ...]
db.list_tables("public") # ['users', 'orders', ...]
db.table_info("users") # Column details
db.list_columns("users") # ['id', 'name', 'email']
db.columns_with_types("users") # [('id', 'integer'), ('name', 'text')]
db.size() # '256 MB'
db.table_sizes("public") # Size of each table
Query Execution
# Select with parameters
users = db.execute("SELECT * FROM users WHERE active = %s", [True])
# Insert/Update
db.execute("INSERT INTO users (name, email) VALUES (%s, %s)", ["Alice", "alice@example.com"])
# Batch insert (optimized with executemany)
db.execute_many(
"INSERT INTO users (name) VALUES (%s)",
[("Alice",), ("Bob",), ("Charlie",)]
)
# High-performance batch insert (single INSERT with multiple VALUES)
db.insert_batch("users", [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
{"name": "Charlie", "email": "charlie@example.com"},
])
# Upsert with conflict handling
db.insert_batch("users", rows, on_conflict="(email) DO UPDATE SET name = EXCLUDED.name")
# Ultra-fast bulk insert using COPY protocol (10-100x faster for large datasets)
db.copy_insert("users", rows)
Session Mode (Connection Reuse)
For multiple sequential operations, use session mode to reuse a single connection:
# Without session: each operation opens/closes a connection
db.execute("SELECT 1") # Open, execute, close
db.execute("SELECT 2") # Open, execute, close
# With session: single connection for all operations (much faster)
with db.session() as session:
session.execute("SELECT 1")
session.execute("SELECT 2")
session.insert_batch("users", rows)
# Connection closed automatically at end
# Useful for batch operations
with db.session() as session:
for table in tables:
session.truncate_table(table)
session.insert_batch(table, data[table])
DataFrame Operations
import pandas as pd
# Create table from DataFrame
df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
db.from_dataframe(df, "users", primary_key="id")
# Read table to DataFrame
users_df = db.to_dataframe("users")
users_df = db.to_dataframe(sql="SELECT * FROM users WHERE age > :min_age", params={"min_age": 25})
Roles & Permissions
# Create users
db.create_role("appuser", password="secret123", login=True)
db.create_role("admin", password="secret", superuser=True)
# Create group roles
db.create_role("readonly", login=False)
db.create_role("analyst", password="secret", in_roles=["readonly"])
# Grant privileges
db.grant("SELECT", "users", "readonly")
db.grant("ALL", "orders", "appuser")
db.grant("SELECT", "ALL TABLES", "readonly", schema="public")
db.grant("USAGE", "myschema", "appuser", object_type="SCHEMA")
# Revoke privileges
db.revoke("INSERT", "users", "readonly")
# Role management
db.grant_role("readonly", "analyst")
db.alter_role("appuser", password="newpassword")
db.list_roles()
db.list_role_grants("appuser")
Backup & Restore
# Full backup (custom format - compressed)
db.pg_dump("backup.dump")
# SQL format
db.pg_dump("backup.sql", format="plain")
# Schema only
db.pg_dump("schema.sql", format="plain", schema_only=True)
# Specific tables
db.pg_dump("users.dump", tables=["users", "profiles"])
# Parallel backup (directory format)
db.pg_dump("backup_dir", format="directory", jobs=4)
# Restore
db.pg_restore("backup.dump")
db.pg_restore("backup.dump", clean=True) # Drop and recreate
db.pg_restore("backup_dir", jobs=4) # Parallel restore
# CSV export/import
db.copy_to_csv("users", "users.csv")
db.copy_from_csv("users", "users.csv")
Async Support
from pycopg import AsyncDatabase
db = AsyncDatabase.from_env()
# Basic queries
users = await db.execute("SELECT * FROM users")
user = await db.fetch_one("SELECT * FROM users WHERE id = %s", [1])
count = await db.fetch_val("SELECT COUNT(*) FROM users")
# Transactions
async with db.transaction() as conn:
await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
await conn.execute("UPDATE stats SET count = count + 1")
# Auto-commits on success, rolls back on exception
# Streaming large results
async for row in db.stream("SELECT * FROM large_table", batch_size=1000):
process(row)
# Batch operations
await db.insert_many("users", [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
])
await db.upsert_many("users", rows, conflict_columns=["email"], update_columns=["name"])
# Pub/Sub with LISTEN/NOTIFY
await db.notify("events", '{"type": "user_created", "id": 1}')
async for payload in db.listen("events"):
event = json.loads(payload)
handle_event(event)
Async DataFrame Operations (NEW in 0.3.0)
import pandas as pd
import geopandas as gpd
# Read table to DataFrame
df = await db.to_dataframe("users")
df = await db.to_dataframe(sql="SELECT * FROM users WHERE age > :min", params={"min": 18})
# Insert from DataFrame
df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
await db.from_dataframe(df, "users_backup", primary_key="id")
# Spatial data
gdf = await db.to_geodataframe("parcels")
await db.from_geodataframe(gdf, "parcels_copy", spatial_index=True)
Async Admin Operations (NEW in 0.3.0)
# Maintenance
await db.vacuum("users", analyze=True)
await db.analyze("orders")
# Query analysis
plan = await db.explain("SELECT * FROM users WHERE email = %s", ["test@example.com"])
# Indexes
await db.create_index("users", "email", unique=True)
await db.drop_index("idx_users_email")
# Tables
await db.create_table("logs", {"id": "SERIAL PRIMARY KEY", "message": "TEXT"})
await db.drop_table("temp_data")
Async Backup Operations (NEW in 0.3.0)
# Full backup
await db.pg_dump("backup.dump")
await db.pg_dump("backup.sql", format="plain")
# Restore
await db.pg_restore("backup.dump")
# CSV export/import
await db.copy_to_csv("users", "users.csv")
await db.copy_from_csv("users", "users.csv")
Async Role Management (NEW in 0.3.0)
# Create roles
await db.create_role("analyst", password="secret", login=True)
await db.create_role("readonly", login=False)
# Grant/revoke privileges
await db.grant("SELECT", "users", "readonly")
await db.grant("ALL", "orders", "analyst")
await db.revoke("INSERT", "users", "readonly")
# Role membership
await db.grant_role("readonly", "analyst")
Async PostGIS & TimescaleDB (NEW in 0.3.0)
# PostGIS: Spatial indexes
await db.create_spatial_index("parcels", "geometry")
# TimescaleDB: Hypertables
await db.create_hypertable("events", "timestamp", chunk_time_interval="1 week")
# TimescaleDB: Compression
await db.enable_compression("events", segment_by="device_id", order_by="timestamp DESC")
await db.add_compression_policy("events", compress_after="30 days")
# TimescaleDB: Retention
await db.add_retention_policy("logs", drop_after="90 days")
Connection Pooling
For high-performance applications with many concurrent requests:
from pycopg import PooledDatabase, AsyncPooledDatabase
# Sync pool
db = PooledDatabase.from_env(
min_size=5, # Minimum connections
max_size=20, # Maximum connections
max_idle=300, # Close idle connections after 5 minutes
timeout=30, # Wait timeout for connection
)
# Use connections from pool
with db.connection() as conn:
result = conn.execute("SELECT * FROM users")
# Or use simplified API (auto-manages connection)
users = db.execute("SELECT * FROM users WHERE active = %s", [True])
# Monitor pool stats
print(db.stats)
# {'pool_min': 5, 'pool_max': 20, 'pool_size': 8, 'pool_available': 5, ...}
# Resize pool dynamically
db.resize(min_size=10, max_size=50)
# Clean up
db.close()
# Async pool
async with AsyncPooledDatabase.from_env(min_size=5, max_size=20) as db:
users = await db.execute("SELECT * FROM users")
async with db.transaction() as conn:
await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
Resilience
pycopg includes automatic resilience features to handle transient failures and prevent runaway queries.
Automatic Retry with Backoff
Connection failures are automatically retried with exponential backoff:
from pycopg import Database, AsyncDatabase
# Retry automatically enabled on connect()
db = Database.from_env()
# On connection failure: retries 3 times with exponential backoff (1-10s)
# Same for async
db = AsyncDatabase.from_env()
# Automatically retries on OperationalError (connection failures only)
Details:
- 3 retry attempts (initial + 2 retries)
- Exponential backoff: 1s, 2.7s, 7.4s
- Only retries
OperationalError(connection failures, not SQL errors) - Applies to
connect()method only (pools have built-in reconnection)
Statement Timeout
Prevent runaway queries from consuming resources:
from pycopg import Database, Config
# Configure timeout for all queries
config = Config.from_env()
config.statement_timeout = 30000 # 30 seconds (milliseconds)
db = Database(config)
# Queries exceeding 30s will be cancelled automatically
# Or with URL
db = Database.from_url(
"postgresql://user:pass@localhost:5432/mydb",
statement_timeout=30000
)
Recommended values:
- Web API endpoints: 5000-10000ms (5-10s)
- Background jobs: 60000-300000ms (1-5 minutes)
- Data warehousing: 600000+ (10+ minutes)
Configurable Batch Size
Optimize memory usage and performance for bulk inserts:
# Default batch size is 1000
db.insert_batch("users", large_dataset)
# For memory-constrained environments or very large rows
db.insert_batch("users", large_dataset, batch_size=500)
# For small rows and high performance
db.insert_batch("users", large_dataset, batch_size=5000)
When to adjust:
- Large rows (many columns, JSONB, TEXT): decrease to 100-500
- Small rows (few columns, simple types): increase to 2000-5000
- Memory errors: decrease batch size
- Performance tuning: benchmark different values
Migrations
Simple SQL-based migrations using numbered files:
migrations/
├── 001_create_users.sql
├── 002_add_email_index.sql
└── 003_create_orders.sql
from pycopg import Database, Migrator
db = Database.from_env()
migrator = Migrator(db, "migrations/")
# Check status
status = migrator.status()
print(f"Applied: {status['applied_count']}, Pending: {status['pending_count']}")
# Run all pending migrations
applied = migrator.migrate()
for m in applied:
print(f"Applied: {m}")
# Run up to specific version
migrator.migrate(target=5)
# Rollback last migration
migrator.rollback()
# Rollback last 3 migrations
migrator.rollback(steps=3)
# Create new migration
path = migrator.create("add_orders_table")
# Creates: migrations/004_add_orders_table.sql
Migration file format (with optional rollback):
-- Migration: create_users
-- Created: 2024-01-15
-- UP
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users(email);
-- DOWN
DROP TABLE users;
Migrations are tracked in schema_migrations table with version, name, and applied timestamp.
PostGIS Support
import geopandas as gpd
# Ensure PostGIS is installed
db.create_extension("postgis")
# Create spatial table
gdf = gpd.read_file("parcels.geojson")
db.from_geodataframe(gdf, "parcels", spatial_index=True)
# Read spatial data
parcels = db.to_geodataframe("parcels")
# Spatial queries
db.execute("""
SELECT * FROM parcels
WHERE ST_Within(geometry, ST_MakeEnvelope(-122.5, 37.7, -122.3, 37.9, 4326))
""")
TimescaleDB Support
# Ensure TimescaleDB is installed
db.create_extension("timescaledb")
# Create hypertable
db.create_hypertable("events", "timestamp", chunk_time_interval="1 week")
# Enable compression
db.enable_compression("events", segment_by="device_id", order_by="timestamp DESC")
db.add_compression_policy("events", compress_after="30 days")
# Data retention
db.add_retention_policy("logs", drop_after="90 days")
# Query hypertables
db.list_hypertables()
Schema & Table Management
# Schemas
db.create_schema("app")
db.drop_schema("old_schema", cascade=True)
# Tables
db.drop_table("users")
db.truncate_table("logs")
# Indexes
db.create_index("users", "email", unique=True)
db.create_index("products", ["category", "price"])
db.create_index("documents", "content", method="gin")
# Constraints
db.add_primary_key("users", "id")
db.add_foreign_key("orders", "user_id", "users", "id", on_delete="CASCADE")
db.add_unique_constraint("users", "email")
Database Administration
# Create/drop databases
db.create_database("myapp", owner="appuser")
db.drop_database("olddb")
db.database_exists("myapp")
db.list_databases()
# Maintenance
db.vacuum("users", analyze=True)
db.analyze("users")
# Query analysis
plan = db.explain("SELECT * FROM users WHERE email = %s", ["test@example.com"])
print("\n".join(plan))
Environment Variables
pycopg reads configuration from environment variables:
| Variable | Description | Default |
|---|---|---|
DATABASE_URL |
Full connection URL | - |
DB_HOST / PGHOST |
Database host | localhost |
DB_PORT / PGPORT |
Database port | 5432 |
DB_NAME / PGDATABASE |
Database name | postgres |
DB_USER / PGUSER |
Database user | postgres |
DB_PASSWORD / PGPASSWORD |
Database password | - |
License
MIT License - Copyright (c) 2026 Loc Cosnier loc.cosnier@pm.me
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pycopg-0.3.0.tar.gz.
File metadata
- Download URL: pycopg-0.3.0.tar.gz
- Upload date:
- Size: 343.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d99617972fc682b96e6de09731896292b536a6dd4f2379806f088ce5bd7db970
|
|
| MD5 |
dddbed2da2b1ced51238e3179603bb9e
|
|
| BLAKE2b-256 |
3c9f1f489aee171ac997e9418c2d85f517d01e4dcc0f0176b65b7eb535013043
|
Provenance
The following attestation bundles were made for pycopg-0.3.0.tar.gz:
Publisher:
publish.yml on alkimya/pycopg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pycopg-0.3.0.tar.gz -
Subject digest:
d99617972fc682b96e6de09731896292b536a6dd4f2379806f088ce5bd7db970 - Sigstore transparency entry: 942290336
- Sigstore integration time:
-
Permalink:
alkimya/pycopg@d810726de51f44a32c66c00333b0a17224b03a08 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/alkimya
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d810726de51f44a32c66c00333b0a17224b03a08 -
Trigger Event:
release
-
Statement type:
File details
Details for the file pycopg-0.3.0-py3-none-any.whl.
File metadata
- Download URL: pycopg-0.3.0-py3-none-any.whl
- Upload date:
- Size: 53.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
12d6b6218e1e43ff941db05ead6a83016b49d296590b2ffc4046b21d7b8d6d43
|
|
| MD5 |
7b235d4dcff31c739c551f70f76e7570
|
|
| BLAKE2b-256 |
8e4d6a6e3ad8f6cc96c2bedd5ae2c2440d0b0f5ec25a886558454c1826467875
|
Provenance
The following attestation bundles were made for pycopg-0.3.0-py3-none-any.whl:
Publisher:
publish.yml on alkimya/pycopg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pycopg-0.3.0-py3-none-any.whl -
Subject digest:
12d6b6218e1e43ff941db05ead6a83016b49d296590b2ffc4046b21d7b8d6d43 - Sigstore transparency entry: 942290337
- Sigstore integration time:
-
Permalink:
alkimya/pycopg@d810726de51f44a32c66c00333b0a17224b03a08 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/alkimya
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@d810726de51f44a32c66c00333b0a17224b03a08 -
Trigger Event:
release
-
Statement type: