Skip to main content

Amazon MSK Library in Python for SASL/OAUTHBEARER Auth

Project description

Package Version Python Versions Build status License Security Scan

This is an Amazon MSK Library in Python. This library provides a function to generates a base 64 encoded signed url to enable authentication/authorization with an MSK Cluster. The signed url is generated by using your IAM credentials.

  • Free software: Apache Software License 2.0

Features

  • Provides a function to generate auth token using IAM credentials from the AWS default credentials chain.

  • Provides a function to generate auth token using IAM credentials from the AWS named profile.

  • Provides a function to generate auth token using assumed IAM role’s credentials.

  • Provides a function to generate auth token using a CredentialProvider. The CredentialProvider should be inherited from botocore.credentials.CredentialProvider class.

Get Started

  • For installation, refer to installation guide

  • In order to use the signer library with a Kafka client library with SASL/OAUTHBEARER mechanism, add the callback function in your code.

  • For example, here is the sample code to use with dpkp/kafka-python library:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('<my aws region>')
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers='<my bootstrap string>',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
)

topic = "<my-topic>"
while True:
    try:
        inp=input(">")
        producer.send(topic, inp.encode())
        producer.flush()
        print("Produced!")
    except Exception:
        print("Failed to send message:", e)

producer.close()
from confluent_kafka import Consumer
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("<my aws region>")
    # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in ms
    return auth_token, expiry_ms/1000

c = Consumer({
    "debug": "all",
    'bootstrap.servers': "<my bootstrap string>",
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'OAUTHBEARER',
    'oauth_cb': oauth_cb,
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['<my-topic>'])

print("Starting consumer!")

while True:
    msg = c.poll(5)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()
  • In order to use a named profile to generate token, replace the token() function with code below :

class MSKTokenProvider():
    def token(self):
        oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_profile('<your aws region>', '<named_profile>')
        return oauth2_token
  • In order to use a role arn to generate token, replace the token() function with code below :

class MSKTokenProvider():
    def token(self):
        oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_role_arn('<your aws region>', '<role_arn>')
        return oauth2_token
  • In order to use a custom credentials provider, replace the token() function with code below :

class MSKTokenProvider():
    def token(self):
        oauth2_token, _ = MSKAuthTokenProvider.generate_auth_token_from_credentials_provider('<your aws region>', '<your_credentials_provider')
        return oauth2_token

Running Tests

You can run tests in all supported Python versions using pytest. By default, it will run all of the unit tests.

$ pytest

You can also run tests with setup.py:

$ python setup.py test

To fix lint issues, run the pre-commit command:

$ pre-commit run --all-files

To run tests with coverage information, run:

$ coverage run --source=aws_msk_iam_sasl_signer.MSKAuthTokenProvider -m pytest tests/test_auth_token_provider.py
$ coverage report -m

Troubleshooting

Finding out which identity is being used

You may receive an Access denied error and there may be some doubt as to which credential is being exactly used. The credential may be sourced from a role ARN, EC2 instance profile, credential profile etc. When calling generate_auth_token(), you can set aws_debug_creds argument to True along with client side logging set to DEBUG then the signer library will print a debug log of the form:

MSKAuthTokenProvider.generate_auth_token('<my aws region>', aws_debug_creds = True)
Credentials Identity: {UserId: ABCD:test124, Account: 1234567890, Arn: arn:aws:sts::1234567890:assumed-role/abc/test124}

The log line provides the IAM Account, IAM user id and the ARN of the IAM Principal corresponding to the credential being used.

Getting Help

Please use these community resources for getting help. We use the GitHub issues for tracking bugs and feature requests.

This repository provides a pluggable library with any Python Kafka client for SASL/OAUTHBEARER mechanism. For more information about SASL/OAUTHBEARER mechanism please go to KIP 255.

Opening Issues

If you encounter a bug with the AWS MSK IAM SASL Signer for Python, we would like to hear about it. Search the Issues and see if others are also experiencing the same issue before opening a new issue. Please include the version of AWS MSK IAM SASL Signer for Python, Python, and OS you’re using. Please also include reproduction case when appropriate.

The GitHub issues are intended for bug reports and feature requests. For help and questions with using AWS MSK IAM SASL Signer for Python, please make use of the resources listed in the Getting Help section. Keeping the list of open issues lean will help us respond in a timely manner.

Contributing

We value feedback and contributions from our community. Whether it’s a bug report, new feature, correction, or additional documentation, we welcome your issues and pull requests. Please read through this CONTRIBUTING document before submitting any issues or pull requests to ensure we have all the necessary information to effectively respond to your contribution.

More Resources

Credits

This package was created with Cookiecutter and the audreyr/cookiecutter-pypackage project template.

# Changelog

## 1.0.2 (2025-03-04)

  • Add support for more Python versions

## 1.0.1 (2024-01-17)

  • Expanding version dependency constraints

## 1.0.0 (2023-11-09)

  • First release of AWS MSK IAM SASL Signer Python library.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aws_msk_iam_sasl_signer_python-1.0.2.tar.gz (24.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aws_msk_iam_sasl_signer_python-1.0.2-py2.py3-none-any.whl (13.3 kB view details)

Uploaded Python 2Python 3

File details

Details for the file aws_msk_iam_sasl_signer_python-1.0.2.tar.gz.

File metadata

File hashes

Hashes for aws_msk_iam_sasl_signer_python-1.0.2.tar.gz
Algorithm Hash digest
SHA256 3432d88a7c6db4887ceb1130ebaed0113bfda48b79ea811537add5f1d25fa13f
MD5 e5703f61fa605869ad8460b7cb12269b
BLAKE2b-256 388b9af0a7def4ba357afadc89c06019d3735944cb3d6065a455f41580ba7fd6

See more details on using hashes here.

File details

Details for the file aws_msk_iam_sasl_signer_python-1.0.2-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for aws_msk_iam_sasl_signer_python-1.0.2-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 310eb2db9ca0ff55ed06a24212739b87533e7f1cf6f34e43aabbd97a3b21290e
MD5 3303d33ee365087e37485de6f7d0d4f0
BLAKE2b-256 0643f3ffd79fc4941b8d610530d81e1f600cfa1b8651affc25cb929fd0f2f2c9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page