Skip to main content

Multithreaded Google Task Queue client.

Project description

[![Build Status](https://travis-ci.org/seung-lab/python-task-queue.svg?branch=master)](https://travis-ci.org/seung-lab/python-task-queue) [![PyPI version](https://badge.fury.io/py/task-queue.svg)](https://badge.fury.io/py/task-queue)

# python-task-queue
Python TaskQueue object that can rapidly populate and download from cloud queues. Supports local multi-process execution as well.

# Installation

```bash
pip install numpy # make sure you do this first on a seperate line
pip install task-queue
```

The task queue uses your CloudVolume secrets located in `$HOME/.cloudvolume/secrets/`. When using AWS SQS as your queue backend, you must provide `$HOME/.cloudvolume/secrets/aws-secret.json`. See the [CloudVolume](https://github.com/seung-lab/cloud-volume) repo for additional instructions.

The additional pip install line is to make it easier for CloudVolume to install as this library uses its facilities for accessing secrets.

# Usage

Define a class that inherits from taskqueue.RegisteredTask and implments the `execute` method.

Tasks can be loaded into queues locally or as based64 encoded data in the cloud and executed later.
Here's an example implementation of a `PrintTask`. Generally, you should specify a very lightweight
container and let the actual execution download and manipulate data.

```python
from taskqueue import RegisteredTask

class PrintTask(RegisteredTask):
def __init__(self, txt=''):
super(PrintTask, self).__init__(txt)
self.txt = txt

def execute(self):
if self.txt:
print(str(self) + ": " + str(self.txt))
else:
print(self)
```

## Local Usage

For small jobs, you might want to use one or more processes to execute the tasks:
```python
from taskqueue import LocalTaskQueue

with LocalTaskQueue(parallel=5) as tq: # use 5 processes
for _ in range(1000):
tq.insert(
PrintTask(i)
)
```
This will load the queue with 1000 print tasks then execute them across five processes.

## Cloud Usage

Set up an SQS queue and acquire an aws-secret.json that is compatible with CloudVolume.

```python
from taskqueue import TaskQueue

qurl = 'https://sqs.us-east-1.amazonaws.com/$DIGITS/$QUEUE_NAME'
with TaskQueue(queue_server='sqs', qurl=qurl) as tq:
for _ in range(1000):
tq.insert(PrintTask(i))
```

This inserts 1000 PrintTask descriptions into your SQS queue.

Somewhere else, you'll do the following (probably across multiple workers):

```python
from taskqueue import TaskQueue

qurl = 'https://sqs.us-east-1.amazonaws.com/$DIGITS/$QUEUE_NAME'
with TaskQueue(queue_server='sqs', qurl=qurl) as tq:
task = tq.lease(seconds=int($LEASE_SECONDS))
task.execute()
tq.delete(task)
```

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

task-queue-0.12.2.tar.gz (12.3 kB view details)

Uploaded Source

File details

Details for the file task-queue-0.12.2.tar.gz.

File metadata

  • Download URL: task-queue-0.12.2.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.6.3 requests-toolbelt/0.9.0 tqdm/4.23.0 CPython/2.7.12

File hashes

Hashes for task-queue-0.12.2.tar.gz
Algorithm Hash digest
SHA256 3d18e23da14e487234122147405479a92b972dc3dc16a0d73db5517c8b9dddce
MD5 97533b4cafb20e8fe3756385d7b104dd
BLAKE2b-256 86c00cc178a6b3c216d0d3ccfe53abe9bb4d0e2225521967bebc8ded5eabdee7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page