Skip to main content

AI Graph framework for building processing pipelines

Project description

AI-Graph

Python Version License: GPL v3 Code style: black Tests Coverage Development Status

A powerful and flexible AI Graph framework for building processing pipelines using the Chain of Responsibility pattern.

🚀 Features

  • 🔗 Pipeline Architecture: Build complex processing pipelines using chained steps
  • 🔄 ForEach Processing: Iterate over collections or run fixed iterations with sub-pipelines
  • 🏗️ Modular Design: Easily extensible with custom pipeline steps
  • 📊 Progress Tracking: Built-in progress bars with tqdm integration
  • 🧪 100% Test Coverage: Comprehensive test suite with pytest
  • 🎯 Type Safe: Full type hints support with mypy
  • 📦 Modern Python: Built with modern Python packaging standards

📋 Table of Contents

💾 Installation

Using pip

pip install ai-graph

Using uv (recommended)

uv add ai-graph

Development Installation

git clone https://github.com/ai-graph/ai-graph.git
cd ai-graph
uv sync --group dev

⚡ Quick Start

Here's a simple example to get you started:

from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import AddKeyStep, DelKeyStep
from ai_graph.step.foreach import ForEachStep

# Create a simple pipeline
pipeline = Pipeline("DataProcessor")

# Add processing steps
pipeline.add_step(AddKeyStep("status", "processing"))
pipeline.add_step(AddKeyStep("timestamp", "2024-01-01"))

# Process data
data = {"input": "some data"}
result = pipeline.process(data)

print(result)
# Output: {'input': 'some data', 'status': 'processing', 'timestamp': '2024-01-01'}

🧩 Core Concepts

Pipeline Steps

Pipeline steps are the building blocks of your processing pipeline. Each step implements the BasePipelineStep interface:

from ai_graph.step.base import BasePipelineStep

class CustomStep(BasePipelineStep):
    def _process_step(self, data):
        # Your processing logic here
        data["custom_field"] = "processed"
        return data

# Use in pipeline
pipeline = Pipeline("CustomPipeline")
pipeline.add_step(CustomStep("MyCustomStep"))

Built-in Steps

  • AddKeyStep: Adds a key-value pair to the data
  • DelKeyStep: Removes a key from the data
  • ForEachStep: Processes collections or runs iterations

Pipelines

Pipelines manage the execution flow of your processing steps:

from ai_graph.pipeline.base import Pipeline

# Create pipeline
pipeline = Pipeline("MyPipeline")

# Add steps (method chaining supported)
pipeline.add_step(step1).add_step(step2).add_step(step3)

# Process data
result = pipeline.process(input_data)

ForEach Processing

Process collections or run fixed iterations with sub-pipelines:

from ai_graph.step.foreach import ForEachStep

# Process a list of items
foreach_step = ForEachStep(
    items_key="items",
    results_key="processed_items"
)

# Add sub-processing steps
foreach_step.add_sub_step(AddKeyStep("processed", True))
foreach_step.add_sub_step(AddKeyStep("batch_id", "batch_001"))

# Use in pipeline
pipeline = Pipeline("BatchProcessor")
pipeline.add_step(foreach_step)

data = {"items": [{"id": 1}, {"id": 2}, {"id": 3}]}
result = pipeline.process(data)

📚 Examples

Example 1: Data Validation Pipeline

from ai_graph.pipeline.base import Pipeline
from ai_graph.step.base import AddKeyStep

class ValidateDataStep(BasePipelineStep):
    def _process_step(self, data):
        if "required_field" not in data:
            data["validation_error"] = "Missing required field"
        else:
            data["validation_status"] = "valid"
        return data

# Create validation pipeline
pipeline = Pipeline("DataValidator")
pipeline.add_step(ValidateDataStep("Validator"))
pipeline.add_step(AddKeyStep("validated_at", "2024-01-01"))

# Process data
data = {"required_field": "value"}
result = pipeline.process(data)

Example 2: Batch Processing with ForEach

from ai_graph.step.foreach import ForEachStep

class ProcessItemStep(BasePipelineStep):
    def _process_step(self, data):
        # Access current item and iteration index
        current_item = data["_current_item"]
        iteration_index = data["_iteration_index"]
        
        # Process the item
        data["processed_value"] = current_item * 2
        data["position"] = iteration_index
        
        return data

# Create batch processing pipeline
batch_processor = ForEachStep(
    items_key="numbers",
    results_key="processed_numbers"
)
batch_processor.add_sub_step(ProcessItemStep("ItemProcessor"))

pipeline = Pipeline("BatchProcessor")
pipeline.add_step(batch_processor)

