Skip to main content

Kafka client for NASA's General Coordinates Network (GCN)

Project description

PyPI codecov

GCN Kafka Client for Python

This is the official Python client for the General Coordinates Network (GCN). It is a very lightweight wrapper around confluent-kafka-python.

To Install

Run this command to install with pip:

pip install gcn-kafka

or this command to install with with conda:

conda install -c conda-forge gcn-kafka

To Use

Create a consumer.

from gcn_kafka import Consumer
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in')

List all topics:

print(consumer.list_topics().topics)

Subscribe to topics and receive alerts:

consumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',
                    'gcn.classic.text.LVC_INITIAL'])
while True:
    for message in consumer.consume(timeout=1):
        if message.error():
            print(message.error())
            continue
        print(message.value())

The timeout argument to consume(), given as an integer number of seconds, will allow the program to exit quickly once it has reached the end of the existing message buffer. This is useful for users who just want to recover an older message from the stream. timeout will also make the while True infinite loop interruptible via the standard ctrl-c key sequence, which consume() ignores.

Testing and Development Kafka Clusters

GCN has three Kafka clusters: production, testing, and an internal development deployment. Use the optional domain keyword argument to select which broker to connect to.

# Production (default)
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

# Testing
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='test.gcn.nasa.gov')

# Development (internal)
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='dev.gcn.nasa.gov')

FAQ

How can I keep track of the last read message when restarting a client?

A key feature of kafka consumer clients is the ability to perform persistent tracking of which messages have been read. This allows clients to recover missed messages after a restart by beginning at the earliest unread message rather than the next available message from the stream. In order to enable this feature, you will need to set a client Group ID using the configuration dictionary argument for the Consumer class as well as change the auto offset reset option to the ‘earliest’ setting. Once this is done, every new client with the given Group ID will begin reading the specified topic at the earliest unread message. When doing this, it is recommended to turn OFF the auto commit feature because it can lose track of the last read message if the client crashes before the auto commit interval (5 seconds by default) occurs. Manually committing messages (i.e. storing the state of the last read message) once they are read is the most robust method for tracking the last read message.

Example code:

from gcn_kafka import Consumer

config = {'group.id': 'my group name',
          'auto.offset.reset': 'earliest',
          'enable.auto.commit': False}

consumer = Consumer(config=config,
                    client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

topics = ['gcn.classic.voevent.FERMI_GBM_SUBTHRESH']
consumer.subscribe(topics)

while True:
    for message in consumer.consume(timeout=1):
        print(message.value())
        consumer.commit(message)

How can I read messages beginning at the earliest available messages for a given stream?

You can begin reading a given topic stream from the earliest message that is present in the stream buffer by setting the Group ID to an empty string and applying the ‘earliest’ setting for the auto offset reset option in the configuration dictionary argument for the Consumer class. This feature allows the user to scan for older messages for testing purposes or to recover messages that may have been missed due to a crash or network outage. Just keep in mind that the stream buffers are finite in size. They currently hold messages from the past few days.

Example code:

from gcn_kafka import Consumer

config = {'auto.offset.reset': 'earliest'}

consumer = Consumer(config=config,
                    client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

topics = ['gcn.classic.voevent.INTEGRAL_SPIACS']
consumer.subscribe(topics)

while True:
    for message in consumer.consume(timeout=1):
        print(message.value())

How can I search for messages occurring within a given date range?

To search for messages in a given date range, you can use the offsets_for_times() function from the Consumer class to get the message offsets for the desired date range. You can then assign the starting offset to the Consumer and read the desired number of messages. When doing so, keep in mind that the stream buffers are finite in size. It is not possible to recover messages prior to the start of the stream buffer. The GCN stream buffers are currently set to hold messages from the past few days.

Example code:

import datetime
from gcn_kafka import Consumer
from confluent_kafka import TopicPartition

consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

# get messages occurring 3 days ago
timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=3)).timestamp() * 1000)
timestamp2 = timestamp1 + 86400000 # +1 day

topic = 'gcn.classic.voevent.INTEGRAL_SPIACS'
start = consumer.offsets_for_times(
    [TopicPartition(topic, 0, timestamp1)])
end = consumer.offsets_for_times(
    [TopicPartition(topic, 0, timestamp2)])

consumer.assign(start)
for message in consumer.consume(end[0].offset - start[0].offset, timeout=1):
    print(message.value())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcn_kafka-0.3.4.tar.gz (12.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

gcn_kafka-0.3.4-py3-none-any.whl (9.3 kB view details)

Uploaded Python 3

File details

Details for the file gcn_kafka-0.3.4.tar.gz.

File metadata

  • Download URL: gcn_kafka-0.3.4.tar.gz
  • Upload date:
  • Size: 12.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for gcn_kafka-0.3.4.tar.gz
Algorithm Hash digest
SHA256 47ed3dbc94d1d981374e7e573a05ecc17356e577409a363f32a9891b1a7ad2be
MD5 a8d460c3d447abcd665fc03e7a2c46b9
BLAKE2b-256 e16cbd30844a4c3f48de2e2cbd2a2101e9ac7346c41a56c59d825a37ffcfc1fa

See more details on using hashes here.

File details

Details for the file gcn_kafka-0.3.4-py3-none-any.whl.

File metadata

  • Download URL: gcn_kafka-0.3.4-py3-none-any.whl
  • Upload date:
  • Size: 9.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.12.9

File hashes

Hashes for gcn_kafka-0.3.4-py3-none-any.whl
Algorithm Hash digest
SHA256 dc7e5b5d05fa7ca05d6fcdc3bad6914bbae41bb6a0798e1b4cf48da54afe933c
MD5 31423d5b81476e274832f2d08eb61a0b
BLAKE2b-256 850a643fc270208f2dd2a2524d429a31a5361ca4601331e2dcaf293e9576fa76

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page