Skip to main content

Publish/Subscribe asyncio framework for Python

Project description

Asyncio native pub/sub framework for Python

Tests Build License Mypy Ruff Pydantic v2 security: bandit Python Format PyPi

Installation

pip install eventiq

or

poetry add eventiq

Installing optional dependencies

pip install 'eventiq[broker]'

Available brokers

  • nats
  • rabbitmq
  • kafka
  • redis

Features

  • Modern, asyncio based python 3.8+ syntax
  • Fully type annotated
  • Minimal external dependencies (anyio, pydantic, typer)
  • Automatic message parsing based on type annotations using pydantic
  • Code hot-reload
  • Highly scalable: each service can process hundreds of tasks concurrently, all messages are load balanced between all instances by default
  • Resilient - at least once delivery for all messages by default (except for Redis*)
  • Customizable & pluggable message encoder/decoder (json as default)
  • Multiple broker support
    • Memory (for testing)
    • Nats
    • Kafka
    • Rabbitmq
    • Redis
  • Result Backend implementation for Nats & Redis
  • Lifespan protocol support
  • Lightweight (and completely optional) dependency injection system based on type annotations
  • Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
  • Cloud Events standard as base message structure (no more python specific *args and **kwargs in messages)
  • AsyncAPI documentation generation from code
  • Twelve factor app approach - stdout logging, configuration through environment variables
  • Easily extensible via Middlewares
  • Multiple extensions and integrations including:
    • Prometheus - mertics exporter
    • OpenTelemetry - tracing and metrics
    • Message Pack - message pack encoder for messages
    • FastAPI - integrating eventiq Service with FastAPI applications (WIP)
    • Dataref - data reference resolver for messages (WIP)
    • Eventiq Workflows - orchestration engine built on top of eventiq (WIP)

Basic Usage

import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker

class SendMessageMiddleware(Middleware):
    async def after_broker_connect(self):
        print(f"After service start, running with {service.broker}")
        await asyncio.sleep(10)
        for i in range(100):
            message = CloudEvent(topic="test.topic", data={"counter": i})
            await service.publish(message)
        print("Published messages(s)")

broker = JetStreamBroker(url="nats://localhost:4222")

service = Service(
    name="example-service",
    broker=broker,
)
service.add_middleware(SendMessageMiddleware)

@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
    print(f"Received Message {message.id} with data: {message.data}")

@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
    async def process(self, message: CloudEvent):
        print(f"Received Message {message.id} with data: {message.data}")
        await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})

Run with

eventiq run app:service --log-level=info

Watching for changes

eventiq run app:service --log-level=info --reload=.

Testing

StubBroker class is provided as in memory replacement for running unit tests

import os


def get_broker(**kwargs):
    if os.getenv('ENV') == 'TEST':
        from eventiq.backends.stub import StubBroker
        return StubBroker()
    else:
        from eventiq.backends.rabbitmq import RabbitmqBroker
        return RabbitmqBroker(**kwargs)

broker = get_broker()

Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation

# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
    return 42

# tests.py
from main import my_subscriber

async def test_my_subscriber():
    result = await my_subscriber(None)
    assert result == 42

CLI

Getting help:

eventiq --help

Installing shell autocompletion:

eventiq --install-completion [bash|zsh|fish|powershell|pwsh]

Basic commands

  • run - run service
  • docs - generate AsyncAPI docs
  • send - send message to broker

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventiq-1.1.14.tar.gz (38.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventiq-1.1.14-py3-none-any.whl (45.6 kB view details)

Uploaded Python 3

File details

Details for the file eventiq-1.1.14.tar.gz.

File metadata

  • Download URL: eventiq-1.1.14.tar.gz
  • Upload date:
  • Size: 38.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.14.tar.gz
Algorithm Hash digest
SHA256 3c66c0516398bd2410d4f8983c575b4f28af66962e05eabff9dbbef5997aeb07
MD5 6922ee974a0003702febea97b16517cb
BLAKE2b-256 29264dde2aaa36f3bb8d97b84b017aa8969c6ec344e7693fd86c570ffce8ad4d

See more details on using hashes here.

File details

Details for the file eventiq-1.1.14-py3-none-any.whl.

File metadata

  • Download URL: eventiq-1.1.14-py3-none-any.whl
  • Upload date:
  • Size: 45.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.14-py3-none-any.whl
Algorithm Hash digest
SHA256 66d4ed52de9687ec7097eb7664ba86beace88dcec3bab949c2de6816515f2943
MD5 4fbe6e5b9e5e04b6648e79c6f9bcf73a
BLAKE2b-256 99224ca6e6428b2b87e944889bebcd722b0cfff25593e8515f8d8487df7e6c02

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page