# Process batch
data = {"numbers": [1, 2, 3, 4, 5]}
result = pipeline.process(data)
# result["processed_numbers"] will contain processed items

Example 3: Fixed Iterations

# Run a fixed number of iterations
iteration_step = ForEachStep(
    iterations=5,
    results_key="iteration_results"
)

class IterationStep(BasePipelineStep):
    def _process_step(self, data):
        iteration = data["_iteration_index"]
        data["step"] = f"Step {iteration + 1}"
        return data

iteration_step.add_sub_step(IterationStep("Iterator"))

pipeline = Pipeline("IterationPipeline")
pipeline.add_step(iteration_step)

result = pipeline.process({"start": True})

📖 API Reference

BasePipelineStep

class BasePipelineStep:
    def __init__(self, name: str = None)
    def set_next(self, step: 'BasePipelineStep') -> None
    def process(self, data: Dict[str, Any]) -> Dict[str, Any]
    def _process_step(self, data: Dict[str, Any]) -> Dict[str, Any]  # Override this

Pipeline

class Pipeline:
    def __init__(self, name: str, first_step: Optional[PipelineStep] = None)
    def add_step(self, step: PipelineStep) -> "Pipeline"
    def process(self, data: Dict[str, Any]) -> Dict[str, Any]

ForEachStep

class ForEachStep(BasePipelineStep):
    def __init__(
        self,
        items_key: Optional[str] = None,
        iterations: Optional[int] = None,
        results_key: str = "foreach_results",
        name: str = None
    )
    def add_sub_step(self, step: BasePipelineStep) -> "ForEachStep"

🛠️ Development

Prerequisites

  • Python 3.8+
  • uv (recommended) or pip

Setup Development Environment

# Clone repository
git clone https://github.com/ai-graph/ai-graph.git
cd ai-graph

# Install dependencies
uv sync --group dev

# Install pre-commit hooks (optional)
uv run pre-commit install

Running Tests

# Run all tests
uv run pytest

# Run with coverage
uv run pytest --cov

# Run specific test file
uv run pytest tests/step/test_base.py

Code Quality

# Format code
uv run black .
uv run isort .

# Lint code
uv run flake8 .

# Type checking
uv run mypy .

Making Releases

This project uses conventional commits and automated versioning:

# Make changes and commit using conventional commits
uv run cz commit

# Bump version automatically
uv run cz bump

# Push changes and tags
git push origin main --tags

🤝 Contributing

We love contributions! Please see our Contributing Guidelines for details.

Quick Contribution Guide

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Make your changes and add tests
  4. Ensure tests pass: uv run pytest
  5. Format code: uv run black . && uv run isort .
  6. Commit using conventional commits: uv run cz commit
  7. Push and create a Pull Request

📄 License

This project is licensed under the GNU General Public License v3.0 - see the LICENSE file for details.

🙋‍♂️ Support

🎉 Acknowledgments

  • Built with modern Python packaging standards
  • Inspired by the Chain of Responsibility design pattern
  • Uses tqdm for progress bars
  • Tested with pytest

⭐ Star this repository if you find it helpful!

Made with ❤️ by Mohammad Sina Allahkaram

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

ai_graph-0.0.0.dev0.tar.gz (113.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

ai_graph-0.0.0.dev0-py3-none-any.whl (32.8 kB view details)

Uploaded Python 3

File details

Details for the file ai_graph-0.0.0.dev0.tar.gz.

File metadata

  • Download URL: ai_graph-0.0.0.dev0.tar.gz
  • Upload date:
  • Size: 113.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.7.17

File hashes

Hashes for ai_graph-0.0.0.dev0.tar.gz
Algorithm Hash digest
SHA256 2cded7361cbbe5aa6950adb0b97d87e53761de7dc901221ef65a5aab24479867
MD5 a97e9346134f64fa725573996888ccf4
BLAKE2b-256 a9d991c17e56de531d3891272ada10d48b81bf5ef2c5a5de88e8916d517850a7

See more details on using hashes here.

File details

Details for the file ai_graph-0.0.0.dev0-py3-none-any.whl.

File metadata

File hashes

Hashes for ai_graph-0.0.0.dev0-py3-none-any.whl
Algorithm Hash digest
SHA256 c68d61578a0e9ffa8f5a8ea5d49d98d19002d97d9de4c2ff4ca3d08998511c63
MD5 440a33b9721166c4dfd313bfb9406d9e
BLAKE2b-256 4078fa109a96b318ec8976a67435deb1641b6b92719fe58f0ad7578ceede4a06

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page