Skip to main content

Rammearkitektur AMQP library (aio_pika wrapper)

Project description

Rammearkitektur AMQP

Rammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.

It is implemented as a thin wrapper around aio_pika, with a generic and a MO specific AMQPSystem abstract, the MO abstraction being implementing using a thin wrapper around the generic abstraction.

Usage

Generic

Receiving:

import asyncio

from ramqp import AMQPSystem
from ramqp import Router
from ramqp.config import AMQPConnectionSettings
from ramqp.depends import RoutingKey

router = Router()


# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("my.routing.key")
@router.register("my.other.routing.key")
async def callback_function(routing_key: RoutingKey) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with AMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())

Sending:

from ramqp import AMQPSystem

with AMQPSystem(...) as amqp_system:
    await amqp_system.publish_message("my.routing.key", {"key": "value"})

Dependency Injection

The callback handlers support FastAPI dependency injection. This allows handlers to request exactly the data that they need, as seen with FastAPI dependencies or PyTest fixtures. A callback may look like:

from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType

async def callback(mo_routing_key: MORoutingKey, payload: PayloadType):
    ...

Experienced FastAPI developers might wonder how this works without the Depends function. Indeed, this less verbose pattern was introduced in FastAPI v0.95, and works by defining the dependency directly on the type using the Annotated mechanism from PEP593. For example:

MORoutingKey = Annotated[MORoutingKey, Depends(get_routing_key)]
PayloadType = Annotated[PayloadType, Depends(get_payload_as_type(PayloadType))]

whereby the previous example is equivalent to

async def callback(
    mo_routing_key: MORoutingKey = Depends(get_routing_key),
    payload: PayloadType = Depends(get_payload_as_type(PayloadType))
):
    ...

.

Reference documentation should be made available for these types in the future, but for now they can be found mainly in ramqp/depends.py and ramqp/mo.py.

Context

import asyncio
from typing import Annotated

import httpx
from fastapi import Depends

from ramqp import AMQPSystem
from ramqp import Router
from ramqp.depends import Context
from ramqp.depends import from_context

router = Router()

async def main() -> None:
    async with httpx.AsyncClient() as client:
        context = {
            "client": client,
        }
        async with AMQPSystem(..., context=context) as amqp_system:
            await amqp_system.run_forever()


HTTPXClient = Annotated[httpx.AsyncClient, Depends(from_context("client"))]

@router.register("my.routing.key")
async def callback_function(context: Context, client: HTTPXClient) -> None:
    pass

asyncio.run(main())

Settings

In most cases, AMQPConnectionSettings is probably initialised by being included in the BaseSettings of the application using the library. The url parameter of the AMQPConnectionSettings object can be given as a single URL string or as individual structured fields. Consider the following:

from pydantic import BaseSettings

from ramqp.config import AMQPConnectionSettings

# BaseSettings makes the entire model initialisable using environment variables
class Settings(BaseSettings):
    amqp: AMQPConnectionSettings

    class Config:
        env_nested_delimiter = "__"  # allows setting e.g. AMQP__URL__HOST=foo

settings = Settings()

The above would work with either multiple structured environment variables

AMQP__URL__SCHEME=amqp
AMQP__URL__USER=guest
AMQP__URL__PASSWORD=guest
AMQP__URL__HOST=msg_broker
AMQP__URL__PORT=5672
AMQP__URL__VHOST=os2mo

or a single URL definition

AMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo

MO AMQP

Receiving:

import asyncio

from ramqp.config import AMQPConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType

router = MORouter()


# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("employee.address.edit")
@router.register("employee.it.create")
async def callback_function(
    mo_routing_key: MORoutingKey, payload: PayloadType
) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with MOAMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())

Sending:

from datetime import datetime
from uuid import uuid4

from ramqp.mo import MOAMQPSystem
from ramqp.mo import PayloadType

payload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())

async with MOAMQPSystem(...) as amqp_system:
    await amqp_system.publish_message("employee.address.edit", payload)

Metrics

RAMQP exports a myriad of prometheus metrics via prometheus/client_python.

These can be exported using:

from prometheus_client import start_http_server

start_http_server(8000)

Or similar, see the promethues client library for details.

Development

Prerequisites

Getting Started

  1. Clone the repository:
git clone git@git.magenta.dk:rammearkitektur/ramqp.git
  1. Install all dependencies:
poetry install
  1. Set up pre-commit:
poetry run pre-commit install

Running the tests

You use poetry and pytest to run the tests:

poetry run pytest

You can also run specific files

poetry run pytest tests/<test_folder>/<test_file.py>

and even use filtering with -k

poetry run pytest -k "Manager"

You can use the flags -vx where v prints the test & x makes the test stop if any tests fails (Verbose, X-fail)

Running the integration tests

To run the integration tests, an AMQP instance must be available.

If an instance is already available, it can be used by configuring the AMQP_URL environmental variable. Alternatively a RabbitMQ can be started in docker, using:

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Versioning

This project uses Semantic Versioning with the following strategy:

  • MAJOR: Incompatible changes to existing data models
  • MINOR: Backwards compatible updates to existing data models OR new models added
  • PATCH: Backwards compatible bug fixes

Authors

Magenta ApS https://magenta.dk

License

This project uses: MPL-2.0

This project uses REUSE for licensing. All licenses can be found in the LICENSES folder of the project.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ramqp-8.3.0.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

ramqp-8.3.0-py3-none-any.whl (27.6 kB view details)

Uploaded Python 3

File details

Details for the file ramqp-8.3.0.tar.gz.

File metadata

  • Download URL: ramqp-8.3.0.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Linux/5.15.65+

File hashes

Hashes for ramqp-8.3.0.tar.gz
Algorithm Hash digest
SHA256 e6fbe551f8bdf05ed7ac0afbe6306e754f1ecfb413920d4e6b951f6b83d92552
MD5 91d6101aa6a685190bee9084bf3879ef
BLAKE2b-256 5884ce402bf2872ac68a0cbf4209ba3257dfb8c6d1882487607561e1114453f6

See more details on using hashes here.

File details

Details for the file ramqp-8.3.0-py3-none-any.whl.

File metadata

  • Download URL: ramqp-8.3.0-py3-none-any.whl
  • Upload date:
  • Size: 27.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Linux/5.15.65+

File hashes

Hashes for ramqp-8.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 30cca33abd396a588c37a010eecd09ffcdbca458b1519e388aa25430a36bb1fd
MD5 2ee6432417d23a18f33076d69fc4d700
BLAKE2b-256 35f661e1266f0168fbff8d618e68983a14b347557b1d65e1a195be8b076dcec6

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page