Skip to main content

Broker agnostic library to associate JSON Schemas to message broker topics.

Project description

Cloud Eventful

Broker agnostic library to associate JSON Schemas to message broker topics.

License: MIT Code style: black PyPI version Code Coverage

Install

Cloud Eventful is on PyPI and can be installed with:

poetry add cloudeventful

or

pip install cloudeventful

Usage

This library provides a CloudEventful class which can be used to generate CloudEvents and associate Pydantic models as the cloud event data field on a per-topic basis.

Model Registration

A model is associated with a pattern describing the topics it may be published to using the data_model decorator.

import re

from cloudeventful import CloudEventful
from pydantic import BaseModel

ce = CloudEventful(api_version="1.0.0", default_source="my/event/server")


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str

Cloud Event Generation

Once data models are registered, CloudEvent objects can be generated with an instance of the generated model as the CloudEvent data property.

>>> ce.event(Coffee(flavor="mocha"))
CloudEvent[ModelType](id='9b21a718-9dc1-4b56-a4ea-4e9911bc8ca6', source='my/event/server', specversion='1.0', type='Coffee', data=Coffee(flavor='mocha'), datacontenttype='application/json', dataschema='/Coffee', subject='Coffee', time=datetime.datetime(2022, 11, 19, 15, 33, 6, 39795))

Publish

A publish function can be registered with a CloudEventful instance to enforce topic integrity at run time. This is done by setting the publish_function property on a CloudEventful instance.

A publish function must accept at least a topic arg as a str and a data arg as a registered data model.

Then, the CloudEventful publish function can be used to wrap data models in a CloudEvent and publish them as JSON strings. Keyword args will be passed to the registered publish function.

Example using MQTT with Paho

import re

from cloudeventful import CloudEventful
import paho.mqtt.client as mqtt
from pydantic import BaseModel

server_id = "my/event/server"

client = mqtt.Client(server_id)
client.connect("127.0.0.1")

ce = CloudEventful(
    api_version="1.0.0",
    default_source=server_id,
    publish_function=client.publish,
    default_topic_factory=lambda m: f"/api/v1/{type(m).__name__.lower()}"
)


@ce.data_model(re.compile(r"/.*/coffee"))
class Coffee(BaseModel):
    flavor: str


@ce.data_model(re.compile(r"/.*/pen"))
class Pen(BaseModel):
    color: str


# Publish a data model wrapped in a cloud event.
ce.publish(Coffee(flavor="mocha"))
# Raise `ValueError` because topic does not match pattern of this model.
ce.publish(Pen(color="black"), topic="wrong-topic")

Support The Developer

Buy Me A Coffee

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cloudeventful-3.0.0.tar.gz (7.7 kB view details)

Uploaded Source

Built Distribution

cloudeventful-3.0.0-py3-none-any.whl (7.8 kB view details)

Uploaded Python 3

File details

Details for the file cloudeventful-3.0.0.tar.gz.

File metadata

  • Download URL: cloudeventful-3.0.0.tar.gz
  • Upload date:
  • Size: 7.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.0 Linux/5.4.109+

File hashes

Hashes for cloudeventful-3.0.0.tar.gz
Algorithm Hash digest
SHA256 a93fbeb4e2af469b1c651b7e34e5ce380ff807c325e73d39d1220ec85d25fbf1
MD5 5e52e2774d04a554d681e2f4834b89a8
BLAKE2b-256 2959fb843fcbae50dafe0cb1167575891944480ac455fe1844a045d1cb05cca0

See more details on using hashes here.

File details

Details for the file cloudeventful-3.0.0-py3-none-any.whl.

File metadata

  • Download URL: cloudeventful-3.0.0-py3-none-any.whl
  • Upload date:
  • Size: 7.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.12.0 Linux/5.4.109+

File hashes

Hashes for cloudeventful-3.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e5a97d7d44819e24de95027ec8534d7768a2a51644f67542acaacef7e43fd99b
MD5 d7f424d97083ec22065f4ddb22b61991
BLAKE2b-256 ece166671706cb912a1e9b08c1d6bf911617548587fae8dcdaa110708ac5ab4a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page