Skip to main content

Rammearkitektur AMQP library (aio_pika wrapper)

Project description

Rammearkitektur AMQP

Rammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.

It is implemented as a thin wrapper around aio_pika, with a generic and a MO specific AMQPSystem abstract, the MO abstraction being implementing using a thin wrapper around the generic abstraction.

Usage

Generic

Receiving:

import asyncio
from typing import Any

from ramqp import AMQPSystem
from ramqp import Router
from ramqp.config import AMQPConnectionSettings

router = Router()

# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("my.routing.key")
@router.register("my.other.routing.key")
async def callback_function(routing_key: str, **kwargs: Any) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with AMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())

NOTE: **kwargs is required in all handlers for forward compatibility: the framework can add new keywords in the future, and existing handlers should accept them without breaking, if they do not use them. It can be named **_ to prevent the "unused variable" warning by linters.

Sending:

from ramqp import AMQPSystem

with AMQPSystem(...) as amqp_system:
    await amqp_system.publish_message("my.routing.key", {"key": "value"})

Settings

In most cases, AMQPConnectionSettings is probably initialised by being included in the BaseSettings of the application using the library. The url parameter of the AMQPConnectionSettings object can be given as a single URL string or as individual structured fields. Consider the following:

from pydantic import BaseSettings

from ramqp.config import AMQPConnectionSettings

# BaseSettings makes the entire model initialisable using environment variables
class Settings(BaseSettings):
    amqp: AMQPConnectionSettings

    class Config:
        env_nested_delimiter = "__"  # allows setting e.g. AMQP__URL__HOST=foo

settings = Settings()

The above would work with either multiple structured environment variables

AMQP__URL__SCHEME=amqp
AMQP__URL__USER=guest
AMQP__URL__PASSWORD=guest
AMQP__URL__HOST=msg_broker
AMQP__URL__PORT=5672
AMQP__URL__VHOST=os2mo

or a single URL definition

AMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo

MO AMQP

Receiving:

import asyncio
from typing import Any

from ramqp.config import AMQPConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
from ramqp.mo.models import MORoutingKey
from ramqp.mo.models import ObjectType
from ramqp.mo.models import PayloadType
from ramqp.mo.models import RequestType
from ramqp.mo.models import ServiceType

router = MORouter()


# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register(ServiceType.EMPLOYEE, ObjectType.ADDRESS, RequestType.EDIT)
@router.register("employee.it.create")
async def callback_function(
    mo_routing_key: MORoutingKey, payload: PayloadType, **kwargs: Any
) -> None:
    pass


async def main() -> None:
    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    async with MOAMQPSystem(settings=settings, router=router) as amqp_system:
        await amqp_system.run_forever()


asyncio.run(main())

Sending:

from datetime import datetime
from uuid import uuid4

from ramqp.mo import MOAMQPSystem
from ramqp.mo.models import ObjectType
from ramqp.mo.models import PayloadType
from ramqp.mo.models import RequestType
from ramqp.mo.models import ServiceType

payload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())

async with MOAMQPSystem(...) as amqp_system:
    await amqp_system.publish_message(
        ServiceType.EMPLOYEE, ObjectType.ADDRESS, RequestType.EDIT, payload
    )

FastAPI and Additional Context

The run_forever() function is not very useful if you would like to serve an API at the same time. To solve this issue, the main entrypoint of the program is the ASGI application, with the AMQP system tied to its lifespan. The example below also shows how additional context can be injected into the AMQP system, and subsequently retrieved in the handlers:

from contextlib import asynccontextmanager
from functools import partial
from typing import Any

from fastapi import APIRouter
from fastapi import FastAPI
from starlette.requests import Request

from ramqp.config import AMQPConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter

amqp_router = MORouter()
fastapi_router = APIRouter()


@amqp_router.register("employee.it.create")
async def callback_function(context: dict, **kwargs: Any) -> None:
    print(context["greeting"])  # Hello, world!


