Pythonic event-processing library based on decorators
Project description
Process Events In Style
event-processor is a library that aims to simplify the common pattern of event processing. It simplifies the process of filtering, dispatching and pre-processing events as well as injecting dependencies in event processors.
The only requirement is that your events are regular python dictionaries. Python 3.7+ is supported.
Here's a very basic example of simple event filtering and dispatching. This is as simple as it gets, just calling the right processor depending on the event:
from typing import Dict
from event_processor import EventProcessor
event_processor = EventProcessor()
@event_processor.processor({"service.type": "service_a"})
def process_service_a(event: Dict):
return event["service"]["status"] == "up"
@event_processor.processor({"service.type": "service_b"})
def process_service_b(event: Dict):
return event["authorized"]
service_a_event = {
"service": {
"type": "service_a",
"status": "down"
}
}
service_b_event = {
"service": {
"type": "service_b",
"authorized": False
}
}
event_processor.invoke(service_a_event) # False
event_processor.invoke(service_b_event) # False
Documentation
Find the full documentation on Read the Docs.
Use-Cases
This library is very generic and it applies to several different problems in several different domains, so here are some use-cases for it. Hopefully this might give you an idea for how the library is applicable to your own use-cases.
FaaS (AWS, GCP, Azure)
This library is very useful in cloud computing environments where functions as a service are used, such with AWS Lambda, Google Cloud Functions or Azure Functions. These platforms are frequently used to manage the cloud account. For example, by running functions when a resource is launched, or simply on a schedule. They're also used for monitoring. In most cases, functions will take an event as input and should take different actions based on the value of that event.
event-processor is helpful with the architecture of such functions, because it allows easily forwarding events to the right function for processing.
AWS - CloudWatch Alarm -> SNS -> Lambda
import json
from event_processor import EventProcessor
event_processor = EventProcessor()
# Truncated for readability
cloudwatch_event = {
"id": "c4c1c1c9-6542-e61b-6ef0-8c4d36933a92",
"detail-type": "CloudWatch Alarm State Change",
"detail": {
"alarmName": "ServerCpuTooHigh",
"previousState": {
"value": "OK"
},
"state": {
"value": "ALARM"
}
}
}
lambda_event = {
"Records": [
{
"Sns": {
"Subject": "TestInvoke",
"Message": json.dumps(cloudwatch_event)
}
}
]
}
class DummySlackClient:
def send_message(self, message: str):
print(f"Send in slack: {message}")
@event_processor.dependency_factory
def messaging_service(service: str):
if service == "slack":
return DummySlackClient()
else:
raise NotImplementedError()
@event_processor.processor(
{"detail.previousState.value": "OK", "detail.state.value": "ALARM"},
messaging_service=("slack",)
)
def process_started_alarming(event, slack_client: DummySlackClient):
slack_client.send_message(f"Alarm {event['detail']['alarmName']} went from OK to ALARM")
@event_processor.processor({}, messaging_service=("slack",)) # Default processor
def default_processor(event, slack_client: DummySlackClient):
slack_client.send_message(f"Unexpected event: {event}")
def demo_lambda_main(event, _context):
# You can pre-process events as much as you like before calling invoke.
# You could also just process the whole raw event, but then filters would
# be less useful since they couldn't filter against the cloudwatch event.
# It would also be possible to just update the raw event by parsing the
# json contained within, so it would be possible to filter on anything.
cloudwatch_event = json.loads(event["Records"][0]["Sns"]["Message"])
event_processor.invoke(cloudwatch_event)
demo_lambda_main(lambda_event, {})
lambda_event["Records"][0]["Sns"]["Message"] = json.dumps({"Unexpected": "Oops!"})
demo_lambda_main(lambda_event, {})
Send in slack: Alarm ServerCpuTooHigh went from OK to ALARM
Send in slack: Unexpected event: {'Unexpected': 'Oops!'}
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for event_processor-1.0.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9c07c6a2c566372f54c2146143162538795b0f155507c7306b822c6ddb031430 |
|
MD5 | 758b6d351a23e44b9cd4a5a56b5b235d |
|
BLAKE2b-256 | ea1b18e16b6486111e582869a6ebb446353bf20a2f364c9e985eeea634591a98 |