Skip to main content

Shared utilities for Investify services

Project description

investify-utils

Shared utilities for Investify services.

Installation

# Sync PostgreSQL client (psycopg3 + SQLAlchemy + pangres)
pip install investify-utils[postgres]

# Async PostgreSQL client (asyncpg + SQLAlchemy)
pip install investify-utils[postgres-async]

# Both clients
pip install investify-utils[postgres-all]

PostgreSQL Clients

Sync Client (PostgresClient)

Uses psycopg3 + SQLAlchemy for connection pooling, pangres for upsert.

from investify_utils.postgres import PostgresClient

client = PostgresClient(
    host="localhost",
    port=5432,
    username="user",
    password="pass",
    database="db",
    # SQLAlchemy pool options
    pool_size=5,
    pool_recycle=3600,
)

# Read
df = client.read("SELECT * FROM table")

# Insert (uses COPY protocol)
client.insert(df, table="my_table", schema="public")

# Upsert (DataFrame index = primary keys)
df = df.set_index(["id"])
client.upsert(df, table="my_table", schema="public")

# Execute DDL/DML
client.execute("DELETE FROM table WHERE id = 1")

# Close (important for Celery workers)
client.close()

Async Client (AsyncPostgresClient)

Uses asyncpg + SQLAlchemy async engine. Read-only.

from investify_utils.postgres import AsyncPostgresClient

client = AsyncPostgresClient(
    host="localhost",
    port=5432,
    username="user",
    password="pass",
    database="db",
    # SQLAlchemy pool options
    pool_size=5,
    pool_recycle=3600,
)

# In async context
df = await client.read("SELECT * FROM table")
await client.close()

# Or as context manager
async with AsyncPostgresClient(...) as client:
    df = await client.read("SELECT * FROM table")

Celery Integration

Both clients use lazy initialization, safe for Celery prefork workers.

# instances/postgres_core.py
from investify_utils.postgres import PostgresClient
from config import settings

postgres_core = PostgresClient(**settings.postgres.model_dump())
# celery_app.py
from celery.signals import worker_process_init

@worker_process_init.connect
def init_worker(**_kwargs):
    # Reset engine for each worker (optional with lazy init)
    from instances.postgres_core import postgres_core
    postgres_core.close()

Development

uv sync
uv run ruff check investify_utils/

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

investify_utils-2.0.0a2.tar.gz (4.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

investify_utils-2.0.0a2-py3-none-any.whl (6.7 kB view details)

Uploaded Python 3

File details

Details for the file investify_utils-2.0.0a2.tar.gz.

File metadata

  • Download URL: investify_utils-2.0.0a2.tar.gz
  • Upload date:
  • Size: 4.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.9.2

File hashes

Hashes for investify_utils-2.0.0a2.tar.gz
Algorithm Hash digest
SHA256 e37e6ba012663bd72181ae899861107059b089f06943736d388a91f14f0763bd
MD5 5cace17a2ef2ab22b95bee231a9ba11b
BLAKE2b-256 1b79288f3ba95d0a32d4832f11c568f6a345e1b78c33a26dd4205f113ce6b756

See more details on using hashes here.

File details

Details for the file investify_utils-2.0.0a2-py3-none-any.whl.

File metadata

File hashes

Hashes for investify_utils-2.0.0a2-py3-none-any.whl
Algorithm Hash digest
SHA256 6967f29819975c00bf3d9d6dc4cbee5457f6db8a4be3bf19a14eee17dd0a0048
MD5 343e774227b61ea9e17c5b01e8caa988
BLAKE2b-256 c7076e5cddcadbb23240ee6eab66fc03e0351a5b0a09abadc9337cf49f0b23c0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page