Skip to main content

A Pydantic validator processor for Google's genai-processors framework

Project description

genai-processors-pydantic

A Pydantic validator processor for Google's genai-processors framework.

Note: This is an independent contrib processor that extends the genai-processors ecosystem.

⚠️ Important: Current Limitations & Roadmap

This processor was developed based on feedback from the genai-processors maintainers. While functional and tested, it has known limitations in certain scenarios. See MAINTAINER_FEEDBACK.md for detailed analysis and our roadmap to address these challenges:

  • Streaming: Currently works best with complete JSON in single Parts
  • Tool Integration: Planned support for genai_types.ToolResponse Parts
  • Multi-Model Validation: Single-model design; multi-model support planned
  • MIME Type Independence: ✅ Already handles unmarked JSON Parts

We're committed to addressing these limitations while maintaining a stable API.

PydanticValidator

The PydanticValidator is a PartProcessor that validates the JSON content of a ProcessorPart against a specified Pydantic model. It provides a simple, declarative way to enforce data schemas and improve the robustness of your AI pipelines.

Motivation

In many AI applications, processors ingest data from external sources like user inputs or API calls. This data can be unpredictable or malformed. The PydanticValidator solves this by:

  • Preventing Errors: It catches invalid data early, before it can cause errors in downstream processors like a GenaiModel or a database writer.
  • Ensuring Structure: It guarantees that any data moving forward in the pipeline conforms to a reliable, expected structure.
  • Simplifying Logic: It allows other processors to focus on their core tasks without being cluttered with boilerplate data validation code.

Installation

Install the package from PyPI:

pip install genai-processors-pydantic

Or with uv:

uv add genai-processors-pydantic

This will automatically install the required dependencies:

  • genai-processors>=1.0.4
  • pydantic>=2.0

Configuration

You can customize the validator's behavior by passing a ValidationConfig object during initialization.

from genai_processors_pydantic import PydanticValidator, ValidationConfig

config = ValidationConfig(fail_on_error=True, strict_mode=True)
validator = PydanticValidator(MyModel, config=config)

ValidationConfig Parameters

  • fail_on_error (bool, default: False):
    • If False, the processor will catch ValidationErrors, add error details to the part's metadata, and allow the stream to continue.
    • If True, the processor will re-raise the ValidationError, stopping the stream immediately. This is useful for "fail-fast" scenarios.
  • strict_mode (bool, default: False):
    • If False, Pydantic will attempt to coerce types where possible (e.g., converting the string "123" to the integer 123).
    • If True, Pydantic will enforce strict type matching and will not perform type coercion.

Behavior and Metadata

The PydanticValidator processes parts that contain valid JSON in their text field. For each part it processes, it yields one or more new parts:

  1. The Result Part: The original part, now with added metadata.
  2. A Status Part: A message sent to the STATUS_STREAM indicating the outcome.

On Successful Validation

  • The yielded part's metadata['validation_status'] is set to 'success'.
  • The metadata['validated_data'] contains the serialized dictionary representation of the validated data (ensuring ProcessorParts remain serializable).
  • The part's text is updated to be the formatted JSON representation of the validated data.
  • A processor.status() message like ✅ Successfully validated... is yielded.

On Failed Validation

  • The yielded part's metadata['validation_status'] is set to 'failure'.
  • metadata['validation_errors'] contains a structured list of validation errors.
  • metadata['original_data'] contains the raw data that failed validation.
  • A processor.status() message like ❌ Validation failed... is yielded.

Practical Demonstration: Building a Robust Pipeline

A common use case is to validate a stream of user data and route valid and invalid items to different downstream processors. This example shows how to create a filter to separate the stream after validation.

Example

import asyncio
import json

from genai_processors import streams, processor
from genai_processors_pydantic import PydanticValidator
from pydantic import BaseModel, Field


# 1. Define the data schema.
class UserEvent(BaseModel):
    user_id: int
    event_name: str = Field(min_length=3)


