Skip to main content

Easy publish and subscribe to events with python and Kafka.

Project description


Onna Logo

kafkaesk

Table Of Contents

About The Project

This project is meant to help facilitate effortless publishing and subscribing to events with Python and Kafka.

Guiding principal

  • HTTP
  • Language agnostic
  • Contracts built on top of Kafka

Alternatives

  • aiokafka: can be complex to scale correctly
  • guillotina_kafka: complex, tied to Guillotina
  • faust: requires additional data layers, not language agnostic
  • confluent kafka + avro: close but ends up being like grpc. compilation for languages. No asyncio.

Consider this Python project as syntactic sugar around these ideas.

Publish

Using pydantic but can be done with pure JSON.

import kafkaesk
from pydantic import BaseModel

app = kafkaesk.Application()

@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


async def foobar():
    # ...
    # doing something in an async func
    await app.publish("content.edited.Resource", data=ContentMessage(foo="bar"))

A convenience method is available in the subscriber dependency instance, this allow to header propagation from the consumed message.

import kafkaesk
from pydantic import BaseModel

app = kafkaesk.Application()

@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
    print(f"{data.foo}")
    # This will propagate `data` record headers
    await subscriber.publish("content.edited.Resource", data=ContentMessage(foo="bar"))

Subscribe

import kafkaesk
from pydantic import BaseModel

app = kafkaesk.Application()

@app.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
    print(f"{data.foo}")

Avoiding global object

If you do not want to have global application configuration, you can lazily configure the application and register schemas/subscribers separately.

import kafkaesk
from pydantic import BaseModel

router = kafkaesk.Router()

@router.schema("Content", version=1, retention=24 * 60 * 60)
class ContentMessage(BaseModel):
    foo: str


@router.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage):
    print(f"{data.foo}")


if __name__ == "__main__":
    app = kafkaesk.Application()
    app.mount(router)
    kafkaesk.run(app)

Optional consumer injected parameters:

  • schema: str
  • record: aiokafka.structs.ConsumerRecord
  • app: kafkaesk.app.Application
  • subscriber: kafkaesk.app.BatchConsumer

Depending on the type annotation for the first parameter, you will get different data injected:

  • async def get_messages(data: ContentMessage): parses pydantic schema
  • async def get_messages(data: bytes): give raw byte data
  • async def get_messages(record: aiokafka.structs.ConsumerRecord): give kafka record object
  • async def get_messages(data): raw json data in message

Manual commit

To accomplish a manual commit strategy yourself:

app = kafkaesk.Application(auto_commit=False)

@app.subscribe("content.*", "group_id")
async def get_messages(data: ContentMessage, subscriber):
    print(f"{data.foo}")
    await subscriber.consumer.commit()

SSL

Add these values to your kafka_settings:

  • ssl_context - this should be a placeholder as the SSL Context is generally created within the application
  • security_protocol - one of SSL or PLAINTEXT
  • sasl_mechanism - one of PLAIN, GSSAPI, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
  • sasl_plain_username .
  • sasl_plain_password .

kafkaesk contract

This is a library around using kafka. Kafka itself does not enforce these concepts.

  • Every message must provide a json schema
  • Messages produced will be validated against json schema
  • Each topic will have only one schema
  • A single schema can be used for multiple topics
  • Consumed message schema validation is up to the consumer
  • Messages will be consumed at least once. Considering this, your handling should be idempotent

Message format

{
    "schema": "schema_name:1",
    "data": { ... }
}

Worker

kafkaesk mymodule:app --kafka-servers=localhost:9092

Options:

Application.publish

  • stream_id: str: name of stream to send data to
  • data: class that inherits from pydantic.BaseModel
  • key: Optional[bytes]: key for message if it needs one

Application.subscribe

  • stream_id: str: fnmatch pattern of streams to subscribe to
  • group: Optional[str]: consumer group id to use. Will use name of function if not provided

