Skip to main content

postgres backend implementation for rhosocial-activerecord, providing a robust and optimized postgres database support.

Project description

rhosocial-activerecord-postgres ($\rho_{\mathbf{AR}\text{-postgres}}$)

PyPI version Python Tests Coverage Status License Powered by vistart

rhosocial ActiveRecord Logo

postgres backend implementation for rhosocial-activerecord, providing a robust and optimized postgres database support.

Postgres Backend Implementation Guide

Overview

This postgres backend implementation provides both synchronous and asynchronous support using the psycopg (psycopg3) driver.

Key Features

  • Dual Implementation: Both sync (PostgresBackend) and async (AsyncPostgresBackend)
  • Shared Logic: Common functionality in PostgresBackendMixin
  • Full Transaction Support: Including savepoints and DEFERRABLE mode
  • Rich Type Support: Arrays, JSONB, UUID, ranges, network types, geometry
  • Complete Capability Declaration: CTEs, window functions, JSON operations, etc.
  • Native Driver: Uses psycopg3 directly, no ORM dependencies

Installation

pip install rhosocial-activerecord-postgres

Dependencies:

  • rhosocial-activerecord>=1.0.0,<2.0.0
  • psycopg>=3.2.12

Usage Examples

Synchronous Usage

from rhosocial.activerecord.model import ActiveRecord
from rhosocial.activerecord.backend.impl.postgres import (
    PostgresBackend,
    PostgresConnectionConfig
)

# Configure connection
config = PostgresConnectionConfig(
    host="localhost",
    port=5432,
    database="mydb",
    username="user",
    password="password",
    options={
        "sslmode": "prefer",
        "connect_timeout": 10,
        "application_name": "my_app"
    }
)

# Create backend
backend = PostgresBackend(connection_config=config)

# Configure model
class User(ActiveRecord):
    __table_name__ = "users"
    name: str
    email: str

User.configure(backend)

# Use the model
user = User(name="John", email="john@example.com")
user.save()

# Query with CTEs (postgres 8.4+)
results = User.query().with_cte(
    "active_users",
    User.query().where(is_active=True)
).from_cte("active_users").all()

# Use JSONB operations (postgres 9.4+)
users = User.query().where(
    "metadata->>'role' = ?", ("admin",)
).all()

Asynchronous Usage

import asyncio
from rhosocial.activerecord.async_model import ActiveRecord
from rhosocial.activerecord.backend.impl.postgres import (
    AsyncPostgresBackend,
    PostgresConnectionConfig
)

async def main():
    # Configure connection
    config = PostgresConnectionConfig(
        host="localhost",
        port=5432,
        database="mydb",
        username="user",
        password="password",
    )

    # Create async backend
    backend = ActiveRecord(connection_config=config)
    
    # Connect explicitly for async
    await backend.connect()

    # Configure model
    class User(ActiveRecord):
        __table_name__ = "users"
        name: str
        email: str

    User.configure(backend)

    # Use the model asynchronously
    user = User(name="Jane", email="jane@example.com")
    await user.save()

    # Async queries
    users = await User.query().where(is_active=True).all()

    # Cleanup
    await backend.disconnect()

# Run async code
asyncio.run(main())

Transaction Usage

Synchronous Transactions:

# Get transaction manager
tm = backend.transaction_manager

# Basic transaction
with tm:
    user1 = User(name="Alice")
    user1.save()
    user2 = User(name="Bob")
    user2.save()
# Auto-commit on context exit

# Explicit control
tm.begin()
try:
    user = User(name="Charlie")
    user.save()
    tm.commit()
except Exception:
    tm.rollback()
    raise

# With savepoints
tm.begin()
user1 = User(name="Dave")
user1.save()

savepoint = tm.savepoint()
try:
    user2 = User(name="Eve")
    user2.save()
    tm.release_savepoint(savepoint)
except Exception:
    tm.rollback_savepoint(savepoint)

tm.commit()

