An Adobe IMS authorization code flow used with the OAuth Kafka client for Python (kafka-python-ng).
Project description
pipelinepy
This is an OAuth Python client callback for the kafka-python-ng client implementation. It is designed to provide a simple and straightforward interface for authenticating Adobe IMS (Identity Management System) users into Kafka.
Installation
Install pipelinepy using pip:
pip install pipelinepy
Usage
Pipelinepy is designed to be used with the kafka-python-ng client implementation.
To use it, you need to create an instance of the ImsTokenProvider
class and pass it to
the sasl_oauth_token_provider
parameter of the Kafka producer and consumer.
pip install kafka-python-ng pipelinepy
#!/usr/bin/env python
import threading, time
from kafka import KafkaConsumer, KafkaProducer
from pipelinepy import ImsTokenProvider
class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
def run(self):
producer = KafkaProducer(bootstrap_servers=['broker1:9092','broker2:9092','broker3:9092'],
sasl_mechanism='OAUTHBEARER',
security_protocol='SASL_SSL',
sasl_oauth_token_provider=ImsTokenProvider())
while not self.stop_event.is_set():
producer.send('some_existing_topic', b"Hello, world!")
producer.send('some_existing_topic', b"Salutare, lume!")
producer.send('some_existing_topic', b"\xc2\xa1Hola, mundo!")
time.sleep(1)
producer.close()
class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
def stop(self):
self.stop_event.set()
def run(self):
consumer = KafkaConsumer(bootstrap_servers=['broker1:9092','broker2:9092','broker3:9092'],
auto_offset_reset='earliest',
group_id='test-cg',
consumer_timeout_ms=1000,
sasl_mechanism='OAUTHBEARER',
security_protocol='SASL_SSL',
sasl_oauth_token_provider=ImsTokenProvider())
consumer.subscribe(['some_existing_topic'])
while not self.stop_event.is_set():
for message in consumer:
print(message)
if self.stop_event.is_set():
break
consumer.close()
def main():
tasks = [
Producer(),
Consumer()
]
# Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
for t in tasks:
t.start()
time.sleep(30)
# Stop threads
for task in tasks:
task.stop()
for task in tasks:
task.join()
if __name__ == "__main__":
main()
Environment Variables
The following environment variables need to be set:
IMS_URL
: The base URL of the IMS service (does not include the '/ims/token/v1' part).IMS_CLIENT_ID
: The client ID for the IMS service.IMS_CLIENT_SECRET
: The client secret for the IMS service.IMS_CLIENT_CODE
: The client code for the IMS service.
Contributing
Contributions are welcome. Please submit a pull request or create an issue to discuss the changes you want to make.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file pipelinepy-1.0.5.tar.gz
.
File metadata
- Download URL: pipelinepy-1.0.5.tar.gz
- Upload date:
- Size: 2.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.9 Darwin/23.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2e8393c2892fb964b2242e6da42a4fb617baea8562bbaf63b7372accbc7e613f |
|
MD5 | c525b256681f9f351d741c3bd0af3c6a |
|
BLAKE2b-256 | e68269849a40fe25f4262faac31c7598ff084c0f3126ea363885204e169863af |
File details
Details for the file pipelinepy-1.0.5-py3-none-any.whl
.
File metadata
- Download URL: pipelinepy-1.0.5-py3-none-any.whl
- Upload date:
- Size: 3.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.11.9 Darwin/23.4.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cbd4797cdc5138e29b7104bcf8394075902e5f1ac3a59125707e6d314823c21e |
|
MD5 | 2acf78537f976ebd67369a20d897014d |
|
BLAKE2b-256 | 7df131c9a7e03e073fdad50faedf9b724db587e1884812530b92377b252a2ae4 |