Cloud native framework for building event driven applications in Python.
Project description
Cloud native framework for building event driven applications in Python
Note: This package is under active development and is not recommended for production use
|Version|0.1.1| |Documentation| |--|---|---|
Version: 0.1.2
Documentation: https://performancemedia.github.io/eventiq/
Repository: https://github.com/performancemedia/eventiq
About
The package utilizes pydantic
, async_timeout
and python-json-logger
as the only required dependencies.
For messages Cloud Events format is used.
Service can be run as standalone processes, or included into starlette (e.g. FastAPI) applications.
Installation
pip install eventiq
Multiple broker support (in progress)
- Stub (in memory using
asyncio.Queue
for PoC, local development and testing) - NATS (with JetStream)
- Redis Pub/Sub
- Kafka
- Rabbitmq
- Google Cloud PubSub
- And more comming
Optional Dependencies
cli
-typer
andaiorun
- broker of choice:
nats
,kafka
,rabbitmq
,redis
,pubsub
- custom message serializers:
msgpack
,orjson
prometheus
- Metric exposure viaPrometheusMiddleware
Motivation
However, those libraries don't provide a pub/sub pattern, useful for creating
event driven and loosely coupled systems. Furthermore, the majority of those libraries
do not support asyncio
. This is why this project was born.
Basic usage
import asyncio
from eventiq import Service, CloudEvent, Middleware
from eventiq.backends.nats.broker import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self, broker: "Broker") -> None:
print(f"After service start, running with {broker}")
await asyncio.sleep(10)
for i in range(100):
await broker.publish("test.topic", data={"counter": i})
print("Published event(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
broker.add_middleware(SendMessageMiddleware())
service = Service(name="example-service", broker=broker)
@service.subscribe("test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
if __name__ == "__main__":
service.run()
Scaling
Each message is load-balanced (depending on broker) between all service instances with the same name
.
To scale number of processes you can use containers (docker/k8s), supervisor,
or web server like gunicorn.
TODOS:
- More tests
- Integration tests with docker-compose and all backends
- Docs + tutorials
- OpenTelemetry Middleware (?)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.