An Adobe IMS authorization code flow used with the OAuth Kafka client for Python (kafka-python-ng).
Project description
pipelinepy
This is an OAuth Python client callback for the kafka-python-ng client implementation. It is designed to provide a simple and straightforward interface for authenticating Adobe IMS (Identity Management System) users into Kafka.
Installation
Install pipelinepy using pip:
pip install pipelinepy
Usage
Pipelinepy is designed to be used with the kafka-python-ng client implementation.
To use it, you need to create an instance of the ImsTokenProvider
class and pass it to
the sasl_oauth_token_provider
parameter of the Kafka producer and consumer.
pip install kafka-python-ng pipelinepy
#!/usr/bin/env python
import threading, time
from kafka import KafkaConsumer, KafkaProducer
from pipelinepy import ImsTokenProvider
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
def run(self):
producer = KafkaProducer(bootstrap_servers=['broker1:9092','broker2:9092','broker3:9092'],
sasl_mechanism='OAUTHBEARER',
security_protocol='SASL_SSL',
sasl_oauth_token_provider=ImsTokenProvider())
while not self.stop_event.is_set():
producer.send('some_existing_topic', b"Hello, world!")
producer.send('some_existing_topic', b"Salutare, lume!")
producer.send('some_existing_topic', b"\xc2\xa1Hola, mundo!")
time.sleep(1)
producer.close()
class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
def run(self):
consumer = KafkaConsumer(bootstrap_servers=['broker1:9092','broker2:9092','broker3:9092'],
auto_offset_reset='earliest',
group_id='test-cg',
consumer_timeout_ms=1000,
sasl_mechanism='OAUTHBEARER',
security_protocol='SASL_SSL',
sasl_oauth_token_provider=ImsTokenProvider())
consumer.subscribe(['some_existing_topic'])
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
consumer.close()
def main():
tasks = [
Producer(),
Consumer()
]
# Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
for t in tasks:
t.start()
time.sleep(30)
# Stop threads
for task in tasks:
task.stop()
for task in tasks:
task.join()
if __name__ == "__main__":
main()
Environment Variables
The following environment variables need to be set:
IMS_URL
: The base URL of the IMS service (does not include the '/ims/token/v1' part).IMS_CLIENT_ID
: The client ID for the IMS service.IMS_CLIENT_SECRET
: The client secret for the IMS service.IMS_CLIENT_CODE
: The client code for the IMS service.
Contributing
Contributions are welcome. Please submit a pull request or create an issue to discuss the changes you want to make.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pipelinepy-1.0.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | cbd4797cdc5138e29b7104bcf8394075902e5f1ac3a59125707e6d314823c21e |
|
MD5 | 2acf78537f976ebd67369a20d897014d |
|
BLAKE2b-256 | 7df131c9a7e03e073fdad50faedf9b724db587e1884812530b92377b252a2ae4 |