Skip to main content

Toolkit for encapsulating Python-based computation into deployable and distributable tasks

Project description

werkit

version python versions license build code style

Toolkit for encapsulating Python-based computation into deployable and distributable tasks.

Provides code that helps package things up:

  • Serializing results
  • Handling and serializing errors
  • Deploying task workers using Redis, RQ and the Fargate CLI

They're particularly useful for providing repsonse consistency across different revisions of a service or different services.

Installation

pip install werkit

Usage

from werkit import Manager

def myfunc(param, verbose=False, handle_exceptions=True):
    with Manager(handle_exceptions=handle_exceptions, verbose=verbose) as manager:
        manager.result = do_some_computation()
    return manager.serialized_result

Parallel computation

Werkit supports parallel computation using Redis and RQ.

You must install the dependencies separately:

pip install redis rq

Requesting work

from mylib import myfunc
from werkit.parallel import invoke_for_each


items = {'a': ..., 'b': ...}
job_ids = invoke_for_each(myfunc, items, connection=Redis.from_url(...))

Performing work

pip install redis rq
rq worker --burst werkit-default --url rediss://...

Note: mylib.myfunc must be importable.

Using CloudManager

In place of the low-level API you can make your calls using CloudManager:

#!/usr/bin/env python


import click
from werkit.parallel import Config, CloudManager, invoke_for_each

manager = CloudManager(
    config=Config(
        local_repository="my-project",
        ecr_repository="123456789012.dkr.ecr.us-east-1.amazonaws.com/my-project",
        ecs_task_name="my-project",
        task_args=[
            "--cpu",
            "1024",
            "--memory",
            "2048",
            "--task-role",
            "arn:aws:iam::123456789012:role/...",
            "--security-group-id",
            "sg-...",
            "--subnet-id",
            "subnet-...",
        ],
        default_task_count=5,
    )
)


@click.group()
def cli():
    pass


@cli.command()
def login():
    manager.login()


@cli.command()
@click.argument("tag")
def build_and_push(tag):
    manager.build_and_push()


@cli.command()
def enqueue():
    from myproject import myfunc

    items = {"key1": "value1", "key2": "value2"}

    invoke_for_each(
        measure_body,
        items,
        clean=True,
        connection=manager.redis_connection,
    )


@cli.command()
@click.option(
    "--count",
    default=manager.config.default_task_count,
    type=int,
    help="Number of tasks to run",
)
@click.argument("tag")
def run(count, tag):
    manager.run(tag=tag, count=count)


@cli.command()
def dashboard():
    manager.dashboard()


@cli.command()
def ps():
    manager.ps()


@cli.command()
def get_results():
    print(manager.get_results())


@cli.command()
def clean():
    manager.clean()


if __name__ == "__main__":
    cli()

Getting results

from redis import Redis
from werkit.parallel import get_results


get_results(wait_until_done=True, connection=Redis.from_url(...))

Monitoring

You can monitor your queues using RQ Dashboard or one of the other methods outlined here.

Parallel computation on AWS lambda

Werkit also implements a parallel map on AWS lambda.

Werkit comes with a default lambda handler, that accepts an event of the form {"input":[a, b, ...],"extra_args":[c, d, ...]}. Werkit invokes a lambda function in parallel for every item in input, with an event of the form {"input": a, "extra_args":[c, d, ...]}.

The werkit default handler is configurable via the following environmnent variables:

  • LAMBDA_WORKER_FUNCTION_NAME: Name of the lambda worker function to invoke
  • LAMBDA_WORKER_TIMEOUT: How long to wait in seconds for the lambda worker function to return before returning a TimeoutError

Building and deploying functions to AWS Lambda

Werkit provides tools for programmatically building and deploying functions to AWS Lambda. There are two distinct steps: build and deploy.

The build process can run natively using a virtualenv, or in Docker. When either of the following cases apply, you can use the native virtualenv method:

  • The function's dependencies are pure Python (no compiled extensions).
  • You are building the function in Linux.

When building a function using compiled dependencies in OS X, the virtualenv method will try to pack up the OS X dependencies which of course won't work on Lambda. In that case you must use the Docker method.

Building a function natively using a virtualenv

def build_natively(build_dir="build", target_dir="build"):
    import os
    import shutil
    from werkit.aws_lambda.build import (
        collect_zipfile_contents,
        create_venv_with_dependencies,
        create_zipfile_from_dir,
    )

    shutil.rmtree(build_dir, ignore_errors=True)

    venv_dir = os.path.join(build_dir, "venv")
    create_venv_with_dependencies(
        venv_dir=venv_dir,
        # These are the defaults, which you can override if necessary.
        upgrade_pip=True,
        install_wheel=True,
        install_werkit=False,
        install_requirements_from=["requirements.txt"],
        # You can pass credentials to `pip install`.
        environment={"DEPLOY_TOKEN": DEPLOY_TOKEN},
    )

    contents_dir = os.path.join(build_dir, "contents")
    collect_zipfile_contents(
        target_dir=contents_dir,
        venv_dir=venv_dir,
        src_dirs=["mypackage", "assets"],
        # Specify additional system files to copy to `lib/` inside the zipfile.
        lib_files=[...],
    )

    os.makedirs(target_dir, exist_ok=True)
    temp_path_to_zipfile = os.path.join(target_dir, "function.zip")
    create_zipfile_from_dir(
        dir_path=contents_dir,
        path_to_zipfile=temp_path_to_zipfile
    )

Building a function using Docker

Create a collect_and_zip.py script which will run in Docker, with /target mounted to a folder on the host system.

