Google Pub/Sub as task manager, like Celery can do
Project description
Flask GCP Pub/Sub
Lite distributed task queue using Google Cloud Platform (GCP) Pub/Sub
🤔 What does this package does?
As Celery, but in a lighter version, this package allows you to run operations asynchronously in your Flask project, but without the choice of the broker: it only uses GCP Pub/Sub.
Technically, this package can run without Flask, but, historically, it comes to have a quick-win for migrating to GCP Cloud Run using the Pub/Sub system, from an existing project using Flask + Celery.
This package aims to remove some painful tasks by:
- Creating one dedicated topic for each function
- Creating one dedicated reusable subscription for each function
We do not recommand this package for the following cases:
- You need to reuse your development in a multi-cloud context
- You have high volume of messages to process (not tested)
This package is given "as it", without garantees, under the GPLv3 License.
🚀 Getting started
Prerequisites
- A Google Cloud account
- A GCP project (here to create a new one), with Pub/Sub API enabled (take care to select the good one)
- A Service Account for which one you need a credential JSON file (
creds.json
in example below), with roles:- Pub/Sub Admin
- A local environment with Python >= 3.9
Installation
pip install flask-gcp-pubsub
Full example
demo.py
#!/usr/bin/env python
# coding: utf-8
from flask import Flask, make_response
from flask_gcp_pubsub import PubSub
app = Flask(__name__)
pubsub = PubSub(
app,
project_id='<project_id>',
gcp_credentials_file='./creds.json'
)
@pubsub.task
def my_task(msg1, msg2):
"""Awesome delayed execution"""
print('test', msg1, msg2)
return 'ok'
@app.route('/test')
def route_test():
"""Launch delayed execution"""
my_task.delay('test1', 'test2')
return make_response('ok', 200)
WARNING: do not forget to replace <project_id>
with you GCP project ID (not number) and to downloed the JSON-formatted key from GCP Console.
wsgi.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Imports
from demo import app
# Start
if __name__ == '__main__':
app.run()
wsgi_delayed.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Imports
from demo import pubsub
# Start
if __name__ == '__main__':
pubsub.run()
This command will launch the Flask server:
flask run --port 9090
This command will launch the asynchronous tasks manager:
python wsgi_delayed.py
You can now navigate to http://localhost:9090/test And if everything goes OK, you just have to check the content of the output in console, which should look something like that:
Start consumers
status=received message_id=6860318059876990 function=my_task
test test1 test2
status=processed message_id=6860318059876990 function=my_task result=ok execution_time=6.818771362304688e-05
Bucket notification
You can also create a task based on GCP Storage, by receiving a notification on any supported event from a bucket.
@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_FINALIZE'])
def my_bucket_notifications_create(*args, **kwargs):
print('FINALIZE', args, kwargs)
@pubsub.bucket('bucket-flask-gcp', events=['OBJECT_DELETE'])
def my_bucket_notifications_delete(*args, **kwargs):
print('DELETE', args, kwargs)
For the specific Storage product, Google create a specific Service Account for specific actions, that you cannot choose. You can found it here.
You have to add the Pub/Sub Admin role for that particular Service Account in IAM.
The kwargs returns all attributes of the Pub/Sub notification.
If you change the function name, the auto-clean included at start-up cannot works. As you cannot excess 10 events per bucket, do not forget to clean previous subscription with commands:
gcloud storage buckets notifications list gs://<bucket_name>
gcloud storage buckets notifications delete gs://<bucket_name>
Configuration
Configuration can be done using keyword arguments in class instantiation and/or flask environment variable (set with config.update
).
If both method used for one configuration key, the class instanciation is primary.
Flask env variable | Keyword argument | Usage | How-to get? |
---|---|---|---|
PUBSUB_PROJECT_ID |
project_id |
GCP project ID | See console.cloud.google.com |
PUBSUB_CREDENTIALS_JSON |
gcp_credentials_json |
Service account credentials, as JSON string format | See IAM in console.cloud.google.com |
PUBSUB_CREDENTIALS_FILE |
gcp_credentials_file |
Servicce account credentials, as JSON local file | See IAM in console.cloud.google.com |
PUBSUB_CONCURRENT_CONSUMERS |
concurrent_consumers |
Number of simultaneous consumer (default: 4 ) |
|
PUBSUB_CONCURRENT_MESSAGES |
concurrent_messages |
Number of messages pull from topic per consumer per call (default: 2 ) |
|
PUBSUB_TOPIC_PREFIX |
topic_prefix |
Prefix for all topic used in the instance, useful for feature branches using same project. |
🔮 Roadmap
- Priority in the treatment of messages per functions
- Logging instead of print (+ option to format as JSON)
- Contributing manual
- Documentation about Flask configuration keys and their counterpart on PubSub direct call
TO BE CONFIRMED
- Region selection (default: all regions) - can be edited in Storage Rules of Topic for the moment
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file flask-gcp-pubsub-0.2.1.tar.gz
.
File metadata
- Download URL: flask-gcp-pubsub-0.2.1.tar.gz
- Upload date:
- Size: 21.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d04d8a024d5acb1673d219d2d6dfadc70c2002e338c944b5b8c99dafebb17903 |
|
MD5 | 2d90de57a24b266c5663411bb1c73394 |
|
BLAKE2b-256 | 4250b0f9451c844df9633bce784ae62e3ed22321311e6947976b0444d5d48264 |
File details
Details for the file flask_gcp_pubsub-0.2.1-py3-none-any.whl
.
File metadata
- Download URL: flask_gcp_pubsub-0.2.1-py3-none-any.whl
- Upload date:
- Size: 20.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.11.2
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | f4de01b1ca39856feac131734656dc0a4d267c3ada9bcc933ddf8b305c2b6884 |
|
MD5 | 259b56bf6f90c6cc805048e846408a01 |
|
BLAKE2b-256 | 8f43d6a1e9e73f70458587f4d20142413c8a26062f34752ea0312bd5b9689d35 |