Skip to main content

Axual client for Python

Project description

Axual Python Client

License Pipeline Status Quality Gate Status Maintainability Rating

Python client library that enables interaction with the Axual platform.

Python Client Overview

Prerequisites

Python 3 is required, Python 3.6 or greater is recommended.

Installation

pip install axual-client-python

Testing

Tests are located in the tests/ directory. To run all of them:

python -m unittest discover tests

Usage

Producer

import json
from axualclient.producer import Producer

conf = {
    # Axual configuration
    'application_id': application_id,
    'endpoint': endpoint,
    'tenant': tenant,
    'environment': environment,
    # SSL configuration
    'ssl.certificate.location': producer_app_cert,
    'ssl.key.location': producer_app_key,
    'ssl.ca.location': root_cert,
}
producer = Producer(conf)
producer.produce(value=json.dumps(dict(a=1, b='banana')))
producer.flush()  # Flushes producer before ending (triggering callbacks for delivery reports)

See SSL Configuration for more info on the certificates required.

Consumer

from axualclient.consumer import Consumer

conf = {
    # Axual configuration
    'application_id': application_id,
    'endpoint': endpoint,
    'tenant': tenant,
    'environment': environment,
    # SSL configuration
    'ssl.certificate.location': producer_app_cert,
    'ssl.key.location': producer_app_key,
    'ssl.ca.location': root_cert,
    # Consumer configuration
    'auto.offset.reset': 'earliest',
    'on_commit': on_commit_callback,
}
consumer = Consumer(conf)

try:
    # Subscribe to topic(s) as they appear on Self Service
    consumer.subscribe(['your-topic'])
    while True:
        msg = consumer.poll(1.0)

        # msg is None if no new message
        if msg is None:
            continue

        if msg.error():
            # Error handling
            raise KafkaException(msg.error())
        else:
            # Do some processing on the message
            print(
                f'Received message on topic {msg.topic()} partition {msg.partition()} '
                f'offset {msg.offset()} key {msg.key()} value {msg.value()}'
            )
            # Commit message offset to the topic
            consumer.commit()
finally:
    if consumer is not None:
        # Cleanly unregister from cluster by closing consumer
        consumer.commit()
        consumer.close()

Producer (AVRO)

Producing AVRO messages works if you have the AVRO schema available that was uploaded to self-service. Those schemas need to be provided to the serializer via the schema_str as Avro Schema Declaration.

from axualclient.avro import AvroSerializer
from axualclient.serializing_producer import SerializingProducer

key_serializer = AvroSerializer(schema_str=Application.SCHEMA, to_dict=application_to_dict)
value_serializer = AvroSerializer(schema_str=ApplicationLogEvent.SCHEMA, to_dict=application_log_event_to_dict)

conf = {
    # Axual configuration
    'application_id': application_id,
    'endpoint': endpoint,
    'tenant': tenant,
    'environment': environment,
    # SSL configuration
    'ssl.certificate.location': producer_app_cert,
    'ssl.key.location': producer_app_key,
    'ssl.ca.location': root_cert,
    # Producer configuration
    'key.serializer': key_serializer,
    'value.serializer': value_serializer,
    'acks': 'all',
}
producer = SerializingProducer(conf)

producer.produce(topic=topic, value=value, key=key)
producer.flush() # Flushes producer before ending (triggering callbacks for delivery reports)

Consumer (AVRO)

Consumer AvroDeserializers need to be instantiated and passed as configuration to the Axual DeserializingConsumer like so:

from axualclient.avro import AvroDeserializer
from axualclient.deserializing_consumer import DeserializingConsumer

# Initialize Deserializers:
key_deserializer = AvroDeserializer(
    # Optional parameters
    schema_str=Application.SCHEMA,
    from_dict=dict_to_application
)
value_deserializer = AvroDeserializer(
    # Optional parameters
    schema_str=ApplicationLogEvent.SCHEMA,
    from_dict=dict_to_application_log_event,
)

