Skip to main content

librdkafka-powered client for Kafka for python with (hopefully) more handful API

Project description

Wunderkafka

The power of librdkafka for humans pythons

Wunderkafka provides a handful of facades for C-powered consumer/producer. It's built on top of the confluent-kafka

For a quick view on what is going on, please check Quickstart and Documentation

Installation process described here

Features

#TypeSafe librdkafka config

Instead of passing just a dict to consumer/producer config, the pydantic-powered config is used. It is extracted directly from librdkafka's CONFIGURATION.md with some rough parsing.

Confluent & Cloudera Schema Registry support

Confluent is used as-is, but hortonworks/cloudera schema registry client and (de)serialization protocol are implemented as well (no "admin" methods support).

Building Kit

Wunderkafka allows you to relatively simply build your own transport for message (de)serialization and eliminate boilerplates for typical cases.

Pre-defined config models

import os
from functools import partial

from pydantic import field_validator, Field
from wunderkafka.time import now
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer


# If you are a fan of 12 factors, you may want to config via env variables
class OverridenSRConfig(SRConfig):
    url: str = Field(alias='SCHEMA_REGISTRY_URL')

    @field_validator('sasl_username')
    @classmethod
    def from_env(cls, v) -> str:
        # And to use 'native' kerberos envs
        return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))


# Or you want to override some defaults by default (pun intended)
class OverridenConfig(ConsumerConfig):
    # Consumer which do not commit messages automatically
    enable_auto_commit: bool = False
    # And knows nothing after restart due to new gid.
    group_id: str = 'wunderkafka-{0}'.format(now())
    # More 12 factors
    bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_kerberos_kinit_cmd: str = ''
    sr: SRConfig = OverridenSRConfig()

    @field_validator('sasl_kerberos_kinit_cmd')
    @classmethod
    def format_keytab(cls, v) -> str:
        if not v:
            return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
        # Still allowing to set it manually
        return str(v)


# After this, you can `partial` your own Producer/Consumer, something like...
MyConsumer = partial(AvroConsumer, config=OverridenConfig())
# OR
class MyConsumer(AvroConsumer):
    def __init__(self, config: ConsumerConfig = OverridenConfig()):
        super().__init__(config)

Building your own transport

from typing import Optional

from pydantic import Field

from wunderkafka.config.generated import enums
from wunderkafka.consumers.bytes import BytesConsumer
from wunderkafka.schema_registry import ClouderaSRClient
from wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler
from wunderkafka.consumers.constructor import HighLevelDeserializingConsumer
from wunderkafka.schema_registry.cache import SimpleCache
from wunderkafka.schema_registry.transport import KerberizableHTTPClient
from wunderkafka.serdes.avro.deserializers import FastAvroDeserializer
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol


class SRConfig(SRConfig):
    url: str = Field(alias="SCHEMA_REGISTRY_URL")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")


class OverridenConsumerConfig(ConsumerConfig):
    enable_auto_commit: bool = False
    auto_offset_reset: enums.AutoOffsetReset = enums.AutoOffsetReset.earliest
    bootstrap_servers: str = Field(env="BOOTSTRAP_SERVERS")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")
    sr: SRConfig = Field(default_factory=SRConfig)


# Pydantic/FastAPI style, but you can inherit from `HighLevelDeserializingConsumer` directly
def MyAvroConsumer(
    config: Optional[ConsumerConfig] = None,
) -> HighLevelDeserializingConsumer:
    config = config or OverridenConsumerConfig()
    return HighLevelDeserializingConsumer(
        consumer=BytesConsumer(config),
        schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        deserializer=FastAvroDeserializer(),
    )

Avro on-the-fly schema derivation

Supports dataclasses and pydantic.BaseModel for avro serialization powered by dataclasses-avroschema and some rough "metaprogramming":

# dataclass to AVRO schema example
from dataclasses import dataclass
from dataclasses_avroschema import AvroModel

@dataclass
class SomeData(AvroModel):
    field1: int
    field2: str

for a topic topic_name will become

{
      "type": "record",
      "name": "topic_name_value",
      "fields": [
          {
              "name": "field1",
              "type": "long"
          },
          {
              "name": "field2",
              "type": "string"
          }
      ]
  }

and

# pydantic.BaseModel to AVRO schema example

from typing import Optional
from pydantic import BaseModel

class Event(BaseModel):
  id: Optional[int]
  ts: Optional[int] = None

  class Meta:
      namespace = "any.data"

for a topic topic_name will become

{
      "type": "record",
      "name": "topic_name_value",
      "namespace": "any.data",
      "fields": [
          {
              "type": ["long", "null"],
              "name": "id"
          },
          {
              "type": ["null", "long"],
              "name": "ts",
              "default": null
          }
      ]
  }

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wunderkafka-0.19.0rc3.tar.gz (57.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wunderkafka-0.19.0rc3-py3-none-any.whl (121.9 kB view details)

Uploaded Python 3

File details

Details for the file wunderkafka-0.19.0rc3.tar.gz.

File metadata

  • Download URL: wunderkafka-0.19.0rc3.tar.gz
  • Upload date:
  • Size: 57.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for wunderkafka-0.19.0rc3.tar.gz
Algorithm Hash digest
SHA256 67c842dec20a28fcf19ec193a1a04dd6c016fe8f9f8f095652cb3a2efb070dc2
MD5 cd7445402a064fd877fb41b2c1842216
BLAKE2b-256 bc193f5e1f35a8ca164701efef3475b988faf57d70b10e93b2ab342725f72e3a

See more details on using hashes here.

Provenance

The following attestation bundles were made for wunderkafka-0.19.0rc3.tar.gz:

Publisher: release.yml on wunderkafka/wunderkafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file wunderkafka-0.19.0rc3-py3-none-any.whl.

File metadata

File hashes

Hashes for wunderkafka-0.19.0rc3-py3-none-any.whl
Algorithm Hash digest
SHA256 38082557c8b08fee1221d5d33ba2350b682afcc31a903fa65cf676be7578b745
MD5 09bc9e6b7c25cd7d357723cc01ebcc87
BLAKE2b-256 7205fb14ce9f1c164ade605d8306b2c818c545ccf17f3aee82d18b409fcdf747

See more details on using hashes here.

Provenance

The following attestation bundles were made for wunderkafka-0.19.0rc3-py3-none-any.whl:

Publisher: release.yml on wunderkafka/wunderkafka

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page