Axual client for Python
Project description
Axual Python Client
Python client library that enables interaction with the Axual platform.
Prerequisites
Python 3 is required, Python 3.6 or greater is recommended.
Installation
pip install axual-client-python
Testing
Tests are located in the tests/
directory.
To run all of them:
python -m unittest discover tests
Usage
Producer
import json
from axualclient.producer import Producer
conf = {
# Axual configuration
'application_id': application_id,
'endpoint': endpoint,
'tenant': tenant,
'environment': environment,
# SSL configuration
'ssl.certificate.location': producer_app_cert,
'ssl.key.location': producer_app_key,
'ssl.ca.location': root_cert,
}
producer = Producer(conf)
producer.produce(value=json.dumps(dict(a=1, b='banana')))
producer.flush() # Flushes producer before ending (triggering callbacks for delivery reports)
See SSL Configuration for more info on the certificates required.
Consumer
from axualclient.consumer import Consumer
conf = {
# Axual configuration
'application_id': application_id,
'endpoint': endpoint,
'tenant': tenant,
'environment': environment,
# SSL configuration
'ssl.certificate.location': producer_app_cert,
'ssl.key.location': producer_app_key,
'ssl.ca.location': root_cert,
# Consumer configuration
'auto.offset.reset': 'earliest',
'on_commit': on_commit_callback,
}
consumer = Consumer(conf)
try:
# Subscribe to topic(s) as they appear on Self Service
consumer.subscribe(['your-topic'])
while True:
msg = consumer.poll(1.0)
# msg is None if no new message
if msg is None:
continue
if msg.error():
# Error handling
raise KafkaException(msg.error())
else:
# Do some processing on the message
print(
f'Received message on topic {msg.topic()} partition {msg.partition()} '
f'offset {msg.offset()} key {msg.key()} value {msg.value()}'
)
# Commit message offset to the topic
consumer.commit()
finally:
if consumer is not None:
# Cleanly unregister from cluster by closing consumer
consumer.commit()
consumer.close()
Producer (AVRO)
Producing AVRO messages works if you have the AVRO schema available that was uploaded to self-service.
Those schemas need to be provided to the serializer via the schema_str
as Avro Schema Declaration.
from axualclient.avro import AvroSerializer
from axualclient.serializing_producer import SerializingProducer
key_serializer = AvroSerializer(schema_str=Application.SCHEMA, to_dict=application_to_dict)
value_serializer = AvroSerializer(schema_str=ApplicationLogEvent.SCHEMA, to_dict=application_log_event_to_dict)
conf = {
# Axual configuration
'application_id': application_id,
'endpoint': endpoint,
'tenant': tenant,
'environment': environment,
# SSL configuration
'ssl.certificate.location': producer_app_cert,
'ssl.key.location': producer_app_key,
'ssl.ca.location': root_cert,
# Producer configuration
'key.serializer': key_serializer,
'value.serializer': value_serializer,
'acks': 'all',
}
producer = SerializingProducer(conf)
producer.produce(topic=topic, value=value, key=key)
producer.flush() # Flushes producer before ending (triggering callbacks for delivery reports)
Consumer (AVRO)
Consumer AvroDeserializers
need to be instantiated and passed as configuration to the Axual DeserializingConsumer
like so:
from axualclient.avro import AvroDeserializer
from axualclient.deserializing_consumer import DeserializingConsumer
# Initialize Deserializers:
key_deserializer = AvroDeserializer(
# Optional parameters
schema_str=Application.SCHEMA,
from_dict=dict_to_application
)
value_deserializer = AvroDeserializer(
# Optional parameters
schema_str=ApplicationLogEvent.SCHEMA,
from_dict=dict_to_application_log_event,
)
configuration = {
# Axual configuration
'application_id': application_id,
'endpoint': endpoint,
'tenant': tenant,
'environment': environment,
# SSL configuration
'ssl.certificate.location': producer_app_cert,
'ssl.key.location': producer_app_key,
'ssl.ca.location': root_cert,
# Consumer configuration
'key.deserializer': key_deserializer,
'value.deserializer': value_deserializer,
'auto.offset.reset': 'earliest',
'on_commit': on_commit_callback,
'error_cb': on_error_callback,
'logger': logger
}
consumer = DeserializingConsumer(configuration)
try:
consumer.subscribe([topic])
while True:
msg = consumer.poll()
if msg is None:
continue
if msg.error():
print(f'Error returned by poll: {msg.error()}')
else:
print(
f'Received message on topic {msg.topic()} partition {msg.partition()} '
f'offset {msg.offset()} key: {str(msg.key())} value: {str(msg.value())}'
)
consumer.commit()
except KeyboardInterrupt:
print('Caught KeyboardInterrupt, stopping.')
finally:
if consumer is not None:
print('Committing final offsets and leaving group.')
consumer.commit()
consumer.close()
SSL Configuration
The client configuration requires a correct SSL configuration in order to communicate securely with brokers, the discovery API, and the schema registry.
Each application (as defined in self-service) requires an application certificate (ssl.certificate.location
) and corresponding private key (ssl.key.location
).
The application certificate must match the one uploaded in self-service for that application.
The file with root certificates needs to be created properly: the brokers might be using a root certificate authority different
from the authority that signed the certificates for the discovery API and schema registry.
The base64-encoded unencrypted versions of these certificates can be pasted into one file (ssl.ca.location
).
This file would then look like the following example:
-----BEGIN CERTIFICATE-----
MIIQdaGDAksKadksSDKNsdka5sjy8elAMsm3d .....
... more base64-encoded content here ...
..... LKlmsf02mz2EWYnds=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIPmsd92nNWlasHWdwMOe92nwoa2QNinnNaZ .....
... more base64-encoded content here ...
..... ldFAf02SArubBW7wVFW2i1=
-----END CERTIFICATE-----
Examples
Simple use cases using the client code can be found in the Axual Python Client Examples.
Known Limitations
- We have encountered issues when using
root_ca
s consisting of more than 1 intermediate certificates. The issue originates with the underlying SSL C library implementation and results in the following exception when authenticating:
[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: path length constraint exceeded (_ssl.c:1123)
- Transaction support is not there. An issue is already created and transactions will be supported in future releases.
- PEM as string format is not supported. An issue is already created and will be added in future releases.
Contributing
Axual is interested in building the community; we would welcome any thoughts or patches. You can reach us here.
See contributing.
Building locally
This project uses poetry for dependency management and building. Install the tool as per the instructions on the linked page, and build the library using:
poetry build
Testing locally
Unit Tests are placed in directory unit, to run all of them:
python -m unittest discover -s tests/unit -v
Integration tests require standalone to be running, to run standalone locally you need docker compose. Once that is present you can start standalone
docker compose up
python -m unittest discover -s tests/integration -v
For maintainers: building a release
The version of the library being built will be the version specified in pyproject.toml
whenever you
push to a branch.
When tagging and building a release however, please be aware that the CI pipeline will ignore the version in pyproject.toml
and
build a release based on what is specified in the tag;
for example tagging 1.0.0-alpha4
will produce axual_client_python-1.0.0a4-py3-none-any.whl
.
This has two consequences:
- You have to follow the normal semver rules when choosing a tag
- After releasing, it falls on the developer to manually update the version in
pyproject.toml
in preparation for the next version.
License
Axual Python Client is licensed under the Apache License, Version 2.0.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file axual-client-python-1.1.5.tar.gz
.
File metadata
- Download URL: axual-client-python-1.1.5.tar.gz
- Upload date:
- Size: 22.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.0 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.7.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 783a6885f8feffd60679339d63785a1e6ab16c02e1942a3ae93a14e91bf3e7e0 |
|
MD5 | 59c9d16f8feac6e3cfc8cb1c54b31c9f |
|
BLAKE2b-256 | e446fcb1d900f14d5877148fc7de28f75e1fbaed9ead102f9ed8e0610c075141 |
File details
Details for the file axual_client_python-1.1.5-py3-none-any.whl
.
File metadata
- Download URL: axual_client_python-1.1.5-py3-none-any.whl
- Upload date:
- Size: 28.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.0 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.7.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cb160fb593d05c9a5d39cfd4e684689cd66963e12afa20e56367dd8dcd4d22c5 |
|
MD5 | 6e3cd1a93988cac201ecc5268227fd1c |
|
BLAKE2b-256 | 47f80d5707a7b1bad0c8ded823e72fea74fdf49bff87be739db86ec8da63a1ff |