A typed, generic repository base for SQLAlchemy 2.0. Full CRUD with no per-table boilerplate.
Project description
repositron
A typed, generic repository base for SQLAlchemy 2.0. Full CRUD, zero per-table boilerplate.
Declare a model (and optionally a DTO and write payloads), inherit one generic
class, and get get / first / list / list_paginated / count / exists
/ create / update / delete with no per-table boilerplate.
Every method is fully typed off the generic parameters, so your editor knows
that repo.list() returns list[UserDTO]. The primary-key type is one of those
parameters (int by default), so repo.get(id) is checked against it, declare
str or uuid.UUID for tables keyed that way.
class UserRepository(Repository[User, UserDTO, UserCreate, UserUpdate]):
field_mapping = {"full_name": "name"}
repo.list(is_active=True, order_by=User.created_at.desc()) # -> list[UserDTO]
repo.update(1, UserUpdate(name="Ada")) # only name; others untouched
Install
uv add repositron # or: pip install repositron
Requires Python 3.13+ and sqlalchemy>=2.0. That is the only dependency; the
dataclass path pulls in nothing else.
The before / after
Every SQLAlchemy project rewrites the same layer: a class per table wrapping
session.query(...), the same get / list / count, the same pagination
math, the same "turn the ORM row into something light to return". It is
mechanical and easy to get subtly wrong.
Before: hand-written, per table
class UserRepository:
def __init__(self, session: Session) -> None:
self.session = session
def get(self, id: int) -> UserDTO | None:
row = self.session.query(User).filter(User.id == id).first()
if row is None:
return None
return UserDTO(id=row.id, name=row.full_name, email=row.email)
def list(self, *, is_active: bool | None = None) -> list[UserDTO]:
query = self.session.query(User)
if is_active is not None:
query = query.filter(User.is_active == is_active)
return [
UserDTO(id=r.id, name=r.full_name, email=r.email)
for r in query.order_by(User.created_at.desc()).all()
]
def list_paginated(self, offset: int, limit: int = 20) -> tuple[list[UserDTO], int]:
query = self.session.query(User).order_by(User.created_at.desc())
total = query.order_by(None).count()
rows = query.offset(offset).limit(limit).all()
return [UserDTO(id=r.id, name=r.full_name, email=r.email) for r in rows], total
def count(self, *, is_active: bool | None = None) -> int:
query = self.session.query(User.id)
if is_active is not None:
query = query.filter(User.is_active == is_active)
return query.count()
def create(self, full_name: str, email: str) -> int:
user = User(full_name=full_name, email=email)
self.session.add(user)
self.session.flush()
return user.id
def update(self, id: int, *, full_name: str | None = None, email: str | None = None) -> bool:
user = self.session.query(User).filter(User.id == id).first()
if user is None:
return False
if full_name is not None: # but how do you set a column to NULL on purpose?
user.full_name = full_name
if email is not None:
user.email = email
self.session.flush()
return True
# ...and delete, and first, and the same again for the next ten tables.
After: declare it once
from dataclasses import dataclass
from repositron import Repository, UNSET, UnsetType
@dataclass(frozen=True, slots=True)
class UserDTO: # light, detached, serializes straight to JSON
id: int
name: str # renamed from the model column `full_name`
email: str
@dataclass
class UserCreate:
full_name: str
email: str
@dataclass
class UserUpdate:
full_name: str | UnsetType = UNSET # absent = leave alone; None = SET NULL
email: str | UnsetType = UNSET
class UserRepository(Repository[User, UserDTO, UserCreate, UserUpdate]):
field_mapping = {"full_name": "name"}
That is the whole repository. Every method above comes for free, typed against
UserDTO:
repo = UserRepository(session)
repo.get(1) # -> UserDTO | None
repo.list(is_active=True, order_by=User.created_at.desc())
repo.list_paginated(0, 20, order_by=User.created_at.desc()) # -> PaginatedResult[UserDTO]
repo.count(is_active=True)
repo.create(UserCreate(full_name="Ada Lovelace", email="ada@example.com")) # -> int (new id)
repo.update(1, UserUpdate(full_name="Ada L.")) # email left untouched
repo.delete(1)
The sugar
Two ways to filter, in one call
Equality is keyed by attribute name. Anything else is a plain SQLAlchemy expression. They combine.
repo.list(
is_active=True, # equality
extra_filters=[User.age > 18], # any expression: >, IN, LIKE, OR, ...
order_by=[User.created_at.desc(), User.id],
)
# WHERE is_active = true AND age > 18 ORDER BY created_at DESC, id
A filter value of None means IS NULL; UNSET skips the filter entirely, so
you can pass through optional query params without branching.
Partial updates that can actually write NULL
UNSET and None are different on purpose. UNSET says "don't touch this
column"; None says "set it to NULL". The hand-written if x is not None
pattern can't express the second one.
repo.update(1, UserUpdate(full_name="Ada")) # email stays whatever it was
repo.update(1, UserUpdate(email=None)) # email IS NULL now
Column projection: load only what you need
Index the repo with a narrow DTO and it selects only those columns, returning that shape, for the duration of the call. The injected repository is untouched.
@dataclass(frozen=True, slots=True)
class UserIdEmail:
id: int
email: str
repo[UserIdEmail].list(is_active=True) # SELECT id, email -> list[UserIdEmail]
repo[UserIdEmail].first(id=5)
Pagination is ordered, or it raises
Pagination over an unstable order silently drops and repeats rows across pages,
so list_paginated requires order_by. Forgetting it is a ValueError, not a
heisenbug in production.
repo.list_paginated(0, 20) # ValueError
repo.list_paginated(0, 20, order_by=User.id) # PaginatedResult(items=[...], total=...)
Commit when you need it, not by default
Writes flush so the caller keeps the transaction boundary. When the row has
to be visible past this transaction, commit at the instance level or per call.
repo = ItemRepository(session, autocommit=True) # every write commits
repo.create(payload)
repo = ItemRepository(session) # flush-only (default)
item_id = repo.create(payload, commit=True) # but commit this one,
worker.enqueue(item_id) # so the worker process sees it
Hooks: extend a write or a read without overriding
Tag a method with @on and the base runs it inside its own create / update /
delete / hydration. Set a derived column before the flush, or enrich the
returned DTO with a value no column holds, no override, no self.session
plumbing.
from dataclasses import replace
from repositron import Repository, on
class ArticleRepository(Repository[Article, ArticleDTO, ArticleCreate, ArticleUpdate]):
@on("create", mode="before")
def stamp(self, model, payload): # runs before the insert flushes
model.published_at = datetime.now(UTC)
@on("hydrate", mode="after")
def add_count(self, model, dto): # enrich every read
return replace(dto, comment_count=self._count(model.id))
Hooks compose: stack @on to run one method on several events, or put a shared
one (timestamps, audit) in a mixin and every repository inherits it.
Three shapes of DTO
The DTO is an optimization, not a mandate. Pick the one that fits, and pay only for what you use.
1. Dataclass DTO (the recommendation)
Lightest, detached from the session, and FastAPI serializes it natively, so the
same object is your repository return value and your response_model. No third
hand-written schema.
@app.get("/users")
def list_users(repo: Annotated[UserRepository, Depends(get_repo)]) -> list[UserDTO]:
return repo.list(order_by=User.created_at.desc())
2. No DTO at all (model as DTO)
Leave the DTO parameter off and the repository returns the model itself. No
hydration, no dict round-trip. Works the same whatever the key type; here the
table is keyed by uuid.UUID:
import uuid
from repositron import Repository
class AccountRepository(
Repository[Account, Account, AccountCreate, AccountUpdate, uuid.UUID]
):
pass
repo.get(uuid.uuid4()) # -> Account | None
repo.list(status="active") # -> list[Account]
3. Pydantic DTO
If you already have a Pydantic response schema, it is the DTO. repositron
detects Pydantic and hydrates through model_validate.
class UserOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
name: str
class UserRepository(Repository[User, UserOut]): ...
repo.list() # -> list[UserOut], ready for HTTP
Public API
from repositron import (
Repository, # full CRUD generic base
ReadOnlyRepository, # read-only generic base
PaginatedResult, # {items, total} container
OrderBy, # order_by argument type
UNSET, UnsetType, # partial-update sentinel
)
Type parameters: Repository[ModelT, DTOT=ModelT, CreateT=object, UpdateT=object, PKT=int].
ModelT is required; everything else has a default, so Repository[Account]
is a valid read/write repository returning Account with an int key. For a
str or uuid key, declare PKT in the last slot
(Repository[Session, SessionDTO, SessionCreate, SessionUpdate, str]).
| Class attribute | Purpose | Default |
|---|---|---|
field_mapping |
{model_field: dto_field} for renamed fields |
{} |
pk_column |
primary-key column, by name or column reference | "id" |
Design notes
- The session is the caller's. The repository never opens or closes it, and by
default only
flushes, so transaction boundaries stay in the app. Opt into committing withRepository(session, autocommit=True)or per write withrepo.create(payload, commit=True); on failure it rolls back and re-raises. - One source of truth per field name: declare a rename once in
field_mappingand it applies to both hydration and projection. - Ordering is never implicit.
list/firstdefault to unordered; pagination refuses to run without an order. UNSETis one canonical singleton, compared by identity. There is no per-project override.
License
MIT. See LICENSE.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file repositron-0.3.0.tar.gz.
File metadata
- Download URL: repositron-0.3.0.tar.gz
- Upload date:
- Size: 15.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.23 {"installer":{"name":"uv","version":"0.11.23","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
4c85d2e835893b43d2529549adf907b02a2a20aa682c9cbe7e5632cf44eb739f
|
|
| MD5 |
993332efeb87d3995f46857aafe04b34
|
|
| BLAKE2b-256 |
0263930462a473d54473eac21983723b97432847cbfd7a7a99afe277e501ea56
|
File details
Details for the file repositron-0.3.0-py3-none-any.whl.
File metadata
- Download URL: repositron-0.3.0-py3-none-any.whl
- Upload date:
- Size: 17.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.11.23 {"installer":{"name":"uv","version":"0.11.23","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e4b26d0c03bb02809d8affeb1dca680c436122cdcf02430e6cb2f8247ee5a812
|
|
| MD5 |
d610fbc60dde7aa02b002bb86410dd12
|
|
| BLAKE2b-256 |
d14d59020ac6954a0b11fe564cff0b5f1adb62b3ff3f03a0e64bf813d1a6a048
|