# With isolation level and deferrable mode
tm.set_isolation_level(IsolationLevel.SERIALIZABLE)
tm.set_deferrable(True)
with tm:
    # Deferrable serializable transaction
    pass

Asynchronous Transactions:

async def async_transaction_example():
    # Get async transaction manager
    tm = backend.transaction_manager

    # Basic async transaction
    async with tm:
        user1 = User(name="Alice")
        await user1.save()
        user2 = User(name="Bob")
        await user2.save()
    # Auto-commit on context exit

    # Explicit control
    await tm.begin()
    try:
        user = User(name="Charlie")
        await user.save()
        await tm.commit()
    except Exception:
        await tm.rollback()
        raise

    # With savepoints
    await tm.begin()
    user1 = User(name="Dave")
    await user1.save()

    savepoint = await tm.savepoint()
    try:
        user2 = User(name="Eve")
        await user2.save()
        await tm.release_savepoint(savepoint)
    except Exception:
        await tm.rollback_savepoint(savepoint)

    await tm.commit()

Postgres-Specific Features

Array Types:

from rhosocial.activerecord.model import ActiveRecord

class Article(ActiveRecord):
    __table_name__ = "articles"
    title: str
    tags: list  # Will use postgres array type

article = Article(
    title="postgres Arrays",
    tags=["database", "postgres", "arrays"]
)
article.save()

# Query arrays
articles = Article.query().where(
    "? = ANY(tags)", ("postgres",)
).all()

JSONB Operations:

from rhosocial.activerecord.model import ActiveRecord

class Product(ActiveRecord):
    __table_name__ = "products"
    name: str
    attributes: dict  # Will use JSONB type

product = Product(
    name="Laptop",
    attributes={
        "brand": "Dell",
        "specs": {
            "cpu": "Intel i7",
            "ram": "16GB"
        }
    }
)
product.save()

# Query JSONB
products = Product.query().where(
    "attributes->>'brand' = ?", ("Dell",)
).all()

# JSONB contains
products = Product.query().where(
    "attributes @> ?", ('{"brand": "Dell"}',)
).all()

Range Types:

from datetime import date
from rhosocial.activerecord.model import ActiveRecord

class Booking(ActiveRecord):
    __table_name__ = "bookings"
    room_id: int
    date_range: tuple  # Will use DATERANGE type

booking = Booking(
    room_id=101,
    date_range=(date(2024, 1, 1), date(2024, 1, 7))
)
booking.save()

# Query ranges
bookings = Booking.query().where(
    "date_range @> ?", (date(2024, 1, 3),)
).all()

Configuration Options

Connection Options

config = PostgresConnectionConfig(
    host="localhost",
    port=5432,
    database="mydb",
    username="user",
    password="password",
    options={
        # SSL/TLS
        "sslmode": "prefer",  # disable, allow, prefer, require, verify-ca, verify-full
        
        # Connection timeout
        "connect_timeout": 10,
        
        # Application identification
        "application_name": "my_app",
        
        # Client encoding
        "client_encoding": "UTF8",
        
        # Connection pooling (if supported)
        "pool_min_size": 1,
        "pool_max_size": 10,
        "pool_timeout": 30.0,
    }
)

Postgres Version Compatibility

Feature Minimum Version Notes
Basic operations 8.0+ Core functionality
CTEs 8.4+ WITH clauses
Window functions 8.4+ ROW_NUMBER, RANK, etc.
RETURNING clause 8.2+ INSERT/UPDATE/DELETE RETURNING
JSON type 9.2+ Basic JSON support
JSONB type 9.4+ Binary JSON, better performance
UPSERT (ON CONFLICT) 9.5+ INSERT ... ON CONFLICT

Recommended: Postgres 12+ for optimal feature support and performance.

Architecture Notes

Backend Structure

PostgresBackendMixin (Shared Logic)
    ├── Configuration validation
    ├── Version parsing
    ├── Capability initialization
    ├── Type converter registration
    └── Error mapping