# 2. Create the validator.
validator = PydanticValidator(model=UserEvent)

# 3. Define downstream processors for success and failure cases.
class DatabaseWriter(processor.PartProcessor):
    async def call(self, part: processor.ProcessorPart):
        validated_data = part.metadata['validated_data']
        print(
            f"DATABASE: Writing event '{validated_data['event_name']}' "
            f"for user {validated_data['user_id']}"
        )
        yield part


class ErrorLogger(processor.PartProcessor):
    async def call(self, part: processor.ProcessorPart):
        errors = part.metadata['validation_errors']
        print(f"ERROR_LOG: Found {len(errors)} validation errors.")
        yield part


# 4. Create a stream with mixed-quality data.
input_stream = streams.stream_content([
    # Valid example
    processor.ProcessorPart(json.dumps({"user_id": 101, "event_name": "login"})),
    # Invalid user_id
    processor.ProcessorPart(json.dumps({"user_id": "102", "event_name": "logout"})),
    # Invalid event_name
    processor.ProcessorPart(json.dumps({"user_id": 103, "event_name": "up"})),
    # Ignore this part
    processor.ProcessorPart("This is not a JSON part and will be ignored."),
])


# 5. Build and run the pipeline.
async def main():
    print("--- Running Validation Pipeline ---")

    # Process each input part through the validator as it arrives
    # This avoids buffering the entire stream in memory
    valid_parts = []
    invalid_parts = []

    async for input_part in input_stream:
        async for validated_part in validator(input_part):
            # Filter based on validation status (skip status messages)
            status = validated_part.metadata.get("validation_status")
            if status == "success":
                valid_parts.append(validated_part)
            elif status == "failure":
                invalid_parts.append(validated_part)

    # Create streams from the filtered parts
    valid_stream = streams.stream_content(valid_parts)
    invalid_stream = streams.stream_content(invalid_parts)

    # Create processor instances
    db_writer = DatabaseWriter()
    error_logger = ErrorLogger()

    # Process both streams concurrently
    async def process_valid():
        async for part in valid_stream:
            async for result in db_writer(part):
                pass  # Results are printed in the processor

    async def process_invalid():
        async for part in invalid_stream:
            async for result in error_logger(part):
                pass  # Results are printed in the processor

    # Run both processing pipelines concurrently
    await asyncio.gather(process_valid(), process_invalid())
    print("--- Pipeline Finished ---")


if __name__ == "__main__":
    asyncio.run(main())


# Expected Output:
# --- Running Validation Pipeline ---
# DATABASE: Writing event 'login' for user 101
# ERROR_LOG: Found 1 validation errors.
# ERROR_LOG: Found 1 validation errors.
# --- Pipeline Finished ---

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

genai_processors_pydantic-0.1.0.tar.gz (17.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

genai_processors_pydantic-0.1.0-py3-none-any.whl (13.8 kB view details)

Uploaded Python 3

File details

Details for the file genai_processors_pydantic-0.1.0.tar.gz.

File metadata

File hashes

Hashes for genai_processors_pydantic-0.1.0.tar.gz
Algorithm Hash digest
SHA256 30b13a187d34cbd962a90a662e4268b1af642df12098b885a8a960e413bc237c
MD5 0b1754391db7683a6a1ddb0b6f841ca5
BLAKE2b-256 032dbcff95b6228a424647425a3eb93524021521b365a3875e2e5471275e667b

See more details on using hashes here.

File details

Details for the file genai_processors_pydantic-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for genai_processors_pydantic-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 956f7350caeb47957beeb45078ee6a02892671619b380744ee52a547dee3335d
MD5 73ece2ce26cfa5e0e3d91224ef2aec46
BLAKE2b-256 62291d8735d4a9ed9d5ccea54e1b1f13385956ed186029c98c1f1ae5e40d32c6

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page