Skip to main content

Integration package for Dishka DI and FastStream framework

Project description

FastStream integration for Dishka

Downloads Package version Supported Python versions License FastStream

Though it is not required, you can use dishka-faststream integration. It features:

  • automatic REQUEST scope management using middleware
  • passing StreamMessage and ContextRepo object as a context data to providers
  • automatic injection of dependencies into message handler.

You can use auto-injection for FastStream 0.5.0 and higher. For older version you need to specify @inject manually.

Note

If you are using FastAPI plugin of FastStream you need to use both dishka integrations, but you can share the same container.

  • Call dishka_faststream.setup_dishka on faststream broker or router.
  • Call dishka.integrations.fastapi.setup_dishka on fastapi app.

Installation

Install using pip

pip install dishka-faststream

Or with uv

uv add dishka-faststream

How to use

  1. Import
from dishka_faststream import (
    FromDishka,
    inject,
    setup_dishka,
    FastStreamProvider,
)
from dishka import make_async_container, Provider, provide, Scope
  1. Create provider. You can use faststream.types.StreamMessage and faststream.ContextRepo as a factory parameter to access on REQUEST-scope
class YourProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def create_x(self, event: StreamMessage) -> X:
         ...
  1. Mark those of your handlers parameters which are to be injected with FromDishka[]
@broker.subscriber("test")
async def start(
    gateway: FromDishka[Gateway],
):
    ...

3a. (optional) decorate them using @inject if you are not using auto-injection

@broker.subscriber("test")
@inject
async def start(
    gateway: FromDishka[Gateway],
):
    ...
  1. (optional) Use FastStreamProvider() when creating container if you are going to use faststream.types.StreamMessage or faststream.ContextRepo in providers
container = make_async_container(YourProvider(), FastStreamProvider())
  1. Setup dishka integration. auto_inject=True is required unless you explicitly use @inject decorator
setup_dishka(container=container, app=app, auto_inject=True)

Or pass your own inject decorator

setup_dishka(container=container, broker=broker, auto_inject=my_inject)

FastStream - Litestar/FastAPI - dishka integration

  1. Running RabbitMQ
docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=guest \
  -e RABBITMQ_DEFAULT_PASS=guest \
  rabbitmq:management
  1. Example of usage FastStream + Litestar
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
import dishka_faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka
from dishka_faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> Litestar:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    http = Litestar(
        route_handlers=[http_handler],
        on_startup=[broker.start],
        on_shutdown=[broker.stop],
    )
    litestar_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)

Example of usage FastStream + FastAPI

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

import uvicorn
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka_faststream import inject as faststream_inject


class SomeDependency:
    async def do_something(self) -> int:
        print("Hello world")
        return 42


class SomeProvider(Provider):
    @provide(scope=Scope.REQUEST)
    def some_dependency(self) -> SomeDependency:
        return SomeDependency()


router = APIRouter(route_class=DishkaRoute)


@router.get("/")
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


amqp_router = RabbitRouter()


@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
    await some_dependency.do_something()


def create_app() -> FastAPI:
    container = make_async_container(SomeProvider())

    broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
    broker.include_router(amqp_router)
    faststream_integration.setup_dishka(container, broker=broker)

    @asynccontextmanager
    async def lifespan(app: FastAPI) -> AsyncIterator[None]:
        async with broker:
            await broker.start()
            yield

    http = FastAPI(lifespan=lifespan)
    http.include_router(router)
    fastapi_integration.setup_dishka(container, http)
    return http


if __name__ == "__main__":
    uvicorn.run(create_app(), host="0.0.0.0", port=8000)

Testing FastStream with dishka

Simple example:

from collections.abc import AsyncIterator

import pytest
from dishka import AsyncContainer, make_async_container
from dishka import Provider, Scope, provide
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter

router = RabbitRouter()


@router.subscriber("test-queue")
async def handler(msg: str, some_dependency: Depends[int]) -> int:
    print(f"{msg=}")
    return some_dependency


@pytest.fixture
async def broker() -> RabbitBroker:
    broker = RabbitBroker()
    broker.include_router(router)
    return broker


@pytest.fixture
def mock_provider() -> Provider:
    class MockProvider(Provider):
        @provide(scope=Scope.REQUEST)
        async def get_some_dependency(self) -> int:
            return 42

    return MockProvider()


@pytest.fixture
def container(mock_provider: Provider) -> AsyncContainer:
    return make_async_container(mock_provider)


@pytest.fixture
async def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:
    app = FastStream(broker)
    faststream_integration.setup_dishka(container, app, auto_inject=True)
    return FastStream(broker)


@pytest.fixture
async def client(app: FastStream) -> AsyncIterator[RabbitBroker]:
    async with TestRabbitBroker(app.broker) as br, TestApp(app):
        yield br


@pytest.mark.asyncio
async def test_handler(client: RabbitBroker) -> None:
    result = await client.request("hello", "test-queue")
    assert await result.decode() == 42

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dishka_faststream-0.7.0.tar.gz (118.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dishka_faststream-0.7.0-py3-none-any.whl (10.6 kB view details)

Uploaded Python 3

File details

Details for the file dishka_faststream-0.7.0.tar.gz.

File metadata

  • Download URL: dishka_faststream-0.7.0.tar.gz
  • Upload date:
  • Size: 118.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dishka_faststream-0.7.0.tar.gz
Algorithm Hash digest
SHA256 b8c6e95bf30f32fc67b3289697cee51c3047778afe195aa9a1062d21e64bfb7b
MD5 15d92e323a520c6250d34e1ccc05af6f
BLAKE2b-256 1c32c8df3791072a5ea11abaa8e142536fe8cd7745f3fe25e4be4fdf3ac112f5

See more details on using hashes here.

File details

Details for the file dishka_faststream-0.7.0-py3-none-any.whl.

File metadata

  • Download URL: dishka_faststream-0.7.0-py3-none-any.whl
  • Upload date:
  • Size: 10.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.19 {"installer":{"name":"uv","version":"0.11.19","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for dishka_faststream-0.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6f68ce1520b9f15478c8e8407e0d36cdcf1261d237a81e0090fb34ab4775e67e
MD5 eb44e600a1c2e07eb130b4f45827df83
BLAKE2b-256 0e9fb1918e3ee6814f1b94489ff367b9178e2730a3d23370c24f2f2bce5886e5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page