Skip to main content

Task decorator and execution features for wt ecosystem

Project description

wt-task

Task decorator and execution features for the wt ecosystem.

Overview

wt-task provides the @task decorator and task execution capabilities for workflow systems. It implements the TaskProtocol from wt-contracts, enabling type-safe task execution across the wt ecosystem.

Features

  • Dual-purpose task decorator: Works as both a decorator (@task) and wrapper function (task(func))
  • Execution methods: call, map, mapvalues for different execution patterns
  • Partial application: Bind arguments with .partial() before execution
  • Validation: Pydantic-based validation with .validate()
  • Error handling: Wrap errors with task instance context
  • Conditional skipping: Skip execution based on conditions with .skipif()
  • Custom executors: Switch execution backends with .set_executor()
  • OpenTelemetry tracing: Optional tracing support (requires wt-task[gcp])

Installation

# Basic installation
pip install wt-task

# With tracing support
pip install wt-task[gcp]

# Development installation
cd wt/wt-task
uv sync

Usage

Basic Task Definition

from wt_task import task

@task
def add(a: int, b: int) -> int:
    return a + b

# Direct call
result = add(1, 2)  # 3

# Explicit call
result = add.call(1, 2)  # 3

Partial Application

@task
def multiply(a: int, b: int) -> int:
    return a * b

# Bind one argument
multiply_by_2 = multiply.partial(b=2)
result = multiply_by_2.call(a=5)  # 10

Mapping Over Values

@task
def square(x: int) -> int:
    return x * x

# Map over a sequence
results = square.map("x", [1, 2, 3, 4])  # [1, 4, 9, 16]

Mapping Over Key-Value Pairs

@task
def process(data: str) -> int:
    return len(data)

# Map over key-value pairs, preserving keys
results = process.mapvalues("data", [
    ("a", "hello"),
    ("b", "world"),
])
# [("a", 5), ("b", 5)]

Validation

@task
def add_numbers(a: int, b: int) -> int:
    return a + b

# Parse string inputs to ints
result = add_numbers.validate().call("10", "20")  # 30

Error Handling

@task
def divide(a: int, b: int) -> float:
    return a / b

# Add task instance context to errors
task_with_id = divide.set_task_instance_id("task-1")
try:
    task_with_id.handle_errors().call(10, 0)
except TaskInstanceError as e:
    print(e)  # Task instance 'task-1' raised ZeroDivisionError(...)

Method Chaining

All methods return a new task instance, enabling clean method chaining:

result = (
    add
    .partial(a=10)
    .validate()
    .set_task_instance_id("add-task")
    .handle_errors()
    .call(b="20")
)  # 30

Architecture

wt-task is designed to work seamlessly with other wt packages:

  • wt-contracts: Implements the TaskProtocol interface
  • wt-compiler: Generated DAG code uses task() wrapper function
  • wt-registry: Task functions are registered separately with @register

Development

# Create environment and install dependencies
cd wt/wt-task
uv sync

# Run tests
uv run pytest

# Type checking
uv run mypy src/

# Linting
uv run ruff check .

# Formatting
uv run ruff format .

Testing

The package includes comprehensive tests for all features:

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=wt_task --cov-report=html

# Run specific test file
uv run pytest tests/test_decorator.py

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wt_task-0.1.0.tar.gz (89.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wt_task-0.1.0-py3-none-any.whl (25.5 kB view details)

Uploaded Python 3

File details

Details for the file wt_task-0.1.0.tar.gz.

File metadata

  • Download URL: wt_task-0.1.0.tar.gz
  • Upload date:
  • Size: 89.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.0.tar.gz
Algorithm Hash digest
SHA256 93408d120fc19569d7e48f8556c26f0f0d80efc64a5b2a665ffb94776715178a
MD5 31e1e4ac46e08a95c628ac15e06f7ea4
BLAKE2b-256 293a0d7e2ae8edc2fb03c24998649ac8ccfe2e3510325bc5713250aa022d8896

See more details on using hashes here.

File details

Details for the file wt_task-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: wt_task-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 25.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cfb3976749c98f4d42d7bd9020cd05b59a691b996d74c4c814c3ccf5bd476bf3
MD5 cca6fa056b4f4e53e3791a65bbe45d6c
BLAKE2b-256 5e83f1b94d5a17ade451804db8b9447df80798594b2c7d49569c329911504493

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page