SDK of Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge
Project description
Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge
- Installation
- Use Diaspora Event SDK
- Common Issues
Installation
Recommended Installation with Kafka Client Library
If you plan to utilize the KafkaProducer
and KafkaConsumer
classes in the SDK, which are extensions of the respective classes from the kafka-python
library, we recommend installing the SDK with kafka-python
support. This is especially convenient for tutorial purposes and integrating Kafka functionalities in your projects with out-of-box configurations.
To install Diaspora Event SDK with kafka-python
, run:
pip install "diaspora-event-sdk[kafka-python]"
Installation Without Kafka Client Library
For scenarios where kafka-python
is not required or if you are using other client libraries to communicate with Kafka, you can install the SDK without this dependency.
To install the SDK without Kafka support, simply run:
pip install diaspora-event-sdk
Note that this option does not install the necessary dependency for KafkaProducer
and KafkaConsumer
below to work.
Use Diaspora Event SDK
Use the SDK to communicate with Kafka (kafka-python Required)
Register Topic (create topic ACLs)
Before you can create, describe, and delete topics we need to set the appropriate ACLs in ZooKeeper. Here we use the Client to register ACLs for the desired topic name.
from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
topic = "topic-" + c.subject_openid[-12:]
print(c.register_topic(topic))
print(c.list_topics())
Register a topic also creates it, if the topic previously does not exist.
Block Until Ready
KafkaProducer
and KafkaConsumer
would internally call create_key
if the the connection credential is not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. The method below blocks until the credential is ready to be used by producer and consumer. When the method finishes, it returns True and the producer and consumer code below should work without further waiting. By default, the method retries in loop for five minutes before giving up and return False. Use parameter max_minutes
to change the number of minutes of max waiting.
from diaspora_event_sdk import block_until_ready
assert block_until_ready()
Start Producer
Once the topic is created we can publish to it. The KafkaProducer wraps the Python KafkaProducer Event publication can be either synchronous or asynchronous. Below demonstrates the synchronous approach.
from diaspora_event_sdk import KafkaProducer
producer = KafkaProducer()
future = producer.send(
topic, {'message': 'Synchronous message from Diaspora SDK'})
print(future.get(timeout=10))
Start Consumer
A consumer can be configured to monitor the topic and act on events as they are published. The KafkaConsumer wraps the Python KafkaConsumer. Here we use the auto_offset_reset
to consume from the first event published to the topic. Removing this field will have the consumer act only on new events.
from diaspora_event_sdk import KafkaConsumer
consumer = KafkaConsumer(topic, auto_offset_reset='earliest')
for msg in consumer:
print(msg)
Unregister Topic (remove topic ACLs)
from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
topic = "topic-" + c.subject_openid[-12:]
print(c.unregister_topic(topic))
print(c.list_topics())
Use Your Preferred Kafka Client Library
Register and Unregister Topic
The steps are the same as above by using the register_topic
, unregister_topic
, and list_topics
methods from the Client
class.
Cluster Connection Details
Configuration | Value |
---|---|
Bootstrap Servers | MSK_SCRAM_ENDPOINT |
Security Protocol | SASL_SSL |
Sasl Mechanism | SCRAM-SHA-512 |
Api Version | 3.5.1 |
Username | (See instructions below) |
Password | (See instructions below) |
Execute the code snippet below to obtain your unique username and password for the Kafka cluster:
from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
print(c.retrieve_key())
Advanced Usage
Password Refresh
In case that you need to invalidate all previously issued passwords and generate a new one, call the create_key
method from the Client
class
from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
print(c.create_key())
Subsequent calls to retrieve_key
will return the new password from the cache. This cache is reset with a logout or a new create_key
call.
Common Issues
ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk'
It seems that you ran pip install diaspora-event-sdk
to install the Diaspora Event SDK without kafka-python
. Run pip install kafka-python
to install the necessary dependency for our KafkaProducer
and KafkaConsumer
classes.
kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError
These messages might pop up if create_key
is called shortly before instanciating a Kafka client. This is because there's a delay for AWS Secret Manager to associate the newly generated credential with MSK. Note that create_key
is called internally by kafka_client.py
the first time you create one of these clients. Please wait a while (around 1 minute) and retry.
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Step 1: Verify Topic Creation and Access: Before interacting with the producer/consumer, ensure that the topic has been successfully created and access is granted to you. Execute the following command:
from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
# topic = <the topic you want to use>
print(c.register_topic(topic))
This should return a status: no-op
message, indicating that the topic is already registered and accessible.
Step 2: Wait Automatic Key Creation in KafkaProducer and KafkaConsumer
KafkaProducer
and KafkaConsumer
would internally call create_key
if the keys are not found locally (e.g., when you first authenticated with Globus). Behind the sence, the middle service contacts AWS to initialize the asynchronous process of creating and associating the secret. Please wait a while (around 1 minute) and retry.
ssl.SSLCertVerificationError
This is commmon on MacOS system, see this StackOverflow answer.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for diaspora-event-sdk-0.0.18.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | f912e05cc4a201aa2309e1fe6cbbd892b7c120d02ce79a28fbc14413509f8d2c |
|
MD5 | b4964b50318222184c5bd4ca068eb7aa |
|
BLAKE2b-256 | 64fcb5664341ba0a791cacc1134aaf85a66c5d513def7c5d2b31cd5e75569fe2 |
Hashes for diaspora_event_sdk-0.0.18-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1f77bb8e1ebc1895b7007c918944710052c2f92691950a0e1433bcd4e2aa05f9 |
|
MD5 | 3b167f3d9d95bb40625e813e32e3ff63 |
|
BLAKE2b-256 | 88bab9355dd715a5b809a1e2efd5234579c27063ad15d199c8b3029f486bd295 |