Google Pub/Sub as task manager, like Celery can do
Project description
Flask GCP Pub/Sub
Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub
🤔 What does this package does?
As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.
Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.
This package aims to remove some painful tasks by:
- Creating one dedicated topic for each function
- Creating one dedicated reusable subscription for each function
We do not recommand this package for the following cases:
- You need to reuse your development in a multi-cloud context
- You have high volume of messages to process (not tested)
This package is given "as it", without garantees, under the GPLv3 License.
🚀 Getting started
Prerequisites
- A Google Cloud account
- A GCP project (here to create a new one), with Pub/Sub API enabled (take care to select the good one)
- A Service Account for which one you need a credential JSON file (
creds.json
in example below), with roles:- Pub/Sub Admin
- A local environment with Python >= 3.9
Installation
pip install flask-gcp-pubsub
Full example
demo.py
#!/usr/bin/env python
# coding: utf-8
from flask import Flask, make_response
from flask_gcp_pubsub import PubSub
app = Flask(__name__)
pubsub = PubSub(
app,
project_id='<project_id>',
gcp_credentials_file='./creds.json'
)
@pubsub.task
def my_task(msg1, msg2):
"""Awesome delayed execution"""
print('test', msg1, msg2)
return 'ok'
@app.route('/test')
def route_test():
"""Launch delayed execution"""
my_task.delay('test1', 'test2')
return make_response('ok', 200)
WARNING: do not forget to replace <project_id>
with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.
wsgi.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Imports
from demo import app
# Start
if __name__ == '__main__':
app.run()
wsgi_delayed.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Imports
from demo import pubsub
# Start
if __name__ == '__main__':
pubsub.run()
This command will launch the Flask server:
flask run --port 9090
This command will launch the asynchronous tasks manager:
python wsgi_delayed.py
You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:
Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05
Bucket notification
You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.
@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
print('FINALIZE', args, kwargs)
@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
print('DELETE', args, kwargs)
For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.
You have to add the Pub/Sub Admin role for that particular Service Account in IAM.
The kwargs returns all attributes of the Pub/Sub notification.
If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:
gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>
Configuration
Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update
).
If both method used for one configuration key, the class instanciation is primary.
Flask env variable | Keyword argument | Usage | How-to get? |
---|---|---|---|
PUBSUB_PROJECT_ID |
project_id |
GCP project ID | See console.cloud.google.com |
PUBSUB_CREDENTIALS_JSON |
gcp_credentials_json |
Service account credentials, as JSON string format | See IAM in console.cloud.google.com |
PUBSUB_CREDENTIALS_FILE |
gcp_credentials_file |
Servicce account credentials, as JSON local file | See IAM in console.cloud.google.com |
PUBSUB_CONCURRENT_CONSUMERS |
concurrent_consumers |
Number of simultaneous consumer (default: 4 ) |
|
PUBSUB_CONCURRENT_MESSAGES |
concurrent_messages |
Number of messages pull from topic per consumer per call (default: 2 ) |
|
PUBSUB_TOPIC_PREFIX |
topic_prefix |
Prefix for all topic used in the instance, useful for feature branches using same project. | |
PUBSUB_AUTO_SETUP |
auto_setup |
Enable the auto-setup for Bucket notifications to Pub/Sub (default: false ) |
🔮 Roadmap
- Priority in the treatment of messages per functions
- Logging instead of print (+ option to format as JSON)
- Contributing manual
- Documentation about Flask configuration keys and their counterpart on PubSub direct call
TO BE CONFIRMED
- Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for flask_gcp_pubsub-0.2.8-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3fc32e86f8ba734e23f435d1502ed42de8222c444e6ff486c2c93812606a9e43 |
|
MD5 | b22ec905fc8b825807a3d03a9f7a0a08 |
|
BLAKE2b-256 | 0e67f367f4e54c0858b971dd5ec5494b2d8ed8f65d3c47347f17bba2f7a3ff03 |