Rammearkitektur integrations framework
Project description
FastRAMQPI
FastRAMQPI is an opinionated library for OS2mo integrations.
Usage
from typing import Any
from fastapi import APIRouter
from fastapi import FastAPI
from pydantic import BaseSettings
from pydantic import Field
from fastramqpi import depends
from fastramqpi.config import Settings as FastRAMQPISettings
from fastramqpi.main import FastRAMQPI
from fastramqpi.ramqp.depends import Context
from fastramqpi.ramqp.depends import rate_limit
from fastramqpi.ramqp.mo import MORouter
from fastramqpi.ramqp.mo import PayloadUUID
class Settings(BaseSettings):
class Config:
frozen = True
env_nested_delimiter = "__"
fastramqpi: FastRAMQPISettings = Field(
default_factory=FastRAMQPISettings, description="FastRAMQPI settings"
)
fastapi_router = APIRouter()
amqp_router = MORouter()
@amqp_router.register("engagement", dependencies=[Depends(rate_limit(10))])
async def listen_to_engagements(
context: Context,
graphql_session: depends.LegacyGraphQLSession,
uuid: PayloadUUID,
) -> None:
print(uuid)
def create_fastramqpi(**kwargs: Any) -> FastRAMQPI:
settings = Settings(**kwargs)
fastramqpi = FastRAMQPI(
application_name="os2mo-example-integration",
settings=settings.fastramqpi,
graphql_version=20,
)
fastramqpi.add_context(settings=settings)
# Add our AMQP router(s)
amqpsystem = fastramqpi.get_amqpsystem()
amqpsystem.router.registry.update(amqp_router.registry)
# Add our FastAPI router(s)
app = fastramqpi.get_app()
app.include_router(fastapi_router)
return fastramqpi
def create_app(**kwargs: Any) -> FastAPI:
fastramqpi = create_fastramqpi(**kwargs)
return fastramqpi.get_app()
Metrics
FastRAMQPI Metrics are exported via prometheus/client_python
on the FastAPI's /metrics
.
Debugging
FastRAMQPI ships with support for debugging via DAP.
To enable it set the DAP
environmental variable to true, and expose the debugging port (5678).
For instance in a docker-compose.yaml
file, by merging in:
version: "3"
services:
mo_ldap_import_export:
environment:
DAP: "true"
ports:
- "127.0.0.0:5678:5678"
For ease of use, this should be the default for projects using FastRAMQPI.
Autogenerated GraphQL Client
FastRAMQPI exposes an authenticated httpx client through the dependency injection system. While it is possible to call the OS2mo API directly through it, the recommended approach is to define a properly-typed GraphQL client in the integration and configure it to make calls through the authenticated client. Instead of manually implementing such client, we strongly recommend to use the Ariadne Code Generator, which generates an integration-specific client based on the general OS2mo GraphQL schema and the exact queries and mutations the integration requires.
To integrate such client, first add and configure the codegen:
# pyproject.toml
[tool.poetry.group.dev.dependencies]
ariadne-codegen = "0.13.0"
[tool.ariadne-codegen]
# Ideally, the GraphQL client is generated as part of the build process and
# never committed to git. Unfortunately, most of our tools and CI analyses the
# project directly as it is in Git. In the future - when the CI templates
# operate on the built container image - only the definition of the schema and
# queries should be checked in.
#
# The default package name is `graphql_client`. Make it more obvious that the
# files are not to be modified manually.
target_package_name = "autogenerated_graphql_client"
target_package_path = "my_integration/"
client_name = "GraphQLClient"
schema_path = "schema.graphql" # curl -O http://localhost:5000/graphql/v8/schema.graphql
queries_path = "queries.graphql"
include_all_inputs = false
include_all_enums = false
plugins = [
# Return values directly when only a single top field is requested
"ariadne_codegen.contrib.shorter_results.ShorterResultsPlugin",
]
[tool.ariadne-codegen.scalars.DateTime]
type = "datetime.datetime"
parse = "fastramqpi.ariadne.parse_graphql_datetime"
[tool.ariadne-codegen.scalars.UUID]
type = "uuid.UUID"
Where you replace "my_integration/"
with the path to your integration.
Grab OS2mo's GraphQL schema:
curl -O http://localhost:5000/graphql/v20/schema.graphql
Define your queries:
# queries.graphql
# SPDX-FileCopyrightText: Magenta ApS <https://magenta.dk>
# SPDX-License-Identifier: MPL-2.0
query Version {
version {
mo_version
mo_hash
}
}
Generate the client - you may have to activate some virtual environment:
ariadne-codegen
The client class is passed to FastRAMQPI on startup as shown below. This will
ensure it is automatically opened and closed and configured with
authentication. NOTE: remember to update the graphql_version
when you
generate against a new schema version.
# app.py
from autogenerated_graphql_client import GraphQLClient
def create_app(**kwargs: Any) -> FastAPI:
fastramqpi = FastRAMQPI(
...,
graphql_version=20,
graphql_client_cls=GraphQLClient,
)
The FastRAMQPI framework cannot define the annotated type for the GraphQL client since its methods depend on the specific queries required by the integration. Therefore, each implementing integration needs to define their own:
# depends.py
from typing import Annotated
from fastapi import Depends
from fastramqpi.ramqp.depends import from_context
from my_integration.autogenerated_graphql_client import GraphQLClient as _GraphQLClient
GraphQLClient = Annotated[_GraphQLClient, Depends(from_context("graphql_client"))]
Finally, we can define our AMQP handler to use the GraphQL client:
# events.py
from . import depends
@router.register("*")
async def handler(mo: depends.GraphQLClient) -> None:
version = await mo.version()
print(version)
To get REUSE working, you might consider adding the following to .reuse/dep5
:
Files: my_integration/autogenerated_graphql_client/*
Copyright: Magenta ApS <https://magenta.dk>
License: MPL-2.0
Database
If your integration requires access to a database, set it up as so:
compose.yaml
:
services:
my_integration:
environment:
FASTRAMQPI__DATABASE__USER: "fastramqpi"
FASTRAMQPI__DATABASE__PASSWORD: "fastramqpi"
FASTRAMQPI__DATABASE__HOST: "db"
FASTRAMQPI__DATABASE__NAME: "fastramqpi"
db:
image: postgres:16
environment:
POSTGRES_USER: "fastramqpi"
POSTGRES_PASSWORD: "fastramqpi"
POSTGRES_DB: "fastramqpi"
my_integration/database.py
:
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
Due to budgetary constraints FastRAMQPI does not yet facilitate database
migrations through alembic. Instead, all defined tables are created on startup.
For technical reasons, this requires us to pass the DeclarativeBase
from
above to the FastRAMQPI constructor in app.py
as so:
from my_integration.database import Base
fastramqpi = FastRAMQPI(
application_name="os2mo-example-integration",
settings=set``tings.fastramqpi,
...
database_metadata=Base.metadata,
)
The database can be used in a handler as follows:
from fastramqpi import depends
from my_integration.database import User
@amqp_router.register("person")
async def add(session: depends.Session) -> None:
session.add(User(name="Alice"))
Multilayer exchanges
FastRAMQPI supports a multi-layer exchange setup, where all integration queues are bound to an intermediate exchange which itself is bound to an upstream exchange for instance OS2mo, the below Graph shows this setup with two integrations that each has their own intermediate exchange.
┌─────┐
┌──┤OS2mo├──┐
R1+R2+R3│ └─────┘ │R4+R5
│ │
┌──▼──┐ ┌──▼──┐
┌──┤LDAP ├──┐ │Omada├──┐
│ └──┬──┘ │ └──┬──┘ │
R1│ R2│ R3│ R4│ R5│
│ │ │ │ │
┌──▼──┬──▼──┬──▼──┐ ┌▼─────▼┐
│ Q1 │ Q2 │ Q3 │ │ Q4 │
└─────┴─────┴─────┘ └───────┘
Here R1,...R5
are routing keys, and Q1,...Q4
are queues. Notice that it is
still possible for a single queue to subscribe to multiple routing keys, and to
have multiple queues subscribed to a single intermediate exchange.
Compared the above model using intermediate exchanges with the model below, that does not utilize intermediate exchanges.
┌─────┐
┌─────┬──┤OS2mo├──┬─────┐
│ │ └──┬──┘ │ │
R1│ R2│ R3│ R4│ R5│
│ │ │ │ │
┌──▼──┬──▼──┬──▼──┐ ┌▼─────▼┐
│ Q1 │ Q2 │ Q3 │ │ Q4 │
└─────┴─────┴─────┘ └───────┘
Here it is no longer obvious which queues and routing keys belong to which
integration, and as such it is no longer possible to use OS2mo's refresh_*
mutators to request a specific integration to run its event handlers.
Requesting a specific integration to run its event handlers, using multi-layer
exchanges with intermediate exchanges can be done using the exchange
parameter on the refresh_*
mutators, by passing the name of the intermediate
exchange.
To start using intermediate exchanges in your FastRAMQPI integration, simply
set the upstream_exchange
value on the AMQPConnectionSettings
to the
upstream exchange you want to bind your intermediate exchange to, and set the
exchange
value to the name you want to allocate to your intermediate exchange.
It is generally recommended to use the upstream_exchange
name as a prefix for
the intermediate exchange set in the exchange
value, such as:
from fastramqpi.config import Settings as _FastRAMQPISettings
from fastramqpi.ramqp.config import AMQPConnectionSettings as _AMQPConnectionSettings
class AMQPConnectionSettings(_AMQPConnectionSettings):
upstream_exchange = "os2mo"
exchange = "os2mo_ldap"
queue_prefix = "os2mo_ldap"
class FastRAMQPISettings(_FastRAMQPISettings):
amqp: AMQPConnectionSettings
class Settings(BaseSettings):
fastramqpi: FastRAMQPISettings
Integration Testing
The goal of integration testing in our context is to minimise the amount of
mocking and patching by testing the integration's behavior against a running
OS2mo instance in a way that resembles a production environment as much as
possible. To this end, the integration is configured for testing through its
public interface -- environment variables -- in .gitlab-ci.yml
wherever
possible. The flow of each test follows the well-known "Arrange, Act and
Assert" pattern, but with a sleight modification to avoid hooking into either
OS2mo or the integration itself, and endure the nature of eventual consistency
that is unavoidable with an event-based system.
To get started, add OS2mo's GitLab CI template
to .gitlab-ci.yml
:
include:
- project: rammearkitektur/os2mo
file:
- gitlab-ci-templates/integration-test-meta.v1.yml
...
Test:
variables:
PYTEST_ADDOPTS: "-m 'not integration_test'"
Integration-test:
extends:
- .integration-test:mo
variables:
MY_INTEGRATION__FOO: "true"
The Integration-test
CI job starts a full OS2mo stack, including all
necessary services, and then runs all tests marked with integration_test
.
Auto-used fixtures automatically ensure test
isolation in OS2mo's database, increase the AMQP messaging interval, and
configure respx
appropriately. By default, the Test
job runs all tests in
the project, including both unit- and integration-tests. We overwrite the
Test
CI job to exclude integration tests -- the Integration-test
job
already only runs integration tests by default.
Moving on, some boilerplate needs to be copied since a few types cannot be known a priori:
# tests/integration/conftest.py
from collections.abc import AsyncIterator
import pytest
from asgi_lifespan import LifespanManager
from asgi_lifespan._types import ASGIApp
from fastapi import FastAPI
from gql.client import AsyncClientSession
from httpx import AsyncClient
from httpx import ASGITransport
from pytest import MonkeyPatch
from my_integration.app import create_app
@pytest.fixture
async def _app(monkeypatch: MonkeyPatch) -> FastAPI:
monkeypatch.setenv("SOME_CONFIG", "http://example.org")
app = create_app()
return app
@pytest.fixture
async def asgiapp(_app: FastAPI) -> AsyncIterator[ASGIApp]:
"""ASGI app with lifespan run."""
async with LifespanManager(_app) as manager:
yield manager.app
@pytest.fixture
async def app(_app: FastAPI, asgiapp: ASGIApp) -> FastAPI:
"""FastAPI app with lifespan run."""
return _app
@pytest.fixture
async def test_client(asgiapp: ASGIApp) -> AsyncIterator[AsyncClient]:
"""Create test client with associated lifecycles."""
transport = ASGITransport(app=asgiapp, client=("1.2.3.4", 123)) # type: ignore
async with AsyncClient(
transport=transport, base_url="http://example.com"
) as client:
yield client
@pytest.fixture
async def graphql_client(app: FastAPI) -> AsyncClientSession:
"""Authenticated GraphQL codegen client for OS2mo."""
return app.state.context["graphql_client"]
The test_client
fixture refers to a running instance of the integration, and
graphql_client
to the integration's pre-authenticated instance of its
autogenerated GraphQL client.
To make async tests and fixtures work, and avoid having to
@pytest.mark.asyncio
each test individually, the following must be added to
pyproject.toml
:
[tool.pytest.ini_options]
asyncio_mode="auto"
A sample integration test could be as follows:
# tests/integration/test_my_integration.py
import pytest
from fastapi.testclient import TestClient
from fastramqpi.pytest_util import retry
from more_itertools import one
from my_integration.autogenerated_graphql_client import GraphQLClient
@pytest.mark.integration_test
async def test_create_person(
test_client: TestClient,
graphql_client: GraphQLClient,
) -> None:
# Precondition: The person does not already exist.
# The auto-use fixtures should automatically ensure test isolation, but
# sometimes, especially during local development, we might be a little too
# fast on the ^C^C^C^C^C so pytest doesn't get a chance to clean up.
cpr_number = "1234567890"
employee = await graphql_client._testing__get_employee(cpr_number)
assert employee.objects == []
# The integration needs to be triggered to create the employee. How this is
# done depends on the integration. We assume a /trigger/ endpoint here:
test_client.post("/trigger")
@retry()
async def verify() -> None:
employees = await graphql_client._testing__get_employee(cpr_number)
employee_states = one(employees.objects)
employee = one(employee_states.objects)
assert employee.cpr_number == cpr_number
assert employee.given_name == "Alice"
await verify()
Through the use of the test_client
fixture, the test begins by starting the
integration, including initialising any associated lifecyles such as AMQP
connections. We sanity-check, to ensure the test isn't trivially passing, and
trigger the integration. Due to the nature of AMQP, we don't know how long the
integration will need to reconcile the state of OS2mo or an external system.
Therefore, all asserts which depend on eventual consistent state, are wrapped
in a function with @retry
from FastRAMQPI's pytest_util
. This allows the
assertions to fail and be retried up to 30 seconds, before the test fails.
To keep tests clear and concise, all GraphQL queries, which are required to
arrange and assert in the test, are added to the autogenerated GraphQL client,
using a _testing__
prefix by convention:
# queries.graphql
query _Testing_GetEmployee($cpr_number: CPR!) {
employees(filter: {cpr_numbers: [$cpr_number]}) {
objects {
objects {
cpr_number
given_name
}
}
}
}
Overriding OS2mo-init
It is possible to specific a custom OS2mo-init
configuration by setting the OS2MO_INIT_CONFIG
variable at the top of the
project's .gitlab-ci.yml
and declaring a init.config.yml
:
# .gitlab-ci.yml
variables:
OS2MO_INIT_CONFIG: "/builds/$CI_PROJECT_PATH/init.config.yml"
# init.config.yml
facets:
org_unit_address_type: {}
manager_address_type: {}
address_property: {}
engagement_job_function: {}
org_unit_type: {}
engagement_type:
qa_engineer:
title: "Software Integration Tester"
scope: "TEXT"
association_type: {}
role_type: {}
leave_type: {}
manager_type: {}
responsibility: {}
manager_level: {}
visibility: {}
time_planning: {}
org_unit_level: {}
primary_type: {}
org_unit_hierarchy: {}
kle_number: {}
kle_aspect: {}
it_systems:
FOOBAR: "The Foo Bar"
Debugging
By default, only logs generated by the application are captured and output in
the GitLab interface. Service logs (OS2mo, OS2mo-init, Keycloak, etc.) can be
captured for debugging by adding the following to the project's
.gitlab-ci.yml
file:
variables:
CI_DEBUG_SERVICES: "true"
This is especially useful if a service fails to start (Waiting for keycloak realm builder...
). See the GitLab
documentation
for more information.
Development
Prerequisites
Getting Started
- Clone the repository:
git clone git@git.magenta.dk:rammearkitektur/FastRAMQPI.git
- Install all dependencies:
poetry install
- Set up pre-commit:
poetry run pre-commit install
Running the tests
You use poetry
and pytest
to run the tests:
poetry run pytest
You can also run specific files
poetry run pytest tests/<test_folder>/<test_file.py>
and even use filtering with -k
poetry run pytest -k "Manager"
You can use the flags -vx
where v
prints the test & x
makes the test stop if any tests fails (Verbose, X-fail)
Authors
Magenta ApS https://magenta.dk
License
This project uses: MPL-2.0
This project uses REUSE for licensing. All licenses can be found in the LICENSES folder of the project.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for fastramqpi-10.0.2-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | c2b1125900612d3f6c3671c516dc54841fc40e41425f2991e4065c8935d4f448 |
|
MD5 | 3717b5486d3e8f0025c74dbf36e4408e |
|
BLAKE2b-256 | 851c14dd4d3003e218a631753dafc598559791d0ce54091a3c2b8d66de92c63d |