Python custom implementation of unit of work pattern
Project description
Unit of Work: A Behavioral Pattern in Software Development Implemented in Python.
Installation
PyUow package is available on PyPI:
$ python -m pip install pyuow
PyUow officially supports Python >= 3.9.
Usage examples
Simple unit usage example:
Definition:
import typing as t
from dataclasses import dataclass
from pyuow.aio import (
ConditionalUnit,
RunUnit,
FinalUnit,
ErrorUnit,
)
from pyuow import (
BaseContext,
Result,
)
@dataclass(frozen=True)
class ExampleParams:
field: str
@dataclass
class ExampleContext(BaseContext[ExampleParams]):
field: str
@dataclass(frozen=True)
class ExampleOutput:
field: str
class ExampleConditionalUnit(ConditionalUnit[ExampleContext, ExampleOutput]):
async def condition(
self, context: ExampleContext, **kwargs: t.Any
) -> bool:
return context.field == "context field value"
class ExampleRunUnit(RunUnit[ExampleContext, ExampleOutput]):
async def run(self, context: ExampleContext, **kwargs: t.Any) -> None:
print(
f"I'm just running a logic, and displaying: {context.params.field}"
)
class SuccessUnit(FinalUnit[ExampleContext, ExampleOutput]):
async def finish(
self, context: ExampleContext, **kwargs: t.Any
) -> Result[ExampleOutput]:
return Result.ok(ExampleOutput(field="success"))
flow = (
ExampleConditionalUnit(
on_failure=ErrorUnit(exc=Exception("example error"))
)
>> ExampleRunUnit()
>> SuccessUnit()
).build()
Success scenario:
async def main() -> None:
params = ExampleParams(field="params field value")
context = ExampleContext(params=params, field="context field value")
result = await flow(context)
result.get()
Failure scenario:
async def main() -> None:
params = ExampleParams(field="params field value")
context = ExampleContext(params=params, field="invalid field value")
result = await flow(context)
result.get()
Example with Unit of Work manager example
:warning: NoOp - No Operations, can be replaced with your own implementation of UoW.
...
from pyuow.work.noop import NoOpWorkManager
...
work = NoOpWorkManager()
...
async def main() -> None:
...
result = await work.by(flow).do_with(context)
...
Example with SqlAlchemy based Unit of Work manager:
from __future__ import annotations
import typing as t
from dataclasses import dataclass, replace
from uuid import UUID, uuid4
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Mapped
from pyuow.aio import (
ConditionalUnit,
RunUnit,
FinalUnit,
ErrorUnit,
)
from pyuow import (
BaseContext,
Result,
)
from pyuow.contrib.sqlalchemy.persistence.tables import AuditedEntityTable
from pyuow.contrib.sqlalchemy.aio.persistence.repository import (
BaseSqlAlchemyRepositoryFactory,
BaseSqlAlchemyEntityRepository,
)
from pyuow.contrib.sqlalchemy.aio.work import (
SqlAlchemyReadOnlyTransactionManager,
SqlAlchemyTransactionManager,
)
from pyuow.persistence.entities import Entity, AuditedEntity
from pyuow.persistence.aio.repository import BaseEntityRepository
from pyuow.work.aio.transactional import TransactionalWorkManager
ExampleEntityId = t.NewType("ExampleEntityId", UUID)
@dataclass(frozen=True, kw_only=True)
class ExampleAuditedEntity(AuditedEntity[ExampleEntityId]):
field: str
def change_field(self, value: str) -> ExampleAuditedEntity:
return replace(self, field=value)
class ExampleEntityTable(AuditedEntityTable):
__tablename__ = "example_entities"
field: Mapped[str]
class ExampleEntityRepository(
BaseSqlAlchemyEntityRepository[
ExampleEntityId, ExampleAuditedEntity, ExampleEntityTable
]
):
@staticmethod
def to_entity(record: ExampleEntityTable) -> ExampleAuditedEntity:
return ExampleAuditedEntity(
id=record.id,
field=record.field,
created_date=record.created_date,
updated_date=record.updated_date,
)
@staticmethod
def to_record(entity: ExampleAuditedEntity) -> ExampleEntityTable:
return ExampleEntityTable(
id=entity.id,
field=entity.field,
created_date=entity.created_date,
updated_date=entity.updated_date,
)
class ExampleRepositoryFactory(BaseSqlAlchemyRepositoryFactory):
@property
def repositories(self) -> t.Mapping[
t.Type[Entity[t.Any]],
BaseEntityRepository[t.Any, t.Any],
]:
return {
ExampleAuditedEntity: ExampleEntityRepository(
ExampleEntityTable,
self._transaction_manager,
self._readonly_transaction_manager,
),
}
def example_entity_repository(self) -> ExampleEntityRepository:
return t.cast(
ExampleEntityRepository,
repositories.repo_for(ExampleAuditedEntity),
)
@dataclass(frozen=True)
class ExampleParams:
field: str
@dataclass
class ExampleContext(BaseContext[ExampleParams]):
field: str
@dataclass(frozen=True)
class ExampleOutput:
field: str
class ExampleConditionalUnit(ConditionalUnit[ExampleContext, ExampleOutput]):
async def condition(
self, context: ExampleContext, **kwargs: t.Any
) -> bool:
return context.field == "context field value"
class ExampleRunUnit(RunUnit[ExampleContext, ExampleOutput]):
def __init__(
self, *, example_entity_repository: ExampleEntityRepository
) -> None:
super().__init__()
self._example_entity_repository = example_entity_repository
async def run(self, context: ExampleContext, **kwargs: t.Any) -> None:
entity = ExampleAuditedEntity(
id=ExampleEntityId(str(uuid4())), field=context.params.field
)
await self._example_entity_repository.add(entity)
class SuccessUnit(FinalUnit[ExampleContext, ExampleOutput]):
async def finish(
self, context: ExampleContext, **kwargs: t.Any
) -> Result[ExampleOutput]:
return Result.ok(ExampleOutput(field="success"))
engine = create_async_engine("postgresql://postgres:postgres@db:5432/postgres")
transaction_manager = SqlAlchemyTransactionManager(engine)
readonly_transaction_manager = SqlAlchemyReadOnlyTransactionManager(engine)
repositories = ExampleRepositoryFactory(
transaction_manager=transaction_manager,
readonly_transaction_manager=readonly_transaction_manager,
)
work = TransactionalWorkManager(transaction_manager=transaction_manager)
flow = (
ExampleConditionalUnit(
on_failure=ErrorUnit(exc=Exception("example error"))
)
>> ExampleRunUnit(
example_entity_repository=repositories.example_entity_repository()
)
>> SuccessUnit()
).build()
async def main() -> None:
params = ExampleParams(field="params field value")
context = ExampleContext(params=params, field="context field value")
result = await work.by(flow).do_with(context)
result.get()
Async compatibility
This package provides robust support for both asynchronous (async) and synchronous (sync) versions of code execution, catering to diverse development needs. The package follows the convention where each module with async code has an aio/ folder in the same directory, allowing you to easily import the async version.
For example:
# Async code imports
from pyuow.aio import (
ConditionalUnit,
RunUnit,
FinalUnit,
ErrorUnit,
)
from pyuow import (
BaseContext,
Result,
)
from pyuow.persistence.aio.repository.base import (
BaseEntityRepository
)
# Sync code imports
from pyuow import (
BaseContext,
Result,
ConditionalUnit,
RunUnit,
FinalUnit,
ErrorUnit,
)
from pyuow.persistence.repository.base import (
BaseEntityRepository
)
Same with contributing modules:
# Async code imports
from pyuow.contrib.sqlalchemy.aio.work.impl import (
SqlAlchemyTransaction,
SqlAlchemyReadOnlyTransactionManager,
)
# Sync code imports
from pyuow.contrib.sqlalchemy.work.impl import (
SqlAlchemyTransaction,
SqlAlchemyReadOnlyTransactionManager,
)
For Contributors
This project is managed with poetry
. All python dependencies have to be specified inside pyproject.toml
file. Don't use pip
directly, as the installed dependencies will be overridden by poetry during next poetry install
run.
-
Install poetry globally:
curl -sSL https://install.python-poetry.org | python -
Optionally you can specify
POETRY_HOME
to install poetry to a custom directory:curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py | POETRY_HOME=`pwd`/.poetry python -
Follow the steps in the command's output to add
poetry
toPATH
. -
Install dependencies to virtualenv:
poetry env use python poetry shell poetry install
Commands
Run tests
make tests
Pre-commit hooks
The repo contains configuration for pre-commit
hooks that are run automatically before git commit
command. Inspect .pre-commit-config.yaml
to learn which hooks are installed.
To enable hooks, just type once:
pre-commit install
Then changes staged for committing will be automatically fixed and styled.
To execute manually, run at any time:
pre-commit run
Project Layout
pyuow
├── pyuow # library sources
└── tests # tests package (structure is mirrored from src)
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pyuow-0.2.0.tar.gz
.
File metadata
- Download URL: pyuow-0.2.0.tar.gz
- Upload date:
- Size: 16.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.11.9 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1c1e6e2e1dd3262b97ce3ee529ed1fd9ed9bddae90a711ce33a7510e52de0374 |
|
MD5 | efb8d0072fcb5602e01736b2becc3f30 |
|
BLAKE2b-256 | 91a4cfd07642ac7e753567c356fc22092c69d5ecbccaf05644f171b96801fe48 |
File details
Details for the file pyuow-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: pyuow-0.2.0-py3-none-any.whl
- Upload date:
- Size: 32.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.11.9 Linux/6.5.0-1025-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0530e64041737e6cc0f8b82b66ffa7ed8b32dacdba7427bca6c4e9d779626172 |
|
MD5 | d216c8099b19a776974958ce56b52d31 |
|
BLAKE2b-256 | 35b951f2cc8308d7dfacadababeab3933f9710731f84906635318d08e0d3f361 |