Skip to main content

High-level Python API for PostgreSQL/PostGIS/TimescaleDB

Project description

pycopg

High-level Python API for PostgreSQL/PostGIS/TimescaleDB built on psycopg 3.

Simple, powerful, pythonic database operations with sync and async support.

Python 3.11+ License: MIT

Installation

# Basic installation
pip install pycopg

# With .env file support
pip install pycopg[dotenv]

# With PostGIS support
pip install pycopg[geo]

# Full installation (all optional dependencies)
pip install pycopg[all]

Quick Start

from pycopg import Database, Config

# Connect from environment variables
db = Database.from_env()

# Or with explicit config
db = Database(Config(
    host="localhost",
    port=5432,
    database="mydb",
    user="postgres",
    password="secret"
))

# Or from URL
db = Database.from_url("postgresql://user:pass@localhost:5432/mydb")

# Create a new database and connect to it
db = Database.create("myapp", user="admin", password="secret")

# Or create using credentials from .env
db = Database.create_from_env("myapp")

Core Features

Database Exploration

db.list_schemas()           # ['public', 'app', ...]
db.list_tables("public")    # ['users', 'orders', ...]
db.table_info("users")      # Column details
db.list_columns("users")    # ['id', 'name', 'email']
db.columns_with_types("users") # [('id', 'integer'), ('name', 'text')]
db.size()                   # '256 MB'
db.table_sizes("public")    # Size of each table

Query Execution

# Select with parameters
users = db.execute("SELECT * FROM users WHERE active = %s", [True])

# Insert/Update
db.execute("INSERT INTO users (name, email) VALUES (%s, %s)", ["Alice", "alice@example.com"])

# Batch insert (optimized with executemany)
db.execute_many(
    "INSERT INTO users (name) VALUES (%s)",
    [("Alice",), ("Bob",), ("Charlie",)]
)

# High-performance batch insert (single INSERT with multiple VALUES)
db.insert_batch("users", [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"},
    {"name": "Charlie", "email": "charlie@example.com"},
])

# Upsert with conflict handling
db.insert_batch("users", rows, on_conflict="(email) DO UPDATE SET name = EXCLUDED.name")

# Ultra-fast bulk insert using COPY protocol (10-100x faster for large datasets)
db.copy_insert("users", rows)

Session Mode (Connection Reuse)

For multiple sequential operations, use session mode to reuse a single connection:

# Without session: each operation opens/closes a connection
db.execute("SELECT 1")  # Open, execute, close
db.execute("SELECT 2")  # Open, execute, close

# With session: single connection for all operations (much faster)
with db.session() as session:
    session.execute("SELECT 1")
    session.execute("SELECT 2")
    session.insert_batch("users", rows)
    # Connection closed automatically at end

# Useful for batch operations
with db.session() as session:
    for table in tables:
        session.truncate_table(table)
        session.insert_batch(table, data[table])

DataFrame Operations

import pandas as pd

# Create table from DataFrame
df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
db.from_dataframe(df, "users", primary_key="id")

# Read table to DataFrame
users_df = db.to_dataframe("users")
users_df = db.to_dataframe(sql="SELECT * FROM users WHERE age > :min_age", params={"min_age": 25})

Roles & Permissions

# Create users
db.create_role("appuser", password="secret123", login=True)
db.create_role("admin", password="secret", superuser=True)

# Create group roles
db.create_role("readonly", login=False)
db.create_role("analyst", password="secret", in_roles=["readonly"])

# Grant privileges
db.grant("SELECT", "users", "readonly")
db.grant("ALL", "orders", "appuser")
db.grant("SELECT", "ALL TABLES", "readonly", schema="public")
db.grant("USAGE", "myschema", "appuser", object_type="SCHEMA")

# Revoke privileges
db.revoke("INSERT", "users", "readonly")

# Role management
db.grant_role("readonly", "analyst")
db.alter_role("appuser", password="newpassword")
db.list_roles()
db.list_role_grants("appuser")

Backup & Restore

