Skip to main content

Python Client for Google Cloud Pub/Sub

Project description

This is a shared codebase for gcloud-aio-pubsub and gcloud-rest-pubsub

Latest PyPI Version Python Version Support (gcloud-aio-pubsub) Python Version Support (gcloud-rest-pubsub)

Installation

$ pip install --upgrade gcloud-{aio,rest}-pubsub

Usage

Subscriber

gcloud-{aio,rest}-pubsub provides SubscriberClient as an interface to call pubsub’s HTTP API:

from gcloud.aio.pubsub import SubscriberClient
from gcloud.aio.pubsub import SubscriberMessage

client = SubscriberClient()
# create subscription
await client.create_subscription(
    'projects/<project_name>/subscriptions/<subscription_name>',
    'projects/<project_name>/topics/<topic_name>')

# pull messages
messages: List[SubscriberMessage] = await client.pull(
    'projects/<project_name>/subscriptions/<subscription_name>',
    max_messages=10)

There’s also gcloud.aio.pubsub.subscribe helper function you can use to setup a pubsub processing pipeline. It is built with asyncio and thus only available in gcloud-aio-pubsub package. The usage is fairly simple:

from gcloud.aio.pubsub import SubscriberClient
from gcloud.aio.pubsub import subscribe

subscriber_client = SubscriberClient()

async def handler(message):
    return

await subscribe(
    'projects/<my_project>/subscriptions/<my_subscription>',
    handler,
    subscriber_client,
    num_producers=1,
    max_messages_per_producer=100,
    ack_window=0.3,
    num_tasks_per_consumer=1,
    enable_nack=True,
    nack_window=0.3,
)

While defaults are somewhat sensible, it is highly recommended to performance test your application and tweak function parameter to your specific needs. Here’s a few hints:

handler:

an async function that will be called for each message. It should accept an instance of SubscriberMessage as its only argument and return None if the message should be acked. An exception raised within the handler will result in the message being left to expire, and thus it will be redelivered according to your subscription’s ack deadline.

num_producers:

number of workers that will be making pull requests to pubsub. Please note that a worker will only fetch new batch once the handler was called for each message from the previous batch. This means that running only a single worker will most likely make your application IO bound. If you notice this being an issue don’t hesitate to bump this parameter.

max_messages_per_producer:

number of pubsub messages a worker will try to fetch in a single batch. This value is passed to pull endpoint as maxMessages parameter. A rule of thumb here is the faster your handler is the bigger this value should be.

ack_window:

ack requests are handled separately and are done in batches. This parameters specifies how often ack requests will be made. Setting it to 0.0 will effectively disable batching.

num_tasks_per_consumer:

how many handle calls a worker can make until it blocks to wait for them to return. If you process messages independently from each other you should be good with the default value of 1. If you do something fancy (e.g. aggregate messages before processing them), you’ll want a higher pool here. You can think of num_producers * num_tasks_per_consumer as an upper limit of how many messages can possibly be within your application state at any given moment.

enable_nack:

if enabled messages for which callback raised an exception will be explicitly nacked using modifyAckDeadline endpoint so they can be retried immediately.

nack_window:

same as ack_window but for nack requests

Prometheus Metrics

If you like pull-based metrics like Prometheus you will be pleased to know that the subscriber records Prometheus metrics in the form gcloud_aio_pubsub_<metric>, which will have no effect if you don’t use Prometheus to scrape app metrics:

  • subscriber_batch_size - [histogram] how many messages were pulled from the subscription in a single batch

  • subscriber_consume (labels: outcome = {'succeeded', 'cancelled', 'failed', 'failfast'}) - [counter] a consume operation has completed with a given outcome

  • subscriber_consume_latency_seconds (labels: phase = {'receive', 'queueing', 'runtime'}) - [histogram] how many seconds taken to receive a message, while waiting for processing, or to complete the callback

  • subscriber_batch_status (labels: component = {'acker', 'nacker'}, outcome = {'succeeded', 'failed'}) - [counter] a batch has succeeded or failed to be acked or nacked

  • subscriber_messages_processed (labels: component = {'acker', 'nacker'}) - [counter] the number of messages that were processed, either by being acked or nacked

  • subscriber_messages_received - [counter] the number of messages pulled from pubsub

Metrics Agent (Deprecated)

