Skip to main content

AI Agentic Workflows for data preparation and transformation

Project description

aw - Agentic Workflows for Data Preparation

An AI agent package for data preparation with a focus on modular, extensible workflows that follow best practices from modern agentic architectures.

Features

  • AgenticStep Protocol: Build modular agents that follow a consistent interface
  • ReAct Pattern: Reason-Act-Observe loops with automatic retry logic
  • Three Validation Flavors: Schema-based, info-dict based, and functional validation
  • Code Execution: Safe defaults with extensibility for robust backends (LangChain, DSPy)
  • Loading Agent: Automatically loads data from various sources into pandas DataFrames
  • Preparation Agent: Transforms data to meet target requirements
  • Cosmograph Support: Specialized validators for cosmograph visualization
  • Orchestration: Chain multiple steps together with context management
  • Human-in-Loop: Optional human approval at any step

Installation

pip install -e .

Optional Dependencies

# For LangChain code execution
pip install langchain langchain-experimental

# For cosmograph validation
pip install cosmograph

# For OpenAI LLM
pip install openai

# For oa package (preferred LLM interface)
pip install oa

Quick Start

Simple Example: Load and Prepare for Cosmograph

from aw import load_for_cosmo

# Load and prepare data in one call
df, metadata = load_for_cosmo('data.csv')

# Use with cosmograph
from cosmograph import cosmo
params = metadata['preparing']['metadata']['validation_result']['params']
cosmo(df, **params)

Step-by-Step Example

from aw import LoadingAgent, PreparationAgent, Context
from aw.cosmo import basic_cosmo_validator

# Create context
context = Context()

# Step 1: Load data
loading_agent = LoadingAgent()
df, loading_meta = loading_agent.execute('data.csv', context)

print(f"Loaded {df.shape[0]} rows, {df.shape[1]} columns")

# Step 2: Prepare data
preparing_agent = PreparationAgent(
    target='cosmo-ready',
    target_validator=basic_cosmo_validator()
)
prepared_df, prep_meta = preparing_agent.execute(df, context)

print(f"Prepared data: {prep_meta['validation_result']}")

Workflow Example

from aw import create_cosmo_prep_workflow

# Create a complete workflow
workflow = create_cosmo_prep_workflow(max_retries=5)

# Run it
result_df, metadata = workflow.run('data.csv')

# Check results
if metadata['success']:
    print("Success!")
    print(f"Steps executed: {[s['name'] for s in metadata['steps']]}")
else:
    print(f"Failed at: {metadata.get('failed_at')}")

Architecture

Core Concepts

AgenticStep Protocol

All agents implement the AgenticStep protocol:

def execute(
    self, 
    input_data: Any, 
    context: MutableMapping[str, Any]
) -> tuple[ArtifactType, dict[str, Any]]:
    """Execute the agentic step following ReAct pattern"""
    ...

ReAct Pattern

Each agent follows the Reason-Act-Observe loop:

  1. Reason (Thought): Analyze input and context to decide action
  2. Act (Action): Generate and execute code or use tools
  3. Observe: Capture results and validate
  4. Repeat or Finish: Based on validation, retry or proceed

Three Validation Flavors

  1. Schema-based: Use Pydantic models or JSON schemas
  2. Info-dict based: Compute info → check info → pass/fail
  3. Functional: Try to use the artifact for its purpose
from aw.validation import (
    schema_validator,
    info_dict_validator, 
    functional_validator
)

# Schema validation
from pydantic import BaseModel
class DataSchema(BaseModel):
    x: float
    y: float
validate = schema_validator(DataSchema)

# Info-dict validation
def compute_info(df):
    return {'null_count': df.isnull().sum().sum()}

def check_info(info):
    return info['null_count'] == 0, "No nulls allowed"

validate = info_dict_validator(compute_info, check_info)

# Functional validation (try the purpose)
def try_visualize(df):
    return cosmograph.cosmo(df, points_x_by='x', points_y_by='y')

validate = functional_validator(try_visualize)

Configuration System

Supports both simple objects and callables with cascading defaults:

from aw import GlobalConfig, StepConfig

# Global defaults
global_config = GlobalConfig(
    llm="gpt-4",
    max_retries=3
)

# Step-specific override
step_config = global_config.override(
    llm="gpt-3.5-turbo",
    max_retries=5
)

# Or pass callables
def my_llm(prompt):
    return "Response to: " + prompt

step_config = StepConfig(
    llm=my_llm,  # Function instead of string
    validator=lambda x: (True, {}),
    max_retries=3
)

Tools

CodeInterpreterTool

Safe code execution with extensibility:

from aw import CodeInterpreterTool

# Default (safe exec)
tool = CodeInterpreterTool()
result = tool("x = 5; y = x * 2; print(y)")
print(result.output)  # "10"

# With LangChain backend
from aw import create_langchain_executor
tool = CodeInterpreterTool(executor=create_langchain_executor())

FileSamplerTool

Sample and analyze files before loading:

from aw import FileSamplerTool

sampler = FileSamplerTool()
info = sampler('data.csv')
print(info['extension'])  # '.csv'
print(info['sample_text'])  # First 1024 bytes

