Publish/Subscribe asyncio framework for Python
Project description
Asyncio native pub/sub framework for Python
Installation
pip install eventiq
or
poetry add eventiq
Installing optional dependencies
pip install 'eventiq[broker]'
Available brokers
nats
rabbitmq
kafka
redis
Features
- Modern,
asyncio
based python 3.8+ syntax - Fully type annotated
- Minimal external dependencies (
anyio
,pydantic
,typer
) - Automatic message parsing based on type annotations using
pydantic
- Code hot-reload
- Highly scalable: each service can process hundreds of tasks concurrently, all messages are load balanced between all instances by default
- Resilient - at least once delivery for all messages by default (except for Redis*)
- Customizable & pluggable message encoder/decoder (
json
as default) - Multiple broker support
- Memory (for testing)
- Nats
- Kafka
- Rabbitmq
- Redis
- Result Backend implementation for Nats & Redis
- Lifespan protocol support
- Lightweight (and completely optional) dependency injection system based on type annotations
- Easy and lightweight (~3k lines of code including types definitions and brokers implementations)
- Cloud Events standard as base message structure (no more python specific
*args
and**kwargs
in messages) - AsyncAPI documentation generation from code
- Twelve factor app approach - stdout logging, configuration through environment variables
- Easily extensible via Middlewares
- Multiple extensions and integrations including:
- Prometheus - mertics exporter
- OpenTelemetry - tracing and metrics
- Message Pack - message pack encoder for messages
- FastAPI - integrating eventiq Service with FastAPI applications (WIP)
- Dataref - data reference resolver for messages (WIP)
- Eventiq Workflows - orchestration engine built on top of eventiq (WIP)
Basic Usage
import asyncio
from eventiq import Service, Middleware, CloudEvent, GenericConsumer
from eventiq.backends.nats import JetStreamBroker
class SendMessageMiddleware(Middleware):
async def after_broker_connect(self):
print(f"After service start, running with {service.broker}")
await asyncio.sleep(10)
for i in range(100):
message = CloudEvent(topic="test.topic", data={"counter": i})
await service.publish(message)
print("Published messages(s)")
broker = JetStreamBroker(url="nats://localhost:4222")
service = Service(
name="example-service",
broker=broker,
middlewares=[SendMessageMiddleware()]
)
@service.subscribe(topic="test.topic")
async def example_run(message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
@service.subscribe(topic="test.topic2")
class MyConsumer(GenericConsumer[CloudEvent]):
async def process(self, message: CloudEvent):
print(f"Received Message {message.id} with data: {message.data}")
await self.publish(CloudEvent(topic="test.topic", data={"response": "ok"})
Run with
eventiq run app:service --log-level=info
Watching for changes
eventiq run app:service --log-level=info --reload=.
Testing
StubBroker
class is provided as in memory replacement for running unit tests
import os
def get_broker(**kwargs):
if os.getenv('ENV') == 'TEST':
from eventiq.backends.stub import StubBroker
return StubBroker()
else:
from eventiq.backends.rabbitmq import RabbitmqBroker
return RabbitmqBroker(**kwargs)
broker = get_broker()
Furthermore, subscribers are just regular python coroutines, so it's possible to test them simply by invocation
# main.py
@service.subscribe(topic="test.topic")
async def my_subscriber(message: CloudEvent):
return 42
# tests.py
from main import my_subscriber
async def test_my_subscriber():
result = await my_subscriber(None)
assert result == 42
CLI
Getting help:
eventiq --help
Installing shell autocompletion:
eventiq --install-completion [bash|zsh|fish|powershell|pwsh]
Basic commands
run
- run servicedocs
- generate AsyncAPI docssend
- send message to broker
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
eventiq-0.3.0rc2.tar.gz
(37.0 kB
view details)
Built Distribution
File details
Details for the file eventiq-0.3.0rc2.tar.gz
.
File metadata
- Download URL: eventiq-0.3.0rc2.tar.gz
- Upload date:
- Size: 37.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f84bb6b87065efb79a7be1b6ab8306bf7f9c33eec70f3a099f72008bed78e398 |
|
MD5 | 365404615ef7518b36b55d2eb3bd0760 |
|
BLAKE2b-256 | ec856c3b88b76b8099b167401a00c2edd2c4d6742e732099977c1847ff870012 |
File details
Details for the file eventiq-0.3.0rc2-py3-none-any.whl
.
File metadata
- Download URL: eventiq-0.3.0rc2-py3-none-any.whl
- Upload date:
- Size: 44.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.12.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 809479f57bea864171eedffcad4b3799aa42c674a08d6dc7402b4731dce711c7 |
|
MD5 | 085c61f3a2eb8605d9bc7ccb141271a1 |
|
BLAKE2b-256 | 8d6f482fa05652c28fae3f3b42aec2f315a63ca49c9254afa3fc771109b475a9 |