Topic-Based Subscription context for python aiohttp web applications
Project description
fhir-tbs-py
Topic-based subscription extension for python aiohttp web applications.
Features:
- Automatically created webhook aiohttp handlers based on definitions
- Unified R4B/R5 API for automatic registration (managed subscriptions)
- Optional managed subscriptions
- Optional authentication using
X-Api-Key id-only/full-resourcesupport
Install
Install fhir-tbs[r4b] or fhir-tbs[r5] using poetry/pipenv/pip.
Usage
- Instantiate R4BTBS/R5TBS class with optionally passing predefined subscriptions using
subscriptionsarg andsubscription_defaultswith default subscription parameters (e.g.payload_content,timeoutorheartbeat_period):tbs = R4BTBS(subscription_defaults={"payload_content": "full-resource"})
- Define subscriptions using decorator
tbs.define:@tbs.define( topic="https://example.com/SubscriptionTopic/new-appointment-event", filter_by=[ { "resource_type": "Appointment", "filter_parameter": "status", "value": "booked" } ], webhook_id="new-appointment" ) async def new_appointment_handler( app: web.Application, reference: str, _included_resources: list[r4b.AnyResource], _timestamp: str | None, ) -> None: logging.info("New appointment %s", reference)
- Invoke
setup_tbson app initialization passing needed parameters (see specification below):setup_tbs(app, tbs, webhook_path_prefix="webhook")
Specification
fhir_tbs.r4b.R4BTBS/fhir_tbs.r5.R5TBS
- subscriptions (list[fhir_tbs.SubscriptionDefinitionWithHandler], optional) - predefined list of subscriptions.
- subscription_defaults (optional) - default parameters for all subscription definitions.
- payload_content (str, optional):
id-only/full-resource(default isid-only) - timeout (int, optional): default is
60 - heartbeat_period (int, optional): default is
20
- payload_content (str, optional):
tbs_instance.define
- topic (str): URL of SubscriptionTopic to subscribe.
- webhook_id (optional): Optional webhook id that will be part of webhook URL.
- filter_by (list[FilterBy], optional): Optional list of filters applied to topic.
- payload_content (str, optional):
id-only/full-resource(default isid-only) - timeout (int, optional): default is
60 - heartbeat_period (int, optional): default is
20
setup_tbs
- app (web.Application): aiohttp application.
- tbs (R4BTBS/R5TBS): TBS class instance.
- webhook_path_prefix (str): Prefix for the generated aiohttp routes.
- webhook_token (str, optional): The authorization token that is checked in X-Api-Token header.
- manage_subscriptions (bool, optional): The flag that indicates whether subscription registration/population should be enabled.
- handle_delivery_errors (bool, optional): WIP The flag that indicated whether subscription delivery errors (e.g. broken connection or missing events) should be handled.
- app_url (str, optional): Application url that is used when
manage_subscriptions/handle_delivery_errorsare set. - get_fhir_client (Callable[[web.Application], AsyncFHIRClient], optional): Getter for web.Application that returns AsyncFHIRClient that further used when
manage_subscriptions/handle_delivery_errorsare set.
Examples
General example
Create subscriptions.py with the following content:
import logging
from fhirpy import AsyncFHIRClient
import fhirpy_types_r4b as r4b
from fhir_tbs import SubscriptionDefinitionWithHandler
from fhir_tbs.r4b import R4BTBS
from aiohttp import web
# Make sure that app has fhir_client_key
fhir_client_key = web.AppKey("fhir_client_key", AsyncFHIRClient)
async def new_appointment_sub(
app: web.Application,
appointment_ref: str,
included_resources: list[r4b.AnyResource],
_timestamp: str
) -> None:
fhir_client = app[fhir_client_key]
# For id-only use fhir_client to fetch the resource
appointment = r4b.Appointment(**(await fhir_client.get(appointment_ref)))
# For full-resource find in in included resources by reference (straightforward example)
appointment = [
resource for resource in included_resources
if appointment_ref == f"{resource.resourceType}/{resource.id}"
][0]
logging.info("New appointment %s", appointment.model_dump())
tbs = R4BTBS()
@tbs.define(
topic="https://example.com/SubscriptionTopic/new-appointment-event",
filter_by=[
{
"resource_type": "Appointment",
"filter_parameter": "status",
"value": "booked"
}
],
webhook_id="new-appointment"
)
async def new_appointment_handler(
app: web.Application,
reference: str,
_included_resources: list[r4b.AnyResource],
_timestamp: str | None,
) -> None:
logging.info("New appointment %s", reference)
def create_app() -> web.Application:
app = web.Application()
app[fhir_client_key] = AsyncFHIRClient(...)
setup_tbs(
app,
tbs,
webhook_path_prefix="webhook",
app_url="http://app:8080",
get_fhir_client=lambda app: app[fhir_client_key],
manage_subscriptions=True,
handle_delivery_errors=True
)
Using in aidbox-python-sdk for external subscriptions
external_webhook_path_prefix_parts = ["external-webhook"]
external_tbs = R4BTBS(subscriptions=subscriptions)
def setup_external_tbs(app: web.Application) -> None:
setup_tbs(
app,
external_tbs,
webhook_prefix_path="/".join(external_webhook_path_prefix_parts),
app_url="http://aidbox.example.com",
get_fhir_client=lambda app: app[fhir_client_key],
manage_subscriptions=True,
handle_delivery_errors=True
)
@sdk.operation(
methods=["POST"],
path=[*external_webhook_path_prefix_parts, {"name": "webhook-name"}],
public=True,
)
async def external_webhook_proxy_op(
_operation: SDKOperation, request: SDKOperationRequest
) -> web.Response:
session = request["app"][ak.session]
app_url = str(request["app"][ak.settings].APP_URL).rstrip("/")
webhook_name = request["route-params"]["webhook-name"]
path = "/".join([*external_webhook_path_prefix_parts, webhook_name])
token = request["headers"].get("x-api-key")
async with session.post(
f"{app_url}/{path}",
headers={"X-Api-Key": token} if token else {},
json=request["resource"],
) as response:
return web.Response(
body=await response.text(),
status=response.status,
content_type=response.content_type,
)
def create_app() -> web.Application:
app = web.Application()
app[fhir_client_key] = AsyncFHIRClient(...)
setup_external_tbs(app)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fhir_tbs-1.0.0a1.tar.gz.
File metadata
- Download URL: fhir_tbs-1.0.0a1.tar.gz
- Upload date:
- Size: 11.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
894ae6e64cc46ca96532132ed048cd0c5e1a406dfb937092752448629dad935f
|
|
| MD5 |
2424de4c01b92b84a3740804b55ab637
|
|
| BLAKE2b-256 |
8aaea6b0819ead7fbb9613cb2f615e71efef492bc63f6c6ad088266499e9d493
|
Provenance
The following attestation bundles were made for fhir_tbs-1.0.0a1.tar.gz:
Publisher:
publish.yaml on beda-software/fhir-tbs-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fhir_tbs-1.0.0a1.tar.gz -
Subject digest:
894ae6e64cc46ca96532132ed048cd0c5e1a406dfb937092752448629dad935f - Sigstore transparency entry: 152324552
- Sigstore integration time:
-
Permalink:
beda-software/fhir-tbs-py@3ff835f7699fff1567cba2aaf233aefd7e26806a -
Branch / Tag:
refs/tags/v1.0.0a1 - Owner: https://github.com/beda-software
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@3ff835f7699fff1567cba2aaf233aefd7e26806a -
Trigger Event:
release
-
Statement type:
File details
Details for the file fhir_tbs-1.0.0a1-py3-none-any.whl.
File metadata
- Download URL: fhir_tbs-1.0.0a1-py3-none-any.whl
- Upload date:
- Size: 12.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fb5c7f2e43d8626063151be3a26e963a17443ec6c617490a376cdfa81fdd4d17
|
|
| MD5 |
322f169c4de0d511109c4db17f7abaf4
|
|
| BLAKE2b-256 |
8ff2f6e6b63baf7877413037e90b6522e26cd40fd32125481bf5252cf78e2fae
|
Provenance
The following attestation bundles were made for fhir_tbs-1.0.0a1-py3-none-any.whl:
Publisher:
publish.yaml on beda-software/fhir-tbs-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
fhir_tbs-1.0.0a1-py3-none-any.whl -
Subject digest:
fb5c7f2e43d8626063151be3a26e963a17443ec6c617490a376cdfa81fdd4d17 - Sigstore transparency entry: 152324553
- Sigstore integration time:
-
Permalink:
beda-software/fhir-tbs-py@3ff835f7699fff1567cba2aaf233aefd7e26806a -
Branch / Tag:
refs/tags/v1.0.0a1 - Owner: https://github.com/beda-software
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yaml@3ff835f7699fff1567cba2aaf233aefd7e26806a -
Trigger Event:
release
-
Statement type: