Skip to main content

Publish/Subscribe asyncio framework for Python

Project description

Asyncio native pub/sub framework for Python

Tests Build License Mypy Ruff Pydantic v2 security: bandit Python Format PyPi

Installation

pip install eventiq

or

poetry add eventiq

Installing optional dependencies

pip install 'eventiq[broker]'

Available brokers

  • nats
  • rabbitmq
  • kafka
  • redis

Features

  • Modern, asyncio based python 3.8+ syntax
  • Fully type annotated
  • Minimal external dependencies (anyio, pydantic, typer)
  • Automatic message parsing based on type annotations using pydantic
  • Code hot-reload
  • Highly scalable: each service can process hundreds of tasks concurrently, all messages are load balanced between all instances by default
  • Resilient - at least once delivery for all messages by default (except for Redis*)
  • Customizable & pluggable message encoder/decoder (json as default)
  • Multiple broker support
    • Memory (for testing)
    • Nats
    • Kafka
    • Rabbitmq
    • Redis
  • Result Backend implementation for Nats & Redis
  • Lifespan protocol support
  • Lightweight (and completely optional) dependency injection system based on type annotations
  • Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
  • Cloud Events standard as base message structure (no more python specific *args and **kwargs in messages)
  • AsyncAPI documentation generation from code
  • Twelve factor app approach - stdout logging, configuration through environment variables
  • Easily extensible via Middlewares
  • Multiple extensions and integrations including:
    • Prometheus - mertics exporter
    • OpenTelemetry - tracing and metrics
    • Message Pack - message pack encoder for messages
    • FastAPI - integrating eventiq Service with FastAPI applications (WIP)
    • Dataref - data reference resolver for messages (WIP)
    • Eventiq Workflows - orchestration engine built on top of eventiq (WIP)

Basic Usage

import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker

class SendMessageMiddleware(Middleware):
    async def after_broker_connect(self):
        print(f"After service start, running with {service.broker}")
        await asyncio.sleep(10)
        for i in range(100):
            message = CloudEvent(topic="test.topic", data={"counter": i})
            await service.publish(message)
        print("Published messages(s)")

broker = JetStreamBroker(url="nats://localhost:4222")

service = Service(
    name="example-service",
    broker=broker,
)
service.add_middleware(SendMessageMiddleware)

@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
    print(f"Received Message {message.id} with data: {message.data}")

@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
    async def process(self, message: CloudEvent):
        print(f"Received Message {message.id} with data: {message.data}")
        await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})

Run with

eventiq run app:service --log-level=info

Watching for changes

eventiq run app:service --log-level=info --reload=.

Testing

StubBroker class is provided as in memory replacement for running unit tests

import os


def get_broker(**kwargs):
    if os.getenv('ENV') == 'TEST':
        from eventiq.backends.stub import StubBroker
        return StubBroker()
    else:
        from eventiq.backends.rabbitmq import RabbitmqBroker
        return RabbitmqBroker(**kwargs)

broker = get_broker()

Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation

# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
    return 42

# tests.py
from main import my_subscriber

async def test_my_subscriber():
    result = await my_subscriber(None)
    assert result == 42

CLI

Getting help:

eventiq --help

Installing shell autocompletion:

eventiq --install-completion [bash|zsh|fish|powershell|pwsh]

Basic commands

  • run - run service
  • docs - generate AsyncAPI docs
  • send - send message to broker

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventiq-1.1.13.tar.gz (38.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventiq-1.1.13-py3-none-any.whl (45.6 kB view details)

Uploaded Python 3

File details

Details for the file eventiq-1.1.13.tar.gz.

File metadata

  • Download URL: eventiq-1.1.13.tar.gz
  • Upload date:
  • Size: 38.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.13.tar.gz
Algorithm Hash digest
SHA256 92548b56019b7a81a08a8f0c6d00433b0ae88e5f0a979fd137e918ac8d847b03
MD5 75c9a9feab9d88e88c4e7008a2a9467f
BLAKE2b-256 57d99c9f0684bc2033e880e1acd7664304ca4567721527636e578e3fcd9298df

See more details on using hashes here.

File details

Details for the file eventiq-1.1.13-py3-none-any.whl.

File metadata

  • Download URL: eventiq-1.1.13-py3-none-any.whl
  • Upload date:
  • Size: 45.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.13-py3-none-any.whl
Algorithm Hash digest
SHA256 5358b2050449ac8c679eb99659f562c5d46ae93b2de93ed5ef3ef1f34ee7d1ee
MD5 d7503134aaec968797ce9180e8c42089
BLAKE2b-256 0c6d7c74bb80cf26c556900d0f11d27e2ba24d48ce4684144dfff29f92812e48

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page