Application.schema

  • id: str: id of the schema to store
  • version: Optional[int]: version of schema to store
  • streams: Optional[List[str]]: if streams are known ahead of time, you can pre-create them before you push data
  • retention: Optional[int]: retention policy in seconds

Application.configure

  • kafka_servers: Optional[List[str]]: kafka servers to connect to
  • topic_prefix: Optional[str]: topic name prefix to subscribe to
  • kafka_settings: Optional[Dict[str, Any]]: additional aiokafka settings to pass in
  • replication_factor: Optional[int]: what replication factor topics should be created with. Defaults to min(number of servers, 3).
  • kafka_api_version: str: default auto
  • auto_commit: bool: default True
  • auto_commit_interval_ms: int: default 5000

Development

Requirements

poetry install

Run tests:

docker-compose up
KAFKA=localhost:9092 poetry run pytest tests

Extensions

Logging

This extension includes classes to extend Python's logging framework to publish structured log messages to a Kafka topic. This extension is made up of three main components: an extended logging.LogRecord and some custom logging.Handlers.

See logger.py in examples directory.

Log Record

kafkaesk.ext.logging.record.factory is a function that will return kafkaesk.ext.logging.record.PydanticLogRecord objects. The factory() function scans through any args passed to a logger and checks each item to determine if it is a subclass of pydantid.BaseModel.

If it is a base model instance and model._is_log_model evaluates to True the model will be removed from args and added to record._pydantic_data. After that factory() will use logging's existing logic to finish creating the log record.

Handler

This extensions ships with two handlers capable of handling kafkaesk.ext.logging.handler.PydanticLogModel classes: kafakesk.ext.logging.handler.PydanticStreamHandler and kafkaesk.ext.logging.handler.PydanticKafkaeskHandler.

The stream handler is a very small wrapper around logging.StreamHandler, the signature is the same, the only difference is that the handler will attempt to convert any pydantic models it receives to a human readable log message.

The kafkaesk handler has a few more bits going on in the background.

The handler has two required inputs, a kafkaesk.app.Application instance and a stream name.

Once initialized any logs emitted by the handler will be saved into an internal queue. There is a worker task that handles pulling logs from the queue and writing those logs to the specified topic.

Naming

It's hard and "kafka" is already a fun name. Hopefully this library isn't literally "kafkaesque" for you.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kafkaesk-0.8.5.tar.gz (22.6 kB view details)

Uploaded Source

Built Distribution

kafkaesk-0.8.5-py3-none-any.whl (23.6 kB view details)

Uploaded Python 3

File details

Details for the file kafkaesk-0.8.5.tar.gz.

File metadata

  • Download URL: kafkaesk-0.8.5.tar.gz
  • Upload date:
  • Size: 22.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.18 Linux/6.8.0-1014-azure

File hashes

Hashes for kafkaesk-0.8.5.tar.gz
Algorithm Hash digest
SHA256 3880a90ffccb0e79ead1a1f793633bd4dd541497e9fbe6831e31a635b62738c5
MD5 ecb943605e0c50c9c7d57c2e59a453a7
BLAKE2b-256 c670c14ac8889d545256dba5e6fde79544290603a9c06f8c4aecd269f60d673a

See more details on using hashes here.

File details

Details for the file kafkaesk-0.8.5-py3-none-any.whl.

File metadata

  • Download URL: kafkaesk-0.8.5-py3-none-any.whl
  • Upload date:
  • Size: 23.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.8.18 Linux/6.8.0-1014-azure

File hashes

Hashes for kafkaesk-0.8.5-py3-none-any.whl
Algorithm Hash digest
SHA256 8285430e7d1be3cbcaee9f674158620709e94f9084174a77000fb7bce21db439
MD5 eb1f4cd35222ba8e5faf60dca38df4ec
BLAKE2b-256 7ddfdca3fc4fa010be5f9b2bc27138d28664f63bd4961e30036c1f18743351bc

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page