Skip to main content

Task decorator and execution features for wt ecosystem

Project description

wt-task

Task decorator and execution features for the wt ecosystem.

Overview

wt-task provides the @task decorator and task execution capabilities for workflow systems. It implements the TaskProtocol from wt-contracts, enabling type-safe task execution across the wt ecosystem.

Features

  • Dual-purpose task decorator: Works as both a decorator (@task) and wrapper function (task(func))
  • Execution methods: call, map, mapvalues for different execution patterns
  • Partial application: Bind arguments with .partial() before execution
  • Validation: Pydantic-based validation with .validate()
  • Error handling: Wrap errors with task instance context
  • Conditional skipping: Skip execution based on conditions with .skipif()
  • Custom executors: Switch execution backends with .set_executor()
  • OpenTelemetry tracing: Optional tracing support (requires wt-task[gcp])

Installation

# Basic installation
pip install wt-task

# With tracing support
pip install wt-task[gcp]

# Development installation
cd wt/wt-task
uv sync

Usage

Basic Task Definition

from wt_task import task

@task
def add(a: int, b: int) -> int:
    return a + b

# Direct call
result = add(1, 2)  # 3

# Explicit call
result = add.call(1, 2)  # 3

Partial Application

@task
def multiply(a: int, b: int) -> int:
    return a * b

# Bind one argument
multiply_by_2 = multiply.partial(b=2)
result = multiply_by_2.call(a=5)  # 10

Mapping Over Values

@task
def square(x: int) -> int:
    return x * x

# Map over a sequence
results = square.map("x", [1, 2, 3, 4])  # [1, 4, 9, 16]

Mapping Over Key-Value Pairs

@task
def process(data: str) -> int:
    return len(data)

# Map over key-value pairs, preserving keys
results = process.mapvalues("data", [
    ("a", "hello"),
    ("b", "world"),
])
# [("a", 5), ("b", 5)]

Validation

@task
def add_numbers(a: int, b: int) -> int:
    return a + b

# Parse string inputs to ints
result = add_numbers.validate().call("10", "20")  # 30

Error Handling

@task
def divide(a: int, b: int) -> float:
    return a / b

# Add task instance context to errors
task_with_id = divide.set_task_instance_id("task-1")
try:
    task_with_id.handle_errors().call(10, 0)
except TaskInstanceError as e:
    print(e)  # Task instance 'task-1' raised ZeroDivisionError(...)

Method Chaining

All methods return a new task instance, enabling clean method chaining:

result = (
    add
    .partial(a=10)
    .validate()
    .set_task_instance_id("add-task")
    .handle_errors()
    .call(b="20")
)  # 30

Architecture

wt-task is designed to work seamlessly with other wt packages:

  • wt-contracts: Implements the TaskProtocol interface
  • wt-compiler: Generated DAG code uses task() wrapper function
  • wt-registry: Task functions are registered separately with @register

Development

# Create environment and install dependencies
cd wt/wt-task
uv sync

# Run tests
uv run pytest

# Type checking
uv run mypy src/

# Linting
uv run ruff check .

# Formatting
uv run ruff format .

Testing

The package includes comprehensive tests for all features:

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov=wt_task --cov-report=html

# Run specific test file
uv run pytest tests/test_decorator.py

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wt_task-0.1.2.tar.gz (90.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wt_task-0.1.2-py3-none-any.whl (25.6 kB view details)

Uploaded Python 3

File details

Details for the file wt_task-0.1.2.tar.gz.

File metadata

  • Download URL: wt_task-0.1.2.tar.gz
  • Upload date:
  • Size: 90.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.10 {"installer":{"name":"uv","version":"0.10.10","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.2.tar.gz
Algorithm Hash digest
SHA256 96029325617dd5a67164f79f979edefd9307271afb0f417124f89d72a47671c3
MD5 e25603775f6782649535a7636b539f69
BLAKE2b-256 6ba565a5ae8b38aba3cd081a2eaddc0723c7f97f525aaa3f984a9d9591c9e837

See more details on using hashes here.

File details

Details for the file wt_task-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: wt_task-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 25.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.10.10 {"installer":{"name":"uv","version":"0.10.10","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for wt_task-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 a52becb28e94afb9930b3b292db99400eaeef76aa6e243aa8d3438e308a4d80d
MD5 c7b22edbec168154392462443c1da13c
BLAKE2b-256 24681c282cfae21d4f93989cc831e4b9316a3c08aaf3a2d02402a642d722130f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page