Skip to main content

Набор компонентов реализующих DQL для систем на базе explicit

Project description

Набор компонентов реализующих очередь необработанных сообщений (Dead Letter Queue).

Пример подключения

testapp/dlq/apps.py:

from django.apps import AppConfig as AppConfigBase


class AppConfig(AppConfigBase):
    name = __package__

    def ready(self):
        self._configure_dlq()

    def _configure_dlq(self):
        from testapp.core import bus
        from explicit.dlq.config import DLQConfig, configure_dlq
        # реализация репозитория специфичная для используемого слоя хранения данных
        from testapp.dlq.adapters.db import Repository
        
        # реализация обработчика сообщений, специфичного для приложения
        from testapp.dlq.services.dispatch import dispatch_message

        config = DLQConfig(
            bus=bus,
            repository=Repository(),
            message_dispatcher=dispatch_message
        )
        # регистрация репозитория и регистрация обработчиков команд
        configure_dlq(config)

testapp/dlq/services/dispatch.py:

import json
from explicit.contrib.messagebus.event_registry import Message

from testapp.core import bus
from testapp.core import event_registry

def dispatch_message(raw_message_value: bytes, topic: str):
    """Преобразует сообщение в событие и отправляет его в шину."""
    message = json.loads(raw_message_value)

    if event := event_registry.resolve(
        Message(
            topic=topic,
            type=message.get('type'),
            body=message
        )
    ):
        bus.handle(event)

testapp/entrypoints/kafka.py:

def bootstrap() -> None:
    from explicit.dlq.infrastructure.handlers import RegisterInDLQOnFailure

    from testapp.core import bus

    from testapp.dlq.services.dispatch import dispatch_message
    topics = ('test.foo.topic',)

    for raw_message in adapter.subscribe(*topics):
        with RegisterInDLQOnFailure(
            bus=bus,
            topic=raw_message.topic(),
            raw_message_value=raw_message.value(),
            raw_message_key=raw_message.key
        ):
            dispatch_message(raw_message.value(), raw_message.topic())

Готовые компоненты (contrib)

В пакете реализованы готовые к использованию компоненты:

  • Реализация абстрактного хранилища сообщений на базе Django ORM
  • REST API для работы с хранилищем сообщений. Предназначен для совместной работы с реализацией хранилища сообщений django ORM.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

explicit_python_dlq-1.1.0-py3-none-any.whl (18.8 kB view details)

Uploaded Python 3

File details

Details for the file explicit_python_dlq-1.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for explicit_python_dlq-1.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0817eb04d98756e10dc2875ae2d706f6a9b368aec45ff90b18848ee230500631
MD5 3baba6dbe529f46b0753b77773ffcd33
BLAKE2b-256 56e256723222f0f91034c9e8f81aca5fd1b3a66a58a7a5dd2e11cf5172b62986

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page