AI Agentic Workflows for data preparation and transformation
Project description
aw - Agentic Workflows for Data Preparation
An AI agent package for data preparation with a focus on modular, extensible workflows that follow best practices from modern agentic architectures.
Features
- AgenticStep Protocol: Build modular agents that follow a consistent interface
- ReAct Pattern: Reason-Act-Observe loops with automatic retry logic
- Three Validation Flavors: Schema-based, info-dict based, and functional validation
- Code Execution: Safe defaults with extensibility for robust backends (LangChain, DSPy)
- Loading Agent: Automatically loads data from various sources into pandas DataFrames
- Preparation Agent: Transforms data to meet target requirements
- Cosmograph Support: Specialized validators for cosmograph visualization
- Orchestration: Chain multiple steps together with context management
- Human-in-Loop: Optional human approval at any step
Installation
pip install -e .
Optional Dependencies
# For LangChain code execution
pip install langchain langchain-experimental
# For cosmograph validation
pip install cosmograph
# For OpenAI LLM
pip install openai
# For oa package (preferred LLM interface)
pip install oa
Quick Start
Simple Example: Load and Prepare for Cosmograph
from aw import load_for_cosmo
# Load and prepare data in one call
df, metadata = load_for_cosmo('data.csv')
# Use with cosmograph
from cosmograph import cosmo
params = metadata['preparing']['metadata']['validation_result']['params']
cosmo(df, **params)
Step-by-Step Example
from aw import LoadingAgent, PreparationAgent, Context
from aw.cosmo import basic_cosmo_validator
# Create context
context = Context()
# Step 1: Load data
loading_agent = LoadingAgent()
df, loading_meta = loading_agent.execute('data.csv', context)
print(f"Loaded {df.shape[0]} rows, {df.shape[1]} columns")
# Step 2: Prepare data
preparing_agent = PreparationAgent(
target='cosmo-ready',
target_validator=basic_cosmo_validator()
)
prepared_df, prep_meta = preparing_agent.execute(df, context)
print(f"Prepared data: {prep_meta['validation_result']}")
Workflow Example
from aw import create_cosmo_prep_workflow
# Create a complete workflow
workflow = create_cosmo_prep_workflow(max_retries=5)
# Run it
result_df, metadata = workflow.run('data.csv')
# Check results
if metadata['success']:
print("Success!")
print(f"Steps executed: {[s['name'] for s in metadata['steps']]}")
else:
print(f"Failed at: {metadata.get('failed_at')}")
Architecture
Core Concepts
AgenticStep Protocol
All agents implement the AgenticStep protocol:
def execute(
self,
input_data: Any,
context: MutableMapping[str, Any]
) -> tuple[ArtifactType, dict[str, Any]]:
"""Execute the agentic step following ReAct pattern"""
...
ReAct Pattern
Each agent follows the Reason-Act-Observe loop:
- Reason (Thought): Analyze input and context to decide action
- Act (Action): Generate and execute code or use tools
- Observe: Capture results and validate
- Repeat or Finish: Based on validation, retry or proceed
Three Validation Flavors
- Schema-based: Use Pydantic models or JSON schemas
- Info-dict based: Compute info → check info → pass/fail
- Functional: Try to use the artifact for its purpose
from aw.validation import (
schema_validator,
info_dict_validator,
functional_validator
)
# Schema validation
from pydantic import BaseModel
class DataSchema(BaseModel):
x: float
y: float
validate = schema_validator(DataSchema)
# Info-dict validation
def compute_info(df):
return {'null_count': df.isnull().sum().sum()}
def check_info(info):
return info['null_count'] == 0, "No nulls allowed"
validate = info_dict_validator(compute_info, check_info)
# Functional validation (try the purpose)
def try_visualize(df):
return cosmograph.cosmo(df, points_x_by='x', points_y_by='y')
validate = functional_validator(try_visualize)
Configuration System
Supports both simple objects and callables with cascading defaults:
from aw import GlobalConfig, StepConfig
# Global defaults
global_config = GlobalConfig(
llm="gpt-4",
max_retries=3
)
# Step-specific override
step_config = global_config.override(
llm="gpt-3.5-turbo",
max_retries=5
)
# Or pass callables
def my_llm(prompt):
return "Response to: " + prompt
step_config = StepConfig(
llm=my_llm, # Function instead of string
validator=lambda x: (True, {}),
max_retries=3
)
Tools
CodeInterpreterTool
Safe code execution with extensibility:
from aw import CodeInterpreterTool
# Default (safe exec)
tool = CodeInterpreterTool()
result = tool("x = 5; y = x * 2; print(y)")
print(result.output) # "10"
# With LangChain backend
from aw import create_langchain_executor
tool = CodeInterpreterTool(executor=create_langchain_executor())
FileSamplerTool
Sample and analyze files before loading:
from aw import FileSamplerTool
sampler = FileSamplerTool()
info = sampler('data.csv')
print(info['extension']) # '.csv'
print(info['sample_text']) # First 1024 bytes
Custom Agents
Build your own agents following the pattern:
from aw.base import AgenticStep, StepConfig
from collections.abc import MutableMapping
from typing import Any
class MyCustomAgent:
"""Custom agent following AgenticStep protocol"""
def __init__(self, config: StepConfig = None):
self.config = config or StepConfig()
self.llm = self.config.resolve_llm()
self.validator = self.config.resolve_validator()
def execute(
self,
input_data: Any,
context: MutableMapping[str, Any]
) -> tuple[Any, dict]:
attempts = []
for attempt in range(self.config.max_retries):
# 1. Reason: Analyze situation
thought = self._analyze(input_data, attempts)
# 2. Act: Perform action
result = self._act(thought)
# 3. Observe: Validate
is_valid, info = self.validator(result)
attempts.append({'attempt': attempt, 'info': info})
if is_valid:
context['my_agent'] = {'result': result, 'info': info}
return result, {'success': True, 'attempts': attempts}
return None, {'success': False, 'attempts': attempts}
Advanced Usage
Interactive Workflows with Human-in-Loop
from aw import InteractiveWorkflow, LoadingAgent, PreparationAgent
workflow = InteractiveWorkflow()
workflow.add_step('loading', LoadingAgent(), require_approval=True)
workflow.add_step('preparing', PreparationAgent(), require_approval=False)
# Set custom approval callback
def approve(step_name, artifact, metadata):
print(f"\nStep '{step_name}' completed")
print(f"Artifact shape: {artifact.shape}")
return input("Continue? (y/n): ").lower() == 'y'
workflow.set_approval_callback(approve)
# Run interactively
result, metadata = workflow.run_interactive('data.csv')
Custom Validators
Create domain-specific validators:
from aw.validation import info_dict_validator
def compute_ml_info(df):
return {
'num_features': len(df.columns) - 1,
'num_samples': len(df),
'class_balance': df['target'].value_counts().to_dict()
}
def check_ml_requirements(info):
if info['num_features'] < 5:
return False, "Need at least 5 features"
if info['num_samples'] < 100:
return False, "Need at least 100 samples"
return True, "ML ready"
ml_validator = info_dict_validator(compute_ml_info, check_ml_requirements)
Partial Workflow Execution
from aw import create_data_prep_workflow
workflow = create_data_prep_workflow()
# Run only the loading step
df, metadata = workflow.run_partial('data.csv', stop_after='loading')
# Inspect intermediate result
print(f"Loaded {df.shape}")
# Continue with preparing
final_df, final_meta = workflow.run_partial(df, stop_after='preparing')
Design Principles
The package follows these principles:
- Functional over OOP: Favor functions and generators
- Modularity: Small, focused functions with clear purposes
- Open-Closed: Easy to extend without modifying core
- Dependency Injection: Abstract interfaces with injectable implementations
- Mapping Interfaces: Use
Mapping/MutableMappingfromcollections.abc - Generators over Lists: Use lazy evaluation when possible
- Minimal Doctests: Simple examples in docstrings
Project Structure
aw/
├── __init__.py # Main exports
├── base.py # Core protocols and configs
├── validation.py # Three validation flavors
├── tools.py # Code execution and tools
├── utils.py # Helpers and facades
├── loading.py # LoadingAgent
├── preparing.py # PreparationAgent
├── cosmo.py # Cosmograph validators
└── orchestration.py # Workflow management
Contributing
Contributions welcome! Key areas:
- Additional validators for other visualization libraries
- More robust LLM reasoning in code generation
- Integration with more backends (DSPy, CrewAI)
- State management with lakeFS/DVC
- Additional agent types (sourcing, finding, etc.)
License
See LICENSE file.
Credits
Built following best practices from:
- ReAct pattern (Reasoning and Acting)
- LangChain's agent architectures
- CrewAI's multi-agent systems
- Data Version Control (DVC) principles
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aw-0.1.2.tar.gz.
File metadata
- Download URL: aw-0.1.2.tar.gz
- Upload date:
- Size: 35.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
318b1578886b99bcd50611af03f40dc07f836b7153be3ced6c69ef7878a646c5
|
|
| MD5 |
0ecd488065be9d0b45d1efee62636a37
|
|
| BLAKE2b-256 |
eaf7f93281dda5f52d80f5fd6e606452271ec4d7f625022ace9cb029b646db85
|
File details
Details for the file aw-0.1.2-py3-none-any.whl.
File metadata
- Download URL: aw-0.1.2-py3-none-any.whl
- Upload date:
- Size: 39.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
babfc5f4d36e2d44fed1560163752d5e55168995ce2ed5a4a27f8fc38110c05b
|
|
| MD5 |
7f0fc694558db4f57fb25bd2300e5509
|
|
| BLAKE2b-256 |
d8595088ede55affd7713c8847529f2c3762a5f07a3675b2821f9ef912effdd8
|