Skip to main content

Async Python DDD utilities library.

Project description

Async Python DDD utilities library

aioddd is an async Python DDD utilities library.

Installation

Use the package manager pip to install aiocli.

pip install aioddd

Usage

from asyncio import get_event_loop
from dataclasses import dataclass
from typing import Type
from aioddd import NotFoundError, \
    Command, CommandHandler, SimpleCommandBus, \
    Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event

_products = []

class ProductStored(Event):
    @dataclass
    class Attributes:
        ref: str

    attributes: Attributes

class StoreProductCommand(Command):
    def __init__(self, ref: str):
        self.ref = ref

class StoreProductCommandHandler(CommandHandler):
    def subscribed_to(self) -> Type[Command]:
        return StoreProductCommand

    async def handle(self, command: StoreProductCommand) -> None:
        _products.append(command.ref)

class ProductNotFoundError(NotFoundError):
    _code = 'product_not_found'
    _title = 'Product not found'

class FindProductQuery(Query):
    def __init__(self, ref: str):
        self.ref = ref

class FindProductQueryHandler(QueryHandler):
    def subscribed_to(self) -> Type[Query]:
        return FindProductQuery

    async def handle(self, query: FindProductQuery) -> OptionalResponse:
        if query.ref != '123':
            raise ProductNotFoundError.create(detail={'ref': query.ref})
        return {'ref': query.ref}

async def main() -> None:
    commands_bus = SimpleCommandBus([StoreProductCommandHandler()])
    await commands_bus.dispatch(StoreProductCommand('123'))
    query_bus = SimpleQueryBus([FindProductQueryHandler()])
    response = await query_bus.ask(FindProductQuery('123'))
    print(response)


if __name__ == '__main__':
    get_event_loop().run_until_complete(main())

Requirements

  • Python >= 3.6

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioddd-1.2.1.tar.gz (11.7 kB view details)

Uploaded Source

Built Distribution

aioddd-1.2.1-py3-none-any.whl (16.4 kB view details)

Uploaded Python 3

File details

Details for the file aioddd-1.2.1.tar.gz.

File metadata

  • Download URL: aioddd-1.2.1.tar.gz
  • Upload date:
  • Size: 11.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.51.0 CPython/3.6.12

File hashes

Hashes for aioddd-1.2.1.tar.gz
Algorithm Hash digest
SHA256 e22b853efe77b849d49c32242105cbd0c41ec1eb10d107010106b02a9afe4eb3
MD5 914a9377c458768f95a645b0d6035fe5
BLAKE2b-256 123f0714b940e885afd92296d60c9ded7ac4c0472fc9d6195ad695f631371be2

See more details on using hashes here.

Provenance

File details

Details for the file aioddd-1.2.1-py3-none-any.whl.

File metadata

  • Download URL: aioddd-1.2.1-py3-none-any.whl
  • Upload date:
  • Size: 16.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.25.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.51.0 CPython/3.6.12

File hashes

Hashes for aioddd-1.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5582f5d4c0cd2a33ad6bb82483fcacec77924d850db4a7a2200149a8d453d554
MD5 6887d9274013fbeb9c193a265120f2ed
BLAKE2b-256 d58555f115f28df997268a2390be1e4d125899356f60b9bcc22492b289b8ead2

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page