Skip to main content

Task decorator and execution features for wt ecosystem

Project description

wt-task

Task decorator and execution features for the wt ecosystem.

Overview

wt-task provides the @task decorator and task execution capabilities for workflow systems. It implements the TaskProtocol from wt-contracts, enabling type-safe task execution across the wt ecosystem.

Features

  • Dual-purpose task decorator: Works as both a decorator (@task) and wrapper function (task(func))
  • Execution methods: call, map, mapvalues for different execution patterns
  • Partial application: Bind arguments with .partial() before execution
  • Validation: Pydantic-based validation with .validate()
  • Error handling: Wrap errors with task instance context
  • Conditional skipping: Skip execution based on conditions with .skipif()
  • Custom executors: Switch execution backends with .set_executor()
  • OpenTelemetry tracing: Optional tracing support (requires wt-task[gcp])

Installation

# Basic installation
pip install wt-task

# With tracing support
pip install wt-task[gcp]

# Development installation
cd wt/wt-task
uv sync

Usage

Basic Task Definition

from wt_task import task

@task
def add(a: int, b: int) -> int:
    return a + b

# Direct call
result = add(1, 2)  # 3

# Explicit call
result = add.call(1, 2)  # 3

Partial Application

@task
def multiply(a: int, b: int) -> int:
    return a * b

# Bind one argument
multiply_by_2 = multiply.partial(b=2)
result = multiply_by_2.call(a=5)  # 10

Mapping Over Values

@task
def square(x: int) -> int:
    return x * x

# Map over a sequence
results = square.map("x", [1, 2, 3, 4])  # [1, 4, 9, 16]

Mapping Over Key-Value Pairs

@task
def process(data: str) -> int:
    return len(data)

# Map over key-value pairs, preserving keys
results = process.mapvalues("data", [
    ("a", "hello"),
    ("b", "world"),
])
# [("a", 5), ("b", 5)]

Validation

@task
def add_numbers(a: int, b: int) -> int:
    return a + b

# Parse string inputs to ints
result = add_numbers.validate().call("10", "20")  # 30

Error Handling

@task
def divide(a: int, b: int) -> float:
    return a / b

# Add task instance context to errors
task_with_id = divide.set_task_instance_id("task-1")
try:
    task_with_id.handle_errors().call(10, 0)
except TaskInstanceError as e:
    print(e)  # Task instance 'task-1' raised ZeroDivisionError(...)

Method Chaining

All methods return a new task instance, enabling clean method chaining:

result = (
    add
    .partial(a=10)
    .validate()
    .set_task_instance_id("add-task")
    .handle_errors()
    .call(b="20")
)  # 30

Architecture

wt-task is designed to work seamlessly with other wt packages:

  • wt-contracts: Implements the TaskProtocol interface
  • wt-compiler: Generated DAG code uses task() wrapper function
  • wt-registry: Task functions are registered separately with @register

Development

# Create environment and install dependencies
cd wt/wt-task
uv sync

# Run tests
uv run pytest

# Type checking
uv run mypy src/

# Linting
uv run ruff check .

# Formatting
uv run ruff format .

Testing

The package includes comprehensive tests for all features:

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=wt_task --cov-report=html

# Run specific test file
uv run pytest tests/test_decorator.py

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wt_task-0.1.3.tar.gz (103.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wt_task-0.1.3-py3-none-any.whl (25.9 kB view details)

Uploaded Python 3

File details

Details for the file wt_task-0.1.3.tar.gz.

File metadata

  • Download URL: wt_task-0.1.3.tar.gz
  • Upload date:
  • Size: 103.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.3.tar.gz
Algorithm Hash digest
SHA256 743c977dadf8ad0e7354aed392d37db0b90f69ae6da69d0a7de077e42d95e6be
MD5 0280e5b64093f5c6359de5e3c71edccf
BLAKE2b-256 8910d612b947afc093668625df43d9feb2e8593d4845ea3222bb809c87a34d79

See more details on using hashes here.

File details

Details for the file wt_task-0.1.3-py3-none-any.whl.

File metadata

  • Download URL: wt_task-0.1.3-py3-none-any.whl
  • Upload date:
  • Size: 25.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.14 {"installer":{"name":"uv","version":"0.11.14","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.3-py3-none-any.whl
Algorithm Hash digest
SHA256 96a5e269bcdc351487874096927f1761010f22481ceb81498bfe27d4b693ac7c
MD5 ada0bfa6218f31bf77c4ce70cd2052b4
BLAKE2b-256 17502db40e8c42d6da2de2851aa8d8600eb94f7d2b49192c39bc474813bc4817

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page