Skip to main content

Toolkit for encapsulating Python-based computation into deployable and distributable tasks

Project description

werkit

version python versions license build code style

Toolkit for encapsulating Python-based computation into deployable and distributable tasks.

Provides code that helps package things up:

  • Serializing results
  • Handling and serializing errors
  • Deploying task workers using Redis, RQ and the Fargate CLI

They're particularly useful for providing repsonse consistency across different revisions of a service or different services.

Installation

pip install werkit

Usage

from werkit import Manager

def myfunc(param, verbose=False, handle_exceptions=True):
    with Manager(handle_exceptions=handle_exceptions, verbose=verbose) as manager:
        manager.result = do_some_computation()
    return manager.serialized_result

Parallel computation

Werkit supports parallel computation using Redis and RQ.

You must install the dependencies separately:

pip install redis rq

Requesting work

from mylib import myfunc
from werkit.parallel import invoke_for_each


items = {'a': ..., 'b': ...}
job_ids = invoke_for_each(myfunc, items, connection=Redis.from_url(...))

Performing work

pip install redis rq
rq worker --burst werkit-default --url rediss://...

Note: mylib.myfunc must be importable.

Using CloudManager

In place of the low-level API you can make your calls using CloudManager:

#!/usr/bin/env python


import click
from werkit.parallel import Config, CloudManager, invoke_for_each

manager = CloudManager(
    config=Config(
        local_repository="my-project",
        ecr_repository="123456789012.dkr.ecr.us-east-1.amazonaws.com/my-project",
        ecs_task_name="my-project",
        task_args=[
            "--cpu",
            "1024",
            "--memory",
            "2048",
            "--task-role",
            "arn:aws:iam::123456789012:role/...",
            "--security-group-id",
            "sg-...",
            "--subnet-id",
            "subnet-...",
        ],
        default_task_count=5,
    )
)


@click.group()
def cli():
    pass


@cli.command()
def login():
    manager.login()


@cli.command()
@click.argument("tag")
def build_and_push(tag):
    manager.build_and_push()


@cli.command()
def enqueue():
    from myproject import myfunc

    items = {"key1": "value1", "key2": "value2"}

    invoke_for_each(
        measure_body,
        items,
        clean=True,
        connection=manager.redis_connection,
    )


@cli.command()
@click.option(
    "--count",
    default=manager.config.default_task_count,
    type=int,
    help="Number of tasks to run",
)
@click.argument("tag")
def run(count, tag):
    manager.run(tag=tag, count=count)


@cli.command()
def dashboard():
    manager.dashboard()


@cli.command()
def ps():
    manager.ps()


@cli.command()
def get_results():
    print(manager.get_results())


@cli.command()
def clean():
    manager.clean()


if __name__ == "__main__":
    cli()

Getting results

from redis import Redis
from werkit.parallel import get_results


get_results(wait_until_done=True, connection=Redis.from_url(...))

Monitoring

You can monitor your queues using RQ Dashboard or one of the other methods outlined here.

Parallel computation on AWS lambda

Werkit also implements a parallel map on AWS lambda.

Werkit comes with a default lambda handler, that accepts an event of the form {"input":[a, b, ...],"extra_args":[c, d, ...]}. Werkit invokes a lambda function in parallel for every item in input, with an event of the form {"input": a, "extra_args":[c, d, ...]}.

The werkit default handler is configurable via the following environmnent variables:

  • LAMBDA_WORKER_FUNCTION_NAME: Name of the lambda worker function to invoke
  • LAMBDA_WORKER_TIMEOUT: How long to wait in seconds for the lambda worker function to return before returning a TimeoutError

Contribute

Pull requests welcome!

Support

If you are having issues, please let us know.

License

The project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

werkit-0.9.1.tar.gz (14.7 kB view details)

Uploaded Source

Built Distribution

werkit-0.9.1-py3-none-any.whl (19.0 kB view details)

Uploaded Python 3

File details

Details for the file werkit-0.9.1.tar.gz.

File metadata

  • Download URL: werkit-0.9.1.tar.gz
  • Upload date:
  • Size: 14.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.7.5

File hashes

Hashes for werkit-0.9.1.tar.gz
Algorithm Hash digest
SHA256 a3dc7c0bd25df7badbf62ff4b406f3dbf2217d69534746279a09f1603f07773a
MD5 4653673441a2570a6676922749c37595
BLAKE2b-256 aa835f32832b54a73f1d771cb1ae97151534d808a1568e3c6b5e46db074f2ad5

See more details on using hashes here.

File details

Details for the file werkit-0.9.1-py3-none-any.whl.

File metadata

  • Download URL: werkit-0.9.1-py3-none-any.whl
  • Upload date:
  • Size: 19.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/47.1.1 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.7.5

File hashes

Hashes for werkit-0.9.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9c8c1208dc099a9e0a16d3d62608e297aa3fda6a7e75db7debb3eed0004a2feb
MD5 8c0f6ae6612e0be0678d399bf16f6b0b
BLAKE2b-256 e0571136911afadd63185b3d484ead78be10773ffb939c54aa04dc01dc63aabf

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page