Skip to main content

Python local-first no-bloat task orchestration framework

Project description

woflo

CI MyPy Lint codecov Maintainability Rating Security Rating PyPI Downloads

Overview

woflo is a Python local-first no-bloat extensible task orchestration framework.

Okay, that's a lot of buzz. So what is actually the point?

Main goal is to abstract a lot of functionality related to Task orchestration and execution away while keeping the "API" clear and dead-simple.

Currently this includes:

  • retries
  • retry timeout
  • parallelism
  • logging

Installation

To download from PyPI use:

pip install woflo

To install from source:

git clone https://github.com/petereon/woflo.git
cd woflo
poetry build
cd dist
pip install ./woflo-<version>-py3-none-any.whl

Examples

Intended usage is by utilizing a decorator @task, consider a very simple example which would run 10 sleepy workers in parallel without blocking the main thread:

import time
from woflo import task


@task
def sleepy_worker():
    time.sleep(5)
    print('I am done')


for _ in range(10):
    sleepy_task_run = sleepy_worker()

You can also include retries for tasks that might fail at times. Following would attempt to run the decorated function for 3 times in total with 5 second delay between attempts.

from woflo import task


@task(retries=2, retry_sleep_time=5)
def fetch_data_from_unstable_api():
    ...

Furthermore, you can also provide a runner within a @task decorator. For example the SequentialTaskRun if prefer your tasks to run sequentially and like to wait around a computer a lot. For example:

from woflo import task
from woflo.runners import SequentialTaskRun


@task(runner=SequentialTaskRun)
def sequential_sleepy_worker():
    time.sleep(5)
    print('I am done')


for _ in range(10):
    sleepy_task_run = sequential_sleepy_worker()

Each TaskRun should also expose a few methods that enable you to handle it:

  • .get_result() to fetch the return value of the finished task
  • .wait() to block main thread till the task finishes (irrelevant for SequentialTaskRun which will block until it finishes anyway)
  • .stop() to stop the task while its running (irrelevant for SequentialTaskRun which will block until it finishes anyway)
  • .is_running() to check if the task is still running (irrelevant for SequentialTaskRun which will block until it finishes anyway)

Let us define an example task:

import time
from woflo import task

@task
def quick_nap(duration):
    time.sleep(duration)
    if duration < 10
        raise Exception("Ouch oof")
    else:
        return 'Well rested'

After you run it,

napping = quick_nap(10)

you can check on it to monitor it's state and receive results,

assert napping.is_running()

napping.wait()
assert napping.get_result() == "Well rested"

Task Runners

It is designed to be easily extended by developing a custom Task runners. Library itself currently exposes two such runners, MultiprocessTaskRun and SequentialTaskRun.

Additionally woflo makes available a BaseTaskRun, an interface against which custom runners can be developed.

The defualt task runner is MultiprocessTaskRun, which can run multiple tasks, or even multiple instances of the same task at the same time in parallel in separate Python process.

MultiprocessTaskRun

The defualt task runner, which can run multiple tasks, or even multiple instances of the same task at the same time in parallel in separate Python process.

It offers two modes of operation:

  • ForkProcess, which forks a main process and inherits all of its state. Forking is default on Darwin and Linux (it is not available on Windows)
  • SpawnProcess, which spawns a new process with same global state

This behavior can be configured by setting the process_type:

from woflo.runners.multiprocess import SpawnProcess, MultiprocessTaskRun

MultiprocessTaskRun.process_type = SpawnProcess

@task(runner=MultiprocessTaskRun)
def sleepy_worker():
    time.sleep(5)
    print('I am done')

Roadmap

  • Setup GitHub Actions, SonarCloud monitoring and Codecov
  • Make a PyPI Package
  • Decide on final API and create a version 1.x.x
  • Implement a Dask runner
  • Implement a Thread runner
  • Implement an Async runner

Known issues

  • Processes potentially inherint a large in-memory state in MultiprocessTaskRun
  • SpawnProcess running into OSError: [Errno 9] Bad file descriptor on macOS 12.6 when using multiprocess.sharedctypes.Value as reflected in this issue
  • Imports need some refactoring

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

woflo-0.8.2.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

woflo-0.8.2-py3-none-any.whl (8.2 kB view details)

Uploaded Python 3

File details

Details for the file woflo-0.8.2.tar.gz.

File metadata

  • Download URL: woflo-0.8.2.tar.gz
  • Upload date:
  • Size: 9.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.14

File hashes

Hashes for woflo-0.8.2.tar.gz
Algorithm Hash digest
SHA256 5515cb194d729841e4607b9a9400f8e606297d48becf798f2c7d31b74671dc84
MD5 6da82c938fc30151eb659dbcab0c0c47
BLAKE2b-256 0f58bf4d70c8177eb52539bc6bb971d974780b5bb314b57bf9c3883e39bee01a

See more details on using hashes here.

File details

Details for the file woflo-0.8.2-py3-none-any.whl.

File metadata

  • Download URL: woflo-0.8.2-py3-none-any.whl
  • Upload date:
  • Size: 8.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.14

File hashes

Hashes for woflo-0.8.2-py3-none-any.whl
Algorithm Hash digest
SHA256 1f8c2931071eb0e29c12210b6088193c5853ed2f11f094dcf13547969ecf5389
MD5 16abb0e3bdc0bfd3a65529a9df9193e0
BLAKE2b-256 4a3893f5d0884759c3849dd460789fedf5cc29bd01e26c881138ad00e414db4a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page