PostgreSQL upsert engine using temp tables and automatic conflict resolution
Project description
pgsql_upserter
A powerful, production-ready PostgreSQL upsert utility with automatic schema introspection and intelligent conflict resolution. Perfect for serverless ETL pipelines and data integration workflows.
🚀 Key Features
- Zero Configuration: Automatic schema detection and column matching
- Intelligent Conflict Resolution: Automatically detects primary keys and unique constraints
- Production Tested: Handles deduplication, data validation, and error recovery
- Flexible Input: Supports both direct data (API responses) and CSV files
📦 Installation
pip install pgsql-upserter
Development Setup (uv)
uv sync --all-extras
🎯 Quick Start
Serverless ETL (Recommended)
Perfect for AWS Lambda, Google Cloud Functions, or any API-driven ETL:
from pgsql_upserter import UpsertEngine, create_connection_from_env
# Your API response data (Facebook Ads, Google Ads, etc.)
api_data = [
{
'account_id': '123456789',
'campaign_id': 'camp_001',
'impressions': 1000,
'clicks': 50,
'spend': 25.50,
'date_start': '2025-08-31'
}
# ... more records
]
# One function call does everything!
connection = create_connection_from_env()
result = UpsertEngine.upsert_data(
connection=connection,
data=api_data, # Direct API data
target_table='ads_metrics'
)
print(f"✅ {result.total_affected} rows processed")
print(f"📈 {result.rows_inserted} inserted, {result.rows_updated} updated")
CSV File Processing
# Automatic CSV processing
result = UpsertEngine.upsert_data(
connection=connection,
data='path/to/data.csv', # File path
target_table='ads_metrics'
)
🔧 Environment Setup
Set your PostgreSQL connection via environment variables:
export PGHOST=your-host
export PGPORT=5432
export PGDATABASE=your-db
export PGUSER=your-user
export PGPASSWORD=your-password
Or use a connection string:
export DATABASE_URL=postgresql://user:pass@host:port/dbname
🧠 How It Works
- Schema Introspection: Analyzes your table structure automatically
- Column Matching: Maps your data columns to table columns
- Conflict Detection: Finds primary keys and unique constraints
- Data Deduplication: Removes duplicates using conflict resolution strategy
- Intelligent Upsert: Uses PostgreSQL's native
INSERT...ON CONFLICT
🎯 Perfect For
- API Data Ingestion: Facebook Ads, Google Ads, LinkedIn Ads APIs
- Serverless ETL: AWS Lambda, Google Cloud Functions, Azure Functions
- Data Warehousing: Loading data into analytics databases
- Real-time Sync: Keeping databases in sync with external sources
- Batch Processing: Traditional CSV and file-based workflows
📊 Automatic Conflict Resolution
The library automatically chooses the best upsert strategy:
- Primary Key: Uses table's primary key if available in data
- Unique Constraints: Combines all unique constraints for conflict detection
- Insert Only: Falls back to simple insert if no conflicts possible
🔍 Advanced Usage
Data Processing Before Upsert
from pgsql_upserter import UpsertEngine, read_csv_to_dict_list
# Read and process CSV data
csv_data = read_csv_to_dict_list('data.csv')
# Filter or transform data
filtered_data = [row for row in csv_data if float(row.get('spend', 0)) > 10.0]
# Upsert processed data
result = UpsertEngine.upsert_data(
connection=connection,
data=filtered_data,
target_table='ads_metrics'
)
Custom Connection
import psycopg2
from pgsql_upserter import UpsertEngine
connection = psycopg2.connect(
host="localhost",
database="mydb",
user="user",
password="password"
)
result = UpsertEngine.upsert_data(
connection=connection,
data=your_data,
target_table='your_table',
schema='public' # optional, defaults to 'public'
)
🛡️ Error Handling
The library provides comprehensive error handling and validation:
from pgsql_upserter import UpsertEngine, PgsqlUpserterError
try:
result = UpsertEngine.upsert_data(connection, data, 'my_table')
print(f"Success: {result.total_affected} rows processed")
except PgsqlUpserterError as e:
print(f"Upsert failed: {e}")
✅ Testing & Coverage
Run tests with coverage:
uv run pytest
Current baseline:
- 41 tests passing
- 89.45% total coverage
📌 Project Status
- Current Version: 0.9.4
- Development Status: Pre-release/Beta
- Python Support: 3.11, 3.12, 3.13, 3.14
- Current Status: Actively maintained, no blocking issues, no known performance regressions
Recent highlights:
- Fixed PostgreSQL data type adaptation for
dictandlistvalues - Added PostgreSQL type-aware JSON/array conversion in temp staging
- Standardized testing with pytest and measured coverage baseline
- Adopted uv workflow with hatchling build backend
🛠️ Tooling
- Dependency Management: uv
- Build Backend: hatchling
- Test Command:
uv run pytest - Coverage Artifacts:
.coverage_html/,coverage.json
🗺️ Roadmap
- Performance improvements: Optimize larger batch workloads and connection behavior
- Advanced logging: Improve observability and operational diagnostics
- Coverage hardening: Push targeted branch coverage beyond the current 89.45%
📋 Requirements
- Python 3.11-3.14
- PostgreSQL 12+
- psycopg2-binary
🤝 Contributing
Issues and pull requests are welcome! Please see our contributing guidelines.
Examples
Two runnable scripts are provided in the examples/ folder:
- examples/upsert_engine_demo.py — local execution demo showing both direct-data and CSV-path modes, fully driven by environment variables
- examples/serverless_etl_example.py — AWS Lambda/serverless ETL pattern using simulated Facebook Ads data, with proper JSON body serialization for API Gateway proxy responses
📚 See Also
- ARCHITECTURE.md for technical design and architectural decisions
- CHANGELOG.md for release history and notable changes
📄 License
MIT License - see LICENSE file for details.
🔗 Links
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pgsql_upserter-0.9.4.tar.gz.
File metadata
- Download URL: pgsql_upserter-0.9.4.tar.gz
- Upload date:
- Size: 111.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ee60bc2f3a489b096f389a0ee3acf1d7a6993cc81b3d4854ace0ad6aaad2dae9
|
|
| MD5 |
108d6ccad42f36fd6b9ff6a4af124ac5
|
|
| BLAKE2b-256 |
7efb6eadfad9e9b174a95f2fbe81650ec248ee2e2737ecdcf597385595b7512e
|
File details
Details for the file pgsql_upserter-0.9.4-py3-none-any.whl.
File metadata
- Download URL: pgsql_upserter-0.9.4-py3-none-any.whl
- Upload date:
- Size: 21.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.11 {"installer":{"name":"uv","version":"0.10.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7b15786ba4d43eca815223fa8c68ceb69121c2838720e03ff0841481a153342f
|
|
| MD5 |
201913a53172f6e4264904c3d90f3df9
|
|
| BLAKE2b-256 |
1109342a4917ca5379f6fa21edaa9189d81743ba55d1634c50373e47d38603dc
|