Skip to main content

Asyncio Python Client for Google Cloud Pub/Sub

Project description

Latest PyPI Version Python Version Support

Installation

$ pip install --upgrade gcloud-aio-pubsub

Usage

This Pub/Sub implementation is based on google-cloud-pubsub >= 0.29.4

Currently we have only implemented an asyncio version of SubscriberClient as the subscription pattern does not work with asyncio by default. The official Google publisher returns a future which is mostly useable as-is. We’ve not yet seen a need to build a non-asyncio threadsafe version of the library – the upstream Google libraries have this well-handled.

Here’s the rough usage pattern for subscribing:

from gcloud.aio.pubsub import SubscriberClient
from google.cloud.pubsub_v1.subscriber.message import Message

client = SubscriberClient()
# create subscription if it doesn't already exist
client.create_subscription('subscription_name', 'topic_name')

async def message_callback(message: Message) -> None:
    try:
        # just an example: process the message however you need to here...
        result = handle(message)
        await upload_result(result)
    except Exception:
        message.nack()
    else:
        message.ack()

# subscribe to the subscription, receiving a Future that acts as a keepalive
keep_alive = client.subscribe('subscription_name', message_callback)

# have the client run forever, pulling messages from this subscription,
# passing them to the specified callback function, and wrapping it in an
# asyncio task.
client.run_forever(keep_alive)

Configuration

Our create_subscription method is a thing wrapper and thus supports all keyword configuration arguments from the official pubsub client which you can find in the official Google documentation.

When subscribing to a subscription you can optionally pass in a FlowControl and/or Scheduler instance.

example_flow_control = FlowControl(
    max_messages=1,
    resume_threshold=0.8,
    max_request_batch_size=1,
    max_request_batch_latency=0.1,
    max_lease_duration=10,
)

keep_alive = client.subscribe(
    'subscription_name',
    message_callback,
    flow_control=example_flow_control
)

Understanding how modifying FlowControl affects how your pubsub runtime will operate can be confusing so here’s a handy dandy guide!

Welcome to @TheKevJames’s guide to configuring Google Pubsub Subscription policies! Settle in, grab a drink, and stay a while.

The Subscriber is controlled by a FlowControl configuration tuple defined here: that configuration object f gets used by the Subscriber in the following ways:

Max Concurrency

The subscriber is allowed to lease new tasks whenever its currently leased tasks x satisfy:

(
    (len(x) < f.resume_threshold * f.max_messages)
    and (sum(x.bytes) < f.resume_threshold * f.max_bytes)
)

In practice, this means we should set these values with the following restrictions:

  • the maximum number of concurrently leased tasks at peak is: = (f.max_messages * f.resume_threshold) + f.max_request_batch_size

  • the maximum memory usage of our leased tasks at peak is: = (f.max_bytes * f.resume_threshold) + (f.max_request_batch_size * bytes_per_task)

  • these values are constrain each other, ie. we limit ourselves to the lesser of these values given: max_tasks * bytes_per_task <> max_memory

Aside: it seems like OCNs on Pubsub are ~1538 bytes each

Leasing Requests

When leasing new tasks, the Subscriber uses the following algorithm:

def lease_more_tasks():
    start = time.now()
    yield queue.Queue.get(block=True)  # always returns >=1

    for _ in range(f.max_request_batch_size - 1):
        elapsed = time.now() - start
        yield queue.Queue.get(
            block=False,
            timeout=f.max_request_batch_latency-elapsed)
        if elapsed >= f.max_request_batch_latency:
            break

In practice, this means we should set f.max_request_batch_size given the above concurrent concerns and set f.max_request_batch_latency given whatever latency ratio we are willing to accept.

The expected best-case time for Queue.get() off a full queue is no worse than 0.3ms. This Queue should be filling up as fast as grpc can make requests to Google Pubsub, which should be Fast Enough(tm) to keep it filled, given those requests are batched.

Therefore, we can expect:

  • avg_lease_latency: ~= f.max_request_batch_size * 0.0003

  • worst_case_latency: ~= f.max_request_batch_latency

Note that leasing occurs based on f.resume_threshold, so some of this latency is concurrent with task execution.

Task Expiry

Any task which has not been acked or nacked counts against the current leased task count. Our worker thread should ensure all tasks are acked or nacked, but the FlowControl config allows us to handle any other cases. Note that leasing works as follows:

  • When a subscriber leases a task, Google Pubsub will not re-lease that task until subscription.ack_deadline_seconds = 10 (configurable per-subscription) seconds have passed.

  • If a client calls ack() on a task, it is immediately removed from Google Pubsub.

  • If a client calls nack() on a task, it immediately allows Google Pubsub to re-lease that task to a new client. The client drops the task from its memory.

  • If f.max_lease_duration passes between a message being leased and acked, the client will send a nack (see above workflow). It will NOT drop the task from its memory – eg. the worker(task) process may still be run.

Notes:

  • all steps are best-effort, eg. read “a task will be deleted” as “a task will probably get deleted, if the distributed-system luck is with you”

  • in the above workflow “Google Pubsub” refers to the server-side system, eg. managed by Google where the tasks are actually stored.

In practice, we should thus set f.max_lease_duration to no lower than our 95% percentile task latency at high load. The lower this value is, the better our throughput will be in extreme cases.

Confusion

f.max_requests is defined, but seems to be unused.

Contributing

Please see our contributing guide.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gcloud-aio-pubsub-1.1.1.tar.gz (8.3 kB view details)

Uploaded Source

Built Distribution

gcloud_aio_pubsub-1.1.1-py2.py3-none-any.whl (10.1 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file gcloud-aio-pubsub-1.1.1.tar.gz.

File metadata

  • Download URL: gcloud-aio-pubsub-1.1.1.tar.gz
  • Upload date:
  • Size: 8.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.14.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.4.0 requests-toolbelt/0.9.1 tqdm/4.39.0 CPython/3.7.4

File hashes

Hashes for gcloud-aio-pubsub-1.1.1.tar.gz
Algorithm Hash digest
SHA256 ad02feb21a69147efceddd3102f0a3fa16f7f8f9e4b3ecdc608be6e94b8b82dd
MD5 eab29f4548b5e26f42b4edee43ac9d69
BLAKE2b-256 558403fdf7d2396cd8cb00561874c7ec32b2e6f59529ea29db2be89784f9260d

See more details on using hashes here.

File details

Details for the file gcloud_aio_pubsub-1.1.1-py2.py3-none-any.whl.

File metadata

  • Download URL: gcloud_aio_pubsub-1.1.1-py2.py3-none-any.whl
  • Upload date:
  • Size: 10.1 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.14.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.4.0 requests-toolbelt/0.9.1 tqdm/4.39.0 CPython/3.7.4

File hashes

Hashes for gcloud_aio_pubsub-1.1.1-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 d3832f539b400195de7a423ae3a5671028a8c1b6ecbecbd0801703d74e1c0ff9
MD5 9ae283762446ca4b6f9ecca73b25dffa
BLAKE2b-256 c5d18427e8f30d2be8933fe9dc8c9556542cc446c7d9a10798082231006c02cf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page