from werkit.aws_lambda.build import create_zipfile_from_dir

VERBOSE = False

collect_zipfile_contents(
    target_dir="/build/contents",
    venv_dir="/build/venv",
    src_dirs=["mypackage", "assets"],
    # Specify additional system files to copy to `lib/` inside the zipfile.
    lib_files=[...],
)

# To improve performance, zip into the container and copy to the host
# afterward.
create_zipfile_from_dir(
    dir_path="/build/contents",
    path_to_zipfile="/build/function.zip"
)
shutil.copyfile("/build/function.zip", "/target/function.zip")

Create a Dockerfile:

FROM python:3.7

WORKDIR /src

# Install any system dependencies you want to include in the zipfile.
RUN apt-get install -y --no-install-recommends ...
RUN rm -rf /var/lib/apt/lists/*

# Install werkit, along with any other dependencies needed for
# `collect_and_zip.py`.
RUN python3 -m pip install "werkit==0.10.0"

# Optionally receive credentials from the environment.
ARG DEPLOY_TOKEN

# Create the venv. As is necessary for some Docker base images, upgrade pip and
# install wheel.
RUN python3 -m venv /build/venv
RUN /build/venv/bin/pip install --upgrade pip wheel

# Install Python dependencies.
COPY requirements.txt /src/
RUN DEPLOY_TOKEN=${DEPLOY_TOKEN} /build/venv/bin/pip install -r requirements.txt

COPY mypackage/ /src/mypackage/
COPY assets/ /src/assets/
COPY collect_and_zip.py /src/

# Optionally set PYTHONPATH if `collect_and_zip.py` imports any internal source
# code.
ENV PYTHONPATH /src

CMD python3 collect_and_zip.py

Invoke the build:

DEPLOY_TOKEN = "..."

def build_in_docker(build_dir="build", target_dir="build"):
    from executor import execute

    docker_tag = "mypackage-lambda-builder"

    execute(
        "docker",
        "build",
        "-t",
        docker_tag,
        "-f",
        "Dockerfile",
        # Optionally pass credentials for the Docker build process.
        "--build-arg",
        f"DEPLOY_TOKEN={DEPLOY_TOKEN}",
        ".",
    )
    print("Build image created")

    os.makedirs("build/", exist_ok=True)
    execute(
        "docker",
        "run",
        "--volume",
        f"{os.path.abspath("build/")}:/target:Z",
        "-t",
        docker_tag,
    )
    print("Build finished")

Deploying the function

# A Lambda role is always required.
LAMBDA_ROLE = ...
# When the zipfile is larger than 50 MB, a temporary bucket is required.
S3_CODE_BUCKET = ...

def create():
    from werkit.aws_lambda.deploy import perform_create

    perform_create(
        # Region is required.
        aws_region="us-east-1",
        function_name="myfunction",
        local_path_to_zipfile="build/function.zip",
        # The importable name of your handler function, which should have the
        # signature `def handler(event, context):`.
        handler="mypackage.worker.handler",
        role=LAMBDA_ROLE,
        # Required when the zipfile is larger than 50 MB.
        s3_code_bucket=S3_CODE_BUCKET,
        # Optionally override.
        timeout=TIMEOUT,
        memory_size=WORKER_MEMORY_SIZE,
        runtime="python3.8",
        env_vars={},
    )

Updating the code for an existing function

LAMBDA_ROLE = ...
S3_CODE_BUCKET = ...

def update_code():
    from werkit.aws_lambda.deploy import perform_update_code

    perform_update_code(
        aws_region="us-east-1",
        function_name="myfunction",
        local_path_to_zipfile="build/function.zip",
        # Required when the zipfile is larger than 50 MB.
        s3_code_bucket=S3_CODE_BUCKET,
    )

Type definitions

TypeScript types are available for the Werkit job message format.

version

Contribute

Pull requests welcome!

Support

If you are having issues, please let us know.

License

The project is licensed under the MIT License.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

werkit-0.15.0.tar.gz (20.8 kB view details)

Uploaded Source

Built Distribution

werkit-0.15.0-py3-none-any.whl (23.4 kB view details)

Uploaded Python 3

File details

Details for the file werkit-0.15.0.tar.gz.

File metadata

  • Download URL: werkit-0.15.0.tar.gz
  • Upload date:
  • Size: 20.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.24.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.51.0 CPython/3.8.6

File hashes

Hashes for werkit-0.15.0.tar.gz
Algorithm Hash digest
SHA256 cf0db3e7160134a9a7213d51fd5587d2e2cc7bf5804765a10af0508b8212cccb
MD5 9d9a9c6fec8ec1cfc9110e16174094fa
BLAKE2b-256 4691ccd4923918978f1a3e0e6bd657f1fd12fda9b6014addaa57e0c17de58fda

See more details on using hashes here.

File details

Details for the file werkit-0.15.0-py3-none-any.whl.

File metadata

  • Download URL: werkit-0.15.0-py3-none-any.whl
  • Upload date:
  • Size: 23.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.1 requests/2.24.0 setuptools/50.3.2 requests-toolbelt/0.9.1 tqdm/4.51.0 CPython/3.8.6

File hashes

Hashes for werkit-0.15.0-py3-none-any.whl
Algorithm Hash digest
SHA256 b83f6e5b8122bea2622638c7247885720cf300d132b550f9156fccd35a45a3f0
MD5 e8e4fbca61f7bbe1c05cb78a84464d0c
BLAKE2b-256 5477e16e1e0579080c6ba66f70e5d2a98dca81bb14fee5a5d37f56250449752a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page