Skip to main content

Google Pub/Sub as task manager, like Celery can do

Project description

Flask GCP Pub/Sub

Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub

PyPI version PyPI downloads GNU GPLv3

🤔 What does this package does?

As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.

Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.

This package aims to remove some painful tasks by:

  • Creating one dedicated topic for each function
  • Creating one dedicated reusable subscription for each function

We do not recommand this package for the following cases:

  • You need to reuse your development in a multi-cloud context
  • You have high volume of messages to process (not tested)

This package is given "as it", without garantees, under the GPLv3 License.

🚀 Getting started

Prerequisites

Installation

pip install flask-gcp-pubsub

Full example

demo.py

#!/usr/bin/env python
# coding: utf-8

from flask import Flask, make_response
from flask_gcp_pubsub import PubSub

app = Flask(__name__)
pubsub = PubSub(
    app,
    project_id='<project_id>',
    gcp_credentials_file='./creds.json'
)

@pubsub.task
def my_task(msg1, msg2):
    """Awesome delayed execution"""
    print('test', msg1, msg2)
    return 'ok'

@app.route('/test')
def route_test():
    """Launch delayed execution"""
    my_task.delay('test1', 'test2')
    return make_response('ok', 200)

WARNING: do not forget to replace <project_id> with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.

wsgi.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import app

# Start
if __name__ == '__main__':
    app.run()

wsgi_delayed.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Imports
from demo import pubsub

# Start
if __name__ == '__main__':
    pubsub.run()

This command will launch the Flask server:

flask run --port 9090

This command will launch the asynchronous tasks manager:

python wsgi_delayed.py

You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:

Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05

Bucket notification

You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.

@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
    print('FINALIZE', args, kwargs)


@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
    print('DELETE', args, kwargs)

For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.

You have to add the Pub/Sub Admin role for that particular Service Account in IAM.

The kwargs returns all attributes of the Pub/Sub notification.

If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:

gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>

Configuration

Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update). If both method used for one configuration key, the class instanciation is primary.

Flask env variable Keyword argument Usage How-to get?
PUBSUB_PROJECT_ID project_id GCP project ID See console.cloud.google.com
PUBSUB_CREDENTIALS_JSON gcp_credentials_json Service account credentials, as JSON string format See IAM in console.cloud.google.com
PUBSUB_CREDENTIALS_FILE gcp_credentials_file Servicce account credentials, as JSON local file See IAM in console.cloud.google.com
PUBSUB_CONCURRENT_CONSUMERS concurrent_consumers Number of simultaneous consumer (default: 4)
PUBSUB_CONCURRENT_MESSAGES concurrent_messages Number of messages pull from topic per consumer per call (default: 2)
PUBSUB_TOPIC_PREFIX topic_prefix Prefix for all topic used in the instance, useful for feature branches using same project.

🔮 Roadmap

  • Priority in the treatment of messages per functions
  • Logging instead of print (+ option to format as JSON)
  • Contributing manual
  • Documentation about Flask configuration keys and their counterpart on PubSub direct call

TO BE CONFIRMED

  • Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

flask-gcp-pubsub-0.2.1.tar.gz (21.5 kB view details)

Uploaded Source

Built Distribution

flask_gcp_pubsub-0.2.1-py3-none-any.whl (20.1 kB view details)

Uploaded Python 3

File details

Details for the file flask-gcp-pubsub-0.2.1.tar.gz.

File metadata

  • Download URL: flask-gcp-pubsub-0.2.1.tar.gz
  • Upload date:
  • Size: 21.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.11.2

File hashes

Hashes for flask-gcp-pubsub-0.2.1.tar.gz
Algorithm Hash digest
SHA256 d04d8a024d5acb1673d219d2d6dfadc70c2002e338c944b5b8c99dafebb17903
MD5 2d90de57a24b266c5663411bb1c73394
BLAKE2b-256 4250b0f9451c844df9633bce784ae62e3ed22321311e6947976b0444d5d48264

See more details on using hashes here.

File details

Details for the file flask_gcp_pubsub-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for flask_gcp_pubsub-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f4de01b1ca39856feac131734656dc0a4d267c3ada9bcc933ddf8b305c2b6884
MD5 259b56bf6f90c6cc805048e846408a01
BLAKE2b-256 8f43d6a1e9e73f70458587f4d20142413c8a26062f34752ea0312bd5b9689d35

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page