Publish/Subscribe asyncio framework for Python
Project description
Asyncio native pub/sub framework for Python
Installation
pip install eventiq
or
poetry add eventiq
Installing optional dependencies
pip install 'eventiq[broker]'
Available brokers
natsrabbitmqkafkaredis
Features
- Modern,
asynciobased python 3.8+ syntax - Fully type annotated
- Minimal external dependencies (
anyio,pydantic,typer) - Automatic message parsing based on type annotations using
pydantic - Code hot-reload
- Highly scalable: each service can process hundreds of tasks concurrently, all messages are load balanced between all instances by default
- Resilient - at least once delivery for all messages by default (except for Redis*)
- Customizable & pluggable message encoder/decoder (
jsonas default) - Multiple broker support
- Memory (for testing)
- Nats
- Kafka
- Rabbitmq
- Redis
- Result Backend implementation for Nats & Redis
- Lifespan protocol support
- Lightweight (and completely optional) dependency injection system based on type annotations
- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
- Cloud Events standard as base message structure (no more python specific
*argsand**kwargsin messages) - AsyncAPI documentation generation from code
- Twelve factor app approach - stdout logging, configuration through environment variables
- Easily extensible via Middlewares
- Multiple extensions and integrations including:
- Prometheus - mertics exporter
- OpenTelemetry - tracing and metrics
- Message Pack - message pack encoder for messages
- FastAPI - integrating eventiq Service with FastAPI applications (WIP)
- Dataref - data reference resolver for messages (WIP)
- Eventiq Workflows - orchestration engine built on top of eventiq (WIP)
Basic Usage
import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
)
service.add_middleware(SendMessageMiddleware)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
Run with
eventiq run app:service --log-level=info
Watching for changes
eventiq run app:service --log-level=info --reload=.
Testing
StubBroker class is provided as in memory replacement for running unit tests
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
CLI
Getting help:
eventiq --help
Installing shell autocompletion:
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
Basic commands
run- run servicedocs- generate AsyncAPI docssend- send message to broker
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file eventiq-1.1.11.tar.gz.
File metadata
- Download URL: eventiq-1.1.11.tar.gz
- Upload date:
- Size: 37.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
0b70434be7d61d51f705640a4c62c55d7e0281d1db95cbed387ee3a414d55bef
|
|
| MD5 |
9dfb979ce3d3deca96ee5eb9020e44b7
|
|
| BLAKE2b-256 |
36ba094dcfaf38aeee03c5e6702c8817121a7e95ff9c1d00ea5e02b0b5a571b8
|
File details
Details for the file eventiq-1.1.11-py3-none-any.whl.
File metadata
- Download URL: eventiq-1.1.11-py3-none-any.whl
- Upload date:
- Size: 45.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
855b3ceb3340fcc344280da82bddf319961b33f929d1f22e6cb406829b392cbc
|
|
| MD5 |
3e7d5bd6fbf174927e5e9a73877d3239
|
|
| BLAKE2b-256 |
c568a31b262c11cc10b6c693d65e9049b307bfe8ae65e07162c99ec525bb2412
|