Framework for building FastAPI-style RabbitMQ apps
Project description
mqkit
Introduction
mqkit is a Python framework for creating apps that integrate with message brokers
like RabbitMQ. It provides a FastAPI-style interface to accelerate the development
of queue-based services.
Documentation
Complete documentation will be coming soon.
Usage
Multi-thread, single process example
from typing import Dict
from mqkit import App, Attributes, create_engine
app: App = App()
@app.on_start
def on_start() -> None:
print("App is starting")
@app.queue("my_queue", forward_to="other_queue")
def my_queue_handler(message: Dict, attributes: Attributes) -> Dict:
print(f"Received {message} with attributes {attributes}")
return {"hello": "other queue!"}
@app.queue("other_queue")
def other_queue_handler(message: Dict, attributes: Attributes) -> None:
print(f"Other queue received {message}")
app.run(create_engine("amqp://user:password@your-server:5672/"))
Single-thread, single process example
The blocking single-threaded @consume decorator is intended for situations where
orchestration is handled by an external provider. (Think Kubernetes or Docker)
from typing import Dict
from mqkit import Attributes, consume
@consume("my_queue")
def handler(message: Dict, attributes: Attributes) -> None:
print(f"Got message {message}")
# NOTE: The engine URL can be inferred here based on the MQKIT_ENGINE_URL
# environment variable. If you don't want to use the environment variable,
# pass an Engine instance as the `engine` parameter to @consume
Parameter models
mqkit also supports automatic serialization and validation of queue messages using
Pydantic BaseModel classes. The appropriate models to use are inferred based on
the annotations of the handler method:
from datetime import datetime
from mqkit import Attributes, consume
from pydantic import BaseModel
class ChatMessage(BaseModel):
id: int
user: str
content: str
sent_time: datetime
@consume("chat_messages")
def handler(message: ChatMessage, attributes: Attributes) -> None:
print(f"Got chat message {message!r} with attributes {attributes}")
"""
Invalid messages raise exceptions and don't call the handler:
'invalid message' -> DecodeError
{} -> ValidationError
{"id": 123} -> ValidationError
Valid messages will result in the handler being called:
{
"id": 123,
"user": "will",
"content": "hello!",
"sent_time": "2026-03-09T11:51:09"
} -> Outputs the message with print()
"""
Note that a BaseModel annotation may also be added to the return
parameter which will enforce that return values are of that type.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file mqkit-0.1.6.tar.gz.
File metadata
- Download URL: mqkit-0.1.6.tar.gz
- Upload date:
- Size: 44.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.14.2 Linux/6.18.9-arch1-2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e0728aceefbe4821f829d4ad902a7ab7f981e39fe065bf0d8c397a04fdc8ee45
|
|
| MD5 |
52f9c62ab30cc656b49670bf1a8d37eb
|
|
| BLAKE2b-256 |
0b22a9ef7f5843ee039d5fa5898ea1d92d9963c85c0325880cd94d4e8acdd390
|
File details
Details for the file mqkit-0.1.6-py3-none-any.whl.
File metadata
- Download URL: mqkit-0.1.6-py3-none-any.whl
- Upload date:
- Size: 74.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.14.2 Linux/6.18.9-arch1-2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
90480119499f80ba8e75dc577c56580fd2285438d5cf47dd9c44c7f948321543
|
|
| MD5 |
9cea4ce93ef8085fe85acda7601cd4c9
|
|
| BLAKE2b-256 |
6a74722eb09847990677c3adb5fd9c3988907332fd41cbc53d1771df60a06fc7
|