@fastapi_router.get("/hello")
async def hello(request: Request) -> None:
    # Passing the 'amqp_system' in the context would allow FastAPI to actually publish
    # this greeting to the whole world.
    return request.app.state.context["greeting"]  # Hello, world!


def create_app() -> FastAPI:
    context = {"greeting": "Hello, world!"}

    app = FastAPI()
    app.include_router(fastapi_router)
    app.state.context = context  # share context with FastAPI

    settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
    amqp_system = MOAMQPSystem(
        settings=settings,
        router=amqp_router,
        context=context,  # share context with the AMQP system
    )

    app.router.lifespan_context = partial(lifespan, context, amqp_system)
    return app


@asynccontextmanager
async def lifespan(context: dict, amqp_system: MOAMQPSystem, app: FastAPI):
    # Add to the context as required

    async with amqp_system:
        # Yield to keep the AMQP system open until the ASGI application is closed.
        # Control will be returned to here when the ASGI application is shut down.
        # Consider using AsyncExitStack in case of multiple context managers.
        yield

Save the example as example.py and try it with uvicorn --factory example:create_app.

Metrics

RAMQP exports a myraid of prometheus metrics via prometheus/client_python.

These can be exported using:

from prometheus_client import start_http_server

start_http_server(8000)

Or similar, see the promethues client library for details.

Development

Prerequisites

Getting Started

  1. Clone the repository:
git clone git@git.magenta.dk:rammearkitektur/ramqp.git
  1. Install all dependencies:
poetry install
  1. Set up pre-commit:
poetry run pre-commit install

Running the tests

You use poetry and pytest to run the tests:

poetry run pytest

You can also run specific files

poetry run pytest tests/<test_folder>/<test_file.py>

and even use filtering with -k

poetry run pytest -k "Manager"

You can use the flags -vx where v prints the test & x makes the test stop if any tests fails (Verbose, X-fail)

Running the integration tests

To run the integration tests, an AMQP instance must be available.

If an instance is already available, it can be used by configuring the AMQP_URL environmental variable. Alternatively a RabbitMQ can be started in docker, using:

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Versioning

This project uses Semantic Versioning with the following strategy:

  • MAJOR: Incompatible changes to existing data models
  • MINOR: Backwards compatible updates to existing data models OR new models added
  • PATCH: Backwards compatible bug fixes

Authors

Magenta ApS https://magenta.dk

License

This project uses: MPL-2.0

This project uses REUSE for licensing. All licenses can be found in the LICENSES folder of the project.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ramqp-7.0.2.tar.gz (23.0 kB view details)

Uploaded Source

Built Distribution

ramqp-7.0.2-py3-none-any.whl (29.5 kB view details)

Uploaded Python 3

File details

Details for the file ramqp-7.0.2.tar.gz.

File metadata

  • Download URL: ramqp-7.0.2.tar.gz
  • Upload date:
  • Size: 23.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.11.2 Linux/5.15.65+

File hashes

Hashes for ramqp-7.0.2.tar.gz
Algorithm Hash digest
SHA256 a1ff8a2c4b948143f6d577d7068d7ebed347b227ee05e2a33c40ce7180b74433
MD5 02c2387e75028b81fe14a52a1a8e6580
BLAKE2b-256 b3c75eae72c5c436a1d2acc6f1a92ee7e3cb2280078e8dda9dd4004b16261457

See more details on using hashes here.

File details

Details for the file ramqp-7.0.2-py3-none-any.whl.

File metadata

  • Download URL: ramqp-7.0.2-py3-none-any.whl
  • Upload date:
  • Size: 29.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.4.1 CPython/3.11.2 Linux/5.15.65+

File hashes

Hashes for ramqp-7.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 daa7461b3461b5b19d454fd4786702e67965a78d3a8fcbc619358dfd8f3651be
MD5 9e6d708442f83b4c8233fee5db774968
BLAKE2b-256 f5efc2954db2f9be3603d0e68d1e30a5809a7a9042905faa3215eaa86c9495d0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page