Skip to main content

Multithreaded Google Task Queue client.

Project description

[![Build Status](https://travis-ci.org/seung-lab/python-task-queue.svg?branch=master)](https://travis-ci.org/seung-lab/python-task-queue) [![PyPI version](https://badge.fury.io/py/task-queue.svg)](https://badge.fury.io/py/task-queue)

# python-task-queue
Python TaskQueue object that can rapidly populate and download from cloud queues. Supports local multi-process execution as well.

# Installation

```bash
pip install numpy # make sure you do this first on a seperate line
pip install task-queue
```

The task queue uses your CloudVolume secrets located in `$HOME/.cloudvolume/secrets/`. When using AWS SQS as your queue backend, you must provide `$HOME/.cloudvolume/secrets/aws-secret.json`. See the [CloudVolume](https://github.com/seung-lab/cloud-volume) repo for additional instructions.

The additional pip install line is to make it easier for CloudVolume to install as this library uses its facilities for accessing secrets.

# Usage

Define a class that inherits from taskqueue.RegisteredTask and implments the `execute` method.

Tasks can be loaded into queues locally or as based64 encoded data in the cloud and executed later.
Here's an example implementation of a `PrintTask`. Generally, you should specify a very lightweight
container and let the actual execution download and manipulate data.

```python
from taskqueue import RegisteredTask

class PrintTask(RegisteredTask):
def __init__(self, txt=''):
super(PrintTask, self).__init__(txt)
self.txt = txt

def execute(self):
if self.txt:
print(str(self) + ": " + str(self.txt))
else:
print(self)
```

## Local Usage

For small jobs, you might want to use one or more processes to execute the tasks:
```python
from taskqueue import LocalTaskQueue

with LocalTaskQueue(parallel=5) as tq: # use 5 processes
for _ in range(1000):
tq.insert(
PrintTask(i)
)
```
This will load the queue with 1000 print tasks then execute them across five processes.

## Cloud Usage

Set up an SQS queue and acquire an aws-secret.json that is compatible with CloudVolume.

```python
from taskqueue import TaskQueue

qurl = 'https://sqs.us-east-1.amazonaws.com/$DIGITS/$QUEUE_NAME'
with TaskQueue(queue_server='sqs', qurl=qurl) as tq:
for _ in range(1000):
tq.insert(PrintTask(i))
```

This inserts 1000 PrintTask descriptions into your SQS queue.

Somewhere else, you'll do the following (probably across multiple workers):

```python
from taskqueue import TaskQueue

qurl = 'https://sqs.us-east-1.amazonaws.com/$DIGITS/$QUEUE_NAME'
with TaskQueue(queue_server='sqs', qurl=qurl) as tq:
task = tq.lease(seconds=int($LEASE_SECONDS))
task.execute()
tq.delete(task)
```

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

task-queue-0.9.0.tar.gz (10.9 kB view details)

Uploaded Source

File details

Details for the file task-queue-0.9.0.tar.gz.

File metadata

  • Download URL: task-queue-0.9.0.tar.gz
  • Upload date:
  • Size: 10.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.6.3 requests-toolbelt/0.8.0 tqdm/4.23.0 CPython/3.6.6

File hashes

Hashes for task-queue-0.9.0.tar.gz
Algorithm Hash digest
SHA256 d982c793dc277ab1d2fd8331720cfe6fc919146b7ce10e7d8cca534598e142d0
MD5 ed3f3d05486a46782ce1e57c07d788f6
BLAKE2b-256 3801f571a58b649ca665ade4398c2da7f5ffa12868e26de6e64078c1a7785055

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page