Skip to main content

PostgreSQL upsert engine using temp tables and automatic conflict resolution for high-performance ETL

Project description

pgsql_upserter

PyPI version License Issues Last Commit

A powerful, production-ready PostgreSQL upsert utility with automatic schema introspection and intelligent conflict resolution. Perfect for serverless ETL pipelines and data integration workflows.

🚀 Key Features

  • Zero Configuration: Automatic schema detection and column matching
  • Intelligent Conflict Resolution: Automatically detects primary keys and unique constraints
  • Production Tested: Handles deduplication, data validation, and error recovery
  • Flexible Input: Supports both direct data (API responses) and CSV files

📦 Installation

pip install pgsql-upserter

🎯 Quick Start

Serverless ETL (Recommended)

Perfect for AWS Lambda, Google Cloud Functions, or any API-driven ETL:

from pgsql_upserter import execute_upsert_workflow, create_connection_from_env

# Your API response data (Facebook Ads, Google Ads, etc.)
api_data = [
    {
        'account_id': '123456789',
        'campaign_id': 'camp_001', 
        'impressions': 1000,
        'clicks': 50,
        'spend': 25.50,
        'date_start': '2025-08-31'
    }
    # ... more records
]

# One function call does everything!
connection = create_connection_from_env()
result = execute_upsert_workflow(
    connection=connection,
    data=api_data,  # Direct API data
    target_table='ads_metrics'
)

print(f"✅ {result.total_affected} rows processed")
print(f"📈 {result.rows_inserted} inserted, {result.rows_updated} updated")

CSV File Processing

# Automatic CSV processing
result = execute_upsert_workflow(
    connection=connection,
    data='path/to/data.csv',  # File path
    target_table='ads_metrics'
)

🔧 Environment Setup

Set your PostgreSQL connection via environment variables:

export PGHOST=your-host
export PGPORT=5432
export PGDATABASE=your-db
export PGUSER=your-user
export PGPASSWORD=your-password

Or use a connection string:

export DATABASE_URL=postgresql://user:pass@host:port/dbname

🧠 How It Works

  1. Schema Introspection: Analyzes your table structure automatically
  2. Column Matching: Maps your data columns to table columns
  3. Conflict Detection: Finds primary keys and unique constraints
  4. Data Deduplication: Removes duplicates using conflict resolution strategy
  5. Intelligent Upsert: Uses PostgreSQL's native INSERT...ON CONFLICT

🎯 Perfect For

  • API Data Ingestion: Facebook Ads, Google Ads, LinkedIn Ads APIs
  • Serverless ETL: AWS Lambda, Google Cloud Functions, Azure Functions
  • Data Warehousing: Loading data into analytics databases
  • Real-time Sync: Keeping databases in sync with external sources
  • Batch Processing: Traditional CSV and file-based workflows

📊 Automatic Conflict Resolution

The library automatically chooses the best upsert strategy:

  1. Primary Key: Uses table's primary key if available in data
  2. Unique Constraints: Combines all unique constraints for conflict detection
  3. Insert Only: Falls back to simple insert if no conflicts possible

🔍 Advanced Usage

Data Processing Before Upsert

from pgsql_upserter import UpsertResult

# Read and process CSV data
csv_data = UpsertResult.read_csv_to_dict_list('data.csv')

# Filter or transform data
filtered_data = [row for row in csv_data if float(row.get('spend', 0)) > 10.0]

# Upsert processed data
result = execute_upsert_workflow(
    connection=connection,
    data=filtered_data,
    target_table='ads_metrics'
)

Custom Connection

import psycopg2
from pgsql_upserter import execute_upsert_workflow

connection = psycopg2.connect(
    host="localhost",
    database="mydb",
    user="user",
    password="password"
)

result = execute_upsert_workflow(
    connection=connection,
    data=your_data,
    target_table='your_table',
    schema_name='public'  # optional, defaults to 'public'
)

🛡️ Error Handling

The library provides comprehensive error handling and validation:

from pgsql_upserter import execute_upsert_workflow, PgsqlUpserterError

try:
    result = execute_upsert_workflow(connection, data, 'my_table')
    print(f"Success: {result.total_affected} rows processed")
except PgsqlUpserterError as e:
    print(f"Upsert failed: {e}")

📋 Requirements

  • Python 3.11+
  • PostgreSQL 12+
  • psycopg2-binary

🤝 Contributing

Issues and pull requests are welcome! Please see our contributing guidelines.

📄 License

MIT License - see LICENSE file for details.

🔗 Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgsql_upserter-0.9.0.tar.gz (16.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pgsql_upserter-0.9.0-py3-none-any.whl (19.3 kB view details)

Uploaded Python 3

File details

Details for the file pgsql_upserter-0.9.0.tar.gz.

File metadata

  • Download URL: pgsql_upserter-0.9.0.tar.gz
  • Upload date:
  • Size: 16.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for pgsql_upserter-0.9.0.tar.gz
Algorithm Hash digest
SHA256 7c2ba50e74d074ac07dcc2f51b41b2e34dc17cfb0424ef3625dee5aa349b2190
MD5 6d48dad4d1dfd4579768bbb0a9fa4625
BLAKE2b-256 6720dd4d6fe6436c1725f532252acbb85c5f1cb62238e903dc3341922cb550a5

See more details on using hashes here.

File details

Details for the file pgsql_upserter-0.9.0-py3-none-any.whl.

File metadata

  • Download URL: pgsql_upserter-0.9.0-py3-none-any.whl
  • Upload date:
  • Size: 19.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.4 CPython/3.12.3 Linux/5.15.167.4-microsoft-standard-WSL2

File hashes

Hashes for pgsql_upserter-0.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a26e3d951c0cfa43b233a307b52a6faa6ef4177599d9596da4c07cdadc28a7a1
MD5 75a9f39ac832f48c8185914cd04de16c
BLAKE2b-256 8f9f41db8de202a944ec30f9051fd8a83d928ec8e65efc78c604e3ad497d1019

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page