AI Graph framework for building processing pipelines
Project description
AI-Graph
A powerful and flexible AI Graph framework for building processing pipelines using the Chain of Responsibility pattern.
🚀 Features
- 🔗 Pipeline Architecture: Build complex processing pipelines using chained steps
- 🔄 ForEach Processing: Iterate over collections or run fixed iterations with sub-pipelines
- 🏗️ Modular Design: Easily extensible with custom pipeline steps
- 📊 Progress Tracking: Built-in progress bars with tqdm integration
- 🧪 100% Test Coverage: Comprehensive test suite with pytest
- 🎯 Type Safe: Full type hints support with mypy
- 📦 Modern Python: Built with modern Python packaging standards
📋 Table of Contents
💾 Installation
Using pip
pip install ai-graph
Using uv (recommended)
uv add ai-graph
Development Installation
git clone https://github.com/ai-graph/ai-graph.git
cd ai-graph
uv sync --group dev
⚡ Quick Start
Here's a simple example to get you started:
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep
# Create a simple pipeline
pipeline = Pipeline("DataProcessor")
# Add processing steps
pipeline.add_step(AddKeyStep("status", "processing"))
pipeline.add_step(AddKeyStep("timestamp", "2024-01-01"))
# Process data
data = {"input": "some data"}
result = pipeline.process(data)
print(result)
# Output: {'input': 'some data', 'status': 'processing', 'timestamp': '2024-01-01'}
🧩 Core Concepts
Pipeline Steps
Pipeline steps are the building blocks of your processing pipeline. Each step implements the BasePipelineStep interface:
from ai_graph.step.base import BasePipelineStep
class CustomStep(BasePipelineStep):
def _process_step(self, data):
# Your processing logic here
data["custom_field"] = "processed"
return data
# Use in pipeline
pipeline = Pipeline("CustomPipeline")
pipeline.add_step(CustomStep("MyCustomStep"))
Built-in Steps
AddKeyStep: Adds a key-value pair to the dataDelKeyStep: Removes a key from the dataForEachStep: Processes collections or runs iterations
Pipelines
Pipelines manage the execution flow of your processing steps:
from ai_graph.pipeline.base import Pipeline
# Create pipeline
pipeline = Pipeline("MyPipeline")
# Add steps (method chaining supported)
pipeline.add_step(step1).add_step(step2).add_step(step3)
# Process data
result = pipeline.process(input_data)
ForEach Processing
Process collections or run fixed iterations with sub-pipelines:
from ai_graph.step.foreach import ForEachStep
# Process a list of items
foreach_step = ForEachStep(
items_key="items",
results_key="processed_items"
)
# Add sub-processing steps
foreach_step.add_sub_step(AddKeyStep("processed", True))
foreach_step.add_sub_step(AddKeyStep("batch_id", "batch_001"))
# Use in pipeline
pipeline = Pipeline("BatchProcessor")
pipeline.add_step(foreach_step)
data = {"items": [{"id": 1}, {"id": 2}, {"id": 3}]}
result = pipeline.process(data)
📚 Examples
Example 1: Data Validation Pipeline
from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import AddKeyStep
class ValidateDataStep(BasePipelineStep):
def _process_step(self, data):
if "required_field" not in data:
data["validation_error"] = "Missing required field"
else:
data["validation_status"] = "valid"
return data
# Create validation pipeline
pipeline = Pipeline("DataValidator")
pipeline.add_step(ValidateDataStep("Validator"))
pipeline.add_step(AddKeyStep("validated_at", "2024-01-01"))
# Process data
data = {"required_field": "value"}
result = pipeline.process(data)
Example 2: Batch Processing with ForEach
from ai_graph.step.foreach import ForEachStep
class ProcessItemStep(BasePipelineStep):
def _process_step(self, data):
# Access current item and iteration index
current_item = data["_current_item"]
iteration_index = data["_iteration_index"]
# Process the item
data["processed_value"] = current_item * 2
data["position"] = iteration_index
return data
# Create batch processing pipeline
batch_processor = ForEachStep(
items_key="numbers",
results_key="processed_numbers"
)
batch_processor.add_sub_step(ProcessItemStep("ItemProcessor"))
pipeline = Pipeline("BatchProcessor")
pipeline.add_step(batch_processor)
# Process batch
data = {"numbers": [1, 2, 3, 4, 5]}
result = pipeline.process(data)
# result["processed_numbers"] will contain processed items
Example 3: Fixed Iterations
# Run a fixed number of iterations
iteration_step = ForEachStep(
iterations=5,
results_key="iteration_results"
)
class IterationStep(BasePipelineStep):
def _process_step(self, data):
iteration = data["_iteration_index"]
data["step"] = f"Step {iteration + 1}"
return data
iteration_step.add_sub_step(IterationStep("Iterator"))
pipeline = Pipeline("IterationPipeline")
pipeline.add_step(iteration_step)
result = pipeline.process({"start": True})
📖 API Reference
BasePipelineStep
class BasePipelineStep:
def __init__(self, name: str = None)
def set_next(self, step: 'BasePipelineStep') -> None
def process(self, data: Dict[str, Any]) -> Dict[str, Any]
def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any] # Override this
Pipeline
class Pipeline:
def __init__(self, name: str, first_step: Optional[PipelineStep] = None)
def add_step(self, step: PipelineStep) -> "Pipeline"
def process(self, data: Dict[str, Any]) -> Dict[str, Any]
ForEachStep
class ForEachStep(BasePipelineStep):
def __init__(
self,
items_key: Optional[str] = None,
iterations: Optional[int] = None,
results_key: str = "foreach_results",
name: str = None
)
def add_sub_step(self, step: BasePipelineStep) -> "ForEachStep"
🛠️ Development
Prerequisites
- Python 3.8+
- uv (recommended) or pip
Setup Development Environment
# Clone repository
git clone https://github.com/ai-graph/ai-graph.git
cd ai-graph
# Install dependencies
uv sync --group dev
# Install pre-commit hooks (optional)
uv run pre-commit install
Running Tests
# Run all tests
uv run pytest
# Run with coverage
uv run pytest --cov
# Run specific test file
uv run pytest tests/step/test_base.py
Code Quality
# Format code
uv run black .
uv run isort .
# Lint code
uv run flake8 .
# Type checking
uv run mypy .
Making Releases
This project uses conventional commits and automated versioning:
# Make changes and commit using conventional commits
uv run cz commit
# Bump version automatically
uv run cz bump
# Push changes and tags
git push origin main --tags
🤝 Contributing
We love contributions! Please see our Contributing Guidelines for details.
Quick Contribution Guide
- Fork the repository
- Create a feature branch:
git checkout -b feature/amazing-feature - Make your changes and add tests
- Ensure tests pass:
uv run pytest - Format code:
uv run black . && uv run isort . - Commit using conventional commits:
uv run cz commit - Push and create a Pull Request
📄 License
This project is licensed under the GNU General Public License v3.0 - see the LICENSE file for details.
🙋♂️ Support
- 📧 Email: msinamsina@gmail.com
- 🐛 Issues: GitHub Issues
- 💬 Discussions: GitHub Discussions
🎉 Acknowledgments
- Built with modern Python packaging standards
- Inspired by the Chain of Responsibility design pattern
- Uses tqdm for progress bars
- Tested with pytest
⭐ Star this repository if you find it helpful!
Made with ❤️ by Mohammad Sina Allahkaram
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file ai_graph-0.0.5.tar.gz.
File metadata
- Download URL: ai_graph-0.0.5.tar.gz
- Upload date:
- Size: 215.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.21
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
cbcdef363250436f492eaa9cdac9382f480a6d21d161563fa2b43e5cd4938fce
|
|
| MD5 |
8c502cd7879281fa96e0223802d919aa
|
|
| BLAKE2b-256 |
274a87a921515909f635b35d3cd70bb0b34d3926031684c856f243f591930cfc
|
File details
Details for the file ai_graph-0.0.5-py3-none-any.whl.
File metadata
- Download URL: ai_graph-0.0.5-py3-none-any.whl
- Upload date:
- Size: 36.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.7.21
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8aec2a82649e4c533b0c5422888273941c5b185f0c3b607e1835e8842cb7ffa7
|
|
| MD5 |
dd0b4bc25162823f2b553db2438a42c7
|
|
| BLAKE2b-256 |
deba8f7e49c94697b7a8045466046000da61e35924945260dd65bc7307d038f2
|