Publish/Subscribe asyncio framework for Python
Project description
Asyncio native pub/sub framework for Python
Installation
pip install eventiq
or
poetry add eventiq
Installing optional dependencies
pip install 'eventiq[broker]'
Available brokers
nats
rabbitmq
kafka
redis
Features
- Modern,
asyncio
based python 3.8+ syntax - Fully type annotated
- Minimal external dependencies (
anyio
,pydantic
,typer
) - Automatic message parsing based on type annotations using
pydantic
- Code hot-reload
- Highly scalable: each service can process hundreds of tasks concurrently, all messages are load balanced between all instances by default
- Resilient - at least once delivery for all messages by default (except for Redis*)
- Customizable & pluggable message encoder/decoder (
json
as default) - Multiple broker support
- Memory (for testing)
- Nats
- Kafka
- Rabbitmq
- Redis
- Result Backend implementation for Nats & Redis
- Lifespan protocol support
- Lightweight (and completely optional) dependency injection system based on type annotations
- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
- Cloud Events standard as base message structure (no more python specific
*args
and**kwargs
in messages) - AsyncAPI documentation generation from code
- Twelve factor app approach - stdout logging, configuration through environment variables
- Easily extensible via Middlewares
- Multiple extensions and integrations including:
- Prometheus - mertics exporter
- OpenTelemetry - tracing and metrics
- Message Pack - message pack encoder for messages
- FastAPI - integrating eventiq Service with FastAPI applications (WIP)
- Dataref - data reference resolver for messages (WIP)
- Eventiq Workflows - orchestration engine built on top of eventiq (WIP)
Basic Usage
import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
)
service.add_middleware(SendMessageMiddleware)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
Run with
eventiq run app:service --log-level=info
Watching for changes
eventiq run app:service --log-level=info --reload=.
Testing
StubBroker
class is provided as in memory replacement for running unit tests
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
CLI
Getting help:
eventiq --help
Installing shell autocompletion:
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
Basic commands
run
- run servicedocs
- generate AsyncAPI docssend
- send message to broker
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
eventiq-1.1.7.tar.gz
(37.8 kB
view details)
Built Distribution
eventiq-1.1.7-py3-none-any.whl
(45.1 kB
view details)
File details
Details for the file eventiq-1.1.7.tar.gz
.
File metadata
- Download URL: eventiq-1.1.7.tar.gz
- Upload date:
- Size: 37.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b50778cd92d6ee7eaa016f0b4b5b3c7ba864781c81ba59e4055e9e7212ef8827 |
|
MD5 | 51e21d45ee3283d3085493c1b763b08c |
|
BLAKE2b-256 | cecb136811bf4ced129fc811479f55cabc7ecfdd73317e29f1bb07bb301c48a6 |
File details
Details for the file eventiq-1.1.7-py3-none-any.whl
.
File metadata
- Download URL: eventiq-1.1.7-py3-none-any.whl
- Upload date:
- Size: 45.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 38fd376451c8b9c5778bdb422b9189bb58eb0c51d0010eb0714d6e460c24f59e |
|
MD5 | 8efd98815ded7bd4a80c45844d1c7da4 |
|
BLAKE2b-256 | 357d9f132559677bb44aaf59f599a38739b1c9241c191944a2d3c565f659a59a |