Skip to main content

Publish/Subscribe asyncio framework for Python

Project description

Asyncio native pub/sub framework for Python

Tests Build License Mypy Ruff Pydantic v2 security: bandit Python Format PyPi

Installation

pip install eventiq

or

poetry add eventiq

Installing optional dependencies

pip install 'eventiq[broker]'

Available brokers

  • nats
  • rabbitmq
  • kafka
  • redis

Features

  • Modern, asyncio based python 3.8+ syntax
  • Fully type annotated
  • Minimal external dependencies (anyio, pydantic, typer)
  • Automatic message parsing based on type annotations using pydantic
  • Code hot-reload
  • Highly scalable: each service can process hundreds of tasks concurrently, all messages are load balanced between all instances by default
  • Resilient - at least once delivery for all messages by default (except for Redis*)
  • Customizable & pluggable message encoder/decoder (json as default)
  • Multiple broker support
    • Memory (for testing)
    • Nats
    • Kafka
    • Rabbitmq
    • Redis
  • Result Backend implementation for Nats & Redis
  • Lifespan protocol support
  • Lightweight (and completely optional) dependency injection system based on type annotations
  • Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
  • Cloud Events standard as base message structure (no more python specific *args and **kwargs in messages)
  • AsyncAPI documentation generation from code
  • Twelve factor app approach - stdout logging, configuration through environment variables
  • Easily extensible via Middlewares
  • Multiple extensions and integrations including:
    • Prometheus - mertics exporter
    • OpenTelemetry - tracing and metrics
    • Message Pack - message pack encoder for messages
    • FastAPI - integrating eventiq Service with FastAPI applications (WIP)
    • Dataref - data reference resolver for messages (WIP)
    • Eventiq Workflows - orchestration engine built on top of eventiq (WIP)

Basic Usage

import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker

class SendMessageMiddleware(Middleware):
    async def after_broker_connect(self):
        print(f"After service start, running with {service.broker}")
        await asyncio.sleep(10)
        for i in range(100):
            message = CloudEvent(topic="test.topic", data={"counter": i})
            await service.publish(message)
        print("Published messages(s)")

broker = JetStreamBroker(url="nats://localhost:4222")

service = Service(
    name="example-service",
    broker=broker,
)
service.add_middleware(SendMessageMiddleware)

@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
    print(f"Received Message {message.id} with data: {message.data}")

@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
    async def process(self, message: CloudEvent):
        print(f"Received Message {message.id} with data: {message.data}")
        await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})

Run with

eventiq run app:service --log-level=info

Watching for changes

eventiq run app:service --log-level=info --reload=.

Testing

StubBroker class is provided as in memory replacement for running unit tests

import os


def get_broker(**kwargs):
    if os.getenv('ENV') == 'TEST':
        from eventiq.backends.stub import StubBroker
        return StubBroker()
    else:
        from eventiq.backends.rabbitmq import RabbitmqBroker
        return RabbitmqBroker(**kwargs)

broker = get_broker()

Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation

# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
    return 42

# tests.py
from main import my_subscriber

async def test_my_subscriber():
    result = await my_subscriber(None)
    assert result == 42

CLI

Getting help:

eventiq --help

Installing shell autocompletion:

eventiq --install-completion [bash|zsh|fish|powershell|pwsh]

Basic commands

  • run - run service
  • docs - generate AsyncAPI docs
  • send - send message to broker

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eventiq-1.1.12.tar.gz (38.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

eventiq-1.1.12-py3-none-any.whl (45.5 kB view details)

Uploaded Python 3

File details

Details for the file eventiq-1.1.12.tar.gz.

File metadata

  • Download URL: eventiq-1.1.12.tar.gz
  • Upload date:
  • Size: 38.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.12.tar.gz
Algorithm Hash digest
SHA256 2311e915f191a8952ebc1c6d00bd98026629619af80d72bcf178afc76936088f
MD5 82f1b3de84a07ac498fe531f603c8ccd
BLAKE2b-256 d3f05ae19bfbef3b27cf77d1577a44dc08be8f44c90c5f34f00dabcabd6924ca

See more details on using hashes here.

File details

Details for the file eventiq-1.1.12-py3-none-any.whl.

File metadata

  • Download URL: eventiq-1.1.12-py3-none-any.whl
  • Upload date:
  • Size: 45.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for eventiq-1.1.12-py3-none-any.whl
Algorithm Hash digest
SHA256 d64db63ace44b6d92fccb077b7c1b6fbb957a9304177370bdf1b3b1d659a93f6
MD5 018b1dc5c4e93d31da5229cda81423e4
BLAKE2b-256 68126551a0e97b5d8e8dec7ad41d563351e1d0d2ad60c6092c4208bb92c7c10f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page