# Full backup (custom format - compressed)
db.pg_dump("backup.dump")

# SQL format
db.pg_dump("backup.sql", format="plain")

# Schema only
db.pg_dump("schema.sql", format="plain", schema_only=True)

# Specific tables
db.pg_dump("users.dump", tables=["users", "profiles"])

# Parallel backup (directory format)
db.pg_dump("backup_dir", format="directory", jobs=4)

# Restore
db.pg_restore("backup.dump")
db.pg_restore("backup.dump", clean=True)  # Drop and recreate
db.pg_restore("backup_dir", jobs=4)       # Parallel restore

# CSV export/import
db.copy_to_csv("users", "users.csv")
db.copy_from_csv("users", "users.csv")

Async Support

from pycopg import AsyncDatabase

db = AsyncDatabase.from_env()

# Basic queries
users = await db.execute("SELECT * FROM users")
user = await db.fetch_one("SELECT * FROM users WHERE id = %s", [1])
count = await db.fetch_val("SELECT COUNT(*) FROM users")

# Transactions
async with db.transaction() as conn:
    await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
    await conn.execute("UPDATE stats SET count = count + 1")
    # Auto-commits on success, rolls back on exception

# Streaming large results
async for row in db.stream("SELECT * FROM large_table", batch_size=1000):
    process(row)

# Batch operations
await db.insert_many("users", [
    {"name": "Alice", "email": "alice@example.com"},
    {"name": "Bob", "email": "bob@example.com"},
])

await db.upsert_many("users", rows, conflict_columns=["email"], update_columns=["name"])

# Pub/Sub with LISTEN/NOTIFY
await db.notify("events", '{"type": "user_created", "id": 1}')

async for payload in db.listen("events"):
    event = json.loads(payload)
    handle_event(event)

Async DataFrame Operations (NEW in 0.3.0)

import pandas as pd
import geopandas as gpd

# Read table to DataFrame
df = await db.to_dataframe("users")
df = await db.to_dataframe(sql="SELECT * FROM users WHERE age > :min", params={"min": 18})

# Insert from DataFrame
df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
await db.from_dataframe(df, "users_backup", primary_key="id")

# Spatial data
gdf = await db.to_geodataframe("parcels")
await db.from_geodataframe(gdf, "parcels_copy", spatial_index=True)

Async Admin Operations (NEW in 0.3.0)

# Maintenance
await db.vacuum("users", analyze=True)
await db.analyze("orders")

# Query analysis
plan = await db.explain("SELECT * FROM users WHERE email = %s", ["test@example.com"])

# Indexes
await db.create_index("users", "email", unique=True)
await db.drop_index("idx_users_email")

# Tables
await db.create_table("logs", {"id": "SERIAL PRIMARY KEY", "message": "TEXT"})
await db.drop_table("temp_data")

Async Backup Operations (NEW in 0.3.0)

# Full backup
await db.pg_dump("backup.dump")
await db.pg_dump("backup.sql", format="plain")

# Restore
await db.pg_restore("backup.dump")

# CSV export/import
await db.copy_to_csv("users", "users.csv")
await db.copy_from_csv("users", "users.csv")

Async Role Management (NEW in 0.3.0)

# Create roles
await db.create_role("analyst", password="secret", login=True)
await db.create_role("readonly", login=False)

# Grant/revoke privileges
await db.grant("SELECT", "users", "readonly")
await db.grant("ALL", "orders", "analyst")
await db.revoke("INSERT", "users", "readonly")

# Role membership
await db.grant_role("readonly", "analyst")

Async PostGIS & TimescaleDB (NEW in 0.3.0)

# PostGIS: Spatial indexes
await db.create_spatial_index("parcels", "geometry")

# TimescaleDB: Hypertables
await db.create_hypertable("events", "timestamp", chunk_time_interval="1 week")

# TimescaleDB: Compression
await db.enable_compression("events", segment_by="device_id", order_by="timestamp DESC")
await db.add_compression_policy("events", compress_after="30 days")

