Skip to main content

Google Pub/Sub as task manager, like Celery can do

Project description

Flask GCP Pub/Sub

Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub

PyPI version PyPI downloads GNU GPLv3

🤔 What does this package does?

As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.

Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.

This package aims to remove some painful tasks by:

  • Creating one dedicated topic for each function
  • Creating one dedicated reusable subscription for each function

We do not recommand this package for the following cases:

  • You need to reuse your development in a multi-cloud context
  • You have high volume of messages to process (not tested)

This package is given "as it", without garantees, under the GPLv3 License.

🚀 Getting started

Prerequisites

Installation

pip install flask-gcp-pubsub

Full example

demo.py

#!/usr/bin/env python
# coding: utf-8

from flask import Flask, make_response
from flask_gcp_pubsub import PubSub

app = Flask(__name__)
pubsub = PubSub(
    app,
    project_id='<project_id>',
    gcp_credentials_file='./creds.json'
)

@pubsub.task
def my_task(msg1, msg2):
    """Awesome delayed execution"""
    print('test', msg1, msg2)
    return 'ok'

@app.route('/test')
def route_test():
    """Launch delayed execution"""
    my_task.delay('test1', 'test2')
    return make_response('ok', 200)

WARNING: do not forget to replace <project_id> with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.

wsgi.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import app

# Start
if __name__ == '__main__':
    app.run()

wsgi_delayed.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import pubsub

# Start
if __name__ == '__main__':
    pubsub.run()

This command will launch the Flask server:

flask run --port 9090

This command will launch the asynchronous tasks manager:

python wsgi_delayed.py

You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:

Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05

Bucket notification

You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.

@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
    print('FINALIZE', args, kwargs)


@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
    print('DELETE', args, kwargs)

For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.

You have to add the Pub/Sub Admin role for that particular Service Account in IAM.

The kwargs returns all attributes of the Pub/Sub notification.

If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:

gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>

Configuration

Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update). If both method used for one configuration key, the class instanciation is primary.

Flask env variable Keyword argument Usage How-to get?
PUBSUB_PROJECT_ID project_id GCP project ID See console.cloud.google.com
PUBSUB_CREDENTIALS_JSON gcp_credentials_json Service account credentials, as JSON string format See IAM in console.cloud.google.com
PUBSUB_CREDENTIALS_FILE gcp_credentials_file Servicce account credentials, as JSON local file See IAM in console.cloud.google.com
PUBSUB_CONCURRENT_CONSUMERS concurrent_consumers Number of simultaneous consumer (default: 4)
PUBSUB_CONCURRENT_MESSAGES concurrent_messages Number of messages pull from topic per consumer per call (default: 2)
PUBSUB_TOPIC_PREFIX topic_prefix Prefix for all topic used in the instance, useful for feature branches using same project.
PUBSUB_AUTO_SETUP auto_setup Enable the auto-setup for Topics creation and Bucket notifications to Pub/Sub (default: false)
PUBSUB_DEADLINE deadline Set deadline retry for all Pub/Sub operations (default: 300)
PUBSUB_PULL_RETURN_IMMEDIATELY return_immediately Enable return immediately flag for pulling (default: false)

🔮 Roadmap

  • Priority in the treatment of messages per functions
  • Logging instead of print (+ option to format as JSON)
  • Contributing manual
  • Documentation about Flask configuration keys and their counterpart on PubSub direct call

TO BE CONFIRMED

  • Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask-gcp-pubsub-0.2.11.tar.gz (22.0 kB view details)

Uploaded Source

Built Distribution

flask_gcp_pubsub-0.2.11-py3-none-any.whl (20.4 kB view details)

Uploaded Python 3

File details

Details for the file flask-gcp-pubsub-0.2.11.tar.gz.

File metadata

  • Download URL: flask-gcp-pubsub-0.2.11.tar.gz
  • Upload date:
  • Size: 22.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.2

File hashes

Hashes for flask-gcp-pubsub-0.2.11.tar.gz
Algorithm Hash digest
SHA256 02a2d4af4bb6b910465d8878a1005783f1b80ed97c5b217afe10f3c56c8133a9
MD5 225b8afb0f970566f82b98df428d43f8
BLAKE2b-256 6e1215744e959a76d9a9655cfcbcacd0c706935cb32c8b980bb36e17cc4bde01

See more details on using hashes here.

File details

Details for the file flask_gcp_pubsub-0.2.11-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_gcp_pubsub-0.2.11-py3-none-any.whl
Algorithm Hash digest
SHA256 720381eb1600d4451e92a5f67002598e89e69e541107d40de548ce3ba400b649
MD5 215774dd54390a2cf61c05e1eae659e2
BLAKE2b-256 3b9b0bb4381f56e51e476e2ba1e643ef86dde6329a156820d41f308d74c81ff0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page