EVKafka framework. Handle kafka events easy
Project description
evkafka
EVKafka is a lightweight framework for building event-driven microservices with Apache Kafka and Python. It is based on the asynchronous Kafka client library aiokafka.
Features
- Easy to start and use
- Sync/async handlers are supported
- Extensible through consumer middleware
- Lifespan
- At-Least-Once/At-Most-Once event delivery guarantees
- Automatic API documentation generation
Installation
$ pip install evkafka
Example
Create a consumer app
from evkafka import EVKafkaApp, Handler
from evkafka.config import ConsumerConfig, BrokerConfig
from pydantic import BaseModel
handler = Handler()
class FooEventPayload(BaseModel):
user_name: str
@handler.event("FooEvent")
async def foo_handler(event: FooEventPayload) -> None:
print('Received FooEvent', event)
if __name__ == "__main__":
broker_config: BrokerConfig = {
"bootstrap_servers": "kafka:9092"
}
consumer_config: ConsumerConfig = {
"group_id": "test",
"topics": ["src-topic"],
**broker_config
}
app = EVKafkaApp(expose_asyncapi=True)
app.add_consumer(consumer_config, handler)
app.run()
Explore API docs
Automatic documentation (based on AsyncAPI) is build and served at http://localhost:8080.
Add a producer to the app
from evkafka import EVKafkaApp, Handler, Sender
from evkafka.config import ConsumerConfig, BrokerConfig, ProducerConfig
from pydantic import BaseModel
sender = Sender()
handler = Handler()
class FooEventPayload(BaseModel):
user_name: str
class BarEventPayload(BaseModel):
user_name: str
message: str
@sender.event('BarEvent')
async def send_bar(event: BarEventPayload) -> None:
pass
@handler.event("FooEvent")
async def foo_handler(event: FooEventPayload) -> None:
print('Received FooEvent', event)
new_event = BarEventPayload(user_name=event.user_name, message='hello')
await send_bar(new_event)
if __name__ == "__main__":
broker_config: BrokerConfig = {
"bootstrap_servers": "kafka:9092"
}
consumer_config: ConsumerConfig = {
"group_id": "test",
"topics": ["src-topic"],
**broker_config
}
producer_config: ProducerConfig = {
"topic": "dest-topic",
**broker_config
}
app = EVKafkaApp(expose_asyncapi=True)
app.add_consumer(consumer_config, handler)
app.add_producer(producer_config, sender)
app.run()
Check API docs update
Documentation includes both consumed and produced events.
More details can be found in the documentation
License
This project is licensed under the terms of the MIT license.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
evkafka-0.7.0.tar.gz
(30.4 kB
view details)
Built Distribution
evkafka-0.7.0-py3-none-any.whl
(26.9 kB
view details)
File details
Details for the file evkafka-0.7.0.tar.gz
.
File metadata
- Download URL: evkafka-0.7.0.tar.gz
- Upload date:
- Size: 30.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5e29bdf6448043b623ae3255f2b51a8af5ac1b8812fea3c7062e4640defd9a6f |
|
MD5 | ff4f403f4679fa3d1e148c6461710297 |
|
BLAKE2b-256 | 8c0039cde0d676eff4da524898f5dfee442a893cf50cc3cf00dcd3ba04f77a88 |
File details
Details for the file evkafka-0.7.0-py3-none-any.whl
.
File metadata
- Download URL: evkafka-0.7.0-py3-none-any.whl
- Upload date:
- Size: 26.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/4.0.2 CPython/3.11.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d8826476d1b3b49b2edb1887ee651e2c352ea92d8520d3bf2311ea226bf9260e |
|
MD5 | a32d4a16d65968e958ed5436d32a1f20 |
|
BLAKE2b-256 | 3369479eacf77c67b2b3a537ea246490778ce6146b7febe0df1d8e0b3ea02992 |