Skip to main content

UNKNOWN

Project description

This is a client for CrowdProcess’s REST API.

It works in python 2.7 and 3.4+.

Installing

pip install crowdprocess

or

easy_install crowdprocess

Usage example

>>> from crowdprocess import CrowdProcess
>>> crp = CrowdProcess('username', 'password')

>>> x2 = crp.job('function Run (d) { return d*2; }')
>>> results = x2(range(5)).results
>>> list(results)
[0, 2, 4, 6, 8, 10] # comes in a random order

More detailed use

Importing and instanciating

>>> from crowdprocess import CrowdProcess
>>> crp = CrowdProcess('username@email.com', 'password')

To get those credentials you must register with CrowdProcess.

You can also instanciate it with a token instead of a username and password:

>>> crp = CrowdProcess(token='3c46d593-5435-47c5-92aa-1613ade978c2')

Jobs

Creating a job

With the CrowdProcess class instanciated above,

>>> program='function Run (d) { return d }'
>>> job = crp.job(program)
>>> job.id
'3c46d593-5435-47c5-92aa-1613ade978c2'

Invoking crp.job with the program parameter automatically creates a job in CrowdProcess and returns an instanciated Job.

After you get a job.id, you can use it to get a Job again, without creating it:

>>> job = crp.job(id='3c46d593-5435-47c5-92aa-1613ade978c2')

Listing jobs

>>> crp.list_jobs()
[{u'status': u'active', u'failed': 0, u'bid': 1, u'created': u'2014-05-14T10:07:52.747503Z', u'modified': u'2014-05-14T10:07:53.716147Z', u'browserHours': 137, u'finished': 1000, u'lastResult': u'2014-05-14T10:07:59.06Z', u'total': 1000, u'id': u'3c46d593-5435-47c5-92aa-1613ade978c2'}]

Prettier:

>>> jobs = crp.list_jobs()
>>> print(json.dumps(jobs, sort_keys=True, indent=2))
[
  {
    "bid": 1,
    "browserHours": 137,
    "created": "2014-05-14T10:07:52.747503Z",
    "failed": 0,
    "finished": 1000,
    "id": "3c46d593-5435-47c5-92aa-1613ade978c2",
    "lastResult": "2014-05-14T10:07:59.06Z",
    "modified": "2014-05-14T10:07:53.716147Z",
    "status": "active",
    "total": 1000
  }
]

Deleting a job

>>> job = crp.job(id='3c46d593-5435-47c5-92aa-1613ade978c2')
>>> job.delete()

Deleting all jobs

>>> crp.delete_jobs()

Tasks and Results

After creating a job, you’re all set to send it tasks and get back results.

tasks can be any iterable object, results will be a generator:

>>> job = crp.job('function Run (d) { return Math.pow(d, 2); }')
>>> tasks = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> results = job(tasks).results
>>> list(results)
[49, 64, 16, 25, 9, 36, 4, 81, 0, 1]

which would be the same as,

>>> job = crp.job('function Run (d) { return Math.pow(d, 2); }')
>>> list(job(range(10)).results)
[49, 64, 16, 25, 9, 36, 4, 81, 0, 1]

which would also be the same as,

>>> job = crp.job('function Run (d) { return Math.pow(d, 2); }')
>>> def tasks():
...     for i in range(10):
...             yield i
...
>>> list(job(tasks).results)
[25, 64, 49, 16, 36, 9, 0, 81, 1, 4]

Notice that the results never come in order.

Pro tip: you can use the results of one job as tasks of another job

>>> multiply = crp.job('function Run (d) { return d*2 }')
>>> divide = crp.job('function Run (d) { return d/2 }')
>>> numbers = range(10)
>>> multiplied = multiply(numbers).results
>>> divided = divide(multiplied).results
>>> list(divided)
[7, 2, 6, 1, 5, 9, 8, 4, 3, 0]

Don’t forget about the errors

Sometimes your tasks will throw uncaught exceptions that you should know about, and you can get them the same way you get results:

>>> job = crp.job('function Run (d) { if (d === 4) { throw new Error("oh no, "+d) } return d; }')
>>> tasks = range(10)
>>> errors = job(tasks).errors
>>> for error in errors:
...     print error

Tasks and Results, lower level

Submitting tasks

Once again, tasks may be any iterable:

>>> multiply = crp.job('function Run (d) { return d*2 }')
>>> multiply.submit_tasks(range(10))

Getting results

>>> results = multiply.get_results()
>>> list(results)
[18, 8, 10, 4, 6, 16, 14, 0, 2, 12]

This delivers all the job’s computed results at the moment, but you should in fact get every result as soon as it’s computed, in a stream:

Streaming results

You can also iterate through every result as soon as it comes in:

>>> expected_results = 10
>>> results = multiply.get_results_stream()
>>> for result in results:
...     print(result)
...     expected_results -= 1
...     if expected_results == 0:
...         break

The stream does not know if or when a result might be computed and delivered, so you must count how many results you still expect to break the loop.

To use this properly you should start listening for streaming results before sending tasks, probably a separate thread:

>>> import threading
>>> job = crp.job("function Run(d) { return d; }")
>>> def get_results():
...     expected_results = 10
...     for result in job.get_results_stream():
...             print(result)
...             expected_results -= 1
...             if expected_results == 0:
...                 break
...
>>> t = threading.Thread(target=get_results)
>>> t.start()
>>> job.submit_tasks(range(10))
>>> 7
9
6
2
3
8
1
4
0
5
>>> t.join()

Sometimes your tasks will have uncaught exceptions and those will cause a result to not be delivered, so you must account for those as well to decrease your expected_results counter.

Errors and streaming errors

Sometimes your tasks throw uncaught exceptions, and you should get them:

>>> program = """
... function Run (d) {
...     if (d === 4) {
...             throw new Error("oops, it's "+d);
...     } else {
...             return d;
...     }
... }
... """
>>> job = crp.job(program)
>>> job.submit_tasks(range(10))
>>> list(job.get_results())
[1, 6, 9, 8, 5, 7, 2, 3, 0] # oh no, 4 is missing...
>>> list(job.get_errors())
[{u'message': u"oops, it's 4", u'type': u'program', u'name': u'Error', u'stack': u'Run@blob:9a4029f7-fff7-4da8-b552-92507e341749:5\n[2]</</self.onmessage@blob:9a4029f7-fff7-4da8-b552-92507e341749:9\n'}]
>>> print(json.dumps(list(job.get_errors()), sort_keys=True, indent=2)) # prettier
[
  {
    "message": "oops, it's 4",
    "name": "Error",
    "stack": "Run@blob:9a4029f7-fff7-4da8-b552-92507e341749:5\n[2]</</self.onmessage@blob:9a4029f7-fff7-4da8-b552-92507e341749:9\n",
    "type": "program"
  }
]

The same way you get streaming results, you can (and should) get streaming errors:

>>> errors = multiply.get_errors_stream()
>>> for error in errors:
...     print(error)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

CrowdProcess-0.2.0.tar.gz (5.1 kB view details)

Uploaded Source

File details

Details for the file CrowdProcess-0.2.0.tar.gz.

File metadata

  • Download URL: CrowdProcess-0.2.0.tar.gz
  • Upload date:
  • Size: 5.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for CrowdProcess-0.2.0.tar.gz
Algorithm Hash digest
SHA256 2f987e29e494220ac3dd6157832e52c8506292892c8a3a7e1633fb79da368af4
MD5 57217a9b247bc2fff8c12714002d4a01
BLAKE2b-256 82130db24e85df4b28aca26e624612cd2886516b86a554c54105285f870c2869

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page