Skip to main content

Python custom implementation of unit of work pattern

Project description

pyUoW

Unit of Work: A Behavioral Pattern in Software Development Implemented in Python.

Downloads Build Codecov Formatter Python versions License


Table of Contents

  1. Installation
  2. Usage examples
  3. Async compatibility
  4. Contributors guide

Installation

PyUow package is available on PyPI:

$ python -m pip install pyuow

PyUow officially supports Python >= 3.9.

PyPi link -


Usage examples

Simple unit usage example:

Definition:

import typing as t
from dataclasses import dataclass

from pyuow.aio import (
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow import (
    BaseContext,
    Result,
)


@dataclass(frozen=True)
class ExampleParams:
    field: str


@dataclass
class ExampleContext(BaseContext[ExampleParams]):
    field: str


@dataclass(frozen=True)
class ExampleOutput:
    field: str


class ExampleConditionalUnit(ConditionalUnit[ExampleContext, ExampleOutput]):
    async def condition(
        self, context: ExampleContext, **kwargs: t.Any
    ) -> bool:
        return context.field == "context field value"


class ExampleRunUnit(RunUnit[ExampleContext, ExampleOutput]):
    async def run(self, context: ExampleContext, **kwargs: t.Any) -> None:
        print(
            f"I'm just running a logic, and displaying: {context.params.field}"
        )


class SuccessUnit(FinalUnit[ExampleContext, ExampleOutput]):
    async def finish(
        self, context: ExampleContext, **kwargs: t.Any
    ) -> Result[ExampleOutput]:
        return Result.ok(ExampleOutput(field="success"))


flow = (
    ExampleConditionalUnit(
        on_failure=ErrorUnit(exc=Exception("example error"))
    )
    >> ExampleRunUnit()
    >> SuccessUnit()
).build()

Success scenario:

async def main() -> None:
    params = ExampleParams(field="params field value")
    context = ExampleContext(params=params, field="context field value")
    result = await flow(context)
    result.get()

Failure scenario:

async def main() -> None:
    params = ExampleParams(field="params field value")
    context = ExampleContext(params=params, field="invalid field value")
    result = await flow(context)
    result.get()

Example with Unit of Work manager

:warning: NoOp - No Operations, can be replaced with your own implementation of UoW.

...
from pyuow.work.noop import NoOpWorkManager

...

work = NoOpWorkManager()

...

async def main() -> None:
    ...
    result = await work.by(flow).do_with(context)
    ...

Example with SqlAlchemy based Unit of Work manager:

from __future__ import annotations

import typing as t
from dataclasses import dataclass, replace
from uuid import UUID, uuid4

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Mapped

from pyuow.aio import (
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow import (
   BaseContext,
   Result,
)

from pyuow.contrib.sqlalchemy.tables import AuditedEntityTable
from pyuow.contrib.sqlalchemy.aio.repository import (
   BaseSqlAlchemyRepositoryFactory,
   BaseSqlAlchemyEntityRepository,
)
from pyuow.contrib.sqlalchemy.aio.work import (
   SqlAlchemyReadOnlyTransactionManager,
   SqlAlchemyTransactionManager,
)
from pyuow.persistence.entities import Entity, AuditedEntity
from pyuow.repository.aio import BaseEntityRepository
from pyuow.work.aio.transactional import TransactionalWorkManager

ExampleEntityId = t.NewType("ExampleEntityId", UUID)


@dataclass(frozen=True, kw_only=True)
class ExampleAuditedEntity(AuditedEntity[ExampleEntityId]):
   field: str

   def change_field(self, value: str) -> ExampleAuditedEntity:
      return replace(self, field=value)


class ExampleEntityTable(AuditedEntityTable):
   __tablename__ = "example_entities"

   field: Mapped[str]


class ExampleEntityRepository(
   BaseSqlAlchemyEntityRepository[
      ExampleEntityId, ExampleAuditedEntity, ExampleEntityTable
   ]
):
   @staticmethod
   def to_entity(record: ExampleEntityTable) -> ExampleAuditedEntity:
      return ExampleAuditedEntity(
         id=record.id,
         field=record.field,
         created_date=record.created_date,
         updated_date=record.updated_date,
      )

   @staticmethod
   def to_record(entity: ExampleAuditedEntity) -> ExampleEntityTable:
      return ExampleEntityTable(
         id=entity.id,
         field=entity.field,
         created_date=entity.created_date,
         updated_date=entity.updated_date,
      )


class ExampleRepositoryFactory(BaseSqlAlchemyRepositoryFactory):
   @property
   def repositories(self) -> t.Mapping[
      t.Type[Entity[t.Any]],
      BaseEntityRepository[t.Any, t.Any],
   ]:
      return {
         ExampleAuditedEntity: ExampleEntityRepository(
            ExampleEntityTable,
            self._transaction_manager,
            self._readonly_transaction_manager,
         ),
      }

   def example_entity_repository(self) -> ExampleEntityRepository:
      return t.cast(
         ExampleEntityRepository,
         repositories.repo_for(ExampleAuditedEntity),
      )


@dataclass(frozen=True)
class ExampleParams:
   field: str


@dataclass
class ExampleContext(BaseContext[ExampleParams]):
   field: str


@dataclass(frozen=True)
class ExampleOutput:
   field: str


class ExampleConditionalUnit(ConditionalUnit[ExampleContext, ExampleOutput]):
   async def condition(
           self, context: ExampleContext, **kwargs: t.Any
   ) -> bool:
      return context.field == "context field value"


class ExampleRunUnit(RunUnit[ExampleContext, ExampleOutput]):
   def __init__(
           self, *, example_entity_repository: ExampleEntityRepository
   ) -> None:
      super().__init__()
      self._example_entity_repository = example_entity_repository

   async def run(self, context: ExampleContext, **kwargs: t.Any) -> None:
      entity = ExampleAuditedEntity(
         id=ExampleEntityId(str(uuid4())), field=context.params.field
      )
      await self._example_entity_repository.add(entity)


class SuccessUnit(FinalUnit[ExampleContext, ExampleOutput]):
   async def finish(
           self, context: ExampleContext, **kwargs: t.Any
   ) -> Result[ExampleOutput]:
      return Result.ok(ExampleOutput(field="success"))


engine = create_async_engine("postgresql://postgres:postgres@db:5432/postgres")

transaction_manager = SqlAlchemyTransactionManager(engine)
readonly_transaction_manager = SqlAlchemyReadOnlyTransactionManager(engine)

repositories = ExampleRepositoryFactory(
   transaction_manager=transaction_manager,
   readonly_transaction_manager=readonly_transaction_manager,
)

work = TransactionalWorkManager(transaction_manager=transaction_manager)

flow = (
        ExampleConditionalUnit(
           on_failure=ErrorUnit(exc=Exception("example error"))
        )
        >> ExampleRunUnit(
   example_entity_repository=repositories.example_entity_repository()
)
        >> SuccessUnit()
).build()


async def main() -> None:
   params = ExampleParams(field="params field value")
   context = ExampleContext(params=params, field="context field value")
   result = await work.by(flow).do_with(context)
   result.get()

Async compatibility

This package provides robust support for both asynchronous (async) and synchronous (sync) versions of code execution, catering to diverse development needs. The package follows the convention where each module with async code has an aio/ folder in the same directory, allowing you to easily import the async version.

For example:

# Async code imports
from pyuow.aio import (
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow import (
   BaseContext,
   Result,
)
from pyuow.repository.aio.base import (
   BaseEntityRepository
)

# Sync code imports
from pyuow import (
   BaseContext,
   Result,
   ConditionalUnit,
   RunUnit,
   FinalUnit,
   ErrorUnit,
)
from pyuow.repository import (
   BaseEntityRepository
)

Same with contributing modules:

# Async code imports
from pyuow.contrib.sqlalchemy.aio.work.impl import (
   SqlAlchemyTransaction,
   SqlAlchemyReadOnlyTransactionManager,
)

# Sync code imports
from pyuow.contrib.sqlalchemy.work.impl import (
   SqlAlchemyTransaction,
   SqlAlchemyReadOnlyTransactionManager,
)

For PyUoW contributors:

For guidance on setting up a development environment and how to make a contribution to PyUow, please see CONTRIBUTING.md for more information.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyuow-0.8.0.tar.gz (18.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyuow-0.8.0-py3-none-any.whl (41.8 kB view details)

Uploaded Python 3

File details

Details for the file pyuow-0.8.0.tar.gz.

File metadata

  • Download URL: pyuow-0.8.0.tar.gz
  • Upload date:
  • Size: 18.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for pyuow-0.8.0.tar.gz
Algorithm Hash digest
SHA256 eac1560c1bfcd6fc0e57172c740af7771abd8447f5e8a80c5b6b671f2bd99756
MD5 779bab8ecb03d6f576d558cf61b1cc4c
BLAKE2b-256 0215c7147f1ca18ddcc2b0c5261749efe11f8ed10e2f7da87dd48c12b5844ce5

See more details on using hashes here.

File details

Details for the file pyuow-0.8.0-py3-none-any.whl.

File metadata

  • Download URL: pyuow-0.8.0-py3-none-any.whl
  • Upload date:
  • Size: 41.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.11.14 Linux/6.11.0-1018-azure

File hashes

Hashes for pyuow-0.8.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cc2cbde6a62f24366e69b06ff27ca62ffcf0e557bcbca611d6881be7074704f2
MD5 1b087ca356083a5e5d08336b1b0c95a1
BLAKE2b-256 3d859f14dcc4c05e1a8f7e5b45c6278bdf8c6ca176af66356449dce75b24f9c8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page