Skip to main content

SDK of Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge

Project description

Diaspora Event Fabric: Resilience-enabling services for science from HPC to edge

Installation

Recommended Installation with Kafka Client Library

If you plan to utilize the KafkaProducer and KafkaConsumer classes in the SDK, which are extensions of the respective classes from the kafka-python library, we recommend installing the SDK with kafka-python support. This is especially convenient for tutorial purposes and integrating Kafka functionalities in your projects with out-of-box configurations.

To install Diaspora Event SDK with kafka-python, run:

pip install "diaspora-event-sdk[kafka-python]"

Installation Without Kafka Client Library

For scenarios where kafka-python is not required or if you are using other client libraries to communicate with Kafka, you can install the SDK without this dependency.

To install the SDK without Kafka support, simply run:

pip install diaspora-event-sdk

Note that this option does not install the necessary dependency for KafkaProducer and KafkaConsumer below to work.

Use Diaspora Event SDK

Use the SDK to communicate with Kafka (kafka-python Required)

Register Topic (create topic ACLs)

Before you can create, describe, and delete topics we need to set the appropriate ACLs in ZooKeeper. Here we use the Client to register ACLs for the desired topic name.

from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
topic = "topic-" + c.subject_openid[-12:]
print(c.register_topic(topic))
print(c.list_topics())

Register a topic also creates it, if the topic previously does not exist.

Start Producer

Once the topic is created we can publish to it. The KafkaProducer wraps the Python KafkaProducer Event publication can be either synchronous or asynchronous. Below demonstrates the synchronous approach.

from diaspora_event_sdk import KafkaProducer
producer = KafkaProducer()
future = producer.send(
    topic, {'message': 'Synchronous message from Diaspora SDK'})
print(future.get(timeout=10))

Start Consumer

A consumer can be configured to monitor the topic and act on events as they are published. The KafkaConsumer wraps the Python KafkaConsumer. Here we use the auto_offset_reset to consume from the first event published to the topic. Removing this field will have the consumer act only on new events.

from diaspora_event_sdk import KafkaConsumer
consumer = KafkaConsumer(topic, auto_offset_reset='earliest')
for msg in consumer:
    print(msg)

Unregister Topic (remove topic ACLs)

from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
topic = "topic-" + c.subject_openid[-12:]
print(c.unregister_topic(topic))
print(c.list_topics())

Use Your Preferred Kafka Client Library

Register and Unregister Topic

The steps are the same as above by using the register_topic, unregister_topic, and list_topics methods from the Client class.

Cluster Connection Details

Configuration Value
Bootstrap Servers MSK_SCRAM_ENDPOINT
Security Protocol SASL_SSL
Sasl Mechanism SCRAM-SHA-512
Api Version 3.5.1
Username (See instructions below)
Password (See instructions below)

Execute the code snippet below to obtain your unique username and password for the Kafka cluster:

from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
print(c.retrieve_key())

Advanced Usage

Password Refresh

In case that you need to invalidate all previously issued passwords and generate a new one, call the create_key method from the Client class

from diaspora_event_sdk import Client as GlobusClient
c = GlobusClient()
print(c.create_key())

Subsequent calls to retrieve_key will return the new password from the cache. This cache is reset with a logout or a new create_key call.

Common Issues

ImportError: cannot import name 'KafkaProducer' from 'diaspora_event_sdk'

It seems that you ran pip install diaspora-event-sdk to install the Diaspora Event SDK without kafka-python. Run pip install kafka-python to install the necessary dependency for our KafkaProducer and KafkaConsumer classes.

kafka.errors.NoBrokersAvailable and kafka.errors.NodeNotReadyError

These messages might pop up if create_key is called shortly before instanciating a Kafka client. This is because there's a delay for AWS Secret Manager to associate the newly generated credential with MSK. Note that create_key is called internally by kafka_client.py the first time you create one of these clients. Please wait a while (around 1 minute) and retry.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

diaspora-event-sdk-0.0.11.tar.gz (17.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

diaspora_event_sdk-0.0.11-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file diaspora-event-sdk-0.0.11.tar.gz.

File metadata

  • Download URL: diaspora-event-sdk-0.0.11.tar.gz
  • Upload date:
  • Size: 17.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.11.6

File hashes

Hashes for diaspora-event-sdk-0.0.11.tar.gz
Algorithm Hash digest
SHA256 85a396c940f9f80c2f00990362d30e17470e2ca8c8090cfda91119e68c6a033d
MD5 afb112d306d3aa82ed4067337015b246
BLAKE2b-256 3c69b62d5881db8573ededff75bf35e1415a5422ee6178521a33fb4a387b7bbf

See more details on using hashes here.

File details

Details for the file diaspora_event_sdk-0.0.11-py3-none-any.whl.

File metadata

File hashes

Hashes for diaspora_event_sdk-0.0.11-py3-none-any.whl
Algorithm Hash digest
SHA256 74311993f8421eefafdc69e7bd60b623e1bd5a873eb180e6507c59131da053b1
MD5 97bf552c64c937fee3bc31d93963d009
BLAKE2b-256 0eed866ae4d940786baa28255c99e323978d62f8294a7dded7d9218d020db06d

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page