Skip to main content

Async Python DDD utilities library.

Project description

Async Python DDD utilities library

PyPI version PyPIDownloads CI

aioddd is an async Python DDD utilities library.

Installation

Use the package manager pip to install aioddd.

pip install aioddd

Documentation

Usage

from asyncio import get_event_loop
from dataclasses import dataclass
from typing import Type
from aioddd import NotFoundError, \
    Command, CommandHandler, SimpleCommandBus, \
    Query, QueryHandler, OptionalResponse, SimpleQueryBus, Event

_products = []

class ProductStored(Event):
    @dataclass
    class Attributes:
        ref: str

    attributes: Attributes

class StoreProductCommand(Command):
    def __init__(self, ref: str):
        self.ref = ref

class StoreProductCommandHandler(CommandHandler):
    def subscribed_to(self) -> Type[Command]:
        return StoreProductCommand

    async def handle(self, command: StoreProductCommand) -> None:
        _products.append(command.ref)

class ProductNotFoundError(NotFoundError):
    _code = 'product_not_found'
    _title = 'Product not found'

class FindProductQuery(Query):
    def __init__(self, ref: str):
        self.ref = ref

class FindProductQueryHandler(QueryHandler):
    def subscribed_to(self) -> Type[Query]:
        return FindProductQuery

    async def handle(self, query: FindProductQuery) -> OptionalResponse:
        if query.ref != '123':
            raise ProductNotFoundError.create(detail={'ref': query.ref})
        return {'ref': query.ref}

async def main() -> None:
    commands_bus = SimpleCommandBus([StoreProductCommandHandler()])
    await commands_bus.dispatch(StoreProductCommand('123'))
    query_bus = SimpleQueryBus([FindProductQueryHandler()])
    response = await query_bus.ask(FindProductQuery('123'))
    print(response)


if __name__ == '__main__':
    get_event_loop().run_until_complete(main())

Requirements

  • Python >= 3.7

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate.

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioddd-1.3.8.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

aioddd-1.3.8-py3-none-any.whl (12.4 kB view details)

Uploaded Python 3

File details

Details for the file aioddd-1.3.8.tar.gz.

File metadata

  • Download URL: aioddd-1.3.8.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.7

File hashes

Hashes for aioddd-1.3.8.tar.gz
Algorithm Hash digest
SHA256 3556c694868b22d4db5ea36eb7c07e6b44a1ed5b2e6b87fd6fdb265c75f20a8e
MD5 2a52f499c6841658ae342cad7c92a4a5
BLAKE2b-256 fc8eeee68ad9f8d0e53eaab70b7efc207fb8370fa42b34916f242732438b1ec3

See more details on using hashes here.

Provenance

File details

Details for the file aioddd-1.3.8-py3-none-any.whl.

File metadata

  • Download URL: aioddd-1.3.8-py3-none-any.whl
  • Upload date:
  • Size: 12.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.10.7

File hashes

Hashes for aioddd-1.3.8-py3-none-any.whl
Algorithm Hash digest
SHA256 ecd0219f71976c7c715713a0019cfe4185b3785a222acb464b997cd7c8dfd7c8
MD5 431ad70d341ab65b9a7332e2679cf19b
BLAKE2b-256 c230853ebbe70949e6a53e2c6d6214771918b855653177f3890a9f38f041b13d

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page