Skip to main content

Broker agnostic library to associate JSON Schemas to message broker topics.

Project description

Cloud Eventful

Broker agnostic library to associate JSON Schemas to message broker topics.

License: MIT Code style: black PyPI version Code Coverage

Install

Cloud Eventful is on PyPI and can be installed with:

poetry add cloudeventful

or

pip install cloudeventful

Usage

This library provides a CloudEventful class which can be used to generate CloudEvents and associate Pydantic models as the cloud event data field on a per-topic basis.

Model Registration

A model is associated with a pattern describing the topics it may be published to using the data_model decorator.

import re

from cloudeventful import CloudEventful
from pydantic import BaseModel

ce = CloudEventful(api_version="1.0.0", default_source="my/event/server")


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str

Cloud Event Generation

Once data models are registered, CloudEvent objects can be generated with an instance of the generated model as the CloudEvent data property.

>>> ce.event(Coffee(flavor="mocha"))
CloudEvent[ModelType](id='9b21a718-9dc1-4b56-a4ea-4e9911bc8ca6', source='my/event/server', specversion='1.0', type='Coffee', data=Coffee(flavor='mocha'), datacontenttype='application/json', dataschema='/Coffee', subject='Coffee', time=datetime.datetime(2022, 11, 19, 15, 33, 6, 39795))

Publish

A publish function can be registered with a CloudEventful instance to enforce topic integrity at run time. This is done by setting the publish_function property on a CloudEventful instance.

A publish function must accept at least a topic arg as a str and a data arg as a registered data model.

Then, the CloudEventful publish function can be used to wrap data models in a CloudEvent and publish them as JSON strings. Keyword args will be passed to the registered publish function.

Example using MQTT with Paho

import re

from cloudeventful import CloudEventful
import paho.mqtt.client as mqtt
from pydantic import BaseModel

server_id = "my/event/server"

client = mqtt.Client(server_id)
client.connect("127.0.0.1")

ce = CloudEventful(
    api_version="1.0.0",
    default_source=server_id,
    publish_function=client.publish,
    default_topic_factory=lambda m: f"/api/v1/{type(m).__name__.lower()}"
)


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str


@ce.data_model(re.compile(r"/.*/pen"))
class Pen(BaseModel):
    color: str


# Publish a data model wrapped in a cloud event.
ce.publish(Coffee(flavor="mocha"))
# Raise `ValueError` because topic does not match pattern of this model.
ce.publish(Pen(color="black"), topic="wrong-topic")

Support The Developer

Buy Me A Coffee

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cloudeventful-2.2.3.tar.gz (8.0 kB view hashes)

Uploaded Source

Built Distribution

cloudeventful-2.2.3-py3-none-any.whl (7.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page