subscribe has also an optional metrics_client argument which will be removed in a future release. You can provide any metrics agent that implements the same interface as MetricsAgent (Datadog client will do ;) ) and get the following metrics:

  • pubsub.producer.batch - [histogram] actual size of a batch retrieved from pubsub.

  • pubsub.consumer.failfast - [increment] a message was dropped due to its lease being expired.

  • pubsub.consumer.latency.receive - [histogram] how many seconds it took for a message to reach handler after it was published.

  • pubsub.consumer.succeeded - [increment] handler call was successfull.

  • pubsub.consumer.failed - [increment] handler call raised an exception.

  • pubsub.consumer.latency.runtime - [histogram] handler execution time in seconds.

  • pubsub.acker.batch.failed - [increment] ack request failed.

  • pubsub.acker.batch - [histogram] actual number of messages that was acked in a single request.

Publisher

The PublisherClient is a dead-simple alternative to the official Google Cloud Pub/Sub publisher client. The main design goal was to eliminate all the additional gRPC overhead implemented by the upstream client.

If migrating between this library and the official one, the main difference is this: the gcloud-{aio,rest}-pubsub publisher’s .publish() method immediately publishes the messages you’ve provided, rather than maintaining our own publishing queue, implementing batching and flow control, etc. If you’re looking for a full-featured publishing library with all the bells and whistles built in, you may be interested in the upstream provider. If you’re looking to manage your own batching / timeouts / retry / threads / etc, this library should be a bit easier to work with.

Sample usage:

from gcloud.aio.pubsub import PubsubMessage
from gcloud.aio.pubsub import PublisherClient

async with aiohttp.ClientSession() as session:
    client = PublisherClient(session=session)

    topic = client.topic_path('my-gcp-project', 'my-topic-name')

    messages = [
        PubsubMessage(b'payload', attribute='value'),
        PubsubMessage(b'other payload', other_attribute='whatever',
                      more_attributes='something else'),
    ]
    response = await client.publish(topic, messages)
    # response == {'messageIds': ['1', '2']}

Emulators

For testing purposes, you may want to use gcloud-aio-pubsub along with a local GCS emulator. Setting the $PUBSUB_EMULATOR_HOST environment variable to the local address of your emulator should be enough to do the trick.

For example, using the official Google Pubsub emulator:

gcloud beta emulators pubsub start --host-port=0.0.0.0:8681
export PUBSUB_EMULATOR_HOST='0.0.0.0:8681'

Any gcloud-aio-pubsub Publisher requests made with that environment variable set will query the emulator instead of the official GCS APIs.

For easier ergonomics, you may be interested in messagebird/gcloud-pubsub-emulator.

Customization

This library mostly tries to stay agnostic of potential use-cases; as such, we do not implement any sort of retrying or other policies under the assumption that we wouldn’t get things right for every user’s situation.

As such, we recommend configuring your own policies on an as-needed basis. The backoff library can make this quite straightforward! For example, you may find it useful to configure something like:

class SubscriberClientWithBackoff(SubscriberClient):
    @backoff.on_exception(backoff.expo, aiohttp.ClientResponseError,
                          max_tries=5, jitter=backoff.full_jitter)
    async def pull(self, *args: Any, **kwargs: Any):
        return await super().pull(*args, **kwargs)

Contributing

Please see our contributing guide.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcloud_aio_pubsub-5.1.1.tar.gz (16.9 kB view details)

Uploaded Source

Built Distribution

gcloud_aio_pubsub-5.1.1-py3-none-any.whl (16.0 kB view details)

Uploaded Python 3

File details

Details for the file gcloud_aio_pubsub-5.1.1.tar.gz.

File metadata

  • Download URL: gcloud_aio_pubsub-5.1.1.tar.gz
  • Upload date:
  • Size: 16.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.9.13 Linux/5.13.0-1023-aws

File hashes

Hashes for gcloud_aio_pubsub-5.1.1.tar.gz
Algorithm Hash digest
SHA256 767dedf6b5d1f35361b7eac7afd6170be423c0b681003f8a1e3cf3dac061ed3e
MD5 9b5990a88022c119e295f9d32ae6db73
BLAKE2b-256 7292199c9e5ed1f6e8f868b1f472bfe3b9c173286a2edf0e1c623fa50b1d6770

See more details on using hashes here.

File details

Details for the file gcloud_aio_pubsub-5.1.1-py3-none-any.whl.

File metadata

  • Download URL: gcloud_aio_pubsub-5.1.1-py3-none-any.whl
  • Upload date:
  • Size: 16.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.9.13 Linux/5.13.0-1023-aws

File hashes

Hashes for gcloud_aio_pubsub-5.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a1cb2e39dea18905153cfddd9a85f60547b689bb81a82154a4ad051a479470d5
MD5 fa560e74fb89be20d100cf8eb04441b2
BLAKE2b-256 38b229b40cbd0a088ba906625fd02cb31798dc18a4f6124f3023fe8784defae7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page