Skip to main content

Async Python DDD utilities library.

Project description

Async Python DDD utilities library

PyPI version PyPIDownloads CI

aioddd is an async Python DDD utilities library.

Installation

Use the package manager pip to install aioddd.

pip install aioddd

Documentation

Usage

from asyncio import get_event_loop
from dataclasses import dataclass
from typing import Type
from aioddd import NotFoundError, \
    Command, CommandHandler, SimpleCommandBus, \
    Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event

_products = []

class ProductStored(Event):
    @dataclass
    class Attributes:
        ref: str

    attributes: Attributes

class StoreProductCommand(Command):
    def __init__(self, ref: str):
        self.ref = ref

class StoreProductCommandHandler(CommandHandler):
    def subscribed_to(self) -> Type[Command]:
        return StoreProductCommand

    async def handle(self, command: StoreProductCommand) -> None:
        _products.append(command.ref)

class ProductNotFoundError(NotFoundError):
    _code = 'product_not_found'
    _title = 'Product not found'

class FindProductQuery(Query):
    def __init__(self, ref: str):
        self.ref = ref

class FindProductQueryHandler(QueryHandler):
    def subscribed_to(self) -> Type[Query]:
        return FindProductQuery

    async def handle(self, query: FindProductQuery) -> OptionalResponse:
        if query.ref != '123':
            raise ProductNotFoundError.create(detail={'ref': query.ref})
        return {'ref': query.ref}

async def main() -> None:
    commands_bus = SimpleCommandBus([StoreProductCommandHandler()])
    await commands_bus.dispatch(StoreProductCommand('123'))
    query_bus = SimpleQueryBus([FindProductQueryHandler()])
    response = await query_bus.ask(FindProductQuery('123'))
    print(response)


if __name__ == '__main__':
    get_event_loop().run_until_complete(main())

Requirements

  • Python >= 3.7

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioddd-1.3.7.tar.gz (10.9 kB view details)

Uploaded Source

Built Distribution

aioddd-1.3.7-py3-none-any.whl (10.9 kB view details)

Uploaded Python 3

File details

Details for the file aioddd-1.3.7.tar.gz.

File metadata

  • Download URL: aioddd-1.3.7.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.6

File hashes

Hashes for aioddd-1.3.7.tar.gz
Algorithm Hash digest
SHA256 451ef829be083edffb127dcef3bac10839f71bd7a594d764d9eb1b6abb7d4eca
MD5 748b827beafea2a05a55122cb1fd5152
BLAKE2b-256 879ed270ae9e06b0228c1cc0334f21e6f879102a2edefa1fc335686d81a6de57

See more details on using hashes here.

Provenance

File details

Details for the file aioddd-1.3.7-py3-none-any.whl.

File metadata

  • Download URL: aioddd-1.3.7-py3-none-any.whl
  • Upload date:
  • Size: 10.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.6

File hashes

Hashes for aioddd-1.3.7-py3-none-any.whl
Algorithm Hash digest
SHA256 600025175a693d4a5a6ac9d6c1c32488dd496cf8c1ec64217b481e2a84819fd1
MD5 dcc72d4644942a239fa342672087931c
BLAKE2b-256 b2de0b1b0f8b38da95a7791e765a344f87a22453a667c46dc0afe4ab63e0f920

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page