Rammearkitektur AMQP library (aio_pika wrapper)
Project description
Rammearkitektur AMQP
Rammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.
It is implemented as a thin wrapper around aio_pika
, with a generic and a MO
specific AMQPSystem abstract, the MO abstraction being implementing using a thin
wrapper around the generic abstraction.
Usage
Generic
Receiving:
import asyncio
from ramqp import AMQPSystem
from ramqp import Router
from ramqp.config import AMQPConnectionSettings
from ramqp.depends import RoutingKey
router = Router()
# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("my.routing.key")
@router.register("my.other.routing.key")
async def callback_function(routing_key: RoutingKey) -> None:
pass
async def main() -> None:
settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
async with AMQPSystem(settings=settings, router=router) as amqp_system:
await amqp_system.run_forever()
asyncio.run(main())
Sending:
from ramqp import AMQPSystem
with AMQPSystem(...) as amqp_system:
await amqp_system.publish_message("my.routing.key", {"key": "value"})
Dependency Injection
The callback handlers support FastAPI dependency injection. This allows handlers to request exactly the data that they need, as seen with FastAPI dependencies or PyTest fixtures. A callback may look like:
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType
async def callback(mo_routing_key: MORoutingKey, payload: PayloadType):
...
Experienced FastAPI developers might wonder how this works without the Depends
function. Indeed, this less verbose pattern was
introduced in FastAPI v0.95,
and works by defining the dependency directly on the type using the Annotated
mechanism from PEP593. For example:
MORoutingKey = Annotated[MORoutingKey, Depends(get_routing_key)]
PayloadType = Annotated[PayloadType, Depends(get_payload_as_type(PayloadType))]
whereby the previous example is equivalent to
async def callback(
mo_routing_key: MORoutingKey = Depends(get_routing_key),
payload: PayloadType = Depends(get_payload_as_type(PayloadType))
):
...
.
Reference documentation should be made available for these types in the future,
but for now they can be found mainly in ramqp/depends.py
and ramqp/mo.py
.
Context
import asyncio
from typing import Annotated
import httpx
from fastapi import Depends
from ramqp import AMQPSystem
from ramqp import Router
from ramqp.depends import Context
from ramqp.depends import from_context
router = Router()
async def main() -> None:
async with httpx.AsyncClient() as client:
context = {
"client": client,
}
async with AMQPSystem(..., context=context) as amqp_system:
await amqp_system.run_forever()
HTTPXClient = Annotated[httpx.AsyncClient, Depends(from_context("client"))]
@router.register("my.routing.key")
async def callback_function(context: Context, client: HTTPXClient) -> None:
pass
asyncio.run(main())
Settings
In most cases, AMQPConnectionSettings
is probably initialised by being
included in the BaseSettings
of the application using the library. The url
parameter of the AMQPConnectionSettings
object can be given as a single URL
string or as individual structured fields. Consider the following:
from pydantic import BaseSettings
from ramqp.config import AMQPConnectionSettings
# BaseSettings makes the entire model initialisable using environment variables
class Settings(BaseSettings):
amqp: AMQPConnectionSettings
class Config:
env_nested_delimiter = "__" # allows setting e.g. AMQP__URL__HOST=foo
settings = Settings()
The above would work with either multiple structured environment variables
AMQP__URL__SCHEME=amqp
AMQP__URL__USER=guest
AMQP__URL__PASSWORD=guest
AMQP__URL__HOST=msg_broker
AMQP__URL__PORT=5672
AMQP__URL__VHOST=os2mo
or a single URL definition
AMQP__URL=amqp://guest:guest@msg_broker:5672/os2mo
MO AMQP
Receiving:
import asyncio
from ramqp.config import AMQPConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
from ramqp.mo import MORoutingKey
from ramqp.mo import PayloadType
router = MORouter()
# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("employee.address.edit")
@router.register("employee.it.create")
async def callback_function(
mo_routing_key: MORoutingKey, payload: PayloadType
) -> None:
pass
async def main() -> None:
settings = AMQPConnectionSettings(url=..., queue_prefix="my-program")
async with MOAMQPSystem(settings=settings, router=router) as amqp_system:
await amqp_system.run_forever()
asyncio.run(main())
Sending:
from datetime import datetime
from uuid import uuid4
from ramqp.mo import MOAMQPSystem
from ramqp.mo import PayloadType
payload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())
async with MOAMQPSystem(...) as amqp_system:
await amqp_system.publish_message("employee.address.edit", payload)
Metrics
RAMQP exports a myriad of prometheus metrics via prometheus/client_python
.
These can be exported using:
from prometheus_client import start_http_server
start_http_server(8000)
Or similar, see the promethues client library for details.
Development
Prerequisites
Getting Started
- Clone the repository:
git clone git@git.magenta.dk:rammearkitektur/ramqp.git
- Install all dependencies:
poetry install
- Set up pre-commit:
poetry run pre-commit install
Running the tests
You use poetry
and pytest
to run the tests:
poetry run pytest
You can also run specific files
poetry run pytest tests/<test_folder>/<test_file.py>
and even use filtering with -k
poetry run pytest -k "Manager"
You can use the flags -vx
where v
prints the test & x
makes the test stop if any tests fails (Verbose, X-fail)
Running the integration tests
To run the integration tests, an AMQP instance must be available.
If an instance is already available, it can be used by configuring the AMQP_URL
environmental variable. Alternatively a RabbitMQ can be started in docker, using:
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Versioning
This project uses Semantic Versioning with the following strategy:
- MAJOR: Incompatible changes to existing data models
- MINOR: Backwards compatible updates to existing data models OR new models added
- PATCH: Backwards compatible bug fixes
Authors
Magenta ApS https://magenta.dk
License
This project uses: MPL-2.0
This project uses REUSE for licensing. All licenses can be found in the LICENSES folder of the project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ramqp-8.0.0.tar.gz
.
File metadata
- Download URL: ramqp-8.0.0.tar.gz
- Upload date:
- Size: 21.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.1 CPython/3.11.2 Linux/5.15.65+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8da3e28cbdbc27769da7df2fa39c4104d023370fe19801814460cf11e060f52b |
|
MD5 | 6e681075dcd850f591ff5707136133f8 |
|
BLAKE2b-256 | fa05ac03d0fc97fabc5643769a078e26c22673711ebbb59815232e9a6eee6e59 |
File details
Details for the file ramqp-8.0.0-py3-none-any.whl
.
File metadata
- Download URL: ramqp-8.0.0-py3-none-any.whl
- Upload date:
- Size: 27.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.4.1 CPython/3.11.2 Linux/5.15.65+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8926f1aa6269a98abc5090cf8769c1d8e8c19a2f1fe4eaa1f85f2b5b5d088ad9 |
|
MD5 | d2d14fee0dd1f54fe8bd463b865810bd |
|
BLAKE2b-256 | 47b3dd497164d2bc3b82b22b4af0117bf83075bbc628f2d5beae37c02b45ee22 |