# TimescaleDB: Retention
await db.add_retention_policy("logs", drop_after="90 days")

Connection Pooling

For high-performance applications with many concurrent requests:

from pycopg import PooledDatabase, AsyncPooledDatabase

# Sync pool
db = PooledDatabase.from_env(
    min_size=5,      # Minimum connections
    max_size=20,     # Maximum connections
    max_idle=300,    # Close idle connections after 5 minutes
    timeout=30,      # Wait timeout for connection
)

# Use connections from pool
with db.connection() as conn:
    result = conn.execute("SELECT * FROM users")

# Or use simplified API (auto-manages connection)
users = db.execute("SELECT * FROM users WHERE active = %s", [True])

# Monitor pool stats
print(db.stats)
# {'pool_min': 5, 'pool_max': 20, 'pool_size': 8, 'pool_available': 5, ...}

# Resize pool dynamically
db.resize(min_size=10, max_size=50)

# Clean up
db.close()

# Async pool
async with AsyncPooledDatabase.from_env(min_size=5, max_size=20) as db:
    users = await db.execute("SELECT * FROM users")

    async with db.transaction() as conn:
        await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])

Resilience

pycopg includes automatic resilience features to handle transient failures and prevent runaway queries.

Automatic Retry with Backoff

Connection failures are automatically retried with exponential backoff:

from pycopg import Database, AsyncDatabase

# Retry automatically enabled on connect()
db = Database.from_env()
# On connection failure: retries 3 times with exponential backoff (1-10s)

# Same for async
db = AsyncDatabase.from_env()
# Automatically retries on OperationalError (connection failures only)

Details:

  • 3 retry attempts (initial + 2 retries)
  • Exponential backoff: 1s, 2.7s, 7.4s
  • Only retries OperationalError (connection failures, not SQL errors)
  • Applies to connect() method only (pools have built-in reconnection)

Statement Timeout

Prevent runaway queries from consuming resources:

from pycopg import Database, Config

# Configure timeout for all queries
config = Config.from_env()
config.statement_timeout = 30000  # 30 seconds (milliseconds)
db = Database(config)

# Queries exceeding 30s will be cancelled automatically

# Or with URL
db = Database.from_url(
    "postgresql://user:pass@localhost:5432/mydb",
    statement_timeout=30000
)

Recommended values:

  • Web API endpoints: 5000-10000ms (5-10s)
  • Background jobs: 60000-300000ms (1-5 minutes)
  • Data warehousing: 600000+ (10+ minutes)

Configurable Batch Size

Optimize memory usage and performance for bulk inserts:

# Default batch size is 1000
db.insert_batch("users", large_dataset)

# For memory-constrained environments or very large rows
db.insert_batch("users", large_dataset, batch_size=500)

# For small rows and high performance
db.insert_batch("users", large_dataset, batch_size=5000)

When to adjust:

  • Large rows (many columns, JSONB, TEXT): decrease to 100-500
  • Small rows (few columns, simple types): increase to 2000-5000
  • Memory errors: decrease batch size
  • Performance tuning: benchmark different values

Migrations

Simple SQL-based migrations using numbered files:

migrations/
├── 001_create_users.sql
├── 002_add_email_index.sql
└── 003_create_orders.sql
from pycopg import Database, Migrator

db = Database.from_env()
migrator = Migrator(db, "migrations/")

# Check status
status = migrator.status()
print(f"Applied: {status['applied_count']}, Pending: {status['pending_count']}")

# Run all pending migrations
applied = migrator.migrate()
for m in applied:
    print(f"Applied: {m}")

# Run up to specific version
migrator.migrate(target=5)

# Rollback last migration
migrator.rollback()

# Rollback last 3 migrations
migrator.rollback(steps=3)

# Create new migration
path = migrator.create("add_orders_table")
# Creates: migrations/004_add_orders_table.sql

Migration file format (with optional rollback):

-- Migration: create_users
-- Created: 2024-01-15

