Python repository pattern implementation
Project description
repka
Repository pattern implementation - isolate db manipulation from domain models
Installation
Via pip:
pip install repka
Via poetry:
poetry add repka
Usage
repka.api.BaseRepository
BaseRepository used to execute sql-queries (via aiopg & sqlalchemy) and convert sql-rows to/from pydantic models
import sqlalchemy as sa
from repka.api import AiopgRepository, IdModel
from repka.utils import create_async_db_connection
# Define pydantic model
# It should inherit repka.api.IdModel
# to set id on entity insert, to update entity with id and more
# IdModel inherits pydantic.BaseModel and defines int id field
class Task(IdModel):
title: str
# Define sqlachemy table with same model columns
metadata = sa.MetaData()
tasks_table = sa.Table(
"tasks", metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=True),
sa.Column("title", sa.String)
)
# Define repository
# You should inherit repka.api.BaseRepository and
# set sqlalchemy-table via table property
# Kwargs is sql-row data returned by sqlalchemy
class TaskRepo(AiopgRepository[Task]):
table = tasks_table
# To use the repository you should instantiate it with async sqlalchemy-connection
db_url = "postgresql://postgres@localhost/test"
async with create_async_db_connection(db_url) as conn:
repo = TaskRepo(conn)
# Now you can use the repo
# Here we select first task with matching title
task = await repo.first(tasks_table.c.title == "My first task")
BaseRepository methods
T
means generic type passed to BaseRepository (e.g.BaseRepository[Task]
means that type ofT
isTask
)
Select methods
-
repo.first(*filters: BinaryExpression, orders: Optional[Columns])
- get first entity matching sqlalchemy {filters} and {orders}; if no entity matches {filters} thenNone
is returnedExample of {filters}:
table.c.title == 'test task'
- equals to sql where clause:where title = 'test task'
Example of {orders}:
table.c.title
- equals to sql order by clause:order by title
-
repo.get_by_ids(entity_ids: List[int])
- get all entities whose id in {entity_ids} (same as sqlwhere id in ({entity_ids})
) -
repo.get_by_id(entity_id: int)
- get entity with id = {entity_id} -
repo.get_or_create(filters: Optional[List[BinaryExpression]], defaults: Optional[Dict])
- get entity that matches {filters} if no entity found create new entity with {defaults}; return tuple of entity and entity existence flag -
repo.get_all(filters: Optional[List[BinaryExpression]], orders: Optional[Columns])
- return all entities matching {filters} and {orders} -
repo.get_all_ids(filters: Optional[List[BinaryExpression]], orders: Optional[Columns])
- return ids of entites matching {filters} and {orders} -
repo.exists(*filters: BinaryExpression)
- check that entity matching {filters} exists using sqlcount
statement
Insert methods
repo.insert(entity: T)
- insert entity to db table and set id field to the entityrepo.insert_many(entities: List[T])
- insert multiple entities and set ids to them in single transaction
Update methods
repo.update(entity: T)
- updates entity in dbrepo.update_partial(entity: T, **updated_values)
- update entity fields via kwargs and update entity fields in dbrepo.update_many(entities: List[T])
- update multiple entities in single transaction
Delete methods
-
repo.delete(*filters: BinaryExpression)
- delete entities matching {filters} via sqldelete
statementTo delete all entities pass
None
as an arg:repo.delete(None)
-
repo.delete_by_id(entity_id: int)
- delete entity with {entity_id} -
repo.delete_by_ids(entity_ids: List[int])
- delete entities whose id in {entity_ids}
Other methods & properties
-
repo.serialize(entity: T)
- convert {entity} to dict (e.g. ininsert
andupdate
methods) -
repo.deserialize(**kwargs)
- convert {kwargs} to entity (e.g. infirst
andget_all
methods) -
repo.execute_in_transaction()
- context manager that allows execute multiple queries in transactionExample: delete all old entities and insert new one in single transaction:
async with repo.execute_in_transaction(): await repo.delete() await repo.insert(Task(title="New task"))
-
repo.ignore_default
- list of entity fields that will be ignored on insert and set after insert if they equal to default field value. Useful for auto incrementing / default fields like dates or sequence numbers
ContextVar support
You can create lazy-connection repositories with context vars
from contextvars import ContextVar
from repka.utils import create_async_db_connection
# Create context var and instantiate repository
db_connection = ContextVar("db_connection")
repo = TaskRepo(db_connection)
# Now you should set the context var somewhere (e.g. in middleware)
# And start using the repository
async with create_async_db_connection(db_url) as conn:
db_connection.set(conn)
await repo.insert(Task(title="New task"))
Other sqlalchemy repositories
Following repositories have same api as AiopgRepository
(select methods, insert methods, etc.)
repka.api.FakeRepo
- repository that uses lists instead of database tables, can be used as mock- This repository is implemented partially, because implementing sqlalchemy features (like filters or orders) is hard and pointless for python lists
repka.json_.DictJsonRepo
This kind of repository used to save/load json objects from file:
from repka.json_ import DictJsonRepo
songs = [
{"artist": "Pig Destroyer", "title": "Thumbsucker"},
{"artist": "Da Menace", "title": "Bag of Funk"}
]
repo = DictJsonRepo()
repo.write(songs, "songs.json")
assert repo.read("songs.json") == songs
DictJsonRepo methods
-
repo.read(filename: str)
- read json file with {filename}, return its content as json primitive (list, dict, str, etc.) -
repo.write(data: T, filename: str)
- serialize json primitive {data} and save it to file with {filename} -
repo.read_or_write_default(filename: str, default_factory: Callable[[], T])
- check file with {filename} exists, read its content if exists, execute {default_factory} and write it to file with {filename} otherwise-
Example: read data from
test.json
or createtest.json
with[{"field": "value"}]
if no such file:repo = DictJsonRepo() repo.read_or_write_default("test.json", lambda: [{"field": "value"}])
-
DictJsonRepo constructor
-
DictJsonRepo(directory: str)
- set directory where files will be read / written; if not set current working directory will be used-
Example: read files from
data/
dir:repo = DictJsonRepo("data") repo.read("test.json") # will read "./data/test.json"
-
Development and contribution
Dependencies
Install production and development dependencies via poetry:
poetry install
Tests
To run tests:
- Setup database url via
DB_URL
environment variable (e.g. via .env file)
WARNING: Every test run will drop all tables from the db
- Run tests via
pytest
Contribution
-
Create fork/branch for new feature/fix/whatever
-
[Optional] Install pre-commit hooks:
pre-commit install
(for manual pre-commit run usepre-commit run -a
) -
When you done create pull request and wait for approval
Deploy
To deploy new version you need to increment version via bump2version and publish it to PyPI via poetry:
bump2version major/minor/patch
poetry publish --build
Don't forget to fill the CHANGELOG.md before release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file repka-3.2.0.tar.gz
.
File metadata
- Download URL: repka-3.2.0.tar.gz
- Upload date:
- Size: 30.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.4 CPython/3.8.5 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5b37537b4eb212941ea051442829f319a65cf8c19526215ca25d4dd01c4df7a5 |
|
MD5 | 8c8b8ebd9b1ba826fd0cd454b2b8105b |
|
BLAKE2b-256 | 3ee7328d9f379bc29186a59cf22cf8bcacbebf8c43818f426f7e6223218e75ad |
File details
Details for the file repka-3.2.0-py3-none-any.whl
.
File metadata
- Download URL: repka-3.2.0-py3-none-any.whl
- Upload date:
- Size: 13.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.4 CPython/3.8.5 Windows/10
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ec1471edc8bc8a23e1b70fbabebf038edbc6d83fee2e8594ab37626c755fc739 |
|
MD5 | 415cc351eae1b914bad2d16b09cc74c1 |
|
BLAKE2b-256 | 6ee9bb8405acc93ba6b10ff1df603324b5ca3cff102f1041595d3359417290a6 |