Skip to main content

EVKafka framework. Handle kafka events easy

Project description

evkafka

Test Coverage PyPI - Version

EVKafka is a lightweight framework for building event-driven microservices with Apache Kafka and Python. It is based on the asynchronous Kafka client library aiokafka.

Features

  • Easy to start and use
  • Sync/async handlers are supported
  • Extensible through consumer middleware
  • Lifespan
  • At-Least-Once/At-Most-Once event delivery guarantees
  • Automatic API documentation generation

Installation

 $ pip install evkafka

Example

Create a consumer app

from evkafka import EVKafkaApp, Handler
from evkafka.config import ConsumerConfig, BrokerConfig
from pydantic import BaseModel


handler = Handler()


class FooEventPayload(BaseModel):
    user_name: str


@handler.event("FooEvent")
async def foo_handler(event: FooEventPayload) -> None:
    print('Received FooEvent', event)


if __name__ == "__main__":
    broker_config: BrokerConfig = {
        "bootstrap_servers": "kafka:9092"
    }

    consumer_config: ConsumerConfig = {
        "group_id": "test",
        "topics": ["src-topic"],
        **broker_config
    }

    app = EVKafkaApp(expose_asyncapi=True)
    app.add_consumer(consumer_config, handler)
    app.run()

Explore API docs

Automatic documentation (based on AsyncAPI) is build and served at http://localhost:8080.

Screenshot

Add a producer to the app

from evkafka import EVKafkaApp, Handler, Sender
from evkafka.config import ConsumerConfig, BrokerConfig, ProducerConfig
from pydantic import BaseModel


sender = Sender()
handler = Handler()


class FooEventPayload(BaseModel):
    user_name: str


class BarEventPayload(BaseModel):
    user_name: str
    message: str


@sender.event('BarEvent')
async def send_bar(event: BarEventPayload) -> None:
    pass


@handler.event("FooEvent")
async def foo_handler(event: FooEventPayload) -> None:
    print('Received FooEvent', event)
    new_event = BarEventPayload(user_name=event.user_name, message='hello')
    await send_bar(new_event)


if __name__ == "__main__":
    broker_config: BrokerConfig = {
        "bootstrap_servers": "kafka:9092"
    }

    consumer_config: ConsumerConfig = {
        "group_id": "test",
        "topics": ["src-topic"],
        **broker_config
    }

    producer_config: ProducerConfig = {
        "topic": "dest-topic",
        **broker_config
    }

    app = EVKafkaApp(expose_asyncapi=True)
    app.add_consumer(consumer_config, handler)
    app.add_producer(producer_config, sender)
    app.run()

Check API docs update

Documentation includes both consumed and produced events.

Screenshot

More details can be found in the documentation

License

This project is licensed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

evkafka-0.7.0.tar.gz (30.4 kB view hashes)

Uploaded Source

Built Distribution

evkafka-0.7.0-py3-none-any.whl (26.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page