Broker agnostic library to associate JSON Schemas to message broker topics.
Project description
Cloud Eventful
Broker agnostic library to associate JSON Schemas to message broker topics.
Install
Cloud Eventful is on PyPI and can be installed with:
poetry add cloudeventful
or
pip install cloudeventful
Usage
This library provides a CloudEventful
class which can be used to generate
CloudEvents and associate
Pydantic models as the cloud event data
field
on a per-topic basis.
Model Registration
A model is associated with a pattern describing the topics it may be published to using
the data_model
decorator.
import re
from cloudeventful import CloudEventful
from pydantic import BaseModel
ce = CloudEventful(api_version="1.0.0", default_source="my/event/server")
@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
flavor: str
Cloud Event Generation
Once data models are registered, CloudEvent objects can be generated with an instance of
the generated model as the CloudEvent data
property.
>>> ce.event(Coffee(flavor="mocha"))
CloudEvent[ModelType](id='9b21a718-9dc1-4b56-a4ea-4e9911bc8ca6', source='my/event/server', specversion='1.0', type='Coffee', data=Coffee(flavor='mocha'), datacontenttype='application/json', dataschema='/Coffee', subject='Coffee', time=datetime.datetime(2022, 11, 19, 15, 33, 6, 39795))
Publish
A publish function can be registered with a CloudEventful
instance to enforce topic
integrity at run time. This is done by setting the publish_function
property on a
CloudEventful
instance.
A publish function must accept at least a topic arg as a str and a data arg as a registered data model.
Then, the CloudEventful
publish function can be used to wrap data models in a
CloudEvent and publish them as JSON strings. Keyword args will be passed to the
registered publish function.
Example using MQTT with Paho
import re
from cloudeventful import CloudEventful
import paho.mqtt.client as mqtt
from pydantic import BaseModel
server_id = "my/event/server"
client = mqtt.Client(server_id)
client.connect("127.0.0.1")
ce = CloudEventful(
api_version="1.0.0",
default_source=server_id,
publish_function=client.publish,
default_topic_factory=lambda m: f"/api/v1/{type(m).__name__.lower()}"
)
@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
flavor: str
@ce.data_model(re.compile(r"/.*/pen"))
class Pen(BaseModel):
color: str
# Publish a data model wrapped in a cloud event.
ce.publish(Coffee(flavor="mocha"))
# Raise `ValueError` because topic does not match pattern of this model.
ce.publish(Pen(color="black"))
Support The Developer
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file cloudeventful-2.2.0.tar.gz
.
File metadata
- Download URL: cloudeventful-2.2.0.tar.gz
- Upload date:
- Size: 8.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.0 CPython/3.11.1 Linux/5.4.109+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1b12ba077ff33c087af318aebd266f9f26365895907b963ebf0b5d8abdef6fa5 |
|
MD5 | 875a078866ba1bdc0b9297c6d3b39a5d |
|
BLAKE2b-256 | be05077f9676cf1c323a1df7ae04db8d1dac68fa8e87b26ae199db433b2b4e56 |
File details
Details for the file cloudeventful-2.2.0-py3-none-any.whl
.
File metadata
- Download URL: cloudeventful-2.2.0-py3-none-any.whl
- Upload date:
- Size: 7.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.3.0 CPython/3.11.1 Linux/5.4.109+
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2f98792b8e3556bc67ae26da4ac2fc7f7c6aecf0068f6f34b5449001aab65f8d |
|
MD5 | 945a7c710810146b60786b89d9dc11e9 |
|
BLAKE2b-256 | 7928dbca962083bdac1849e55338a13298598af2d9bb44880d1927257d125887 |