Skip to main content

Integration package for Dishka DI and FastStream framework

Project description

FastStream

Though it is not required, you can use dishka-faststream integration. It features:

  • automatic REQUEST scope management using middleware
  • passing StreamMessage and ContextRepo object as a context data to providers
  • automatic injection of dependencies into message handler.

You can use auto-injection for FastStream 0.5.0 and higher. For older version you need to specify @inject manually.

Note

If you are using FastAPI plugin of FastStream you need to use both dishka integrations, but you can share the same container.

  • Call dishka.integrations.faststream.setup_dishka on faststream broker or router.
  • Call dishka.integrations.fastapi.setup_dishka on fastapi app.

How to use

  1. Import
from dishka.integrations.faststream import (
    FromDishka,
    inject,
    setup_dishka,
    FastStreamProvider,
)
from dishka import make_async_container, Provider, provide, Scope
  1. Create provider. You can use faststream.types.StreamMessage and faststream.ContextRepo as a factory parameter to access on REQUEST-scope
class YourProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def create_x(self, event: StreamMessage) -> X:
         ...
  1. Mark those of your handlers parameters which are to be injected with FromDishka[]
@broker.subscriber("test")
async def start(
    gateway: FromDishka[Gateway],
):
    ...

3a. (optional) decorate them using @inject if you are not using auto-injection

@broker.subscriber("test")
@inject
async def start(
    gateway: FromDishka[Gateway],
):
    ...
  1. (optional) Use FastStreamProvider() when creating container if you are going to use faststream.types.StreamMessage or faststream.ContextRepo in providers
container = make_async_container(YourProvider(), FastStreamProvider())
  1. Setup dishka integration. auto_inject=True is required unless you explicitly use @inject decorator
setup_dishka(container=container, app=app, auto_inject=True)

Or pass your own inject decorator

setup_dishka(container=container, broker=broker, auto_inject=my_inject)

FastStream - Litestar/FastAPI - dishka integration

  1. Running RabbitMQ
docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=guest \
  -e RABBITMQ_DEFAULT_PASS=guest \
  rabbitmq:management
  1. Example of usage FastStream + Litestar
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
from dishka.integrations import faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> Litestar:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    http = Litestar(
        route_handlers=[http_handler],
        on_startup=[broker.start],
        on_shutdown=[broker.stop],
    )
    litestar_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)

Example of usage FastStream + FastAPI

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import uvicorn
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
from dishka.integrations import faststream as faststream_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka.integrations.faststream import inject as faststream_inject


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


router = APIRouter(route_class=DishkaRoute)


@router.get("/")
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> FastAPI:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    @asynccontextmanager
    async def lifespan(app: FastAPI) -> AsyncIterator[None]:
        async with broker:
            await broker.start()
            yield

    http = FastAPI(lifespan=lifespan)
    http.include_router(router)
    fastapi_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)

Testing FastStream with dishka

Simple example:

from collections.abc import AsyncIterator

import pytest
from dishka import AsyncContainer, make_async_container
from dishka import Provider, Scope, provide
from dishka.integrations import faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter

router = RabbitRouter()


@router.subscriber("test-queue")
async def handler(msg: str, some_dependency: Depends[int]) -> int:
    print(f"{msg=}")
    return some_dependency


@pytest.fixture
async def broker() -> RabbitBroker:
    broker = RabbitBroker()
    broker.include_router(router)
    return broker


@pytest.fixture
def mock_provider() -> Provider:
    class MockProvider(Provider):
        @provide(scope=Scope.REQUEST)
        async def get_some_dependency(self) -> int:
            return 42

    return MockProvider()


@pytest.fixture
def container(mock_provider: Provider) -> AsyncContainer:
    return make_async_container(mock_provider)


@pytest.fixture
async def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:
    app = FastStream(broker)
    faststream_integration.setup_dishka(container, app, auto_inject=True)
    return FastStream(broker)


@pytest.fixture
async def client(app: FastStream) -> AsyncIterator[RabbitBroker]:
    async with TestRabbitBroker(app.broker) as br, TestApp(app):
        yield br


@pytest.mark.asyncio
async def test_handler(client: RabbitBroker) -> None:
    result = await client.request("hello", "test-queue")
    assert await result.decode() == 42

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dishka_faststream-0.0.2.tar.gz (3.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dishka_faststream-0.0.2-py3-none-any.whl (3.4 kB view details)

Uploaded Python 3

File details

Details for the file dishka_faststream-0.0.2.tar.gz.

File metadata

  • Download URL: dishka_faststream-0.0.2.tar.gz
  • Upload date:
  • Size: 3.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for dishka_faststream-0.0.2.tar.gz
Algorithm Hash digest
SHA256 bed0b8366b8c7bdf341410f01d6ec24449dce837ebe18b5b35c166fb7cc8d27a
MD5 08fe73593ba393506a784fce29959874
BLAKE2b-256 e2a54b7a78f7a623c2bc03c264cb39afdd947bc2071e56d1db439df1c29c1315

See more details on using hashes here.

Provenance

The following attestation bundles were made for dishka_faststream-0.0.2.tar.gz:

Publisher: release_pypi.yaml on faststream-community/dishka-faststream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file dishka_faststream-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for dishka_faststream-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 ac90a69b6837b0b83ca4f38dcf8749c0a33ed6f2aac8703ea2edaa0529b17729
MD5 1502866837ee36ad4f7a6c6ad423942e
BLAKE2b-256 55690294e4e93c931b86de9115245c8482561faee2fee383418841b073b959a2

See more details on using hashes here.

Provenance

The following attestation bundles were made for dishka_faststream-0.0.2-py3-none-any.whl:

Publisher: release_pypi.yaml on faststream-community/dishka-faststream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page