Skip to main content

An Adobe IMS authorization code flow used with the OAuth Kafka client for Python (kafka-python-ng).

Project description

pipelinepy

This is an OAuth Python client callback for the kafka-python-ng client implementation. It is designed to provide a simple and straightforward interface for authenticating Adobe IMS (Identity Management System) users into Kafka.

Installation

Install pipelinepy using pip:

pip install pipelinepy

Usage

Pipelinepy is designed to be used with the kafka-python-ng client implementation. To use it, you need to create an instance of the ImsTokenProvider class and pass it to the sasl_oauth_token_provider parameter of the Kafka producer and consumer.

pip install kafka-python-ng pipelinepy
#!/usr/bin/env python
import threading, time

from kafka import KafkaConsumer, KafkaProducer
from pipelinepy import ImsTokenProvider


class Producer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.stop_event = threading.Event()

    def stop(self):
        self.stop_event.set()

    def run(self):
        producer = KafkaProducer(bootstrap_servers=['broker1:9092','broker2:9092','broker3:9092'],
                                 sasl_mechanism='OAUTHBEARER',
                                 security_protocol='SASL_SSL',
                                 sasl_oauth_token_provider=ImsTokenProvider())

        while not self.stop_event.is_set():
            producer.send('some_existing_topic', b"Hello, world!")
            producer.send('some_existing_topic', b"Salutare, lume!")
            producer.send('some_existing_topic', b"\xc2\xa1Hola, mundo!")
            time.sleep(1)

        producer.close()


class Consumer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.stop_event = threading.Event()

    def stop(self):
        self.stop_event.set()

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers=['broker1:9092','broker2:9092','broker3:9092'],
                                 auto_offset_reset='earliest',
                                 group_id='test-cg',
                                 consumer_timeout_ms=1000,
                                 sasl_mechanism='OAUTHBEARER',
                                 security_protocol='SASL_SSL',
                                 sasl_oauth_token_provider=ImsTokenProvider())
        consumer.subscribe(['some_existing_topic'])

        while not self.stop_event.is_set():
            for message in consumer:
                print(message)
                if self.stop_event.is_set():
                    break

        consumer.close()


def main():

    tasks = [
        Producer(),
        Consumer()
    ]

    # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
    for t in tasks:
        t.start()

    time.sleep(30)

    # Stop threads
    for task in tasks:
        task.stop()

    for task in tasks:
        task.join()


if __name__ == "__main__":
    main()

Environment Variables

The following environment variables need to be set:

  • IMS_URL: The base URL of the IMS service (does not include the '/ims/token/v1' part).
  • IMS_CLIENT_ID: The client ID for the IMS service.
  • IMS_CLIENT_SECRET: The client secret for the IMS service.
  • IMS_CLIENT_CODE: The client code for the IMS service.

Contributing

Contributions are welcome. Please submit a pull request or create an issue to discuss the changes you want to make.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipelinepy-1.0.5.tar.gz (2.6 kB view details)

Uploaded Source

Built Distribution

pipelinepy-1.0.5-py3-none-any.whl (3.2 kB view details)

Uploaded Python 3

File details

Details for the file pipelinepy-1.0.5.tar.gz.

File metadata

  • Download URL: pipelinepy-1.0.5.tar.gz
  • Upload date:
  • Size: 2.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.9 Darwin/23.4.0

File hashes

Hashes for pipelinepy-1.0.5.tar.gz
Algorithm Hash digest
SHA256 2e8393c2892fb964b2242e6da42a4fb617baea8562bbaf63b7372accbc7e613f
MD5 c525b256681f9f351d741c3bd0af3c6a
BLAKE2b-256 e68269849a40fe25f4262faac31c7598ff084c0f3126ea363885204e169863af

See more details on using hashes here.

File details

Details for the file pipelinepy-1.0.5-py3-none-any.whl.

File metadata

  • Download URL: pipelinepy-1.0.5-py3-none-any.whl
  • Upload date:
  • Size: 3.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.11.9 Darwin/23.4.0

File hashes

Hashes for pipelinepy-1.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 cbd4797cdc5138e29b7104bcf8394075902e5f1ac3a59125707e6d314823c21e
MD5 2acf78537f976ebd67369a20d897014d
BLAKE2b-256 7df131c9a7e03e073fdad50faedf9b724db587e1884812530b92377b252a2ae4

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page