Skip to main content

Cloud native framework for building event driven applications in Python.

Project description

Cloud native framework for building event driven applications in Python

Tests Build License Python Format PyPi Code Style security: bandit

Note: This package is under active development and is not recommended for production use


Version: 0.1.1

Documentation: https://performancemedia.github.io/eventiq/

Repository: https://github.com/performancemedia/eventiq


About

The package utilizes pydantic, async_timeout and python-json-logger as the only required dependencies. For messages Cloud Events format is used. Service can be run as standalone processes, or included into starlette (e.g. FastAPI) applications.

Installation

pip install eventiq

Multiple broker support (in progress)

  • Stub (in memory using asyncio.Queue for PoC, local development and testing)
  • NATS (with JetStream)
  • Redis Pub/Sub
  • Kafka
  • Rabbitmq
  • Google Cloud PubSub
  • And more comming

Optional Dependencies

  • cli - click and aiorun
  • broker of choice: nats, kafka, rabbitmq, redis, pubsub
  • custom message serializers: msgpack, orjson
  • prometheus - Metric exposure via PrometheusMiddleware

Motivation

Python has many "worker-queue" libraries and frameworks, such as: - Celery - Dramatiq - Huey - arq

However, those libraries don't provide a pub/sub pattern, useful for creating event driven and loosely coupled systems. Furthermore, the majority of those libraries do not support asyncio. This is why this project was born.

Basic usage

import asyncio
from eventiq import Service, CloudEvent, Middleware
from eventiq.backends.nats.broker import JetStreamBroker


class SendMessageMiddleware(Middleware):
    async def after_broker_connect(self, broker: "Broker") -> None:
        print(f"After service start, running with {broker}")
        await asyncio.sleep(10)
        for i in range(100):
            await broker.publish("test.topic", data={"counter": i})
        print("Published event(s)")

broker = JetStreamBroker(url="nats://localhost:4222")
broker.add_middleware(SendMessageMiddleware())

service = Service(name="example-service", broker=broker)

@service.subscribe("test.topic")
async def example_run(message: CloudEvent):
    print(f"Received Message {message.id} with data: {message.data}")


if __name__ == "__main__":
    service.run()

Scaling

Each message is load-balanced (depending on broker) between all service instances with the same name. To scale number of processes you can use containers (docker/k8s), supervisor, or web server like gunicorn.

TODOS:

  • More tests
    • Integration tests with docker-compose and all backends
  • Docs + tutorials
  • OpenTelemetry Middleware (?)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventiq-0.1.1.tar.gz (29.0 kB view hashes)

Uploaded Source

Built Distribution

eventiq-0.1.1-py3-none-any.whl (45.1 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page