Skip to main content

librdkafka-powered client for Kafka for python with (hopefully) more handful API

Project description

Wunderkafka

The power of librdkafka for humans pythons

Wunderkafka provides a handful of facades for C-powered consumer/producer. It's built on top of the confluent-kafka

For a quick view on what is going on, please check Quickstart and Documentation

Installation process described here

Features

#TypeSafe librdkafka config

Instead of passing just a dict to consumer/producer config, the pydantic-powered config is used. It is extracted directly from librdkafka's CONFIGURATION.md with some rough parsing.

Confluent & Cloudera Schema Registry support

Confluent is used as-is, but hortonworks/cloudera schema registry client and (de)serialization protocol are implemented as well (no "admin" methods support).

Building Kit

Wunderkafka allows you to relatively simply build your own transport for message (de)serialization and eliminate boilerplates for typical cases.

Pre-defined config models

import os
from functools import partial

from pydantic import field_validator, Field
from wunderkafka.time import now
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer


# If you are a fan of 12 factors, you may want to config via env variables
class OverridenSRConfig(SRConfig):
    url: str = Field(alias='SCHEMA_REGISTRY_URL')

    @field_validator('sasl_username')
    @classmethod
    def from_env(cls, v) -> str:
        # And to use 'native' kerberos envs
        return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))


# Or you want to override some defaults by default (pun intended)
class OverridenConfig(ConsumerConfig):
    # Consumer which do not commit messages automatically
    enable_auto_commit: bool = False
    # And knows nothing after restart due to new gid.
    group_id: str = 'wunderkafka-{0}'.format(now())
    # More 12 factors
    bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_kerberos_kinit_cmd: str = ''
    sr: SRConfig = OverridenSRConfig()

    @field_validator('sasl_kerberos_kinit_cmd')
    @classmethod
    def format_keytab(cls, v) -> str:
        if not v:
            return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
        # Still allowing to set it manually
        return str(v)


# After this, you can `partial` your own Producer/Consumer, something like...
MyConsumer = partial(AvroConsumer, config=OverridenConfig())
# OR
class MyConsumer(AvroConsumer):
    def __init__(self, config: ConsumerConfig = OverridenConfig()):
        super().__init__(config)

Building your own transport

from typing import Optional

from pydantic import Field

from wunderkafka.config.generated import enums
from wunderkafka.consumers.bytes import BytesConsumer
from wunderkafka.schema_registry import ClouderaSRClient
from wunderkafka.hotfixes.watchdog import check_watchdog
from wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler
from wunderkafka.consumers.constructor import HighLevelDeserializingConsumer
from wunderkafka.schema_registry.cache import SimpleCache
from wunderkafka.schema_registry.transport import KerberizableHTTPClient
from wunderkafka.serdes.avro.deserializers import FastAvroDeserializer
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol


class SRConfig(SRConfig):
    url: str = Field(alias="SCHEMA_REGISTRY_URL")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")


class OverridenConsumerConfig(ConsumerConfig):
    enable_auto_commit: bool = False
    auto_offset_reset: enums.AutoOffsetReset = enums.AutoOffsetReset.earliest
    bootstrap_servers: str = Field(env="BOOTSTRAP_SERVERS")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")
    sr: SRConfig = Field(default_factory=SRConfig)


# Pydantic/FastAPI style, but you can inherit from `HighLevelDeserializingConsumer` directly
def MyAvroConsumer(
    config: Optional[ConsumerConfig] = None,
) -> HighLevelDeserializingConsumer:
    config = config or OverridenConsumerConfig()
    config, watchdog = check_watchdog(config)
    return HighLevelDeserializingConsumer(
        consumer=BytesConsumer(config, watchdog),
        schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        deserializer=FastAvroDeserializer(),
    )

Avro on-the-fly schema derivation

Supports dataclasses and pydantic.BaseModel for avro serialization powered by dataclasses-avroschema and some rough "metaprogramming":

# dataclass to AVRO schema example
from dataclasses import dataclass
from dataclasses_avroschema import AvroModel

@dataclass
class SomeData(AvroModel):
    field1: int
    field2: str

for a topic topic_name will become

{
      "type": "record",
      "name": "topic_name_value",
      "fields": [
          {
              "name": "field1",
              "type": "long"
          },
          {
              "name": "field2",
              "type": "string"
          }
      ]
  }

and

# pydantic.BaseModel to AVRO schema example

from typing import Optional
from pydantic import BaseModel 

class Event(BaseModel):
  id: Optional[int]
  ts: Optional[int] = None

  class Meta:
      namespace = "any.data"

for a topic topic_name will become

{
      "type": "record",
      "name": "topic_name_value",
      "namespace": "any.data",
      "fields": [
          {
              "type": ["long", "null"],
              "name": "id"
          },
          {
              "type": ["null", "long"],
              "name": "ts",
              "default": null
          }
      ]
  }

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wunderkafka-0.17.0.tar.gz (69.5 kB view details)

Uploaded Source

Built Distribution

wunderkafka-0.17.0-py3-none-any.whl (101.0 kB view details)

Uploaded Python 3

File details

Details for the file wunderkafka-0.17.0.tar.gz.

File metadata

  • Download URL: wunderkafka-0.17.0.tar.gz
  • Upload date:
  • Size: 69.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for wunderkafka-0.17.0.tar.gz
Algorithm Hash digest
SHA256 71e306069d5005a8bcbbcd1569dab82c6ea1c9f8fc290a35a17eff9a89314c89
MD5 b26ab5af2152483742ce36542a31a58a
BLAKE2b-256 fd86653d55d7e5dd05e1801e6ebb32543d723b3e785e19f22a6996fd4d1138d8

See more details on using hashes here.

File details

Details for the file wunderkafka-0.17.0-py3-none-any.whl.

File metadata

  • Download URL: wunderkafka-0.17.0-py3-none-any.whl
  • Upload date:
  • Size: 101.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for wunderkafka-0.17.0-py3-none-any.whl
Algorithm Hash digest
SHA256 11a6c3e0ec072000d5b21d8dab8952d424b54709ba4becc931304d33ffea50e3
MD5 edd49c6c6f3f520b15f76f1146dc39a8
BLAKE2b-256 e2cf659f22db0a6223a3005bb362fa5de25612254a117bd15f0a7d8eaa875851

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page