High-level Python API for PostgreSQL/PostGIS/TimescaleDB
Project description
pycopg
High-level Python API for PostgreSQL/PostGIS/TimescaleDB built on psycopg 3.
Simple, powerful, pythonic database operations with sync and async support.
Installation
# Basic installation
pip install pycopg
# With .env file support
pip install pycopg[dotenv]
# With PostGIS support
pip install pycopg[geo]
# Full installation (all optional dependencies)
pip install pycopg[all]
Quick Start
from pycopg import Database, Config
# Connect from environment variables
db = Database.from_env()
# Or with explicit config
db = Database(Config(
host="localhost",
port=5432,
database="mydb",
user="postgres",
password="secret"
))
# Or from URL
db = Database.from_url("postgresql://user:pass@localhost:5432/mydb")
# Create a new database and connect to it
db = Database.create("myapp", user="admin", password="secret")
# Or create using credentials from .env
db = Database.create_from_env("myapp")
Core Features
Database Exploration
db.list_schemas() # ['public', 'app', ...]
db.list_tables("public") # ['users', 'orders', ...]
db.table_info("users") # Column details
db.size() # '256 MB'
db.table_sizes("public") # Size of each table
Query Execution
# Select with parameters
users = db.execute("SELECT * FROM users WHERE active = %s", [True])
# Insert/Update
db.execute("INSERT INTO users (name, email) VALUES (%s, %s)", ["Alice", "alice@example.com"])
# Batch insert (optimized with executemany)
db.execute_many(
"INSERT INTO users (name) VALUES (%s)",
[("Alice",), ("Bob",), ("Charlie",)]
)
# High-performance batch insert (single INSERT with multiple VALUES)
db.insert_batch("users", [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
{"name": "Charlie", "email": "charlie@example.com"},
])
# Upsert with conflict handling
db.insert_batch("users", rows, on_conflict="(email) DO UPDATE SET name = EXCLUDED.name")
# Ultra-fast bulk insert using COPY protocol (10-100x faster for large datasets)
db.copy_insert("users", rows)
Session Mode (Connection Reuse)
For multiple sequential operations, use session mode to reuse a single connection:
# Without session: each operation opens/closes a connection
db.execute("SELECT 1") # Open, execute, close
db.execute("SELECT 2") # Open, execute, close
# With session: single connection for all operations (much faster)
with db.session() as session:
session.execute("SELECT 1")
session.execute("SELECT 2")
session.insert_batch("users", rows)
# Connection closed automatically at end
# Useful for batch operations
with db.session() as session:
for table in tables:
session.truncate_table(table)
session.insert_batch(table, data[table])
DataFrame Operations
import pandas as pd
# Create table from DataFrame
df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
db.from_dataframe(df, "users", primary_key="id")
# Read table to DataFrame
users_df = db.to_dataframe("users")
users_df = db.to_dataframe(sql="SELECT * FROM users WHERE age > :min_age", params={"min_age": 25})
Roles & Permissions
# Create users
db.create_role("appuser", password="secret123", login=True)
db.create_role("admin", password="secret", superuser=True)
# Create group roles
db.create_role("readonly", login=False)
db.create_role("analyst", password="secret", in_roles=["readonly"])
# Grant privileges
db.grant("SELECT", "users", "readonly")
db.grant("ALL", "orders", "appuser")
db.grant("SELECT", "ALL TABLES", "readonly", schema="public")
db.grant("USAGE", "myschema", "appuser", object_type="SCHEMA")
# Revoke privileges
db.revoke("INSERT", "users", "readonly")
# Role management
db.grant_role("readonly", "analyst")
db.alter_role("appuser", password="newpassword")
db.list_roles()
db.list_role_grants("appuser")
Backup & Restore
# Full backup (custom format - compressed)
db.pg_dump("backup.dump")
# SQL format
db.pg_dump("backup.sql", format="plain")
# Schema only
db.pg_dump("schema.sql", format="plain", schema_only=True)
# Specific tables
db.pg_dump("users.dump", tables=["users", "profiles"])
# Parallel backup (directory format)
db.pg_dump("backup_dir", format="directory", jobs=4)
# Restore
db.pg_restore("backup.dump")
db.pg_restore("backup.dump", clean=True) # Drop and recreate
db.pg_restore("backup_dir", jobs=4) # Parallel restore
# CSV export/import
db.copy_to_csv("users", "users.csv")
db.copy_from_csv("users", "users.csv")
Async Support
from pycopg import AsyncDatabase
db = AsyncDatabase.from_env()
# Basic queries
users = await db.execute("SELECT * FROM users")
user = await db.fetch_one("SELECT * FROM users WHERE id = %s", [1])
count = await db.fetch_val("SELECT COUNT(*) FROM users")
# Transactions
async with db.transaction() as conn:
await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
await conn.execute("UPDATE stats SET count = count + 1")
# Auto-commits on success, rolls back on exception
# Streaming large results
async for row in db.stream("SELECT * FROM large_table", batch_size=1000):
process(row)
# Batch operations
await db.insert_many("users", [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
])
await db.upsert_many("users", rows, conflict_columns=["email"], update_columns=["name"])
# Pub/Sub with LISTEN/NOTIFY
await db.notify("events", '{"type": "user_created", "id": 1}')
async for payload in db.listen("events"):
event = json.loads(payload)
handle_event(event)
Connection Pooling
For high-performance applications with many concurrent requests:
from pycopg import PooledDatabase, AsyncPooledDatabase
# Sync pool
db = PooledDatabase.from_env(
min_size=5, # Minimum connections
max_size=20, # Maximum connections
max_idle=300, # Close idle connections after 5 minutes
timeout=30, # Wait timeout for connection
)
# Use connections from pool
with db.connection() as conn:
result = conn.execute("SELECT * FROM users")
# Or use simplified API (auto-manages connection)
users = db.execute("SELECT * FROM users WHERE active = %s", [True])
# Monitor pool stats
print(db.stats)
# {'pool_min': 5, 'pool_max': 20, 'pool_size': 8, 'pool_available': 5, ...}
# Resize pool dynamically
db.resize(min_size=10, max_size=50)
# Clean up
db.close()
# Async pool
async with AsyncPooledDatabase.from_env(min_size=5, max_size=20) as db:
users = await db.execute("SELECT * FROM users")
async with db.transaction() as conn:
await conn.execute("INSERT INTO users (name) VALUES (%s)", ["Alice"])
Migrations
Simple SQL-based migrations using numbered files:
migrations/
├── 001_create_users.sql
├── 002_add_email_index.sql
└── 003_create_orders.sql
from pycopg import Database, Migrator
db = Database.from_env()
migrator = Migrator(db, "migrations/")
# Check status
status = migrator.status()
print(f"Applied: {status['applied_count']}, Pending: {status['pending_count']}")
# Run all pending migrations
applied = migrator.migrate()
for m in applied:
print(f"Applied: {m}")
# Run up to specific version
migrator.migrate(target=5)
# Rollback last migration
migrator.rollback()
# Rollback last 3 migrations
migrator.rollback(steps=3)
# Create new migration
path = migrator.create("add_orders_table")
# Creates: migrations/004_add_orders_table.sql
Migration file format (with optional rollback):
-- Migration: create_users
-- Created: 2024-01-15
-- UP
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_users_email ON users(email);
-- DOWN
DROP TABLE users;
Migrations are tracked in schema_migrations table with version, name, and applied timestamp.
PostGIS Support
import geopandas as gpd
# Ensure PostGIS is installed
db.create_extension("postgis")
# Create spatial table
gdf = gpd.read_file("parcels.geojson")
db.from_geodataframe(gdf, "parcels", spatial_index=True)
# Read spatial data
parcels = db.to_geodataframe("parcels")
# Spatial queries
db.execute("""
SELECT * FROM parcels
WHERE ST_Within(geometry, ST_MakeEnvelope(-122.5, 37.7, -122.3, 37.9, 4326))
""")
TimescaleDB Support
# Ensure TimescaleDB is installed
db.create_extension("timescaledb")
# Create hypertable
db.create_hypertable("events", "timestamp", chunk_time_interval="1 week")
# Enable compression
db.enable_compression("events", segment_by="device_id", order_by="timestamp DESC")
db.add_compression_policy("events", compress_after="30 days")
# Data retention
db.add_retention_policy("logs", drop_after="90 days")
# Query hypertables
db.list_hypertables()
Schema & Table Management
# Schemas
db.create_schema("app")
db.drop_schema("old_schema", cascade=True)
# Tables
db.drop_table("users")
db.truncate_table("logs")
# Indexes
db.create_index("users", "email", unique=True)
db.create_index("products", ["category", "price"])
db.create_index("documents", "content", method="gin")
# Constraints
db.add_primary_key("users", "id")
db.add_foreign_key("orders", "user_id", "users", "id", on_delete="CASCADE")
db.add_unique_constraint("users", "email")
Database Administration
# Create/drop databases
db.create_database("myapp", owner="appuser")
db.drop_database("olddb")
db.database_exists("myapp")
db.list_databases()
# Maintenance
db.vacuum("users", analyze=True)
db.analyze("users")
# Query analysis
plan = db.explain("SELECT * FROM users WHERE email = %s", ["test@example.com"])
print("\n".join(plan))
Environment Variables
pycopg reads configuration from environment variables:
| Variable | Description | Default |
|---|---|---|
DATABASE_URL |
Full connection URL | - |
DB_HOST / PGHOST |
Database host | localhost |
DB_PORT / PGPORT |
Database port | 5432 |
DB_NAME / PGDATABASE |
Database name | postgres |
DB_USER / PGUSER |
Database user | postgres |
DB_PASSWORD / PGPASSWORD |
Database password | - |
License
MIT License - Copyright (c) 2026 Loc Cosnier loc.cosnier@pm.me
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pycopg-0.1.0.tar.gz.
File metadata
- Download URL: pycopg-0.1.0.tar.gz
- Upload date:
- Size: 105.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9e291a706e31f5cde3768882fc7d760d57092e0b2f6c5e705d4f290129bb1274
|
|
| MD5 |
592cf30c7e4271180f39371d41d0aed9
|
|
| BLAKE2b-256 |
438013c8023cf8d9d628aa013ee2de710d9caa5e2f9990c7fc84499ebe6c3b01
|
Provenance
The following attestation bundles were made for pycopg-0.1.0.tar.gz:
Publisher:
publish.yml on alkimya/pycopg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pycopg-0.1.0.tar.gz -
Subject digest:
9e291a706e31f5cde3768882fc7d760d57092e0b2f6c5e705d4f290129bb1274 - Sigstore transparency entry: 788783937
- Sigstore integration time:
-
Permalink:
alkimya/pycopg@610ed553e0cced40206b1b86b9eda532059d1551 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/alkimya
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@610ed553e0cced40206b1b86b9eda532059d1551 -
Trigger Event:
workflow_dispatch
-
Statement type:
File details
Details for the file pycopg-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pycopg-0.1.0-py3-none-any.whl
- Upload date:
- Size: 41.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2fa3df6cdd50a4ce02a2a09baad47e5fedf68069f3313b694fa488eb7c6e14d0
|
|
| MD5 |
2eac60cce257cd504ade3c1a39f90461
|
|
| BLAKE2b-256 |
6594f9a71fa7f209b8e50bf918aada29c2e644d70c59a16b70a5a774d6cac975
|
Provenance
The following attestation bundles were made for pycopg-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on alkimya/pycopg
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
pycopg-0.1.0-py3-none-any.whl -
Subject digest:
2fa3df6cdd50a4ce02a2a09baad47e5fedf68069f3313b694fa488eb7c6e14d0 - Sigstore transparency entry: 788783939
- Sigstore integration time:
-
Permalink:
alkimya/pycopg@610ed553e0cced40206b1b86b9eda532059d1551 -
Branch / Tag:
refs/heads/main - Owner: https://github.com/alkimya
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@610ed553e0cced40206b1b86b9eda532059d1551 -
Trigger Event:
workflow_dispatch
-
Statement type: