Skip to main content

Broker agnostic library to associate JSON Schemas to message broker topics.

Project description

Cloud Eventful

Broker agnostic library to associate JSON Schemas to message broker topics.

License: MIT Code style: black PyPI version Code Coverage

Install

Cloud Eventful is on PyPI and can be installed with:

poetry add cloudeventful

or

pip install cloudeventful

Usage

This library provides a CloudEventful class which can be used to generate CloudEvents and associate Pydantic models as the cloud event data field on a per-topic basis.

Model Registration

A model is associated with a pattern describing the topics it may be published to using the data_model decorator.

import re

from cloudeventful import CloudEventful
from pydantic import BaseModel

ce = CloudEventful(api_version="1.0.0", default_source="my/event/server")


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str

Cloud Event Generation

Once data models are registered, CloudEvent objects can be generated with an instance of the generated model as the CloudEvent data property.

>>> ce.event(Coffee(flavor="mocha"))
CloudEvent[ModelType](id='9b21a718-9dc1-4b56-a4ea-4e9911bc8ca6', source='my/event/server', specversion='1.0', type='Coffee', data=Coffee(flavor='mocha'), datacontenttype='application/json', dataschema='/Coffee', subject='Coffee', time=datetime.datetime(2022, 11, 19, 15, 33, 6, 39795))

Publish

A publish function can be registered with a CloudEventful instance to enforce topic integrity at run time. This is done by setting the publish_function property on a CloudEventful instance.

A publish function must accept at least a topic arg as a str and a data arg as a registered data model.

Then, the CloudEventful publish function can be used to wrap data models in a CloudEvent and publish them as JSON strings. Keyword args will be passed to the registered publish function.

Example using MQTT with Paho

import re

from cloudeventful import CloudEventful
import paho.mqtt.client as mqtt
from pydantic import BaseModel

server_id = "my/event/server"

client = mqtt.Client(server_id)
client.connect("127.0.0.1")

ce = CloudEventful(
    api_version="1.0.0",
    default_source=server_id,
    publish_function=client.publish,
    default_topic_factory=lambda m: f"/api/v1/{type(m).__name__.lower()}"
)


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str


@ce.data_model(re.compile(r"/.*/pen"))
class Pen(BaseModel):
    color: str


# Publish a data model wrapped in a cloud event.
ce.publish(Coffee(flavor="mocha"))
# Raise `ValueError` because topic does not match pattern of this model.
ce.publish(Pen(color="black"))

Support The Developer

Buy Me A Coffee

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cloudeventful-2.1.2.tar.gz (8.0 kB view details)

Uploaded Source

Built Distribution

cloudeventful-2.1.2-py3-none-any.whl (7.7 kB view details)

Uploaded Python 3

File details

Details for the file cloudeventful-2.1.2.tar.gz.

File metadata

  • Download URL: cloudeventful-2.1.2.tar.gz
  • Upload date:
  • Size: 8.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.11.1 Linux/5.4.109+

File hashes

Hashes for cloudeventful-2.1.2.tar.gz
Algorithm Hash digest
SHA256 e2ce5330b0c6038c424b4039032745cd491c2ee361ba7346876c72bf394b54ad
MD5 eafaeed514d4a1e14c538055c6ac877a
BLAKE2b-256 98b165c98b5e711d347d3845328417eea3d6ca5c11541b09c095844d73a38d76

See more details on using hashes here.

File details

Details for the file cloudeventful-2.1.2-py3-none-any.whl.

File metadata

  • Download URL: cloudeventful-2.1.2-py3-none-any.whl
  • Upload date:
  • Size: 7.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.11.1 Linux/5.4.109+

File hashes

Hashes for cloudeventful-2.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 28d73ea86f0ad13c5bf83e3a044465248ba4293cc274604896dd2b994cdc14f2
MD5 121580f7708fe182e69f01e9f0364d8c
BLAKE2b-256 aa33d2450d42413ff3539f4585d10a3644c2ae24484f417a894a73040c312c9f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page