Skip to main content

Publish/Subscribe asyncio framework for Python

Project description

Asyncio native pub/sub framework for Python

Tests Build License Mypy Ruff Pydantic v2 security: bandit Python Format PyPi

Installation

pip install eventiq

or

poetry add eventiq

Installing optional dependencies

pip install 'eventiq[broker]'

Available brokers

  • nats
  • rabbitmq
  • kafka
  • redis

Features

  • Modern, asyncio based python 3.8+ syntax
  • Fully type annotated
  • Minimal external dependencies (anyio, pydantic, typer)
  • Automatic message parsing based on type annotations using pydantic
  • Code hot-reload
  • Highly scalable: each service can process hundreds of tasks concurrently, all messages are load balanced between all instances by default
  • Resilient - at least once delivery for all messages by default (except for Redis*)
  • Customizable & pluggable message encoder/decoder (json as default)
  • Multiple broker support
    • Memory (for testing)
    • Nats
    • Kafka
    • Rabbitmq
    • Redis
  • Result Backend implementation for Nats & Redis
  • Lifespan protocol support
  • Lightweight (and completely optional) dependency injection system based on type annotations
  • Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
  • Cloud Events standard as base message structure (no more python specific *args and **kwargs in messages)
  • AsyncAPI documentation generation from code
  • Twelve factor app approach - stdout logging, configuration through environment variables
  • Easily extensible via Middlewares
  • Multiple extensions and integrations including:
    • Prometheus - mertics exporter
    • OpenTelemetry - tracing and metrics
    • Message Pack - message pack encoder for messages
    • FastAPI - integrating eventiq Service with FastAPI applications (WIP)
    • Dataref - data reference resolver for messages (WIP)
    • Eventiq Workflows - orchestration engine built on top of eventiq (WIP)

Basic Usage

import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker

class SendMessageMiddleware(Middleware):
    async def after_broker_connect(self):
        print(f"After service start, running with {service.broker}")
        await asyncio.sleep(10)
        for i in range(100):
            message = CloudEvent(topic="test.topic", data={"counter": i})
            await service.publish(message)
        print("Published messages(s)")

broker = JetStreamBroker(url="nats://localhost:4222")

service = Service(
    name="example-service",
    broker=broker,
)
service.add_middleware(SendMessageMiddleware)

@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
    print(f"Received Message {message.id} with data: {message.data}")

@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
    async def process(self, message: CloudEvent):
        print(f"Received Message {message.id} with data: {message.data}")
        await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})

Run with

eventiq run app:service --log-level=info

Watching for changes

eventiq run app:service --log-level=info --reload=.

Testing

StubBroker class is provided as in memory replacement for running unit tests

import os


def get_broker(**kwargs):
    if os.getenv('ENV') == 'TEST':
        from eventiq.backends.stub import StubBroker
        return StubBroker()
    else:
        from eventiq.backends.rabbitmq import RabbitmqBroker
        return RabbitmqBroker(**kwargs)

broker = get_broker()

Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation

# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
    return 42

# tests.py
from main import my_subscriber

async def test_my_subscriber():
    result = await my_subscriber(None)
    assert result == 42

CLI

Getting help:

eventiq --help

Installing shell autocompletion:

eventiq --install-completion [bash|zsh|fish|powershell|pwsh]

Basic commands

  • run - run service
  • docs - generate AsyncAPI docs
  • send - send message to broker

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventiq-1.1.11.tar.gz (37.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventiq-1.1.11-py3-none-any.whl (45.3 kB view details)

Uploaded Python 3

File details

Details for the file eventiq-1.1.11.tar.gz.

File metadata

  • Download URL: eventiq-1.1.11.tar.gz
  • Upload date:
  • Size: 37.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.11.tar.gz
Algorithm Hash digest
SHA256 0b70434be7d61d51f705640a4c62c55d7e0281d1db95cbed387ee3a414d55bef
MD5 9dfb979ce3d3deca96ee5eb9020e44b7
BLAKE2b-256 36ba094dcfaf38aeee03c5e6702c8817121a7e95ff9c1d00ea5e02b0b5a571b8

See more details on using hashes here.

File details

Details for the file eventiq-1.1.11-py3-none-any.whl.

File metadata

  • Download URL: eventiq-1.1.11-py3-none-any.whl
  • Upload date:
  • Size: 45.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.11-py3-none-any.whl
Algorithm Hash digest
SHA256 855b3ceb3340fcc344280da82bddf319961b33f929d1f22e6cb406829b392cbc
MD5 3e7d5bd6fbf174927e5e9a73877d3239
BLAKE2b-256 c568a31b262c11cc10b6c693d65e9049b307bfe8ae65e07162c99ec525bb2412

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page