Skip to main content

Load all your confluent-kafka clients from config file.

Reason this release was yanked:

archived

Project description

Confluent Kafka Config

A thin wrapper around the confluent-kafka-python library. This wrapper allows for dynamic instantiation of Consumer, Producer, and Admin clients based on configurations provided in a config file in YAML or JSON formats.

Installation

To install this package, run:

pip install confluent_kafka_config

Structure of config file

admin
   └── config
       └── bootstrap.servers:  < host:port >
schema_registry
   └── url:                    < http's://host:port >

consumers < list of dictionaries >
   ├── name:                   < some consumer name, random >
   ├── topic 
   │   ├── name:               < topic name >
   │   ├── partitions:         < a list of partition numbers to consume from, or leave empty >
   │   └── schema_name:        < schema name to use with topic >
   └── config                  < confluent_kafka.Consumer conf >
       ├── bootstrap.servers:  < host:port >
       ├── group.id:           < group name >
       └── ...
       
producers < list of dictionaries >                 
   ├── name:                   < some producer name, random >
   ├── topic 
   │   ├── name:               < topic name >
   │   ├── partitions:         < a list of partition numbers to produce to, or leave empty >
   │   └── schema_name:        < schema name to use with topic >
   └── config                  < confluent_kafka.Producer conf >
       ├── bootstrap.servers:  < host:port >
       ├── acks:               < 0, 1, etc. >
       └── ...

Docs:

Definitions:

  • Client: Either an instance of ProducerContext or ConsumerContext

⚠️ Warning: At present, each client expects a single topic with single schema. This will be resolved in the future: https://github.com/Aragonski97/confluent-kafka-config/issues/16

ClientPool

A wrapper class that contains all consumers / producers instantiated based on config file. Load ClientPool by calling its class factory function:

from confluent_kafka_config.client_pool import ClientPool

pool = ClientPool.from_config(<path_to_your_config_file>)

# access consumers
#pool.consumers : dict[str, ConsumerContext]

# access producers
#pool.producers: dict[str, ProducerContext]

# get specific consumer by name is pool.consumers[<consumer name>]
# same for producers
# overriden __getitem__ will be implemented in the future: https://github.com/Aragonski97/confluent-kafka-config/issues/15

RegistryContext

A wrapper around confluent_kafka.SchemaRegistryClient that includes the given schema indended for a client specified in config file. Based on the schema, a function confluent_kafka_config.RegistryContext.create_registered_model creates a model based on the schema definied in the registry. This model is used for deserialization and serialization.

TopicContext

A wrapper around confluent_kafka.TopicPartition class that includes not only the topic name and partitions, but also a registered schema specified in the config file.

ConsumerContext

A wrapper around confluent_kafka.Consumer class that includes a given confluent_kafka_config.TopicContext. The function confluent_kafka_config.ConsumerContext.consume is an exposed version of confluent_kafka.Consumer.consume which handles some errors. This error handling will be extensively covered in the future: https://github.com/Aragonski97/confluent-kafka-config/issues/17

ProducerContext

Almost identical to ConsumerContext, just pertaining confluent_kafka.Producer class.

KafkaConfig

A pydantic schema used for loading the config file. Embedded validation, etc.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

confluent_kafka_config-1.1.4.tar.gz (17.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

confluent_kafka_config-1.1.4-py3-none-any.whl (21.9 kB view details)

Uploaded Python 3

File details

Details for the file confluent_kafka_config-1.1.4.tar.gz.

File metadata

  • Download URL: confluent_kafka_config-1.1.4.tar.gz
  • Upload date:
  • Size: 17.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.0.1 CPython/3.12.3

File hashes

Hashes for confluent_kafka_config-1.1.4.tar.gz
Algorithm Hash digest
SHA256 306e2df9497964759b826677b0810c9531b4abe40f226047786f2bc9b9b2a92b
MD5 e6100912c0a5c3eb0b61395b5fd024dd
BLAKE2b-256 94ef597b3d867acb0ada04dcbbd0df744a8abcf5b776ce7174f4632a8c0a7ba4

See more details on using hashes here.

File details

Details for the file confluent_kafka_config-1.1.4-py3-none-any.whl.

File metadata

File hashes

Hashes for confluent_kafka_config-1.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 51438abdf2a85a915abcaf9e444e1641df338841fd045ef23607db1bd612e12f
MD5 ff2e2986a571b3f66ceb109160f98d82
BLAKE2b-256 560b632431d9d3ec301d30734d2d349415a6cb7113481b324c4423b70185e16d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page