A simple task queue using Google Cloud Pub/Sub
psq is an example Python implementation of a simple distributed task queue using Google Cloud Pub/Sub.
psq requires minimal configuration and relies on Cloud Pub/Sub to provide scalable and reliable messaging.
Install via pip:
pip install psq
First, create a task:
def adder(a, b): return a + b
Then, create a pubsub client and a queue:
from google.cloud import pubsub import psq PROJECT_ID = 'your-project-id' client = pubsub.Client(project=PROJECT_ID) q = psq.Queue(client)
Now you can enqueue tasks:
from tasks import adder q.enqueue(adder)
In order to get task results, you have to configure storage:
from google.cloud import pubsub import psq PROJECT_ID = 'your-project-id' ps_client = pubsub.Client(project=PROJECT_ID) ds_client = datastore.Client(project=PROJECT_ID) q = psq.Queue( ps_client, storage=psq.DatastoreStorage(ds_client))
With storage configured, you can get the result of a task:
r = q.enqueue(adder, 5, 6) r.result() # -> 11
You can also define multiple queues:
fast = psq.Queue(client, 'fast') slow = psq.Queue(client, 'slow')
Because psq is largely similar to rq, similar rules around tasks apply. You can put any Python function call on a queue, provided:
Pub/sub guarantees your tasks will be delivered to the workers, but psq doesn’t presently guarantee that a task completes execution or exactly-once semantics, though it does allow you to provide your own mechanisms for this. This is similar to Celery’s default configuration.
Task completion guarantees can be provided via late ack support. Late ack is possible with Cloud Pub/sub, but it currently not implemented in this library. See CONTRIBUTING.md.
Execute psqworker in the same directory where you tasks are defined:
psqworker only operates on one queue at a time. If you want a server to listen to multiple queues, use something like supervisord to run multiple psqworker processes.
A normal queue will send a single task to a single worker, spreading your tasks over all workers listening to the same queue. There are also broadcast queues, which will deliver a copy of the task to every worker. This is useful in situations where you want every worker to execute the same task, such as installing or upgrading software on every server.
broadcast_q = psq.BroadcastQueue(client) def restart_apache_task(): call(["apachectl", "restart"]) broadcast_q.enqueue(restart_apache_task)
Broadcast queues provide an implementation of the solution described in Reliable Task Scheduling on Google Compute Engine.
Note: broadcast queues do not currently support any form of storage and do not support return values.
Raising psq.Retry in your task will cause it to be retried.
from psq import Retry def retry_if_fail(self): try: r = requests.get('http://some.flaky.service.com') except Exception as e: logging.error(e) raise Retry()
You can bind an extra context manager to the queue.
app = Flask(__name__) q = psq.Queue(extra_context=app.app_context)
This will ensure that the context is available in your tasks, which is useful for things such as database connections, etc.:
from flask import current_app def flasky_task(): backend = current_app.config['BACKEND']
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
|File Name & Checksum SHA256 Checksum Help||Version||File Type||Upload Date|
|psq-0.5.0-py2.py3-none-any.whl (29.4 kB) Copy SHA256 Checksum SHA256||py2.py3||Wheel||Sep 20, 2016|
|psq-0.5.0.tar.gz (15.4 kB) Copy SHA256 Checksum SHA256||–||Source||Sep 20, 2016|