Skip to main content

Google Pub/Sub as task manager, like Celery can do

Project description

Flask GCP Pub/Sub

Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub

PyPI version PyPI downloads GNU GPLv3

🤔 What does this package does?

As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.

Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.

This package aims to remove some painful tasks by:

  • Creating one dedicated topic for each function
  • Creating one dedicated reusable subscription for each function

We do not recommand this package for the following cases:

  • You need to reuse your development in a multi-cloud context
  • You have high volume of messages to process (not tested)

This package is given "as it", without garantees, under the GPLv3 License.

🚀 Getting started

Prerequisites

Installation

pip install flask-gcp-pubsub

Full example

demo.py

#!/usr/bin/env python
# coding: utf-8

from flask import Flask, make_response
from flask_gcp_pubsub import PubSub

app = Flask(__name__)
pubsub = PubSub(
    app,
    project_id='<project_id>',
    gcp_credentials_file='./creds.json'
)

@pubsub.task
def my_task(msg1, msg2):
    """Awesome delayed execution"""
    print('test', msg1, msg2)
    return 'ok'

@app.route('/test')
def route_test():
    """Launch delayed execution"""
    my_task.delay('test1', 'test2')
    return make_response('ok', 200)

WARNING: do not forget to replace <project_id> with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.

wsgi.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import app

# Start
if __name__ == '__main__':
    app.run()

wsgi_delayed.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import pubsub

# Start
if __name__ == '__main__':
    pubsub.run()

This command will launch the Flask server:

flask run --port 9090

This command will launch the asynchronous tasks manager:

python wsgi_delayed.py

You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:

Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05

Bucket notification

You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.

@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
    print('FINALIZE', args, kwargs)


@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
    print('DELETE', args, kwargs)

For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.

You have to add the Pub/Sub Admin role for that particular Service Account in IAM.

The kwargs returns all attributes of the Pub/Sub notification.

If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:

gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>

Configuration

Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update). If both method used for one configuration key, the class instanciation is primary.

Flask env variable Keyword argument Usage How-to get?
PUBSUB_PROJECT_ID project_id GCP project ID See console.cloud.google.com
PUBSUB_CREDENTIALS_JSON gcp_credentials_json Service account credentials, as JSON string format See IAM in console.cloud.google.com
PUBSUB_CREDENTIALS_FILE gcp_credentials_file Servicce account credentials, as JSON local file See IAM in console.cloud.google.com
PUBSUB_CONCURRENT_CONSUMERS concurrent_consumers Number of simultaneous consumer (default: 4)
PUBSUB_CONCURRENT_MESSAGES concurrent_messages Number of messages pull from topic per consumer per call (default: 2)
PUBSUB_TOPIC_PREFIX topic_prefix Prefix for all topic used in the instance, useful for feature branches using same project.
PUBSUB_AUTO_SETUP auto_setup Enable the auto-setup for Bucket notifications to Pub/Sub (default: false)

🔮 Roadmap

  • Priority in the treatment of messages per functions
  • Logging instead of print (+ option to format as JSON)
  • Contributing manual
  • Documentation about Flask configuration keys and their counterpart on PubSub direct call

TO BE CONFIRMED

  • Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask-gcp-pubsub-0.2.4.tar.gz (21.8 kB view details)

Uploaded Source

Built Distribution

flask_gcp_pubsub-0.2.4-py3-none-any.whl (20.2 kB view details)

Uploaded Python 3

File details

Details for the file flask-gcp-pubsub-0.2.4.tar.gz.

File metadata

  • Download URL: flask-gcp-pubsub-0.2.4.tar.gz
  • Upload date:
  • Size: 21.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.2

File hashes

Hashes for flask-gcp-pubsub-0.2.4.tar.gz
Algorithm Hash digest
SHA256 88833aea1fd3bae0fe031f3c8c207dd4d8db7fae1f1623e03a83b144131801d9
MD5 94052217a9441dc3858cba83c5ae144f
BLAKE2b-256 8aee1de4cf5b18ca128a5a027648209bbeaa0a46d0fbf5f6652c507294a88dcd

See more details on using hashes here.

File details

Details for the file flask_gcp_pubsub-0.2.4-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_gcp_pubsub-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 56b4cd367a7a99eb7537653a6e7841dba366fdfd7972c23966b58190f5594972
MD5 6000e3c3c329ec6c75ad862fef06a76f
BLAKE2b-256 daada5c983301d66c5e722196aa974e8c22ea6afad0d75277f3926e3878f15d9

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page