-- UP
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    email TEXT UNIQUE,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE INDEX idx_users_email ON users(email);

-- DOWN
DROP TABLE users;

Migrations are tracked in schema_migrations table with version, name, and applied timestamp.

PostGIS Support

import geopandas as gpd

# Ensure PostGIS is installed
db.create_extension("postgis")

# Create spatial table
gdf = gpd.read_file("parcels.geojson")
db.from_geodataframe(gdf, "parcels", spatial_index=True)

# Read spatial data
parcels = db.to_geodataframe("parcels")

# Spatial queries
db.execute("""
    SELECT * FROM parcels
    WHERE ST_Within(geometry, ST_MakeEnvelope(-122.5, 37.7, -122.3, 37.9, 4326))
""")

TimescaleDB Support

# Ensure TimescaleDB is installed
db.create_extension("timescaledb")

# Create hypertable
db.create_hypertable("events", "timestamp", chunk_time_interval="1 week")

# Enable compression
db.enable_compression("events", segment_by="device_id", order_by="timestamp DESC")
db.add_compression_policy("events", compress_after="30 days")

# Data retention
db.add_retention_policy("logs", drop_after="90 days")

# Query hypertables
db.list_hypertables()

Schema & Table Management

# Schemas
db.create_schema("app")
db.drop_schema("old_schema", cascade=True)

# Tables
db.drop_table("users")
db.truncate_table("logs")

# Indexes
db.create_index("users", "email", unique=True)
db.create_index("products", ["category", "price"])
db.create_index("documents", "content", method="gin")

# Constraints
db.add_primary_key("users", "id")
db.add_foreign_key("orders", "user_id", "users", "id", on_delete="CASCADE")
db.add_unique_constraint("users", "email")

Database Administration

# Create/drop databases
db.create_database("myapp", owner="appuser")
db.drop_database("olddb")
db.database_exists("myapp")
db.list_databases()

# Maintenance
db.vacuum("users", analyze=True)
db.analyze("users")

# Query analysis
plan = db.explain("SELECT * FROM users WHERE email = %s", ["test@example.com"])
print("\n".join(plan))

Environment Variables

pycopg reads configuration from environment variables:

Variable Description Default
DATABASE_URL Full connection URL -
DB_HOST / PGHOST Database host localhost
DB_PORT / PGPORT Database port 5432
DB_NAME / PGDATABASE Database name postgres
DB_USER / PGUSER Database user postgres
DB_PASSWORD / PGPASSWORD Database password -

License

MIT License - Copyright (c) 2026 Loc Cosnier loc.cosnier@pm.me

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pycopg-0.3.0.tar.gz (343.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pycopg-0.3.0-py3-none-any.whl (53.9 kB view details)

Uploaded Python 3

File details

Details for the file pycopg-0.3.0.tar.gz.

File metadata

  • Download URL: pycopg-0.3.0.tar.gz
  • Upload date:
  • Size: 343.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pycopg-0.3.0.tar.gz
Algorithm Hash digest
SHA256 d99617972fc682b96e6de09731896292b536a6dd4f2379806f088ce5bd7db970
MD5 dddbed2da2b1ced51238e3179603bb9e
BLAKE2b-256 3c9f1f489aee171ac997e9418c2d85f517d01e4dcc0f0176b65b7eb535013043

See more details on using hashes here.

Provenance

The following attestation bundles were made for pycopg-0.3.0.tar.gz:

Publisher: publish.yml on alkimya/pycopg

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file pycopg-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: pycopg-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 53.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for pycopg-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 12d6b6218e1e43ff941db05ead6a83016b49d296590b2ffc4046b21d7b8d6d43
MD5 7b235d4dcff31c739c551f70f76e7570
BLAKE2b-256 8e4d6a6e3ad8f6cc96c2bedd5ae2c2440d0b0f5ec25a886558454c1826467875

See more details on using hashes here.

Provenance

The following attestation bundles were made for pycopg-0.3.0-py3-none-any.whl:

Publisher: publish.yml on alkimya/pycopg

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page