Skip to main content

Google Pub/Sub as task manager, like Celery can do

Project description

Flask GCP Pub/Sub

Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub

PyPI version PyPI downloads GNU GPLv3

🤔 What does this package does?

As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.

Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.

This package aims to remove some painful tasks by:

  • Creating one dedicated topic for each function
  • Creating one dedicated reusable subscription for each function

We do not recommand this package for the following cases:

  • You need to reuse your development in a multi-cloud context
  • You have high volume of messages to process (not tested)

This package is given "as it", without garantees, under the GPLv3 License.

🚀 Getting started

Prerequisites

Installation

pip install flask-gcp-pubsub

Full example

demo.py

#!/usr/bin/env python
# coding: utf-8

from flask import Flask, make_response
from flask_gcp_pubsub import PubSub

app = Flask(__name__)
pubsub = PubSub(
    app,
    project_id='<project_id>',
    gcp_credentials_file='./creds.json'
)

@pubsub.task
def my_task(msg1, msg2):
    """Awesome delayed execution"""
    print('test', msg1, msg2)
    return 'ok'

@app.route('/test')
def route_test():
    """Launch delayed execution"""
    my_task.delay('test1', 'test2')
    return make_response('ok', 200)

WARNING: do not forget to replace <project_id> with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.

wsgi.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import app

# Start
if __name__ == '__main__':
    app.run()

wsgi_delayed.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import pubsub

# Start
if __name__ == '__main__':
    pubsub.run()

This command will launch the Flask server:

flask run --port 9090

This command will launch the asynchronous tasks manager:

python wsgi_delayed.py

You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:

Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05

Bucket notification

You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.

@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
    print('FINALIZE', args, kwargs)


@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
    print('DELETE', args, kwargs)

For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.

You have to add the Pub/Sub Admin role for that particular Service Account in IAM.

The kwargs returns all attributes of the Pub/Sub notification.

If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:

gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>

Configuration

Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update). If both method used for one configuration key, the class instanciation is primary.

Flask env variable Keyword argument Usage How-to get?
PUBSUB_PROJECT_ID project_id GCP project ID See console.cloud.google.com
PUBSUB_CREDENTIALS_JSON gcp_credentials_json Service account credentials, as JSON string format See IAM in console.cloud.google.com
PUBSUB_CREDENTIALS_FILE gcp_credentials_file Servicce account credentials, as JSON local file See IAM in console.cloud.google.com
PUBSUB_CONCURRENT_CONSUMERS concurrent_consumers Number of simultaneous consumer (default: 4)
PUBSUB_CONCURRENT_MESSAGES concurrent_messages Number of messages pull from topic per consumer per call (default: 2)
PUBSUB_TOPIC_PREFIX topic_prefix Prefix for all topic used in the instance, useful for feature branches using same project.
PUBSUB_AUTO_SETUP auto_setup Enable the auto-setup for Bucket notifications to Pub/Sub (default: false)

🔮 Roadmap

  • Priority in the treatment of messages per functions
  • Logging instead of print (+ option to format as JSON)
  • Contributing manual
  • [*] Documentation about Flask configuration keys and their counterpart on PubSub direct call

TO BE CONFIRMED

  • Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask-gcp-pubsub-0.2.3.tar.gz (21.7 kB view details)

Uploaded Source

Built Distribution

flask_gcp_pubsub-0.2.3-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file flask-gcp-pubsub-0.2.3.tar.gz.

File metadata

  • Download URL: flask-gcp-pubsub-0.2.3.tar.gz
  • Upload date:
  • Size: 21.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.2

File hashes

Hashes for flask-gcp-pubsub-0.2.3.tar.gz
Algorithm Hash digest
SHA256 d81622ec578c208a980a7c93714f8c3e4b7b509cfb6a4e244754f568f2b3b99b
MD5 9c23e3fe6a173421a2d57c9157397758
BLAKE2b-256 56b72b86b0cad8355fb86743993318bf6edada5b2a8573fd9876aa79a7f0d933

See more details on using hashes here.

File details

Details for the file flask_gcp_pubsub-0.2.3-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_gcp_pubsub-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 d9f6a34caebef3f7202e96f4929ca8396062e9e13c754aa5860209c8bec77ca1
MD5 8e74cf0c766ae708f2d70e1510450bb1
BLAKE2b-256 7fa62ed862fde2d7164f272b12f230cdec174b57d229bca5cde861cfbd1e7747

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page