Skip to main content

Task decorator and execution features for wt ecosystem

Project description

wt-task

Task decorator and execution features for the wt ecosystem.

Overview

wt-task provides the @task decorator and task execution capabilities for workflow systems. It implements the TaskProtocol from wt-contracts, enabling type-safe task execution across the wt ecosystem.

Features

  • Dual-purpose task decorator: Works as both a decorator (@task) and wrapper function (task(func))
  • Execution methods: call, map, mapvalues for different execution patterns
  • Partial application: Bind arguments with .partial() before execution
  • Validation: Pydantic-based validation with .validate()
  • Error handling: Wrap errors with task instance context
  • Conditional skipping: Skip execution based on conditions with .skipif()
  • Custom executors: Switch execution backends with .set_executor()
  • OpenTelemetry tracing: Optional tracing support (requires wt-task[gcp])

Installation

# Basic installation
pip install wt-task

# With tracing support
pip install wt-task[gcp]

# Development installation
cd wt/wt-task
uv sync

Usage

Basic Task Definition

from wt_task import task

@task
def add(a: int, b: int) -> int:
    return a + b

# Direct call
result = add(1, 2)  # 3

# Explicit call
result = add.call(1, 2)  # 3

Partial Application

@task
def multiply(a: int, b: int) -> int:
    return a * b

# Bind one argument
multiply_by_2 = multiply.partial(b=2)
result = multiply_by_2.call(a=5)  # 10

Mapping Over Values

@task
def square(x: int) -> int:
    return x * x

# Map over a sequence
results = square.map("x", [1, 2, 3, 4])  # [1, 4, 9, 16]

Mapping Over Key-Value Pairs

@task
def process(data: str) -> int:
    return len(data)

# Map over key-value pairs, preserving keys
results = process.mapvalues("data", [
    ("a", "hello"),
    ("b", "world"),
])
# [("a", 5), ("b", 5)]

Validation

@task
def add_numbers(a: int, b: int) -> int:
    return a + b

# Parse string inputs to ints
result = add_numbers.validate().call("10", "20")  # 30

Error Handling

@task
def divide(a: int, b: int) -> float:
    return a / b

# Add task instance context to errors
task_with_id = divide.set_task_instance_id("task-1")
try:
    task_with_id.handle_errors().call(10, 0)
except TaskInstanceError as e:
    print(e)  # Task instance 'task-1' raised ZeroDivisionError(...)

Method Chaining

All methods return a new task instance, enabling clean method chaining:

result = (
    add
    .partial(a=10)
    .validate()
    .set_task_instance_id("add-task")
    .handle_errors()
    .call(b="20")
)  # 30

Architecture

wt-task is designed to work seamlessly with other wt packages:

  • wt-contracts: Implements the TaskProtocol interface
  • wt-compiler: Generated DAG code uses task() wrapper function
  • wt-registry: Task functions are registered separately with @register

Development

# Create environment and install dependencies
cd wt/wt-task
uv sync

# Run tests
uv run pytest

# Type checking
uv run mypy src/

# Linting
uv run ruff check .

# Formatting
uv run ruff format .

Testing

The package includes comprehensive tests for all features:

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=wt_task --cov-report=html

# Run specific test file
uv run pytest tests/test_decorator.py

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wt_task-0.1.1.tar.gz (89.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wt_task-0.1.1-py3-none-any.whl (25.5 kB view details)

Uploaded Python 3

File details

Details for the file wt_task-0.1.1.tar.gz.

File metadata

  • Download URL: wt_task-0.1.1.tar.gz
  • Upload date:
  • Size: 89.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.1.tar.gz
Algorithm Hash digest
SHA256 352ac9f9441e3d41bfda8fb3947addae2c3b76482599773b5ab298ed943e21a4
MD5 935d04625dced02aacdaa32ea1ab9c94
BLAKE2b-256 9addc0b4fbfb51fd56bf5e023a6a3329b0e558017358d51b9d593195fbd3eaeb

See more details on using hashes here.

File details

Details for the file wt_task-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: wt_task-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 25.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 a29b51f4ebed3df43c7a954afc98a7249fd31e63936b4fb19ebd464222aba402
MD5 b74ae69e559eb06810f735080b0e2cd8
BLAKE2b-256 0b9a804355b160bd1ffb21da67630e1d7c0f365070426c9ac02434d09a1c2f47

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page