EVKafka framework. Handle kafka events easy
Project description
evkafka
EVKafka is a small framework for building event driven microservices with Apache Kafka and Python. It is based on asynchronous kafka client library aiokafka.
Features
- Easy to start and use
- Sync/async handlers are supported
- Extensible through consumer middleware
- Lifespan
- At-Least-Once/At-Most-Once delivery
- Automatic API documentation
Installation
$ pip install evkafka
Example
Create a consumer
from pydantic import BaseModel
from evkafka import EVKafkaApp
from evkafka.config import ConsumerConfig
class FooEventPayload(BaseModel):
user_name: str
config: ConsumerConfig = {
"bootstrap_servers": "kafka:9092",
"group_id": "test",
"topics": ["topic"],
}
app = EVKafkaApp(
config=config,
expose_asyncapi=True,
)
@app.event("FooEvent")
async def foo_handler(event: FooEventPayload) -> None:
print(event)
if __name__ == "__main__":
app.run()
Explore API documentation
Automatic documentation (based on AsyncAPI) is build and served at http://localhost:8080.
Add a producer
from contextlib import asynccontextmanager
from pydantic import BaseModel
from evkafka import EVKafkaApp, EVKafkaProducer, Handler, Request
from evkafka.config import ConsumerConfig, BrokerConfig, ProducerConfig
class FooEventPayload(BaseModel):
user_name: str
class BarEventPayload(BaseModel):
user_name: str
message: str
handler = Handler()
@handler.event("FooEvent")
async def foo_handler(event: FooEventPayload, request: Request) -> None:
print('Received FooEvent', event)
new_event = BarEventPayload(user_name=event.user_name, message='hello')
await request.state.producer.send_event(new_event, 'BarEvent')
@handler.event("BarEvent")
async def bar_handler(event: BarEventPayload) -> None:
print('Received BarEvent', event)
@asynccontextmanager
async def lifespan():
async with EVKafkaProducer(producer_config) as producer:
yield {'producer': producer}
if __name__ == "__main__":
broker_config: BrokerConfig = {
"bootstrap_servers": "kafka:9092"
}
consumer_config: ConsumerConfig = {
"group_id": "test",
"topics": ["topic"],
**broker_config
}
producer_config: ProducerConfig = {
"topic": "topic",
**broker_config
}
app = EVKafkaApp(
expose_asyncapi=True,
lifespan=lifespan
)
app.add_consumer(consumer_config, handler)
app.run()
More details can be found in the documentation
Status
The framework is in alpha.
License
This project is licensed under the terms of the MIT license.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
evkafka-0.5.0.tar.gz
(29.0 kB
view hashes)
Built Distribution
evkafka-0.5.0-py3-none-any.whl
(25.6 kB
view hashes)