Task decorator and execution features for wt ecosystem
Project description
wt-task
Task decorator and execution features for the wt ecosystem.
Overview
wt-task provides the @task decorator and task execution capabilities for workflow systems. It implements the TaskProtocol from wt-contracts, enabling type-safe task execution across the wt ecosystem.
Features
- Dual-purpose
taskdecorator: Works as both a decorator (@task) and wrapper function (task(func)) - Execution methods:
call,map,mapvaluesfor different execution patterns - Partial application: Bind arguments with
.partial()before execution - Validation: Pydantic-based validation with
.validate() - Error handling: Wrap errors with task instance context
- Conditional skipping: Skip execution based on conditions with
.skipif() - Custom executors: Switch execution backends with
.set_executor() - OpenTelemetry tracing: Optional tracing support (requires
wt-task[gcp])
Installation
# Basic installation
pip install wt-task
# With tracing support
pip install wt-task[gcp]
# Development installation
cd wt/wt-task
uv sync
Usage
Basic Task Definition
from wt_task import task
@task
def add(a: int, b: int) -> int:
return a + b
# Direct call
result = add(1, 2) # 3
# Explicit call
result = add.call(1, 2) # 3
Partial Application
@task
def multiply(a: int, b: int) -> int:
return a * b
# Bind one argument
multiply_by_2 = multiply.partial(b=2)
result = multiply_by_2.call(a=5) # 10
Mapping Over Values
@task
def square(x: int) -> int:
return x * x
# Map over a sequence
results = square.map("x", [1, 2, 3, 4]) # [1, 4, 9, 16]
Mapping Over Key-Value Pairs
@task
def process(data: str) -> int:
return len(data)
# Map over key-value pairs, preserving keys
results = process.mapvalues("data", [
("a", "hello"),
("b", "world"),
])
# [("a", 5), ("b", 5)]
Validation
@task
def add_numbers(a: int, b: int) -> int:
return a + b
# Parse string inputs to ints
result = add_numbers.validate().call("10", "20") # 30
Error Handling
@task
def divide(a: int, b: int) -> float:
return a / b
# Add task instance context to errors
task_with_id = divide.set_task_instance_id("task-1")
try:
task_with_id.handle_errors().call(10, 0)
except TaskInstanceError as e:
print(e) # Task instance 'task-1' raised ZeroDivisionError(...)
Method Chaining
All methods return a new task instance, enabling clean method chaining:
result = (
add
.partial(a=10)
.validate()
.set_task_instance_id("add-task")
.handle_errors()
.call(b="20")
) # 30
Architecture
wt-task is designed to work seamlessly with other wt packages:
- wt-contracts: Implements the
TaskProtocolinterface - wt-compiler: Generated DAG code uses
task()wrapper function - wt-registry: Task functions are registered separately with
@register
Development
# Create environment and install dependencies
cd wt/wt-task
uv sync
# Run tests
uv run pytest
# Type checking
uv run mypy src/
# Linting
uv run ruff check .
# Formatting
uv run ruff format .
Testing
The package includes comprehensive tests for all features:
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov=wt_task --cov-report=html
# Run specific test file
uv run pytest tests/test_decorator.py
License
Apache-2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file wt_task-0.1.0.tar.gz.
File metadata
- Download URL: wt_task-0.1.0.tar.gz
- Upload date:
- Size: 89.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
93408d120fc19569d7e48f8556c26f0f0d80efc64a5b2a665ffb94776715178a
|
|
| MD5 |
31e1e4ac46e08a95c628ac15e06f7ea4
|
|
| BLAKE2b-256 |
293a0d7e2ae8edc2fb03c24998649ac8ccfe2e3510325bc5713250aa022d8896
|
File details
Details for the file wt_task-0.1.0-py3-none-any.whl.
File metadata
- Download URL: wt_task-0.1.0-py3-none-any.whl
- Upload date:
- Size: 25.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: uv/0.10.8 {"installer":{"name":"uv","version":"0.10.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cfb3976749c98f4d42d7bd9020cd05b59a691b996d74c4c814c3ccf5bd476bf3
|
|
| MD5 |
cca6fa056b4f4e53e3791a65bbe45d6c
|
|
| BLAKE2b-256 |
5e83f1b94d5a17ade451804db8b9447df80798594b2c7d49569c329911504493
|