PostgresBackend (Sync)           AsyncPostgresBackend (Async)
    ├── Inherits Mixin                 ├── Inherits Mixin
    ├── Inherits StorageBackend        ├── Inherits AsyncStorageBackend
    ├── Sync connection management     ├── Async connection management
    ├── Sync query execution           ├── Async query execution
    └── Sync transaction manager       └── Async transaction manager

Transaction Structure

PostgresTransactionMixin (Shared Logic)
    ├── Isolation level mapping
    ├── Savepoint name generation
    ├── SQL statement building
    └── Deferrable mode support

PostgresTransactionManager       AsyncPostgresTransactionManager
    ├── Inherits Mixin                 ├── Inherits Mixin
    ├── Inherits TransactionManager    ├── Inherits AsyncTransactionManager
    ├── Sync transaction operations    ├── Async transaction operations
    └── Sync constraint management     └── Async constraint management

Testing

The backend includes comprehensive test coverage for both sync and async implementations:

  • Connection lifecycle tests
  • CRUD operation tests
  • Transaction tests (with savepoints)
  • Type conversion tests
  • Capability declaration verification
  • Error handling tests
  • Concurrent operation tests

Run sync tests:

pytest tests/rhosocial/activerecord_test/feature/backend/postgres/test_backend_sync.py

Run async tests:

pytest tests/rhosocial/activerecord_test/feature/backend/postgres/test_backend_async.py

Migration from Old Implementation

If you have an existing postgres backend implementation, here's how to migrate:

Old code:

from rhosocial.activerecord.backend.impl.postgres import PostgresBackend

backend = PostgresBackend(...)

New code (no changes needed for sync):

from rhosocial.activerecord.backend.impl.postgres import PostgresBackend

backend = PostgresBackend(...)  # Same API

New async support:

from rhosocial.activerecord.backend.impl.postgres import AsyncPostgresBackend

backend = AsyncPostgresBackend(...)
await backend.connect()

Known Limitations

  1. Connection Pooling: Basic implementation, consider using external pooling (pgbouncer) for production
  2. Async Context: Async backend requires explicit await backend.connect() call
  3. Type Converters: Some postgres-specific types may need custom converters

Contributing

Contributions are welcome! Please ensure:

  • Tests pass for both sync and async implementations
  • Code follows project style guidelines
  • Documentation is updated
  • Changelog fragments are created

License

license

Copyright © 2025 vistart

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rhosocial_activerecord_postgres-1.0.0.dev1.tar.gz (43.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file rhosocial_activerecord_postgres-1.0.0.dev1.tar.gz.

File metadata

File hashes

Hashes for rhosocial_activerecord_postgres-1.0.0.dev1.tar.gz
Algorithm Hash digest
SHA256 bb59c74b368f1dd64c93c54f3e5d6195c5e4314916d68b70a8082a34fe66015b
MD5 7d76c16d22ad6479bac97e0dcfa8f171
BLAKE2b-256 f91a241abd92f3736ba240a1ffa228b2b0254cfeec355768b7506d13db9393ba

See more details on using hashes here.

Provenance

The following attestation bundles were made for rhosocial_activerecord_postgres-1.0.0.dev1.tar.gz:

Publisher: publish.yml on rhosocial/python-activerecord-postgres

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rhosocial_activerecord_postgres-1.0.0.dev1-py3-none-any.whl.

File metadata

File hashes

Hashes for rhosocial_activerecord_postgres-1.0.0.dev1-py3-none-any.whl
Algorithm Hash digest
SHA256 6e72db1b75238ef54bc76181ce5b11264f4c9d2c7aa963e50196d9c94931e0d7
MD5 6ff1333ea0cfa18430dad353dc396664
BLAKE2b-256 8326b67da6b29b008cd41df6a9e5503f6e5346a6bcd45b031158a2f960542972

See more details on using hashes here.

Provenance

The following attestation bundles were made for rhosocial_activerecord_postgres-1.0.0.dev1-py3-none-any.whl:

Publisher: publish.yml on rhosocial/python-activerecord-postgres

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page