ScalePlan's CloudEvents implementation
Project description
ScalePlan CloudEvents for Python
Unofficial Python implementation for CloudEvents v1.0. Check out the CloudEvents spec.
Install
Requirements:
- Python 3.6 or above
Install using:
pip install spce
Usage:
Create a CloudEvent with required attributes:
from spce import CloudEvent
event = CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000"
)
Create a CloudEvent with optional attributes:
event = CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000",
subject="subject1",
dataschema="https://particlemetrics.com/schema",
time="2020-09-28T21:33:21Z",
data='{\"spo2\": 99})',
datacontenttype="application/json"
)
Required and optional attributes can be directly accessed:
assert event.type == "OximeterMeasured"
assert event.subject == "subject1"
Create a CloudEvent with extension attributes:
event = CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000",
external1="foo/bar"
)
Extension attributes can be accessed using the attribute
method:
assert event.attribute("external1") == "foo/bar"
Encode an event in JSON:
from spce import Json
encoded_event = Json.encode(event)
Decode an event in JSOn:
from spce import Json
text = """
{
"type": "OximeterMeasured",
"source": "oximeter/123",
"id": "1000",
"specversion": "1.0",
"datacontenttype": "application/json",
"subject": "subject1",
"dataschema": "https://particlemetrics.com/schema",
"time": "2020-09-28T21:33:21Z",
"data": "{\"spo2\": 99})"
}
"""
decoded_event = Json.decode(text)
License
(c) 2020 Scale Plan Yazılım A.Ş.
Licensed under Apache 2.0. See the LICENSE.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
spce-0.1.1.tar.gz
(3.3 kB
view hashes)
Built Distribution
spce-0.1.1-py3-none-any.whl
(8.6 kB
view hashes)