Skip to main content

librdkafka-powered client for Kafka for python with (hopefully) more handful API

Project description

Wunderkafka

The power of librdkafka for humans pythons

Wunderkafka provides a handful of facades for C-powered consumer/producer. It's built on top of the confluent-kafka

For a quick view on what is going on, please check Quickstart and Documentation

Installation process described here

Features

#TypeSafe librdkafka config

Instead of passing just a dict to consumer/producer config, the pydantic-powered config is used. It is extracted directly from librdkafka's CONFIGURATION.md with some rough parsing.

Confluent & Cloudera Schema Registry support

Confluent is used as-is, but hortonworks/cloudera schema registry client and (de)serialization protocol are implemented as well (no "admin" methods support).

Building Kit

Wunderkafka allows you to relatively simply build your own transport for message (de)serialization and eliminate boilerplates for typical cases.

Pre-defined config models

import os
from functools import partial

from pydantic import field_validator, Field
from wunderkafka.time import now
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer


# If you are a fan of 12 factors, you may want to config via env variables
class OverridenSRConfig(SRConfig):
    url: str = Field(alias='SCHEMA_REGISTRY_URL')

    @field_validator('sasl_username')
    @classmethod
    def from_env(cls, v) -> str:
        # And to use 'native' kerberos envs
        return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))


# Or you want to override some defaults by default (pun intended)
class OverridenConfig(ConsumerConfig):
    # Consumer which do not commit messages automatically
    enable_auto_commit: bool = False
    # And knows nothing after restart due to new gid.
    group_id: str = 'wunderkafka-{0}'.format(now())
    # More 12 factors
    bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_kerberos_kinit_cmd: str = ''
    sr: SRConfig = OverridenSRConfig()

    @field_validator('sasl_kerberos_kinit_cmd')
    @classmethod
    def format_keytab(cls, v) -> str:
        if not v:
            return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
        # Still allowing to set it manually
        return str(v)


# After this, you can `partial` your own Producer/Consumer, something like...
MyConsumer = partial(AvroConsumer, config=OverridenConfig())
# OR
class MyConsumer(AvroConsumer):
    def __init__(self, config: ConsumerConfig = OverridenConfig()):
        super().__init__(config)

Building your own transport

from typing import Optional

from pydantic import Field

from wunderkafka.config.generated import enums
from wunderkafka.consumers.bytes import BytesConsumer
from wunderkafka.schema_registry import ClouderaSRClient
from wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler
from wunderkafka.consumers.constructor import HighLevelDeserializingConsumer
from wunderkafka.schema_registry.cache import SimpleCache
from wunderkafka.schema_registry.transport import KerberizableHTTPClient
from wunderkafka.serdes.avro.deserializers import FastAvroDeserializer
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol


class SRConfig(SRConfig):
    url: str = Field(alias="SCHEMA_REGISTRY_URL")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")


class OverridenConsumerConfig(ConsumerConfig):
    enable_auto_commit: bool = False
    auto_offset_reset: enums.AutoOffsetReset = enums.AutoOffsetReset.earliest
    bootstrap_servers: str = Field(env="BOOTSTRAP_SERVERS")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")
    sr: SRConfig = Field(default_factory=SRConfig)


# Pydantic/FastAPI style, but you can inherit from `HighLevelDeserializingConsumer` directly
def MyAvroConsumer(
    config: Optional[ConsumerConfig] = None,
) -> HighLevelDeserializingConsumer:
    config = config or OverridenConsumerConfig()
    return HighLevelDeserializingConsumer(
        consumer=BytesConsumer(config),
        schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        deserializer=FastAvroDeserializer(),
    )

Avro on-the-fly schema derivation

Supports dataclasses and pydantic.BaseModel for avro serialization powered by dataclasses-avroschema and some rough "metaprogramming":

# dataclass to AVRO schema example
from dataclasses import dataclass
from dataclasses_avroschema import AvroModel

@dataclass
class SomeData(AvroModel):
    field1: int
    field2: str

for a topic topic_name will become

{
      "type": "record",
      "name": "topic_name_value",
      "fields": [
          {
              "name": "field1",
              "type": "long"
          },
          {
              "name": "field2",
              "type": "string"
          }
      ]
  }

and

# pydantic.BaseModel to AVRO schema example

from typing import Optional
from pydantic import BaseModel

class Event(BaseModel):
  id: Optional[int]
  ts: Optional[int] = None

  class Meta:
      namespace = "any.data"

for a topic topic_name will become

{
      "type": "record",
      "name": "topic_name_value",
      "namespace": "any.data",
      "fields": [
          {
              "type": ["long", "null"],
              "name": "id"
          },
          {
              "type": ["null", "long"],
              "name": "ts",
              "default": null
          }
      ]
  }

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wunderkafka-0.19.0rc5.tar.gz (57.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wunderkafka-0.19.0rc5-py3-none-any.whl (122.1 kB view details)

Uploaded Python 3

File details

Details for the file wunderkafka-0.19.0rc5.tar.gz.

File metadata

  • Download URL: wunderkafka-0.19.0rc5.tar.gz
  • Upload date:
  • Size: 57.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for wunderkafka-0.19.0rc5.tar.gz
Algorithm Hash digest
SHA256 89f3b76c5184b80d29e50aea66d8e7223a2ad3a527a2138f0820cbbbb1b483cb
MD5 76e4572fe736c329eb8e7ad254c7e063
BLAKE2b-256 640a7b33a66f79aad9d504445fd20c5e2a945e13d0d65a4320661dfe067afdd5

See more details on using hashes here.

Provenance

The following attestation bundles were made for wunderkafka-0.19.0rc5.tar.gz:

Publisher: release.yml on wunderkafka/wunderkafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file wunderkafka-0.19.0rc5-py3-none-any.whl.

File metadata

  • Download URL: wunderkafka-0.19.0rc5-py3-none-any.whl
  • Upload date:
  • Size: 122.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for wunderkafka-0.19.0rc5-py3-none-any.whl
Algorithm Hash digest
SHA256 05f79fb942b8b47919ab17a1fb882f0386425ade17306df0d7ae30f61bf4336c
MD5 6d3f82e563a5010ad8e9fd9172db1217
BLAKE2b-256 823096ba30847208dbfaf724e36aad14075b02d5def054391f1f5a1971d63ac7

See more details on using hashes here.

Provenance

The following attestation bundles were made for wunderkafka-0.19.0rc5-py3-none-any.whl:

Publisher: release.yml on wunderkafka/wunderkafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page