Skip to main content

Framework for building FastAPI-style RabbitMQ apps

Project description

mqkit

Introduction

mqkit is a Python framework for creating apps that integrate with message brokers like RabbitMQ. It provides a FastAPI-style interface to accelerate the development of queue-based services.

Documentation

Complete documentation will be coming soon.

Usage

Multi-thread, single process example

from typing import Dict

from mqkit import App, Attributes, create_engine

app: App = App()


@app.on_start
def on_start() -> None:
    print("App is starting")


@app.queue("my_queue", forward_to="other_queue")
def my_queue_handler(message: Dict, attributes: Attributes) -> Dict:
    print(f"Received {message} with attributes {attributes}")
    return {"hello": "other queue!"}


@app.queue("other_queue")
def other_queue_handler(message: Dict, attributes: Attributes) -> None:
    print(f"Other queue received {message}")


app.run(create_engine("amqp://user:password@your-server:5672/"))

Single-thread, single process example

The blocking single-threaded @consume decorator is intended for situations where orchestration is handled by an external provider. (Think Kubernetes or Docker)

from typing import Dict

from mqkit import Attributes, consume


@consume("my_queue")
def handler(message: Dict, attributes: Attributes) -> None:
    print(f"Got message {message}")


# NOTE: The engine URL can be inferred here based on the MQKIT_ENGINE_URL
# environment variable. If you don't want to use the environment variable,
# pass an Engine instance as the `engine` parameter to @consume

Parameter models

mqkit also supports automatic serialization and validation of queue messages using Pydantic BaseModel classes. The appropriate models to use are inferred based on the annotations of the handler method:

from datetime import datetime

from mqkit import Attributes, consume
from pydantic import BaseModel


class ChatMessage(BaseModel):
    id: int
    user: str
    content: str
    sent_time: datetime


@consume("chat_messages")
def handler(message: ChatMessage, attributes: Attributes) -> None:
    print(f"Got chat message {message!r} with attributes {attributes}")


"""
Invalid messages raise exceptions and don't call the handler:

'invalid message' -> DecodeError
{}                -> ValidationError
{"id": 123}       -> ValidationError

Valid messages will result in the handler being called:
{
    "id": 123,
    "user": "will",
    "content": "hello!",
    "sent_time": "2026-03-09T11:51:09"
} -> Outputs the message with print()
"""

Note that a BaseModel annotation may also be added to the return parameter which will enforce that return values are of that type.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

mqkit-0.1.1.tar.gz (43.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

mqkit-0.1.1-py3-none-any.whl (73.5 kB view details)

Uploaded Python 3

File details

Details for the file mqkit-0.1.1.tar.gz.

File metadata

  • Download URL: mqkit-0.1.1.tar.gz
  • Upload date:
  • Size: 43.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.2 Linux/6.18.9-arch1-2

File hashes

Hashes for mqkit-0.1.1.tar.gz
Algorithm Hash digest
SHA256 282046545c0e7692e2c0d0d7d3132a0fbea29d0b4d65d0089ecb7093c6a1553d
MD5 4ee9fbc5b2596e21a2aac8be7dcd6ef5
BLAKE2b-256 40c6e2ade0f208108dd010097b4ab23a39d3635708e1b1c0d77359bc5e10c2ca

See more details on using hashes here.

File details

Details for the file mqkit-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: mqkit-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 73.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.3.2 CPython/3.14.2 Linux/6.18.9-arch1-2

File hashes

Hashes for mqkit-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5aaf43d1e1b389b17316a5703fd4c2caf511fd46fdbea5f59f97293db4ceb38a
MD5 4a610d1eae25f00ae399136c0850528b
BLAKE2b-256 2cce2f4093d75ee61de955d9e7bdbf5f3b3cb068af887b42d8a509502359c892

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page