configuration = {
    # Axual configuration
    'application_id': application_id,
    'endpoint': endpoint,
    'tenant': tenant,
    'environment': environment,
    # SSL configuration
    'ssl.certificate.location': producer_app_cert,
    'ssl.key.location': producer_app_key,
    'ssl.ca.location': root_cert,
    # Consumer configuration
    'key.deserializer': key_deserializer,
    'value.deserializer': value_deserializer,
    'auto.offset.reset': 'earliest',
    'on_commit': on_commit_callback,
    'error_cb': on_error_callback,
    'logger': logger
}

consumer = DeserializingConsumer(configuration)

try:
    consumer.subscribe([topic])
    while True:
        msg = consumer.poll()

        if msg is None:
            continue

        if msg.error():
            print(f'Error returned by poll: {msg.error()}')
        else:
            print(
                f'Received message on topic {msg.topic()} partition {msg.partition()} '
                f'offset {msg.offset()} key: {str(msg.key())} value: {str(msg.value())}'
            )
            consumer.commit()
except KeyboardInterrupt:
    print('Caught KeyboardInterrupt, stopping.')
finally:
    if consumer is not None:
        print('Committing final offsets and leaving group.')
        consumer.commit()
        consumer.close()

SSL Configuration

The client configuration requires a correct SSL configuration in order to communicate securely with brokers, the discovery API, and the schema registry.
Each application (as defined in self-service) requires an application certificate (ssl.certificate.location) and corresponding private key (ssl.key.location). The application certificate must match the one uploaded in self-service for that application.
The file with root certificates needs to be created properly: the brokers might be using a root certificate authority different from the authority that signed the certificates for the discovery API and schema registry. The base64-encoded unencrypted versions of these certificates can be pasted into one file (ssl.ca.location). This file would then look like the following example:

-----BEGIN CERTIFICATE-----
MIIQdaGDAksKadksSDKNsdka5sjy8elAMsm3d .....
 ...  more base64-encoded content here ...
..... LKlmsf02mz2EWYnds=
-----END CERTIFICATE-----

-----BEGIN CERTIFICATE-----
MIIPmsd92nNWlasHWdwMOe92nwoa2QNinnNaZ .....
 ...  more base64-encoded content here ...
..... ldFAf02SArubBW7wVFW2i1=
-----END CERTIFICATE-----

Examples

Simple use cases using the client code can be found in the Axual Python Client Examples.

Known Limitations

  • We have encountered issues when using root_cas consisting of more than 1 intermediate certificates. The issue originates with the underlying SSL C library implementation and results in the following exception when authenticating:
[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: path length constraint exceeded (_ssl.c:1123)
  • Transaction support is not there. An issue is already created and transactions will be supported in future releases.
  • PEM as string format is not supported. An issue is already created and will be added in future releases.

Contributing

Axual is interested in building the community; we would welcome any thoughts or patches. You can reach us here.

See contributing.

Building locally

This project uses poetry for dependency management and building. Install the tool as per the instructions on the linked page, and build the library using:

poetry build

Testing locally

Unit Tests are placed in directory unit, to run all of them:

python -m unittest discover -s tests/unit -v

Integration tests require standalone to be running, to run standalone locally you need docker compose. Once that is present you can start standalone

docker compose up
python -m unittest discover -s tests/integration -v

For maintainers: building a release

The version of the library being built will be the version specified in pyproject.toml whenever you push to a branch.

When tagging and building a release however, please be aware that the CI pipeline will ignore the version in pyproject.toml and build a release based on what is specified in the tag; for example tagging 1.0.0-alpha4 will produce axual_client_python-1.0.0a4-py3-none-any.whl.

This has two consequences:

  • You have to follow the normal semver rules when choosing a tag
  • After releasing, it falls on the developer to manually update the version in pyproject.toml in preparation for the next version.

License

Axual Python Client is licensed under the Apache License, Version 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

axual-client-python-1.1.5.tar.gz (22.0 kB view hashes)

Uploaded Source

Built Distribution

axual_client_python-1.1.5-py3-none-any.whl (28.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page