Skip to main content

Multithreaded Google Task Queue client.

Project description

[![Build Status](https://travis-ci.org/seung-lab/python-task-queue.svg?branch=master)](https://travis-ci.org/seung-lab/python-task-queue) [![PyPI version](https://badge.fury.io/py/task-queue.svg)](https://badge.fury.io/py/task-queue)

# python-task-queue
Python TaskQueue object that can rapidly populate and download from cloud queues. Supports local multi-process execution as well.

# Installation

```bash
pip install numpy # make sure you do this first on a seperate line
pip install task-queue
```

The task queue uses your CloudVolume secrets located in `$HOME/.cloudvolume/secrets/`. When using AWS SQS as your queue backend, you must provide `$HOME/.cloudvolume/secrets/aws-secret.json`. See the [CloudVolume](https://github.com/seung-lab/cloud-volume) repo for additional instructions.

The additional pip install line is to make it easier for CloudVolume to install as this library uses its facilities for accessing secrets.

# Usage

Define a class that inherits from taskqueue.RegisteredTask and implments the `execute` method.

Tasks can be loaded into queues locally or as based64 encoded data in the cloud and executed later.
Here's an example implementation of a `PrintTask`. Generally, you should specify a very lightweight
container and let the actual execution download and manipulate data.

```python
from taskqueue import RegisteredTask

class PrintTask(RegisteredTask):
def __init__(self, txt=''):
super(PrintTask, self).__init__(txt)
self.txt = txt

def execute(self):
if self.txt:
print(str(self) + ": " + str(self.txt))
else:
print(self)
```

## Local Usage

For small jobs, you might want to use one or more processes to execute the tasks:
```python
from taskqueue import LocalTaskQueue

with LocalTaskQueue(parallel=5) as tq: # use 5 processes
for _ in range(1000):
tq.insert(
PrintTask(i)
)
```
This will load the queue with 1000 print tasks then execute them across five processes.

## Cloud Usage

Set up an SQS queue and acquire an aws-secret.json that is compatible with CloudVolume.

```python
from taskqueue import TaskQueue

qurl = 'https://sqs.us-east-1.amazonaws.com/$DIGITS/$QUEUE_NAME'
with TaskQueue(queue_server='sqs', qurl=qurl) as tq:
for _ in range(1000):
tq.insert(PrintTask(i))
```

This inserts 1000 PrintTask descriptions into your SQS queue.

Somewhere else, you'll do the following (probably across multiple workers):

```python
from taskqueue import TaskQueue

qurl = 'https://sqs.us-east-1.amazonaws.com/$DIGITS/$QUEUE_NAME'
with TaskQueue(queue_server='sqs', qurl=qurl) as tq:
task = tq.lease(seconds=int($LEASE_SECONDS))
task.execute()
tq.delete(task)
```

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

task-queue-0.12.3.tar.gz (12.4 kB view details)

Uploaded Source

File details

Details for the file task-queue-0.12.3.tar.gz.

File metadata

  • Download URL: task-queue-0.12.3.tar.gz
  • Upload date:
  • Size: 12.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.6.3 requests-toolbelt/0.9.0 tqdm/4.23.0 CPython/2.7.12

File hashes

Hashes for task-queue-0.12.3.tar.gz
Algorithm Hash digest
SHA256 3f4e7c90c291d957a76490d960d3195ea888196320e057a9441eb2bf8a520b47
MD5 b22f2c94d244a07f418d0d9d7e6eb720
BLAKE2b-256 bcdafbc0cf401aedebda9c4c9db70543a863a6576d5a0d764446e1e3fc88bd4a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page