Skip to main content

EVKafka framework. Handle kafka events easy

Project description

evkafka

Test Coverage PyPI - Version

EVKafka is a small framework for building event driven microservices with Apache Kafka and Python. It is based on asynchronous kafka client library aiokafka.

Features

  • Easy to start and use
  • Sync/async handlers are supported
  • Extensible through consumer middleware
  • Lifespan
  • At-Least-Once/At-Most-Once delivery
  • Automatic API documentation

Installation

 $ pip install evkafka

Example

Create a consumer

from pydantic import BaseModel

from evkafka import EVKafkaApp
from evkafka.config import ConsumerConfig


class FooEventPayload(BaseModel):
    user_name: str


config: ConsumerConfig = {
    "bootstrap_servers": "kafka:9092",
    "group_id": "test",
    "topics": ["topic"],
}

app = EVKafkaApp(
    config=config,
    expose_asyncapi=True,
)


@app.event("FooEvent")
async def foo_handler(event: FooEventPayload) -> None:
    print(event)


if __name__ == "__main__":
    app.run()

Explore API documentation

Automatic documentation (based on AsyncAPI) is build and served at http://localhost:8080.

Screenshot

Add a producer

from contextlib import asynccontextmanager

from pydantic import BaseModel

from evkafka import EVKafkaApp, EVKafkaProducer, Handler, Request
from evkafka.config import ConsumerConfig, BrokerConfig, ProducerConfig


class FooEventPayload(BaseModel):
    user_name: str


class BarEventPayload(BaseModel):
    user_name: str
    message: str


handler = Handler()


@handler.event("FooEvent")
async def foo_handler(event: FooEventPayload, request: Request) -> None:
    print('Received FooEvent', event)
    new_event = BarEventPayload(user_name=event.user_name, message='hello')
    await request.state.producer.send_event(new_event, 'BarEvent')


@handler.event("BarEvent")
async def bar_handler(event: BarEventPayload) -> None:
    print('Received BarEvent', event)


@asynccontextmanager
async def lifespan():
    async with EVKafkaProducer(producer_config) as producer:
        yield {'producer': producer}


if __name__ == "__main__":
    broker_config: BrokerConfig = {
        "bootstrap_servers": "kafka:9092"
    }

    consumer_config: ConsumerConfig = {
        "group_id": "test",
        "topics": ["topic"],
        **broker_config
    }

    producer_config: ProducerConfig = {
        "topic": "topic",
        **broker_config
    }

    app = EVKafkaApp(
        expose_asyncapi=True,
        lifespan=lifespan
    )
    app.add_consumer(consumer_config, handler)
    app.run()

More details can be found in the documentation

Status

The framework is in alpha.

License

This project is licensed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

evkafka-0.5.0.tar.gz (29.0 kB view hashes)

Uploaded Source

Built Distribution

evkafka-0.5.0-py3-none-any.whl (25.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page