This package is a wrapper for REST API requests to Kafka Proxy.
Project description
Kafka REST API
Installation
If you are installing from a remote repository pre-configured in pip, run: pip install kafka-rest-api
.
If you are installing from a wheel file in the local directory, run: pip install {filename}.whl
, and replace {filename}
with the name of the .whl file.
Getting Started
Interactions with a Kafka cluster can be performed on a Producer/Consumer paradigm. As such there are two classes that can be imported and used to publish and subscribe to topics in Kafka: Producer and Consumer.
Configuration
When using this package to access Merck API Gateway in Dev environment, make sure to define the following environment variables:
- KAFKA_REST_API_URL: Target Kafka REST API URL.
- X_API_KEY: The authorization token to validate API requests to API Gateway.
This variables can be passed to Producer and Consumer classes as a dictionary, for example:
auth_headers = {"x-api-key": "<YOUR_X_API_KEY>"}
If KAFKA_REST_API_URL is not defined as an environment variable, you can provide the API URL directly to the Producer or Consumer classes, like:
producer = Producer(kafka_rest_api_url="<YOUR_REST_API_URL>")
consumer = Consumer(kafka_rest_api_url="<YOUR_REST_API_URL>")
Producer
In the snippet below we use the topic pke as example. The pattern for the producer is the following:
from kafka_rest import Producer
producer = Producer(auth_headers=auth_headers)
keys = producer.produce(producer_messages, "pke")
The Producer.produce
method automatically generates each message unique key (UUID) for you.
You can optionally provide your unique keys as well. For more information, please consult this method's docstring.
Consumer
To import the consumer:
from kafka_rest import Consumer
You can choose one the following patterns to define the Consumer:
- Pattern 1 - Instantiation via context manager:
with Consumer(topics=["pke-response"], auth_headers=auth_headers) as consumer:
new_data = consumer.consume_all(keys)
- Pattern 2 - Step-by-step instantiation:
consumer = Consumer(auth_headers=auth_headers)
consumer.create()
consumer.subscribe(topics=["pke-response"])
consumed_data = consumer.consume_all(keys) # or consumer.create().subscribe().consume_all(keys)
consumer.delete()
- Pattern 3 - Consumer with iterator:
with Consumer(topics=["pke-response"], auth_headers=auth_headers) as consumer:
for iter_data in consumer.consume(keys):
print("Iterator Data:", iter_data)
- Produce files as inputs and consume outputs.
messages = []
for name in file_names:
with open(f"snippet/{name}", "rb") as f:
messages.append({"name": name, "bytes": f.read(), "type": "application/pdf"})
producer = Producer(auth_headers=auth_headers)
new_keys = producer.produce_files(messages, topic_for_files)
with Consumer(topics=f"{topic_for_files}-response", auth_headers=auth_headers) as consumer:
for i, iter_data in enumerate(consumer.consume(new_keys, interval_sec=1)):
if iter_data:
print(f"Output: {iter_data}")
For a complete snippet, please check the example in the file kafka_rest/kafka_rest.py
in this repo.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
Hashes for nlp_kafka_rest_api-0.7.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | fa43928b3e0ec928b286679135c0c5fc7f87976956d73bff6fb15f6192c90373 |
|
MD5 | 8b0fe23a7e5cf16d719ee1057393f3ac |
|
BLAKE2b-256 | 63ce0f36b493d57f8a81b91c18dbcfcc2f7e55d0b64a2f14acc42d7945f95915 |