Skip to main content

Python custom implementation of unit of work pattern

Project description

pyUoW

Unit of Work: A Behavioral Pattern in Software Development Implemented in Python.

Downloads Build Codecov Formatter Python versions License


Installation

PyUow package is available on PyPI:

$ python -m pip install pyuow

PyUow officially supports Python >= 3.9.

Usage examples

Simple unit usage example:

Definition:

import typing as t
from dataclasses import dataclass

from pyuow.aio import (
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow import (
    BaseContext,
    Result,
)


@dataclass(frozen=True)
class ExampleParams:
    field: str


@dataclass
class ExampleContext(BaseContext[ExampleParams]):
    field: str


@dataclass(frozen=True)
class ExampleOutput:
    field: str


class ExampleConditionalUnit(ConditionalUnit[ExampleContext, ExampleOutput]):
    async def condition(
        self, context: ExampleContext, **kwargs: t.Any
    ) -> bool:
        return context.field == "context field value"


class ExampleRunUnit(RunUnit[ExampleContext, ExampleOutput]):
    async def run(self, context: ExampleContext, **kwargs: t.Any) -> None:
        print(
            f"I'm just running a logic, and displaying: {context.params.field}"
        )


class SuccessUnit(FinalUnit[ExampleContext, ExampleOutput]):
    async def finish(
        self, context: ExampleContext, **kwargs: t.Any
    ) -> Result[ExampleOutput]:
        return Result.ok(ExampleOutput(field="success"))


flow = (
    ExampleConditionalUnit(
        on_failure=ErrorUnit(exc=Exception("example error"))
    )
    >> ExampleRunUnit()
    >> SuccessUnit()
).build()

Success scenario:

async def main() -> None:
    params = ExampleParams(field="params field value")
    context = ExampleContext(params=params, field="context field value")
    result = await flow(context)
    result.get()

Failure scenario:

async def main() -> None:
    params = ExampleParams(field="params field value")
    context = ExampleContext(params=params, field="invalid field value")
    result = await flow(context)
    result.get()

Example with Unit of Work manager example

:warning: NoOp - No Operations, can be replaced with your own implementation of UoW.

...
from pyuow.work.noop import NoOpWorkManager

...

work = NoOpWorkManager()

...

async def main() -> None:
    ...
    result = await work.by(flow).do_with(context)
    ...

Example with SqlAlchemy based Unit of Work manager:

from __future__ import annotations

import typing as t
from dataclasses import dataclass, replace
from uuid import UUID, uuid4

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Mapped

from pyuow.aio import (
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow import (
   BaseContext,
   Result,
)

from pyuow.contrib.sqlalchemy.persistence.tables import AuditedEntityTable
from pyuow.contrib.sqlalchemy.aio.persistence.repository import (
    BaseSqlAlchemyRepositoryFactory,
    BaseSqlAlchemyEntityRepository,
)
from pyuow.contrib.sqlalchemy.aio.work import (
    SqlAlchemyReadOnlyTransactionManager,
    SqlAlchemyTransactionManager,
)
from pyuow.persistence.entities import Entity, AuditedEntity
from pyuow.persistence.aio.repository import BaseEntityRepository
from pyuow.work.aio.transactional import TransactionalWorkManager

ExampleEntityId = t.NewType("ExampleEntityId", UUID)


@dataclass(frozen=True, kw_only=True)
class ExampleAuditedEntity(AuditedEntity[ExampleEntityId]):
    field: str

    def change_field(self, value: str) -> ExampleAuditedEntity:
        return replace(self, field=value)


class ExampleEntityTable(AuditedEntityTable):
    __tablename__ = "example_entities"

    field: Mapped[str]


class ExampleEntityRepository(
    BaseSqlAlchemyEntityRepository[
        ExampleEntityId, ExampleAuditedEntity, ExampleEntityTable
    ]
):
    @staticmethod
    def to_entity(record: ExampleEntityTable) -> ExampleAuditedEntity:
        return ExampleAuditedEntity(
            id=record.id,
            field=record.field,
            created_date=record.created_date,
            updated_date=record.updated_date,
        )

    @staticmethod
    def to_record(entity: ExampleAuditedEntity) -> ExampleEntityTable:
        return ExampleEntityTable(
            id=entity.id,
            field=entity.field,
            created_date=entity.created_date,
            updated_date=entity.updated_date,
        )


class ExampleRepositoryFactory(BaseSqlAlchemyRepositoryFactory):
    @property
    def repositories(self) -> t.Mapping[
        t.Type[Entity[t.Any]],
        BaseEntityRepository[t.Any, t.Any],
    ]:
        return {
            ExampleAuditedEntity: ExampleEntityRepository(
                ExampleEntityTable,
                self._transaction_manager,
                self._readonly_transaction_manager,
            ),
        }

    def example_entity_repository(self) -> ExampleEntityRepository:
        return t.cast(
            ExampleEntityRepository,
            repositories.repo_for(ExampleAuditedEntity),
        )


@dataclass(frozen=True)
class ExampleParams:
    field: str


@dataclass
class ExampleContext(BaseContext[ExampleParams]):
    field: str


@dataclass(frozen=True)
class ExampleOutput:
    field: str


class ExampleConditionalUnit(ConditionalUnit[ExampleContext, ExampleOutput]):
    async def condition(
        self, context: ExampleContext, **kwargs: t.Any
    ) -> bool:
        return context.field == "context field value"


class ExampleRunUnit(RunUnit[ExampleContext, ExampleOutput]):
    def __init__(
        self, *, example_entity_repository: ExampleEntityRepository
    ) -> None:
        super().__init__()
        self._example_entity_repository = example_entity_repository

    async def run(self, context: ExampleContext, **kwargs: t.Any) -> None:
        entity = ExampleAuditedEntity(
            id=ExampleEntityId(str(uuid4())), field=context.params.field
        )
        await self._example_entity_repository.add(entity)


class SuccessUnit(FinalUnit[ExampleContext, ExampleOutput]):
    async def finish(
        self, context: ExampleContext, **kwargs: t.Any
    ) -> Result[ExampleOutput]:
        return Result.ok(ExampleOutput(field="success"))


engine = create_async_engine("postgresql://postgres:postgres@db:5432/postgres")

transaction_manager = SqlAlchemyTransactionManager(engine)
readonly_transaction_manager = SqlAlchemyReadOnlyTransactionManager(engine)

repositories = ExampleRepositoryFactory(
    transaction_manager=transaction_manager,
    readonly_transaction_manager=readonly_transaction_manager,
)

work = TransactionalWorkManager(transaction_manager=transaction_manager)

flow = (
    ExampleConditionalUnit(
        on_failure=ErrorUnit(exc=Exception("example error"))
    )
    >> ExampleRunUnit(
        example_entity_repository=repositories.example_entity_repository()
    )
    >> SuccessUnit()
).build()


async def main() -> None:
    params = ExampleParams(field="params field value")
    context = ExampleContext(params=params, field="context field value")
    result = await work.by(flow).do_with(context)
    result.get()

Async compatibility

This package provides robust support for both asynchronous (async) and synchronous (sync) versions of code execution, catering to diverse development needs. The package follows the convention where each module with async code has an aio/ folder in the same directory, allowing you to easily import the async version.

For example:

# Async code imports
from pyuow.aio import (
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow import (
   BaseContext,
   Result,
)
from pyuow.persistence.aio.repository.base import (
    BaseEntityRepository
)

# Sync code imports
from pyuow import (
   BaseContext,
   Result,
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow.persistence.repository.base import (
   BaseEntityRepository
)

Same with contributing modules:

# Async code imports
from pyuow.contrib.sqlalchemy.aio.work.impl import (
   SqlAlchemyTransaction,
   SqlAlchemyReadOnlyTransactionManager,
)

# Sync code imports
from pyuow.contrib.sqlalchemy.work.impl import (
   SqlAlchemyTransaction,
   SqlAlchemyReadOnlyTransactionManager,
)

For Contributors

This project is managed with poetry. All python dependencies have to be specified inside pyproject.toml file. Don't use pip directly, as the installed dependencies will be overridden by poetry during next poetry install run.

  1. Install poetry globally:

    curl -sSL https://install.python-poetry.org | python -
    

    Optionally you can specify POETRY_HOME to install poetry to a custom directory:

    curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/install-poetry.py | POETRY_HOME=`pwd`/.poetry python -
    

    Follow the steps in the command's output to add poetry to PATH.

  2. Install dependencies to virtualenv:

    poetry env use python
    poetry shell
    poetry install
    

Commands

Run tests

make tests

Pre-commit hooks

The repo contains configuration for pre-commit hooks that are run automatically before git commit command. Inspect .pre-commit-config.yaml to learn which hooks are installed.

To enable hooks, just type once:

pre-commit install

Then changes staged for committing will be automatically fixed and styled.

To execute manually, run at any time:

pre-commit run

Project Layout


pyuow
├── pyuow                               # library sources
└── tests                               # tests package (structure is mirrored from src)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyuow-0.2.0.tar.gz (16.4 kB view details)

Uploaded Source

Built Distribution

pyuow-0.2.0-py3-none-any.whl (32.3 kB view details)

Uploaded Python 3

File details

Details for the file pyuow-0.2.0.tar.gz.

File metadata

  • Download URL: pyuow-0.2.0.tar.gz
  • Upload date:
  • Size: 16.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Linux/6.5.0-1025-azure

File hashes

Hashes for pyuow-0.2.0.tar.gz
Algorithm Hash digest
SHA256 1c1e6e2e1dd3262b97ce3ee529ed1fd9ed9bddae90a711ce33a7510e52de0374
MD5 efb8d0072fcb5602e01736b2becc3f30
BLAKE2b-256 91a4cfd07642ac7e753567c356fc22092c69d5ecbccaf05644f171b96801fe48

See more details on using hashes here.

File details

Details for the file pyuow-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: pyuow-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 32.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.9 Linux/6.5.0-1025-azure

File hashes

Hashes for pyuow-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0530e64041737e6cc0f8b82b66ffa7ed8b32dacdba7427bca6c4e9d779626172
MD5 d216c8099b19a776974958ce56b52d31
BLAKE2b-256 35b951f2cc8308d7dfacadababeab3933f9710731f84906635318d08e0d3f361

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page