Rammearkitektur AMQP library (aio_pika wrapper)
Project description
Rammearkitektur AMQP
Rammearkitektur AMQP (RAMQP) is an opinionated library for AMQP.
It is implemented as a thin wrapper around aio_pika
, with a generic and a MO
specific AMQPSystem abstract, the MO abstraction being implementing using a thin
wrapper around the generic abstraction.
Usage
Generic
Receiving:
import asyncio
from typing import Any
from ramqp import AMQPSystem
from ramqp import Router
from ramqp.config import ConnectionSettings
router = Router()
# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register("my.routing.key")
@router.register("my.other.routing.key")
async def callback_function(routing_key: str, **kwargs: Any) -> None:
pass
async def main() -> None:
settings = ConnectionSettings(queue_prefix="my-program")
async with AMQPSystem(settings=settings, router=router) as amqp_system:
await amqp_system.run_forever()
asyncio.run(main())
NOTE: **kwargs
is required in all handlers for forward compatibility: the
framework can add new keywords in the future, and existing handlers should
accept them without breaking, if they do not use them.
It can be named **_
to prevent the "unused variable" warning by linters.
Sending:
from ramqp import AMQPSystem
with AMQPSystem() as amqp_system:
await amqp_system.publish_message("my.routing.key", {"key": "value"})
MO AMQP
Receiving:
import asyncio
from typing import Any
from ramqp.config import ConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
from ramqp.mo.models import MORoutingKey
from ramqp.mo.models import ObjectType
from ramqp.mo.models import PayloadType
from ramqp.mo.models import RequestType
from ramqp.mo.models import ServiceType
router = MORouter()
# Configure the callback function to receive messages for the two routing keys.
# If an exception is thrown from the function, the message is not acknowledged.
# Thus, it will be retried immediately.
@router.register(ServiceType.EMPLOYEE, ObjectType.ADDRESS, RequestType.EDIT)
@router.register("employee.it.create")
async def callback_function(
mo_routing_key: MORoutingKey, payload: PayloadType, **kwargs: Any
) -> None:
pass
async def main() -> None:
settings = ConnectionSettings(queue_prefix="my-program")
async with MOAMQPSystem(settings=settings, router=router) as amqp_system:
await amqp_system.run_forever()
asyncio.run(main())
Sending:
from datetime import datetime
from uuid import uuid4
from ramqp.mo import MOAMQPSystem
from ramqp.mo.models import ObjectType
from ramqp.mo.models import PayloadType
from ramqp.mo.models import RequestType
from ramqp.mo.models import ServiceType
payload = PayloadType(uuid=uuid4(), object_uuid=uuid4(), time=datetime.now())
async with MOAMQPSystem() as amqp_system:
await amqp_system.publish_message(
ServiceType.EMPLOYEE, ObjectType.ADDRESS, RequestType.EDIT, payload
)
FastAPI and Additional Context
The run_forever()
function is not very useful if you would like to serve an
API at the same time. To solve this issue, the main entrypoint of the program is
the ASGI application, with the AMQP system tied to its lifespan. The example
below also shows how additional context can be injected into the AMQP system,
and subsequently retrieved in the handlers:
from contextlib import asynccontextmanager
from functools import partial
from typing import Any
from fastapi import APIRouter
from fastapi import FastAPI
from starlette.requests import Request
from ramqp.config import ConnectionSettings
from ramqp.mo import MOAMQPSystem
from ramqp.mo import MORouter
amqp_router = MORouter()
fastapi_router = APIRouter()
@amqp_router.register("employee.it.create")
async def callback_function(context: dict, **kwargs: Any) -> None:
print(context["greeting"]) # Hello, world!
@fastapi_router.get("/hello")
async def hello(request: Request) -> None:
# Passing the 'amqp_system' in the context would allow FastAPI to actually publish
# this greeting to the whole world.
return request.app.state.context["greeting"] # Hello, world!
def create_app() -> FastAPI:
context = {"greeting": "Hello, world!"}
app = FastAPI()
app.include_router(fastapi_router)
app.state.context = context # share context with FastAPI
settings = ConnectionSettings(queue_prefix="my-program")
amqp_system = MOAMQPSystem(
settings=settings,
router=amqp_router,
context=context, # share context with the AMQP system
)
app.router.lifespan_context = partial(lifespan, context, amqp_system)
return app
@asynccontextmanager
async def lifespan(context: dict, amqp_system: MOAMQPSystem, app: FastAPI):
# Add to the context as required
async with amqp_system:
# Yield to keep the AMQP system open until the ASGI application is closed.
# Control will be returned to here when the ASGI application is shut down.
# Consider using AsyncExitStack in case of multiple context managers.
yield
Save the example as example.py
and try it with
uvicorn --factory example:create_app
.
Metrics
RAMQP exports a myraid of prometheus metrics via prometheus/client_python
.
These can be exported using:
from prometheus_client import start_http_server
start_http_server(8000)
Or similar, see the promethues client library for details.
Development
Prerequisites
Getting Started
- Clone the repository:
git clone git@git.magenta.dk:rammearkitektur/ramqp.git
- Install all dependencies:
poetry install
- Set up pre-commit:
poetry run pre-commit install
Running the tests
You use poetry
and pytest
to run the tests:
poetry run pytest
You can also run specific files
poetry run pytest tests/<test_folder>/<test_file.py>
and even use filtering with -k
poetry run pytest -k "Manager"
You can use the flags -vx
where v
prints the test & x
makes the test stop if any tests fails (Verbose, X-fail)
Running the integration tests
To run the integration tests, an AMQP instance must be available.
If an instance is already available, it can be used by configuring the AMQP_URL
environmental variable. Alternatively a RabbitMQ can be started in docker, using:
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Versioning
This project uses Semantic Versioning with the following strategy:
- MAJOR: Incompatible changes to existing data models
- MINOR: Backwards compatible updates to existing data models OR new models added
- PATCH: Backwards compatible bug fixes
Authors
Magenta ApS https://magenta.dk
License
This project uses: MPL-2.0
This project uses REUSE for licensing. All licenses can be found in the LICENSES folder of the project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file ramqp-6.7.4.tar.gz
.
File metadata
- Download URL: ramqp-6.7.4.tar.gz
- Upload date:
- Size: 22.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.11.2 Linux/5.15.65+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 624602d3a0048c5ffea119c91183df975bba4d45203636ee8526feac5ab25481 |
|
MD5 | a19790288ab7035759bae75281c8a491 |
|
BLAKE2b-256 | 591e750a2d121bdadb5ece3160e435c4c663d8c2e3d4f10143e3923d38b9bb21 |
File details
Details for the file ramqp-6.7.4-py3-none-any.whl
.
File metadata
- Download URL: ramqp-6.7.4-py3-none-any.whl
- Upload date:
- Size: 28.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.2 CPython/3.11.2 Linux/5.15.65+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 48e674e7839b71868b2af6b41c3eced3b2430b62c4a81b8725721ada36f2d45a |
|
MD5 | 8d8abad66a6cc6aa8b2090da63c4b83a |
|
BLAKE2b-256 | a844c36abc472fb81f1cb1da4af555e6564178e2b06c5f55e440d0d4cd391544 |