Skip to main content

Axual client for Python

Project description

Axual Python Client

License Pipeline Status Quality Gate Status Maintainability Rating

Python client library that enables interaction with the Axual platform.

Python Client Overview

Prerequisites

Python 3 is required, Python 3.6 or greater is recommended.

Installation

pip install axual-client-python

Testing

Tests are located in the tests/ directory. To run all of them:

python -m unittest discover tests

Usage

Producer

import json
from axualclient.producer import Producer

conf = {
    # Axual configuration
    'application_id': application_id,
    'endpoint': endpoint,
    'tenant': tenant,
    'environment': environment,
    # SSL configuration
    'ssl.certificate.location': producer_app_cert,
    'ssl.key.location': producer_app_key,
    'ssl.ca.location': root_cert,
}
producer = Producer(conf)
producer.produce(value=json.dumps(dict(a=1, b="banana")))
producer.flush()  # Flushes producer before ending (triggering callbacks for delivery reports)

See SSL Configuration for more info on the certificates required.

Consumer

from axualclient.consumer import Consumer
from axualclient.deserializing_consumer import DeserializingConsumer

conf = {
    # Axual configuration
    'application_id': application_id,
    'endpoint': endpoint,
    'tenant': tenant,
    'environment': environment,
    # SSL configuration
    'ssl.certificate.location': producer_app_cert,
    'ssl.key.location': producer_app_key,
    'ssl.ca.location': root_cert,
    # Consumer configuration
    'auto.offset.reset': 'earliest',
    'on_commit': on_commit_callback,
}
consumer = Consumer(conf)
# For consuming from an AVRO topic, use:
# consumer = DeserializingConsumer(
#     conf,
#     key_schema_str=Application.SCHEMA,
#     value_schema_str=ApplicationLogEvent.SCHEMA,
#     from_key_dict=dict_to_application,
#     from_value_dict=dict_to_application_log_event
# )

try:
    # Subscribe to topic(s) as they appear on Self Service
    consumer.subscribe(['your-topic'])
    while True:
        msg = consumer.poll(1.0)

        # msg is None if no new message
        if msg is None:
            continue

        if msg.error():
            # Error handling
            raise KafkaException(msg.error())
        else:
            # Do some processing on the message
            print(
                f'Received message on topic {msg.topic()} partition {msg.partition()} '
                f'offset {msg.offset()} key {msg.key()} value {msg.value()}'
            )
            # Commit message offset to the topic
            consumer.commit()
finally:
    if consumer:
        # Cleanly unregister from cluster by closing consumer
        consumer.commit()
        consumer.close()

Producer (AVRO)

Producing AVRO messages works if you have the AVRO schema available that was uploaded to self-service. schema_value and schema_key can be either an AVRO schema (string), or the path to a file containing the schema. schema_key is optional and depends on whether the key has to be AVRO-serialized as well.

conf = {
    # Axual configuration
    'application_id': application_id,
    'endpoint': endpoint,
    'tenant': tenant,
    'environment': environment,
    # SSL configuration
    'ssl.certificate.location': producer_app_cert,
    'ssl.key.location': producer_app_key,
    'ssl.ca.location': root_cert,
}
producer = SerializingProducer(
    conf,
    key_to_dict=application_to_dict,              # method reference that converts a key object to dictionary (see docs)
    value_to_dict=application_log_event_to_dict,  # method reference that converts a value object to dictionary (see docs)
    schema_key=Application.SCHEMA,
    schema_value=ApplicationLogEvent.SCHEMA
)
producer.produce(topic=topic, value=value, key=key)
producer.flush() # Flushes producer before ending (triggering callbacks for delivery reports)

SSL Configuration

The client configuration requires a correct SSL configuration in order to communicate securely with brokers, the discovery API, and the schema registry.
Each application (as defined in self-service) requires an application certificate (certificate_location in SslConfig) and corresponding private key (private_key_location). The application certificate must match the one uploaded in self-service for that application.
The file with root certificates needs to be created properly: The brokers might be using a root certificate authority different from the authority that signed the certificates for the discovery API and schema registry. The base64-encoded unencrypted versions of these certificates can be pasted into one file to use as certificate that root_ca_location in SslConfig points to. This file would then look like the following example:

-----BEGIN CERTIFICATE-----
MIIQdaGDAksKadksSDKNsdka5sjy8elAMsm3d .....
 ...  more base64-encoded content here ...
..... LKlmsf02mz2EWYnds=
-----END CERTIFICATE-----

-----BEGIN CERTIFICATE-----
MIIPmsd92nNWlasHWdwMOe92nwoa2QNinnNaZ .....
 ...  more base64-encoded content here ...
..... ldFAf02SArubBW7wVFW2i1=
-----END CERTIFICATE-----

Examples

Simple use cases using the client code can be found in the Axual Python Client Examples.

Known Limitations

  • We have encountered issues when using root_cas consisting of more than 1 intermediate certificates. The issue originates with the underlying SSL C library implementation and results in the following exception when authenticating:
[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: path length constraint exceeded (_ssl.c:1123)
  • Transaction support is not there. An issue is already created and transactions will be supported in future releases.

Contributing

Axual is interested in building the community; we would welcome any thoughts or patches. You can reach us here.

See contributing.

Building locally

This project uses poetry for dependency management and building. Install the tool as per the instructions on the linked page, and build the library using:

poetry build

For maintainers: building a release

The version of the library being built will be the version specified in pyproject.toml whenever you push to a branch.
When tagging and building a release however, please be aware that the CI pipeline will ignore the version in pyproject.toml and build a release based on what is specified in the tag; for example tagging 1.0.0-alpha4 will produce axual_client_python-1.0.0a4-py3-none-any.whl.

This has two consequences:

  • You have to follow the normal semver rules when choosing a tag
  • After releasing, it falls on the developer to manually update the version in pyproject.toml in preparation for the next version.

License

Axual Python Client is licensed under the Apache License, Version 2.0.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

axual-client-python-1.0.0.tar.gz (19.5 kB view hashes)

Uploaded Source

Built Distribution

axual_client_python-1.0.0-py3-none-any.whl (25.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page