Custom Agents

Build your own agents following the pattern:

from aw.base import AgenticStep, StepConfig
from collections.abc import MutableMapping
from typing import Any

class MyCustomAgent:
    """Custom agent following AgenticStep protocol"""
    
    def __init__(self, config: StepConfig = None):
        self.config = config or StepConfig()
        self.llm = self.config.resolve_llm()
        self.validator = self.config.resolve_validator()
    
    def execute(
        self,
        input_data: Any,
        context: MutableMapping[str, Any]
    ) -> tuple[Any, dict]:
        attempts = []
        
        for attempt in range(self.config.max_retries):
            # 1. Reason: Analyze situation
            thought = self._analyze(input_data, attempts)
            
            # 2. Act: Perform action
            result = self._act(thought)
            
            # 3. Observe: Validate
            is_valid, info = self.validator(result)
            attempts.append({'attempt': attempt, 'info': info})
            
            if is_valid:
                context['my_agent'] = {'result': result, 'info': info}
                return result, {'success': True, 'attempts': attempts}
        
        return None, {'success': False, 'attempts': attempts}

Advanced Usage

Interactive Workflows with Human-in-Loop

from aw import InteractiveWorkflow, LoadingAgent, PreparationAgent

workflow = InteractiveWorkflow()
workflow.add_step('loading', LoadingAgent(), require_approval=True)
workflow.add_step('preparing', PreparationAgent(), require_approval=False)

# Set custom approval callback
def approve(step_name, artifact, metadata):
    print(f"\nStep '{step_name}' completed")
    print(f"Artifact shape: {artifact.shape}")
    return input("Continue? (y/n): ").lower() == 'y'

workflow.set_approval_callback(approve)

# Run interactively
result, metadata = workflow.run_interactive('data.csv')

Custom Validators

Create domain-specific validators:

from aw.validation import info_dict_validator

def compute_ml_info(df):
    return {
        'num_features': len(df.columns) - 1,
        'num_samples': len(df),
        'class_balance': df['target'].value_counts().to_dict()
    }

def check_ml_requirements(info):
    if info['num_features'] < 5:
        return False, "Need at least 5 features"
    if info['num_samples'] < 100:
        return False, "Need at least 100 samples"
    return True, "ML ready"

ml_validator = info_dict_validator(compute_ml_info, check_ml_requirements)

Partial Workflow Execution

from aw import create_data_prep_workflow

workflow = create_data_prep_workflow()

# Run only the loading step
df, metadata = workflow.run_partial('data.csv', stop_after='loading')

# Inspect intermediate result
print(f"Loaded {df.shape}")

# Continue with preparing
final_df, final_meta = workflow.run_partial(df, stop_after='preparing')

Design Principles

The package follows these principles:

  1. Functional over OOP: Favor functions and generators
  2. Modularity: Small, focused functions with clear purposes
  3. Open-Closed: Easy to extend without modifying core
  4. Dependency Injection: Abstract interfaces with injectable implementations
  5. Mapping Interfaces: Use Mapping/MutableMapping from collections.abc
  6. Generators over Lists: Use lazy evaluation when possible
  7. Minimal Doctests: Simple examples in docstrings

Project Structure

aw/
├── __init__.py         # Main exports
├── base.py            # Core protocols and configs
├── validation.py      # Three validation flavors
├── tools.py           # Code execution and tools
├── utils.py           # Helpers and facades
├── loading.py         # LoadingAgent
├── preparing.py       # PreparationAgent
├── cosmo.py          # Cosmograph validators
└── orchestration.py   # Workflow management

Contributing

Contributions welcome! Key areas:

  • Additional validators for other visualization libraries
  • More robust LLM reasoning in code generation
  • Integration with more backends (DSPy, CrewAI)
  • State management with lakeFS/DVC
  • Additional agent types (sourcing, finding, etc.)

License

See LICENSE file.

Credits

Built following best practices from:

  • ReAct pattern (Reasoning and Acting)
  • LangChain's agent architectures
  • CrewAI's multi-agent systems
  • Data Version Control (DVC) principles

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aw-0.1.2.tar.gz (35.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aw-0.1.2-py3-none-any.whl (39.0 kB view details)

Uploaded Python 3

File details

Details for the file aw-0.1.2.tar.gz.

File metadata

  • Download URL: aw-0.1.2.tar.gz
  • Upload date:
  • Size: 35.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for aw-0.1.2.tar.gz
Algorithm Hash digest
SHA256 318b1578886b99bcd50611af03f40dc07f836b7153be3ced6c69ef7878a646c5
MD5 0ecd488065be9d0b45d1efee62636a37
BLAKE2b-256 eaf7f93281dda5f52d80f5fd6e606452271ec4d7f625022ace9cb029b646db85

See more details on using hashes here.

File details

Details for the file aw-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: aw-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 39.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for aw-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 babfc5f4d36e2d44fed1560163752d5e55168995ce2ed5a4a27f8fc38110c05b
MD5 7f0fc694558db4f57fb25bd2300e5509
BLAKE2b-256 d8595088ede55affd7713c8847529f2c3762a5f07a3675b2821f9ef912effdd8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page