Skip to main content

Prefect tasks plus added functionality to enforce type checking and help in debugging helping to reduce negative engineering!

Project description

A prefect extension giving prefect tasks super powers

Motivation

Prefect tasks plus added functionality to enforce type checking and help in debugging helping to reduce negative engineering!

How to setup

Using pip:

pip install supreme_task

How to use

Runtime type checking

Get runtime type checking thanks to typeguard by importing the @task decorator from supreme_task instead of prefect.

See the example run.py file:

from supreme_task import task

@task
def add(x: int, y: int) -> int:
    return x + y

add.fn(x="1", y=2)

Running python run.py will raise the following exception:

Traceback (most recent call last):
  File "run.py", line 9, in <module>
    add.fn(x="1", y=2)
  File "run.py", line 5, in add
    def add(x: int, y: int) -> int:
  File "supreme-task-py38/lib/python3.8/site-packages/typeguard/_functions.py", line 135, in check_argument_types
    check_type_internal(value, annotation, memo)
  File "supreme-task-py38/lib/python3.8/site-packages/typeguard/_checkers.py", line 761, in check_type_internal
    raise TypeCheckError(f"is not an instance of {qualified_name(origin_type)}")
typeguard.TypeCheckError: argument "x" (str) is not an instance of int  

Persistence of flow run inputs

Get persistence of flow run inputs by importing the @task decorator from supreme_task instead of prefect to help with debugging.

i.e. given a file run.py:

from supreme_task import task
from prefect import flow
from prefect.filesystems import LocalFileSystem

@task
def faulty_add(x: int, y: int) -> int:
    if x == 1:
        raise ValueError("x is 1")
    return x + y

@flow(result_storage=LocalFileSystem(basepath="results/"))
def my_flow() -> None:
    faulty_add(x=1, y=2)

my_flow()

We update our prefect configuration to enable result persistence:

prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true

We then run the flow by running python run.py

We now inspect the results directory:

$ tree results 
results
├── 514aaa4ae0134405a639cbd9a17365da
├── b662c63ff9854b0e9383d7f6cf0a5b76
└── inputs
    └── faulty_add
        └── 2023-06-10T10-47-04+0000

The inputs for failed task runs are saved under results/inputs/<task_name>/<start_run_time>.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

supreme_task-0.1.1.tar.gz (5.4 kB view hashes)

Uploaded Source

Built Distribution

supreme_task-0.1.1-py3-none-any.whl (6.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page