Skip to main content

Rammearkitektur AMQP library (aio_pika wrapper)

Project description

Rammearkitektur AMQP

Rammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.

It is implemented as a thin wrapper around aio_pika, with a generic and a MO specific AMQPSystem abstract, the MO abstraction being implementing using a thin wrapper around the generic abstraction.

Usage

Generic

Receiving:

import asyncio

from ramqp import AMQPSystem
from ramqp import Router
from ramqp.config import AMQPConnectionSettings
from ramqp.depends import RoutingKey

router = Router()


# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("my.routing.key")
@router.register("my.other.routing.key")
async def callback_function(routing_key: RoutingKey) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with AMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())

Sending:

from ramqp import AMQPSystem

with AMQPSystem(...) as amqp_system:
    await amqp_system.publish_message("my.routing.key", {"key": "value"})

Dependency Injection

The callback handlers support FastAPI dependency injection. This allows handlers to request exactly the data that they need, as seen with FastAPI dependencies or PyTest fixtures. A callback may look like:

from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType

async def callback(mo_routing_key: MORoutingKey, payload: PayloadType):
    ...

Experienced FastAPI developers might wonder how this works without the Depends function. Indeed, this less verbose pattern was introduced in FastAPI v0.95, and works by defining the dependency directly on the type using the Annotated mechanism from PEP593. For example:

MORoutingKey = Annotated[MORoutingKey, Depends(get_routing_key)]
PayloadType = Annotated[PayloadType, Depends(get_payload_as_type(PayloadType))]

whereby the previous example is equivalent to

async def callback(
    mo_routing_key: MORoutingKey = Depends(get_routing_key),
    payload: PayloadType = Depends(get_payload_as_type(PayloadType))
):
    ...

.

Reference documentation should be made available for these types in the future, but for now they can be found mainly in ramqp/depends.py and ramqp/mo.py.

Context

import asyncio
from typing import Annotated

import httpx
from fastapi import Depends

from ramqp import AMQPSystem
from ramqp import Router
from ramqp.depends import Context
from ramqp.depends import from_context

router = Router()

async def main() -> None:
    async with httpx.AsyncClient() as client:
        context = {
            "client": client,
        }
        async with AMQPSystem(..., context=context) as amqp_system:
            await amqp_system.run_forever()


HTTPXClient = Annotated[httpx.AsyncClient, Depends(from_context("client"))]

@router.register("my.routing.key")
async def callback_function(context: Context, client: HTTPXClient) -> None:
    pass

asyncio.run(main())

Settings

In most cases, AMQPConnectionSettings is probably initialised by being included in the BaseSettings of the application using the library. The url parameter of the AMQPConnectionSettings object can be given as a single URL string or as individual structured fields. Consider the following:

from pydantic import BaseSettings

from ramqp.config import AMQPConnectionSettings

# BaseSettings makes the entire model initialisable using environment variables
class Settings(BaseSettings):
    amqp: AMQPConnectionSettings

    class Config:
        env_nested_delimiter = "__"  # allows setting e.g. AMQP__URL__HOST=foo

settings = Settings()

The above would work with either multiple structured environment variables

AMQP__URL__SCHEME=amqp
AMQP__URL__USER=guest
AMQP__URL__PASSWORD=guest
AMQP__URL__HOST=msg_broker
AMQP__URL__PORT=5672
AMQP__URL__VHOST=os2mo

or a single URL definition

AMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo

MO AMQP

Receiving:

import asyncio

from ramqp.config import AMQPConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType

router = MORouter()


# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("employee.address.edit")
@router.register("employee.it.create")
async def callback_function(
    mo_routing_key: MORoutingKey, payload: PayloadType
) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with MOAMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())

Sending:

from datetime import datetime
from uuid import uuid4

from ramqp.mo import MOAMQPSystem
from ramqp.mo import PayloadType

payload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())

async with MOAMQPSystem(...) as amqp_system:
    await amqp_system.publish_message("employee.address.edit", payload)

Metrics

RAMQP exports a myriad of prometheus metrics via prometheus/client_python.

These can be exported using:

from prometheus_client import start_http_server

start_http_server(8000)

Or similar, see the promethues client library for details.

Development

Prerequisites

Getting Started

  1. Clone the repository:
git clone git@git.magenta.dk:rammearkitektur/ramqp.git
  1. Install all dependencies:
poetry install
  1. Set up pre-commit:
poetry run pre-commit install

Running the tests

You use poetry and pytest to run the tests:

poetry run pytest

You can also run specific files

poetry run pytest tests/<test_folder>/<test_file.py>

and even use filtering with -k

poetry run pytest -k "Manager"

You can use the flags -vx where v prints the test & x makes the test stop if any tests fails (Verbose, X-fail)

Running the integration tests

To run the integration tests, an AMQP instance must be available.

If an instance is already available, it can be used by configuring the AMQP_URL environmental variable. Alternatively a RabbitMQ can be started in docker, using:

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Versioning

This project uses Semantic Versioning with the following strategy:

  • MAJOR: Incompatible changes to existing data models
  • MINOR: Backwards compatible updates to existing data models OR new models added
  • PATCH: Backwards compatible bug fixes

Authors

Magenta ApS https://magenta.dk

License

This project uses: MPL-2.0

This project uses REUSE for licensing. All licenses can be found in the LICENSES folder of the project.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ramqp-8.2.0.tar.gz (21.4 kB view details)

Uploaded Source

Built Distribution

ramqp-8.2.0-py3-none-any.whl (27.4 kB view details)

Uploaded Python 3

File details

Details for the file ramqp-8.2.0.tar.gz.

File metadata

  • Download URL: ramqp-8.2.0.tar.gz
  • Upload date:
  • Size: 21.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Linux/5.15.65+

File hashes

Hashes for ramqp-8.2.0.tar.gz
Algorithm Hash digest
SHA256 c9e5968bb0d55cdd9d1387d026b37e167471f175eb820b15b010d859a63a6f9c
MD5 284bef4f9282bdf4d6b0ad0310261945
BLAKE2b-256 5d060d063a91cdc657abbca73061909dbf3e9d68f3e7d41108ce48dc3b8aac6a

See more details on using hashes here.

File details

Details for the file ramqp-8.2.0-py3-none-any.whl.

File metadata

  • Download URL: ramqp-8.2.0-py3-none-any.whl
  • Upload date:
  • Size: 27.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.2 CPython/3.11.3 Linux/5.15.65+

File hashes

Hashes for ramqp-8.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c0638aa3710cb2a85867bebee0fa18cb981d4742c5959564b909470608924b0f
MD5 435044cb56fdc71b4701e1e1a3fb0768
BLAKE2b-256 8780361aa539a86698cdab4587678d01ac112d1051d73ae264ece0fe4d0a1641

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page