Skip to main content

Integration package for Dishka DI and FastStream framework

Project description

FastStream integration for Dishka

Downloads Package version Supported Python versions License FastStream

Though it is not required, you can use dishka-faststream integration. It features:

  • automatic REQUEST scope management using middleware
  • passing StreamMessage and ContextRepo object as a context data to providers
  • automatic injection of dependencies into message handler.

You can use auto-injection for FastStream 0.5.0 and higher. For older version you need to specify @inject manually.

Note

If you are using FastAPI plugin of FastStream you need to use both dishka integrations, but you can share the same container.

  • Call dishka_faststream.setup_dishka on faststream broker or router.
  • Call dishka.integrations.fastapi.setup_dishka on fastapi app.

Installation

Install using pip

pip install dishka-faststream

Or with uv

uv add dishka-faststream

How to use

  1. Import
from dishka_faststream import (
    FromDishka,
    inject,
    setup_dishka,
    FastStreamProvider,
)
from dishka import make_async_container, Provider, provide, Scope
  1. Create provider. You can use faststream.types.StreamMessage and faststream.ContextRepo as a factory parameter to access on REQUEST-scope
class YourProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def create_x(self, event: StreamMessage) -> X:
         ...
  1. Mark those of your handlers parameters which are to be injected with FromDishka[]
@broker.subscriber("test")
async def start(
    gateway: FromDishka[Gateway],
):
    ...

3a. (optional) decorate them using @inject if you are not using auto-injection

@broker.subscriber("test")
@inject
async def start(
    gateway: FromDishka[Gateway],
):
    ...
  1. (optional) Use FastStreamProvider() when creating container if you are going to use faststream.types.StreamMessage or faststream.ContextRepo in providers
container = make_async_container(YourProvider(), FastStreamProvider())
  1. Setup dishka integration. auto_inject=True is required unless you explicitly use @inject decorator
setup_dishka(container=container, app=app, auto_inject=True)

Or pass your own inject decorator

setup_dishka(container=container, broker=broker, auto_inject=my_inject)

FastStream - Litestar/FastAPI - dishka integration

  1. Running RabbitMQ
docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=guest \
  -e RABBITMQ_DEFAULT_PASS=guest \
  rabbitmq:management
  1. Example of usage FastStream + Litestar
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
import dishka_faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka
from dishka_faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> Litestar:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    http = Litestar(
        route_handlers=[http_handler],
        on_startup=[broker.start],
        on_shutdown=[broker.stop],
    )
    litestar_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)

Example of usage FastStream + FastAPI

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import uvicorn
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka_faststream import inject as faststream_inject


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


router = APIRouter(route_class=DishkaRoute)


@router.get("/")
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> FastAPI:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    @asynccontextmanager
    async def lifespan(app: FastAPI) -> AsyncIterator[None]:
        async with broker:
            await broker.start()
            yield

    http = FastAPI(lifespan=lifespan)
    http.include_router(router)
    fastapi_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)

Testing FastStream with dishka

Simple example:

from collections.abc import AsyncIterator

import pytest
from dishka import AsyncContainer, make_async_container
from dishka import Provider, Scope, provide
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter

router = RabbitRouter()


@router.subscriber("test-queue")
async def handler(msg: str, some_dependency: Depends[int]) -> int:
    print(f"{msg=}")
    return some_dependency


@pytest.fixture
async def broker() -> RabbitBroker:
    broker = RabbitBroker()
    broker.include_router(router)
    return broker


@pytest.fixture
def mock_provider() -> Provider:
    class MockProvider(Provider):
        @provide(scope=Scope.REQUEST)
        async def get_some_dependency(self) -> int:
            return 42

    return MockProvider()


@pytest.fixture
def container(mock_provider: Provider) -> AsyncContainer:
    return make_async_container(mock_provider)


@pytest.fixture
async def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:
    app = FastStream(broker)
    faststream_integration.setup_dishka(container, app, auto_inject=True)
    return FastStream(broker)


@pytest.fixture
async def client(app: FastStream) -> AsyncIterator[RabbitBroker]:
    async with TestRabbitBroker(app.broker) as br, TestApp(app):
        yield br


@pytest.mark.asyncio
async def test_handler(client: RabbitBroker) -> None:
    result = await client.request("hello", "test-queue")
    assert await result.decode() == 42

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dishka_faststream-0.6.0.tar.gz (99.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dishka_faststream-0.6.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file dishka_faststream-0.6.0.tar.gz.

File metadata

  • Download URL: dishka_faststream-0.6.0.tar.gz
  • Upload date:
  • Size: 99.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.9.7

File hashes

Hashes for dishka_faststream-0.6.0.tar.gz
Algorithm Hash digest
SHA256 8f9c7592b9b2b6c2b140db0c14be892d63cb8a1fdd98f18ee6551db938b91953
MD5 be24ee31300234d02fde83e389e0c2cb
BLAKE2b-256 41787f8611d1175a456925a4fee1c88260c83cbce4fc0c18cb84e41eb99e0010

See more details on using hashes here.

File details

Details for the file dishka_faststream-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for dishka_faststream-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0b8b95e31f2cfab2c592d0e964b9a664ddd37e1c39faff01ea3392a71d4bcb9e
MD5 7f37fc349c46eb5ad00e4f9724446f37
BLAKE2b-256 1a3ecc7754dab1c30d1c13654a3b7f2b9f705fd8a5371e6a62a8903a43247a29

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page