Integration package for Dishka DI and FastStream framework
Project description
FastStream integration for Dishka
Though it is not required, you can use dishka-faststream integration. It features:
- automatic REQUEST scope management using middleware
- passing
StreamMessageandContextRepoobject as a context data to providers - automatic injection of dependencies into message handler.
You can use auto-injection for FastStream 0.5.0 and higher. For older version you need to specify @inject manually.
Note
If you are using FastAPI plugin of FastStream you need to use both dishka integrations, but you can share the same container.
- Call
dishka_faststream.setup_dishkaon faststream broker or router.- Call
dishka.integrations.fastapi.setup_dishkaon fastapi app.
Installation
Install using pip
pip install dishka-faststream
Or with uv
uv add dishka-faststream
How to use
- Import
from dishka_faststream import (
FromDishka,
inject,
setup_dishka,
FastStreamProvider,
)
from dishka import make_async_container, Provider, provide, Scope
- Create provider. You can use
faststream.types.StreamMessageandfaststream.ContextRepoas a factory parameter to access on REQUEST-scope
class YourProvider(Provider):
@provide(scope=Scope.REQUEST)
def create_x(self, event: StreamMessage) -> X:
...
- Mark those of your handlers parameters which are to be injected with
FromDishka[]
@broker.subscriber("test")
async def start(
gateway: FromDishka[Gateway],
):
...
3a. (optional) decorate them using @inject if you are not using auto-injection
@broker.subscriber("test")
@inject
async def start(
gateway: FromDishka[Gateway],
):
...
- (optional) Use
FastStreamProvider()when creating container if you are going to usefaststream.types.StreamMessageorfaststream.ContextRepoin providers
container = make_async_container(YourProvider(), FastStreamProvider())
- Setup
dishkaintegration.auto_inject=Trueis required unless you explicitly use@injectdecorator
setup_dishka(container=container, app=app, auto_inject=True)
Or pass your own inject decorator
setup_dishka(container=container, broker=broker, auto_inject=my_inject)
FastStream - Litestar/FastAPI - dishka integration
- Running RabbitMQ
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:management
- Example of usage FastStream + Litestar
import uvicorn
from dishka import Provider, Scope, provide
from dishka import make_async_container
import dishka_faststream as faststream_integration
from dishka.integrations import litestar as litestar_integration
from dishka.integrations.base import FromDishka
from dishka_faststream import inject as faststream_inject
from dishka.integrations.litestar import inject as litestar_inject
from faststream.rabbit import RabbitBroker, RabbitRouter
from litestar import Litestar, route, HttpMethod
class SomeDependency:
async def do_something(self) -> int:
print("Hello world")
return 42
class SomeProvider(Provider):
@provide(scope=Scope.REQUEST)
def some_dependency(self) -> SomeDependency:
return SomeDependency()
@route(http_method=HttpMethod.GET, path="/", status_code=200)
@litestar_inject
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
amqp_router = RabbitRouter()
@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
def create_app() -> Litestar:
container = make_async_container(SomeProvider())
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)
http = Litestar(
route_handlers=[http_handler],
on_startup=[broker.start],
on_shutdown=[broker.stop],
)
litestar_integration.setup_dishka(container, http)
return http
if __name__ == "__main__":
uvicorn.run(create_app(), host="0.0.0.0", port=8000)
Example of usage FastStream + FastAPI
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import uvicorn
from fastapi import APIRouter, FastAPI
from faststream.rabbit import RabbitBroker, RabbitRouter
from dishka import Provider, Scope, make_async_container, provide
from dishka.integrations import fastapi as fastapi_integration
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from dishka_faststream import inject as faststream_inject
class SomeDependency:
async def do_something(self) -> int:
print("Hello world")
return 42
class SomeProvider(Provider):
@provide(scope=Scope.REQUEST)
def some_dependency(self) -> SomeDependency:
return SomeDependency()
router = APIRouter(route_class=DishkaRoute)
@router.get("/")
async def http_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
amqp_router = RabbitRouter()
@amqp_router.subscriber("test-queue")
@faststream_inject
async def amqp_handler(some_dependency: FromDishka[SomeDependency]) -> None:
await some_dependency.do_something()
def create_app() -> FastAPI:
container = make_async_container(SomeProvider())
broker = RabbitBroker(url="amqp://guest:guest@localhost:5672/")
broker.include_router(amqp_router)
faststream_integration.setup_dishka(container, broker=broker)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
async with broker:
await broker.start()
yield
http = FastAPI(lifespan=lifespan)
http.include_router(router)
fastapi_integration.setup_dishka(container, http)
return http
if __name__ == "__main__":
uvicorn.run(create_app(), host="0.0.0.0", port=8000)
Testing FastStream with dishka
Simple example:
from collections.abc import AsyncIterator
import pytest
from dishka import AsyncContainer, make_async_container
from dishka import Provider, Scope, provide
import dishka_faststream as faststream_integration
from dishka.integrations.base import FromDishka as Depends
from faststream import FastStream, TestApp
from faststream.rabbit import RabbitBroker, TestRabbitBroker, RabbitRouter
router = RabbitRouter()
@router.subscriber("test-queue")
async def handler(msg: str, some_dependency: Depends[int]) -> int:
print(f"{msg=}")
return some_dependency
@pytest.fixture
async def broker() -> RabbitBroker:
broker = RabbitBroker()
broker.include_router(router)
return broker
@pytest.fixture
def mock_provider() -> Provider:
class MockProvider(Provider):
@provide(scope=Scope.REQUEST)
async def get_some_dependency(self) -> int:
return 42
return MockProvider()
@pytest.fixture
def container(mock_provider: Provider) -> AsyncContainer:
return make_async_container(mock_provider)
@pytest.fixture
async def app(broker: RabbitBroker, container: AsyncContainer) -> FastStream:
app = FastStream(broker)
faststream_integration.setup_dishka(container, app, auto_inject=True)
return FastStream(broker)
@pytest.fixture
async def client(app: FastStream) -> AsyncIterator[RabbitBroker]:
async with TestRabbitBroker(app.broker) as br, TestApp(app):
yield br
@pytest.mark.asyncio
async def test_handler(client: RabbitBroker) -> None:
result = await client.request("hello", "test-queue")
assert await result.decode() == 42
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dishka_faststream-0.6.0.tar.gz.
File metadata
- Download URL: dishka_faststream-0.6.0.tar.gz
- Upload date:
- Size: 99.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8f9c7592b9b2b6c2b140db0c14be892d63cb8a1fdd98f18ee6551db938b91953
|
|
| MD5 |
be24ee31300234d02fde83e389e0c2cb
|
|
| BLAKE2b-256 |
41787f8611d1175a456925a4fee1c88260c83cbce4fc0c18cb84e41eb99e0010
|
File details
Details for the file dishka_faststream-0.6.0-py3-none-any.whl.
File metadata
- Download URL: dishka_faststream-0.6.0-py3-none-any.whl
- Upload date:
- Size: 10.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.9.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b8b95e31f2cfab2c592d0e964b9a664ddd37e1c39faff01ea3392a71d4bcb9e
|
|
| MD5 |
7f37fc349c46eb5ad00e4f9724446f37
|
|
| BLAKE2b-256 |
1a3ecc7754dab1c30d1c13654a3b7f2b9f705fd8a5371e6a62a8903a43247a29
|