PostgreSQL partition lifecycle management with extensible hooks
Project description
pg-partsmith
PostgreSQL partition lifecycle management with extensible hooks.
A single library that covers the full PostgreSQL partition lifecycle: creating partitions ahead of time, detaching expired ones, and dropping orphans — with a middleware system for injecting custom logic at each step.
Features
- Async-first — built on
asyncioand SQLAlchemy async engine - Full lifecycle — create ahead, detach expired, drop orphans in one call
- Extensible hooks — 6 hook points (
before/aftercreate, detach, drop) - Multiple strategies — daily, weekly, monthly, yearly + fully custom
- Distributed locking — PostgreSQL advisory locks (built-in) or Redis
- Schema-aware — multi-schema support, independent of
search_path - Safe by default — refuses to drop tables not managed by this library
- Type-safe — full mypy compliance with Pydantic models
- Well-tested — 90%+ coverage with real PostgreSQL via testcontainers
Installation
pip install pg-partsmith
# With Redis distributed locks
pip install "pg-partsmith[redis-locks]"
Requirements: Python 3.11+, PostgreSQL 15+
Quick start
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from pg_partsmith import (
MonthPeriodCalculator,
PartitionGranularity,
PartitionStrategy,
PartitionType,
TablePartitionConfig,
)
from pg_partsmith.aio import (
PartitionLifecycleService,
PartitionMaintainer,
PostgresAdvisoryLockManager,
PostgresMetadataProvider,
PostgresPartitionRepository,
)
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
config = TablePartitionConfig(
schema="public",
table_name="events",
partition_type=PartitionType.RANGE,
partition_strategy=PartitionStrategy.TIME_BASED,
partition_column="created_at",
granularity=PartitionGranularity.MONTH,
create_ahead_count=3, # current month + next 2
retention_count=12,
)
async def run_maintenance(engine: AsyncEngine) -> None:
service = PartitionLifecycleService(
repo=PostgresPartitionRepository(engine),
metadata=PostgresMetadataProvider(engine),
locks=PostgresAdvisoryLockManager(engine),
period_calculator=MonthPeriodCalculator(),
)
maintainer = PartitionMaintainer(service)
result = await maintainer.run_maintenance_safe(config)
if result.success:
print(
f"created={result.created_count} "
f"detached={result.detached_count} "
f"dropped={result.dropped_count}"
)
else:
print(f"error={result.error}")
Transaction semantics — every DDL operation (CREATE, ATTACH, DETACH, DROP) runs in its own connection and commits immediately. Use
AsyncEngine, notAsyncSession.
Cancellation semantics —
run_maintenance_safe()(andmaintain_partitions()) always returnsMaintenanceResult, including onasyncio.CancelledError.
Multi-schema databases
If your database uses multiple schemas, set schema in TablePartitionConfig. The library
schema-qualifies all catalog queries, DDL statements, and lock namespaces — behaviour
becomes independent of search_path.
Orphan partitions
After detach, the repository writes a COMMENT marker on the detached table.
Only marker-tagged tables are eligible for dropping, making cleanup safe even if the
database contains similarly named tables not managed by this library.
Set marker_prefix explicitly on both PostgresPartitionRepository and
PostgresMetadataProvider to ensure consistent orphan marker recognition across deployments.
DEFAULT partition reconciliation
When creating a new partition, if the DEFAULT partition contains rows belonging to the new range, pg-partsmith automatically:
- Detects the conflict (
CheckViolationError 23514) - Moves conflicting rows from DEFAULT to the new partition
- Retries
ATTACH PARTITION
The reconciliation is atomic and logged at INFO level.
TIMESTAMPTZ boundary semantics
For TIMESTAMP WITH TIME ZONE partition keys, PostgresPartitionRepository runs
SET LOCAL TimeZone='UTC' before ATTACH PARTITION (ddl_timezone="UTC" default).
Set ddl_timezone=None to disable this enforcement.
Safe drops
drop_partition() refuses to drop tables not tagged as orphans. To override:
repo = PostgresPartitionRepository(engine, drop_allow_unmanaged=True)
An attempt to drop an unmanaged table raises UnmanagedPartitionDropError.
Hooks (middleware)
from pg_partsmith.aio import BasePartitionLifecycleHooks, PartitionLifecycleService
from pg_partsmith.entities import PartitionInfo, TablePartitionConfig
class KafkaNotifyHooks(BasePartitionLifecycleHooks):
def __init__(self, producer: KafkaProducer) -> None:
self._producer = producer
async def after_create(self, config: TablePartitionConfig, partition: PartitionInfo) -> None:
await self._producer.send("partition.created", {"name": partition.name})
async def before_drop(self, table_name: str, partition_name: str) -> None:
await export_to_cold_storage(table_name, partition_name)
service = PartitionLifecycleService(
repo=repo,
metadata=metadata,
locks=locks,
period_calculator=calculator,
hooks=[KafkaNotifyHooks(producer)],
)
Hook points
| Method | When |
|---|---|
before_create(config, partition_name, from_value, to_value) |
Before partition is created |
after_create(config, partition) |
After creation |
before_detach(table_name, partition) |
Before detach |
after_detach(table_name, partition_name) |
After successful detach |
before_drop(table_name, partition_name) |
Before drop — last chance to read data |
after_drop(table_name, partition_name) |
After drop |
before_* exceptions abort the operation. after_* exceptions are logged but do not affect result.success.
Extensibility
from pg_partsmith.aio import PostgresPartitionRepository
class AuditedPartitionRepository(PostgresPartitionRepository):
async def drop_partition(self, partition_name: str) -> None:
await self._audit_log.record("drop", partition_name)
await super().drop_partition(partition_name)
Lock managers
PostgreSQL advisory locks (default)
from pg_partsmith.aio import PostgresAdvisoryLockManager
locks = PostgresAdvisoryLockManager(engine, prefix="myapp")
Pool sizing — advisory locks hold a dedicated connection for the duration of maintenance. Ensure your pool has spare capacity, or use a separate
AsyncEnginefor the lock manager (a pool of 1 will deadlock).
Redis distributed locks
pip install "pg-partsmith[redis-locks]"
from redis.asyncio import Redis
from pg_partsmith.aio import RedisDistributedLockManager
locks = RedisDistributedLockManager(
redis_client=Redis.from_url("redis://localhost"),
prefix="myapp:partitioner",
ttl_seconds=300,
)
Period strategies
| Class | Granularity | Example |
|---|---|---|
DayPeriodCalculator |
Daily | events__2024_01_15 |
WeekPeriodCalculator |
ISO weekly | events__2024_w03 |
MonthPeriodCalculator |
Monthly | events__2024_01 |
YearPeriodCalculator |
Yearly | events__2024 |
from pg_partsmith.strategies import BasePeriodCalculator
from pg_partsmith.entities import Period
class QuarterPeriodCalculator(BasePeriodCalculator):
def current_period(self) -> Period: ...
def format_partition_name(self, table_name: str, period: Period) -> str: ...
def parse_partition_name(self, partition_name: str) -> Period | None: ...
def get_boundaries(self, period: Period) -> tuple[str, str]: ...
Scheduler integration
from pg_partsmith.aio import maintain_partitions
scheduler.add_job(
maintain_partitions,
"cron",
hour=2,
kwargs={"maintainer": maintainer, "config": config},
)
API reference
pg_partsmith
Entities — Period, PartitionInfo, TablePartitionConfig, MaintenanceResult, MaintenanceIssueStep
Enums — PartitionType, PartitionGranularity, PartitionStrategy
Exceptions — PartitionError, PartitionAlreadyExistsError, PartitionNotFoundError, PartitionAttachedError, PartitionDetachInProgressError, InvalidPartitionConfigError, LockAcquisitionError
Protocols — PeriodCalculator
Strategies — BasePeriodCalculator, DayPeriodCalculator, WeekPeriodCalculator, MonthPeriodCalculator, YearPeriodCalculator
pg_partsmith.aio
Protocols — PartitionRepository, PartitionMetadataProvider, LockManager
Hooks — BasePartitionLifecycleHooks
Service — PartitionLifecycleService
PostgreSQL — PostgresPartitionRepository, PostgresMetadataProvider
Lock managers — PostgresAdvisoryLockManager, RedisDistributedLockManager
Orchestration — PartitionMaintainer, maintain_partitions
Development
make install # uv sync --group dev
make check # ruff + mypy
make test-unit # unit tests (no Docker)
make test-integration # integration tests (Docker required)
make test # all tests with coverage
make docs-serve # local docs preview
See CONTRIBUTING.md for the full guide.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file pg_partsmith-0.1.0.tar.gz.
File metadata
- Download URL: pg_partsmith-0.1.0.tar.gz
- Upload date:
- Size: 46.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
49eb0d75f3e8f7c0d6d8b6a48891920a494bb8783d362dab6089a7a31195a657
|
|
| MD5 |
ffb021c07dadadf21c8578c977c40c49
|
|
| BLAKE2b-256 |
c5e0a30a42d6a043df39ac26a48c9b9c0b90e6c5b4ec8db6c3417fb3d75bc2dd
|
File details
Details for the file pg_partsmith-0.1.0-py3-none-any.whl.
File metadata
- Download URL: pg_partsmith-0.1.0-py3-none-any.whl
- Upload date:
- Size: 67.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.11 {"installer":{"name":"uv","version":"0.11.11","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
1514f5f11c0a989d3702f6484e02a773725bdeae3ead970257da1f3d566249a1
|
|
| MD5 |
c8d4f59f432c0afd8d8dc59790dbfb14
|
|
| BLAKE2b-256 |
7ef219822e2fb488a0d36194b0dae22f3adf087486a611a8dbede162897b8b8f
|