Skip to main content

Broker agnostic library to associate JSON Schemas to message broker topics.

Project description

Cloud Eventful

Broker agnostic library to associate JSON Schemas to message broker topics.

License: MIT Code style: black PyPI version Code Coverage

Install

Cloud Eventful is on PyPI and can be installed with:

poetry add cloudeventful

or

pip install cloudeventful

Usage

This library provides a CloudEventful class which can be used to generate CloudEvents and associate Pydantic models as the cloud event data field on a per-topic basis.

Model Registration

A model is associated with a pattern describing the topics it may be published to using the data_model decorator.

import re

from cloudeventful import CloudEventful
from pydantic import BaseModel

ce = CloudEventful(api_version="1.0.0", default_source="my/event/server")


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str

Cloud Event Generation

Once data models are registered, CloudEvent objects can be generated with an instance of the generated model as the CloudEvent data property.

>>> ce.event(Coffee(flavor="mocha"))
CloudEvent[ModelType](id='9b21a718-9dc1-4b56-a4ea-4e9911bc8ca6', source='my/event/server', specversion='1.0', type='Coffee', data=Coffee(flavor='mocha'), datacontenttype='application/json', dataschema='/Coffee', subject='Coffee', time=datetime.datetime(2022, 11, 19, 15, 33, 6, 39795))

Publish

A publish function can be registered with a CloudEventful instance to enforce topic integrity at run time. This is done by setting the publish_function property on a CloudEventful instance.

A publish function must accept at least a topic arg as a str and a data arg as a registered data model.

Then, the CloudEventful publish function can be used to wrap data models in a CloudEvent and publish them as JSON strings. Keyword args will be passed to the registered publish function.

Example using MQTT with Paho

import re

from cloudeventful import CloudEventful
import paho.mqtt.client as mqtt
from pydantic import BaseModel

server_id = "my/event/server"

client = mqtt.Client(server_id)
client.connect("127.0.0.1")

ce = CloudEventful(
    api_version="1.0.0",
    default_source=server_id,
    publish_function=client.publish,
    default_topic_factory=lambda m: f"/api/v1/{type(m).__name__.lower()}"
)


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str


@ce.data_model(re.compile(r"/.*/pen"))
class Pen(BaseModel):
    color: str


# Publish a data model wrapped in a cloud event.
ce.publish(Coffee(flavor="mocha"))
# Raise `ValueError` because topic does not match pattern of this model.
ce.publish(Pen(color="black"))

Support The Developer

Buy Me A Coffee

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cloudeventful-2.2.0.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

cloudeventful-2.2.0-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file cloudeventful-2.2.0.tar.gz.

File metadata

  • Download URL: cloudeventful-2.2.0.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.3.0 CPython/3.11.1 Linux/5.4.109+

File hashes

Hashes for cloudeventful-2.2.0.tar.gz
Algorithm Hash digest
SHA256 1b12ba077ff33c087af318aebd266f9f26365895907b963ebf0b5d8abdef6fa5
MD5 875a078866ba1bdc0b9297c6d3b39a5d
BLAKE2b-256 be05077f9676cf1c323a1df7ae04db8d1dac68fa8e87b26ae199db433b2b4e56

See more details on using hashes here.

File details

Details for the file cloudeventful-2.2.0-py3-none-any.whl.

File metadata

  • Download URL: cloudeventful-2.2.0-py3-none-any.whl
  • Upload date:
  • Size: 7.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.3.0 CPython/3.11.1 Linux/5.4.109+

File hashes

Hashes for cloudeventful-2.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2f98792b8e3556bc67ae26da4ac2fc7f7c6aecf0068f6f34b5449001aab65f8d
MD5 945a7c710810146b60786b89d9dc11e9
BLAKE2b-256 7928dbca962083bdac1849e55338a13298598af2d9bb44880d1927257d125887

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page