Skip to main content

Framework for building FastAPI-style RabbitMQ apps

Project description

mqkit

Introduction

mqkit is a Python framework for creating apps that integrate with message brokers like RabbitMQ. It provides a FastAPI-style interface to accelerate the development of queue-based services.

Documentation

Complete documentation will be coming soon.

Usage

Multi-thread, single process example

from typing import Dict

from mqkit import App, Attributes, create_engine

app: App = App()


@app.on_start
def on_start() -> None:
    print("App is starting")


@app.queue("my_queue", forward_to="other_queue")
def my_queue_handler(message: Dict, attributes: Attributes) -> Dict:
    print(f"Received {message} with attributes {attributes}")
    return {"hello": "other queue!"}


@app.queue("other_queue")
def other_queue_handler(message: Dict, attributes: Attributes) -> None:
    print(f"Other queue received {message}")


app.run(create_engine("amqp://user:password@your-server:5672/"))

Single-thread, single process example

The blocking single-threaded @consume decorator is intended for situations where orchestration is handled by an external provider. (Think Kubernetes or Docker)

from typing import Dict

from mqkit import Attributes, consume


@consume("my_queue")
def handler(message: Dict, attributes: Attributes) -> None:
    print(f"Got message {message}")


# NOTE: The engine URL can be inferred here based on the MQKIT_ENGINE_URL
# environment variable. If you don't want to use the environment variable,
# pass an Engine instance as the `engine` parameter to @consume

Parameter models

mqkit also supports automatic serialization and validation of queue messages using Pydantic BaseModel classes. The appropriate models to use are inferred based on the annotations of the handler method:

from datetime import datetime

from mqkit import Attributes, consume
from pydantic import BaseModel


class ChatMessage(BaseModel):
    id: int
    user: str
    content: str
    sent_time: datetime


@consume("chat_messages")
def handler(message: ChatMessage, attributes: Attributes) -> None:
    print(f"Got chat message {message!r} with attributes {attributes}")


"""
Invalid messages raise exceptions and don't call the handler:

'invalid message' -> DecodeError
{}                -> ValidationError
{"id": 123}       -> ValidationError

Valid messages will result in the handler being called:
{
    "id": 123,
    "user": "will",
    "content": "hello!",
    "sent_time": "2026-03-09T11:51:09"
} -> Outputs the message with print()
"""

Note that a BaseModel annotation may also be added to the return parameter which will enforce that return values are of that type.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mqkit-0.1.2.tar.gz (43.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mqkit-0.1.2-py3-none-any.whl (73.7 kB view details)

Uploaded Python 3

File details

Details for the file mqkit-0.1.2.tar.gz.

File metadata

  • Download URL: mqkit-0.1.2.tar.gz
  • Upload date:
  • Size: 43.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.2 Linux/6.18.9-arch1-2

File hashes

Hashes for mqkit-0.1.2.tar.gz
Algorithm Hash digest
SHA256 f2a404f4ecf5acee3c08e44b656b46fce3384bf68ba920e1b4b6f499edcb9f25
MD5 77c4ddbc3cf0a7b6f96b0b5fd483cc0f
BLAKE2b-256 534c67424ff3925079db6b4d2322fc08843a71482532f8e61e8ead2e09506127

See more details on using hashes here.

File details

Details for the file mqkit-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: mqkit-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 73.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.2 Linux/6.18.9-arch1-2

File hashes

Hashes for mqkit-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6e2f9adb741d15088320c932c4a19184a1fb17283804182a85511478aaa9d011
MD5 a6de13a326b91c71ade9cf8fe854208b
BLAKE2b-256 f48c1d5f5fecd37aa7030d7a6252cfcbc302e62b064830bec91b889138fe3813

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page