Skip to main content

An evaluation framework library for Python that provides a flexible way to run parallel evaluations and optionally integrate with the Orq AI platform.

Project description

evaluatorq-py

An evaluation framework library for Python that provides a flexible way to run parallel evaluations and optionally integrate with the Orq AI platform.

🎯 Features

  • Parallel Execution: Run multiple evaluation jobs concurrently with progress tracking
  • Flexible Data Sources: Support for inline data, async iterables, and Orq platform datasets
  • Type-safe: Fully typed with Python type hints and Pydantic models with runtime validation
  • Rich Terminal UI: Beautiful progress indicators and result tables powered by Rich
  • Orq Platform Integration: Seamlessly fetch and evaluate datasets from Orq AI (optional)
  • OpenTelemetry Tracing: Built-in observability with automatic span creation for jobs and evaluators
  • Pass/Fail Tracking: Evaluators can return pass/fail status for CI/CD integration
  • Built-in Evaluators: Common evaluators like string_contains_evaluator included
  • Integrations: LangChain, LangGraph, OpenAI Agents SDK, and custom callable support
  • Red Teaming: Adaptive OWASP-mapped adversarial security testing for AI agents

📖 Table of Contents

📥 Installation

pip install evaluatorq
# or
uv add evaluatorq
# or
poetry add evaluatorq

Optional Dependencies

If you want to use the Orq platform integration:

pip install orq-ai-sdk
# or
pip install evaluatorq[orq]

For OpenTelemetry tracing (optional):

pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http opentelemetry-semantic-conventions
# or
pip install evaluatorq[otel]

For LangChain/LangGraph integration:

pip install langchain
# or
pip install evaluatorq[langchain]

🏁 Getting Started

New to evaluatorq? Follow this path to get up and running:

Step What you'll learn Example
1. Basic eval Run your first evaluation with inline data pass_fail_simple.py
2. Multiple jobs Run multiple jobs in parallel on each data point example_runners.py
3. Reusable patterns Create reusable jobs and evaluators eval_reuse.py
4. Datasets Load data from the Orq platform dataset_example.py
5. Structured scores Return multi-dimensional metrics structured_rubric_eval.py
6. LangChain agent Evaluate a LangChain/LangGraph agent langchain_integration_example.py

Tip: Start with step 1 and work your way up. Each example builds on concepts from the previous one.

🚀 Quick Start

Basic Usage

import asyncio
from evaluatorq import evaluatorq, job, DataPoint, EvaluationResult

@job("text-analyzer")
async def text_analyzer(data: DataPoint, row: int):
    """Analyze text data and return analysis results."""
    text = data.inputs["text"]
    analysis = {
        "length": len(text),
        "word_count": len(text.split()),
        "uppercase": text.upper(),
    }

    return analysis

async def length_check_scorer(params):
    """Evaluate if output length is sufficient."""
    output = params["output"]
    passes_check = output["length"] > 10

    return EvaluationResult(
        value=1 if passes_check else 0,
        explanation=(
            "Output length is sufficient"
            if passes_check
            else f"Output too short ({output['length']} chars, need >10)"
        )
    )

async def main():
    await evaluatorq(
        "text-analysis",
        data=[
            DataPoint(inputs={"text": "Hello world"}),
            DataPoint(inputs={"text": "Testing evaluation"}),
        ],
        jobs=[text_analyzer],
        evaluators=[
            {
                "name": "length-check",
                "scorer": length_check_scorer,
            }
        ],
    )

if __name__ == "__main__":
    asyncio.run(main())

Tip: The @job() decorator preserves the job name in error messages. Always prefer @job("name") over raw functions for better debugging.

Using Orq Platform Datasets

import asyncio
from evaluatorq import evaluatorq, job, DataPoint, DatasetIdInput

@job("processor")
async def processor(data: DataPoint, row: int):
    """Process each data point from the dataset."""
    result = await process_data(data)
    return result

async def accuracy_scorer(params):
    """Calculate accuracy by comparing output with expected results."""
    data = params["data"]
    output = params["output"]

    score = calculate_score(output, data.expected_output)

    if score > 0.8:
        explanation = "High accuracy match"
    elif score > 0.5:
        explanation = "Partial match"
    else:
        explanation = "Low accuracy match"

    return {"value": score, "explanation": explanation}

async def main():
    # Requires ORQ_API_KEY environment variable
    await evaluatorq(
        "dataset-evaluation",
        data=DatasetIdInput(dataset_id="your-dataset-id"),  # From Orq platform
        jobs=[processor],
        evaluators=[
            {
                "name": "accuracy",
                "scorer": accuracy_scorer,
            }
        ],
    )

if __name__ == "__main__":
    asyncio.run(main())

Tip: Use parallelism to control how many data points are processed concurrently. Start with a low value (3-5) when calling external APIs to avoid rate limits.

Advanced Features

Multiple Jobs

Run multiple jobs in parallel for each data point:

from evaluatorq import job

@job("preprocessor")
async def preprocessor(data: DataPoint, row: int):
    result = await preprocess(data)
    return result

@job("analyzer")
async def analyzer(data: DataPoint, row: int):
    result = await analyze(data)
    return result

@job("transformer")
async def transformer(data: DataPoint, row: int):
    result = await transform(data)
    return result

await evaluatorq(
    "multi-job-eval",
    data=[...],
    jobs=[preprocessor, analyzer, transformer],
    evaluators=[...],
)

The @job() Decorator

The @job() decorator provides two key benefits:

  1. Eliminates boilerplate - No need to manually wrap returns with {"name": ..., "output": ...}
  2. Preserves job names in errors - When a job fails, the error will include the job name for better debugging

Decorator pattern (recommended):

from evaluatorq import job

@job("text-processor")
async def process_text(data: DataPoint, row: int):
    # Clean return - just the data!
    return {"result": data.inputs["text"].upper()}

Functional pattern (for lambdas):

from evaluatorq import job

# Simple transformations with lambda
uppercase_job = job("uppercase", lambda data, row: data.inputs["text"].upper())
word_count_job = job("word-count", lambda data, row: len(data.inputs["text"].split()))

Deployment Helper

Easily invoke Orq deployments within your evaluation jobs:

from evaluatorq import evaluatorq, job, invoke, deployment, DatasetIdInput

# Simple one-liner with invoke()
@job("summarizer")
async def summarize_job(data, row):
    text = data.inputs["text"]
    return await invoke("my-deployment", inputs={"text": text})

# Full response with deployment()
@job("analyzer")
async def analyze_job(data, row):
    response = await deployment(
        "my-deployment",
        inputs={"text": data.inputs["text"]},
        metadata={"source": "evaluatorq"},
    )
    print("Raw:", response.raw)
    return response.content

# Chat-style with messages
@job("chatbot")
async def chat_job(data, row):
    return await invoke(
        "chatbot",
        messages=[{"role": "user", "content": data.inputs["question"]}],
    )

# Thread tracking for conversations
@job("assistant")
async def conversation_job(data, row):
    return await invoke(
        "assistant",
        inputs={"query": data.inputs["query"]},
        thread={"id": "conversation-123"},
    )

The invoke() function returns the text content directly, while deployment() returns an object with both content and raw response for more control.

Built-in Evaluators

Use the included evaluators for common use cases:

from evaluatorq import evaluatorq, job, string_contains_evaluator, DatasetIdInput

@job("country-lookup")
async def country_lookup_job(data, row):
    country = data.inputs["country"]
    return await invoke("country-capitals", inputs={"country": country})

await evaluatorq(
    "country-unit-test",
    data=DatasetIdInput(dataset_id="your-dataset-id"),
    jobs=[country_lookup_job],
    evaluators=[string_contains_evaluator()],  # Checks if output contains expected_output
    parallelism=6,
)

Available built-in evaluators:

  • string_contains_evaluator() - Checks if output contains expected_output (case-insensitive by default)
  • exact_match_evaluator() - Checks if output exactly matches expected_output
# Case-sensitive matching
strict_evaluator = string_contains_evaluator(case_insensitive=False)

# Custom name
my_evaluator = string_contains_evaluator(name="my-contains-check")

Automatic Error Handling

The @job() decorator automatically preserves job names even when errors occur:

from evaluatorq import job

@job("risky-job")
async def risky_operation(data: DataPoint, row: int):
    # If this raises an error, the job name "risky-job" will be preserved
    result = await potentially_failing_operation(data)
    return result

await evaluatorq(
    "error-handling",
    data=[...],
    jobs=[risky_operation],
    evaluators=[...],
)

# Error output will show: "Job 'risky-job' failed: <error details>"
# Without @job decorator, you'd only see: "<error details>"

Async Data Sources

import asyncio

# Create an array of coroutines for async data
async def get_data_point(i: int) -> DataPoint:
    await asyncio.sleep(0.01)  # Simulate async data fetching
    return DataPoint(inputs={"value": i})

data_promises = [get_data_point(i) for i in range(1000)]

await evaluatorq(
    "async-eval",
    data=data_promises,
    jobs=[...],
    evaluators=[...],
)

Structured Evaluation Results

Evaluators can return structured, multi-dimensional metrics using EvaluationResultCell. This is useful for metrics like BERT scores, ROUGE-N scores, or any evaluation that produces multiple sub-scores.

Multi-criteria Rubric

Return multiple quality sub-scores in a single evaluator:

from evaluatorq import evaluatorq, job, DataPoint, EvaluationResult, EvaluationResultCell

@job("echo")
async def echo_job(data: DataPoint, row: int):
    return data.inputs["text"]

async def rubric_scorer(params):
    text = str(params["output"])
    return EvaluationResult(
        value=EvaluationResultCell(
            type="rubric",
            value={
                "relevance": min(len(text) / 100, 1),
                "coherence": 0.9 if "." in text else 0.4,
                "fluency": 0.85 if len(text.split()) > 5 else 0.5,
            },
        ),
        explanation="Multi-criteria quality rubric",
    )

await evaluatorq(
    "structured-rubric",
    data=[
        DataPoint(inputs={"text": "The quick brown fox jumps over the lazy dog."}),
        DataPoint(inputs={"text": "Hi"}),
    ],
    jobs=[echo_job],
    evaluators=[{"name": "rubric", "scorer": rubric_scorer}],
)
Sentiment Distribution

Break down sentiment across categories:

async def sentiment_scorer(params):
    text = str(params["output"]).lower()
    positive_words = ["good", "great", "excellent", "happy", "love"]
    negative_words = ["bad", "terrible", "awful", "sad", "hate"]
    pos_count = sum(1 for w in positive_words if w in text)
    neg_count = sum(1 for w in negative_words if w in text)
    total = max(pos_count + neg_count, 1)

    return EvaluationResult(
        value=EvaluationResultCell(
            type="sentiment",
            value={
                "positive": pos_count / total,
                "negative": neg_count / total,
                "neutral": 1 - (pos_count + neg_count) / total,
            },
        ),
        explanation="Sentiment distribution across categories",
    )
Safety Scores with Pass/Fail

Combine structured scores with pass/fail tracking for CI/CD:

async def safety_scorer(params):
    text = str(params["output"]).lower()
    categories = {
        "hate_speech": 0.8 if "hate" in text else 0.1,
        "violence": 0.7 if ("kill" in text or "fight" in text) else 0.05,
        "profanity": 0.5 if "damn" in text else 0.02,
    }

    return EvaluationResult(
        value=EvaluationResultCell(
            type="safety",
            value=categories,
        ),
        pass_=all(score < 0.5 for score in categories.values()),
        explanation="Content safety severity scores per category",
    )

See the runnable Python examples in the examples/ directory:

Note: Structured results display as [structured] in the terminal summary table but are preserved in full when sent to the Orq platform and OpenTelemetry spans.

Controlling Parallelism

await evaluatorq(
    "parallel-eval",
    data=[...],
    jobs=[...],
    evaluators=[...],
    parallelism=10,  # Run up to 10 jobs concurrently
)

Dashboard Organization with path

Use the path parameter to organize evaluation results into folders on the Orq dashboard:

await evaluatorq(
    "my-evaluation",
    path="MyProject/Evaluations/Unit Tests",
    data=[...],
    jobs=[...],
    evaluators=[...],
)

Tip: Use paths like "Team/Sprint-42/Feature-X" to keep experiments organized across teams and sprints.

See path_organization.py for a complete example.

Evaluation Description

Add a description to document the purpose of each evaluation run:

await evaluatorq(
    "model-comparison",
    description="Compare GPT-4o vs Claude on customer support responses",
    data=[...],
    jobs=[...],
    evaluators=[...],
)

Disable Progress Display

# Get raw results without terminal output
results = await evaluatorq(
    "silent-eval",
    data=[...],
    jobs=[...],
    evaluators=[...],
    print_results=False,  # Disable progress and table display
)

# Process results programmatically
for result in results:
    print(result.data_point.inputs)
    for job_result in result.job_results:
        print(f"{job_result.job_name}: {job_result.output}")

🔧 Configuration

Environment Variables

  • ORQ_API_KEY: API key for Orq platform integration (required for dataset access and sending results). Also enables automatic OTEL tracing to Orq.
  • ORQ_BASE_URL: Base URL for Orq platform (default: https://my.orq.ai)
  • OTEL_EXPORTER_OTLP_ENDPOINT: Custom OpenTelemetry collector endpoint (overrides default Orq endpoint)
  • OTEL_EXPORTER_OTLP_HEADERS: Headers for OTEL exporter (format: key1=value1,key2=value2)
  • ORQ_DISABLE_TRACING: Set to 1 or true to disable automatic tracing
  • ORQ_DEBUG: Enable debug logging for tracing setup

Evaluation Parameters

Parameters are validated at runtime using Pydantic. The evaluatorq function supports three calling styles:

from evaluatorq import evaluatorq, EvaluatorParams

# 1. Keyword arguments (recommended)
await evaluatorq(
    "my-eval",
    data=[...],
    jobs=[...],
    parallelism=5,
)

# 2. Dict style
await evaluatorq("my-eval", {
    "data": [...],
    "jobs": [...],
    "parallelism": 5,
})

# 3. EvaluatorParams instance
await evaluatorq("my-eval", EvaluatorParams(
    data=[...],
    jobs=[...],
    parallelism=5,
))

Parameter Reference

Parameter Type Default Description
data list[DataPoint] | list[Awaitable[DataPoint]] | DatasetIdInput required Data to evaluate
jobs list[Job] required Jobs to run on each data point
evaluators list[Evaluator] | None None Evaluators to score job outputs
parallelism int (≥1) 1 Number of concurrent jobs
print_results bool True Display progress and results table
description str | None None Optional evaluation description
path str | None None Path for organizing results on the Orq dashboard (e.g., "Project/Category")

📊 Orq Platform Integration

Automatic Result Sending

When the ORQ_API_KEY environment variable is set, evaluatorq automatically sends evaluation results to the Orq platform for visualization and analysis.

# Results are automatically sent when ORQ_API_KEY is set
await evaluatorq(
    "my-evaluation",
    data=[...],
    jobs=[...],
    evaluators=[...],
)

What Gets Sent

When the ORQ_API_KEY is set, the following information is sent to Orq:

  • Evaluation name
  • Dataset ID (when using Orq datasets)
  • Job results with outputs and errors
  • Evaluator scores with values and explanations
  • Execution timing information

Note: Evaluator explanations are included in the data sent to Orq but are not displayed in the terminal output to keep the console clean.

Result Visualization

After successful submission, you'll see a console message with a link to view your results:

📊 View your evaluation results at: <url to the evaluation>

The Orq platform provides:

  • Interactive result tables
  • Score statistics
  • Performance metrics
  • Historical comparisons

🔍 OpenTelemetry Tracing

Evaluatorq automatically creates OpenTelemetry spans for observability into your evaluation runs.

Span Hierarchy

orq.job (independent root per job execution)
└── orq.evaluation (child span per evaluator)

Auto-Enable with Orq

When ORQ_API_KEY is set, traces are automatically sent to the Orq platform:

ORQ_API_KEY=your-api-key python my_eval.py

Custom OTEL Endpoint

Send traces to any OpenTelemetry-compatible backend:

OTEL_EXPORTER_OTLP_ENDPOINT=https://your-collector:4318 \
OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer token" \
python my_eval.py

Disable Tracing

If you want to disable tracing even when ORQ_API_KEY is set:

ORQ_DISABLE_TRACING=1 python my_eval.py

✅ Pass/Fail Tracking

Evaluators can return a pass_ field to indicate pass/fail status:

async def quality_scorer(params):
    """Quality check evaluator with pass/fail."""
    output = params["output"]
    score = calculate_quality(output)

    return {
        "value": score,
        "pass_": score >= 0.8,  # Pass if meets threshold
        "explanation": f"Quality score: {score}",
    }

CI/CD Integration: When any evaluator returns pass_: False, the process exits with code 1. This enables fail-fast behavior in CI/CD pipelines.

Pass Rate Display: The summary table shows pass rate when evaluators use the pass_ field:

┌──────────────────────┬─────────────────┐
│ Pass Rate            │ 75% (3/4)       │
└──────────────────────┴─────────────────┘

🔗 LangChain Integration

Evaluatorq provides integration with LangChain and LangGraph agents, converting their outputs to the OpenResponses format for standardized evaluation.

Overview

The LangChain integration allows you to:

  • Wrap LangChain agents created with create_agent() for use in evaluatorq jobs
  • Wrap LangGraph compiled graphs for stateful agent evaluation
  • Automatically convert agent outputs to OpenResponses format
  • Evaluate agent behavior using standard evaluatorq evaluators

System Instructions

Use the instructions parameter to inject a system prompt into the agent. It can be a static string or a callable that builds instructions dynamically from the dataset row:

from evaluatorq.integrations.langchain_integration import wrap_langchain_agent

# Static instructions
agent_job = wrap_langchain_agent(
    agent,
    name="my-agent",
    instructions="You are a helpful weather assistant.",
)

# Dynamic instructions from dataset inputs
agent_job = wrap_langchain_agent(
    agent,
    name="research-agent",
    instructions=lambda data: (
        f"Research the topic: {data.inputs['topic']}. "
        f"Focus on {data.inputs['focus']}."
    ),
)

Input Modes

The wrapper reads the user input from data.inputs in three ways:

  • prompt (default): data.inputs["prompt"] — a single string, sent as one user message.
  • messages: data.inputs["messages"] — a list of {"role": ..., "content": ...} dicts, sent as-is.
  • Both: when both are present, messages are sent first, followed by prompt as a final user message.

Change the prompt key with the prompt_key parameter (e.g., prompt_key="question").

Examples

Complete examples are available in the examples folder:

Tip: Pass the instructions parameter to wrap_langchain_agent for dynamic system prompts — no need to write a custom job function.

🔴 Red Teaming External Agent Frameworks

Evaluatorq supports red teaming agents built with external frameworks. Each integration wraps your agent into a target that the red teaming pipeline can attack.

Installation

# LangGraph
pip install evaluatorq[langgraph]

# OpenAI Agents SDK
pip install evaluatorq[openai-agents]

# All extras
pip install evaluatorq[all]

LangGraph

Wrap any compiled LangGraph state graph as a red teaming target. The graph must use MessagesState (or a state with a messages key).

from langgraph.prebuilt import create_react_agent
from evaluatorq.integrations.langgraph_integration import LangGraphTarget
from evaluatorq.redteam import red_team

# Create your LangGraph agent
graph = create_react_agent(model, tools=[...])

# Wrap it as a red teaming target
target = LangGraphTarget(graph)

# Run red teaming
report = await red_team(target=target)

Conversation state is managed via LangGraph thread IDs — each attack gets a fresh thread, and clone() creates independent copies for parallel attacks.

Pass extra LangGraph config (e.g., recursion limits) via the config parameter:

target = LangGraphTarget(graph, config={"recursion_limit": 50})

LangChain Agents

LangChain agents are covered by the integrations above — no separate target is needed:

  • Agents built with create_react_agent or StateGraph (the recommended approach) run on LangGraph under the hood → use LangGraphTarget directly.
  • Custom chains or legacy AgentExecutor → wrap with CallableTarget:
from evaluatorq.integrations.callable_integration import CallableTarget

# Any LangChain chain or AgentExecutor
async def run_chain(prompt: str) -> str:
    result = await chain.ainvoke({"input": prompt})
    return result["output"]

target = CallableTarget(run_chain)

OpenAI Agents SDK

Wrap an OpenAI Agents SDK Agent as a red teaming target.

from agents import Agent
from evaluatorq.integrations.openai_agents_integration import OpenAIAgentTarget
from evaluatorq.redteam import red_team

# Create your agent
agent = Agent(name="my-agent", instructions="You are a helpful assistant.")

# Wrap it as a red teaming target
target = OpenAIAgentTarget(agent)

# Run red teaming
report = await red_team(target=target)

Conversation history is managed automatically — each attack starts with a clean history, and clone() creates copies with empty state.

Pass extra Runner.run() kwargs via run_kwargs:

target = OpenAIAgentTarget(agent, run_kwargs={"max_turns": 10})

Custom Callable (Escape Hatch)

For frameworks without a dedicated integration, wrap any function that takes a prompt and returns a response:

from evaluatorq.integrations.callable_integration import CallableTarget
from evaluatorq.redteam import red_team

# Async function
async def my_agent(prompt: str) -> str:
    result = await some_framework.run(prompt)
    return result.text

target = CallableTarget(my_agent)

# With state management
history = []

async def stateful_agent(prompt: str) -> str:
    history.append({"role": "user", "content": prompt})
    response = await my_llm.chat(history)
    history.append({"role": "assistant", "content": response})
    return response

target = CallableTarget(stateful_agent, reset_fn=lambda: history.clear())

report = await red_team(target=target)

Sync functions are also supported — they are automatically run in a thread to avoid blocking the event loop.

📚 API Reference

evaluatorq(name, params?, *, data?, jobs?, evaluators?, parallelism?, print_results?, description?) -> EvaluatorqResult

Main async function to run evaluations.

Signature:

async def evaluatorq(
    name: str,
    params: EvaluatorParams | dict[str, Any] | None = None,
    *,
    data: DatasetIdInput | Sequence[Awaitable[DataPoint] | DataPoint] | None = None,
    jobs: list[Job] | None = None,
    evaluators: list[Evaluator] | None = None,
    parallelism: int = 1,
    print_results: bool = True,
    description: str | None = None,
) -> EvaluatorqResult

Parameters:

  • name: String identifier for the evaluation run
  • params: (Optional) EvaluatorParams instance or dict with evaluation parameters
  • data: List of DataPoint objects, awaitables, or DatasetIdInput
  • jobs: List of job functions to run on each data point
  • evaluators: Optional list of evaluator configurations
  • parallelism: Number of concurrent jobs (default: 1, must be ≥1)
  • print_results: Whether to display progress and results (default: True)
  • description: Optional description for the evaluation run

Note: Parameters can be passed either via the params argument (as dict or EvaluatorParams) or as keyword arguments. Keyword arguments take precedence over params values.

Returns:

EvaluatorqResult - List of DataPointResult objects containing job outputs and evaluator scores.

Types

from typing import Any, Callable, Awaitable
from pydantic import BaseModel, Field
from typing_extensions import TypedDict

# Output type alias
Output = str | int | float | bool | dict[str, Any] | None

class DataPoint(BaseModel):
    """A data point for evaluation."""
    inputs: dict[str, Any]
    expected_output: Output | None = None

EvaluationResultCellValue = str | float | dict[str, "str | float | dict[str, str | float]"]

class EvaluationResultCell(BaseModel):
    """Structured evaluation result with multi-dimensional metrics."""
    type: str
    value: dict[str, EvaluationResultCellValue]

class EvaluationResult(BaseModel):
    """Result from an evaluator."""
    value: str | float | bool | EvaluationResultCell
    explanation: str | None = None
    pass_: bool | None = None  # Optional pass/fail indicator for CI/CD integration

class EvaluatorScore(BaseModel):
    """Score from an evaluator for a job output."""
    evaluator_name: str
    score: EvaluationResult
    error: str | None = None

class JobResult(BaseModel):
    """Result from a job execution."""
    job_name: str
    output: Output
    error: str | None = None
    evaluator_scores: list[EvaluatorScore] | None = None

class DataPointResult(BaseModel):
    """Result for a single data point."""
    data_point: DataPoint
    error: str | None = None
    job_results: list[JobResult] | None = None

# Type aliases
EvaluatorqResult = list[DataPointResult]

class DatasetIdInput(BaseModel):
    """Input for fetching a dataset from Orq platform."""
    dataset_id: str

class EvaluatorParams(BaseModel):
    """Parameters for running an evaluation (validated at runtime)."""
    data: DatasetIdInput | Sequence[Awaitable[DataPoint] | DataPoint]
    jobs: list[Job]
    evaluators: list[Evaluator] | None = None
    parallelism: int = Field(default=1, ge=1)
    print_results: bool = True
    description: str | None = None

class JobReturn(TypedDict):
    """Job return structure."""
    name: str
    output: Output

Job = Callable[[DataPoint, int], Awaitable[JobReturn]]

class ScorerParameter(TypedDict):
    """Parameters passed to scorer functions."""
    data: DataPoint
    output: Output

Scorer = Callable[[ScorerParameter], Awaitable[EvaluationResult | dict[str, Any]]]

class Evaluator(TypedDict):
    """Evaluator configuration."""
    name: str
    scorer: Scorer

# Deployment helper types
@dataclass
class DeploymentResponse:
    """Response from a deployment invocation."""
    content: str  # Text content of the response
    raw: Any      # Raw API response

# Invoke deployment and get text content
async def invoke(
    key: str,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    thread: dict[str, Any] | None = None,  # Must include 'id' key
    messages: list[dict[str, str]] | None = None,
) -> str: ...

# Invoke deployment and get full response
async def deployment(
    key: str,
    inputs: dict[str, Any] | None = None,
    context: dict[str, Any] | None = None,
    metadata: dict[str, Any] | None = None,
    thread: dict[str, Any] | None = None,  # Must include 'id' key
    messages: list[dict[str, str]] | None = None,
) -> DeploymentResponse: ...

# Built-in evaluators
def string_contains_evaluator(
    case_insensitive: bool = True,
    name: str = "string-contains",
) -> Evaluator: ...

def exact_match_evaluator(
    case_insensitive: bool = False,
    name: str = "exact-match",
) -> Evaluator: ...

🔴 Red Teaming

Evaluatorq includes a red teaming module for automated security testing of LLMs and AI agents against OWASP vulnerability categories (LLM Top 10 and Agentic Security Initiative).

Note: The built-in frameworks (OWASP LLM Top 10, OWASP ASI) and their vulnerabilities, evaluators, and attack strategies are not runtime-extendable yet. Adding custom vulnerabilities currently requires modifying the package source. A runtime registration API is planned for a future release.

Quick Start

pip install evaluatorq[redteam]

# Enable shell completion
evaluatorq --install-completion
# or
eq --install-completion

Test an LLM (OpenAI)

import asyncio
from evaluatorq.redteam import TargetConfig, red_team

report = asyncio.run(red_team(
    "llm:gpt-5-mini",
    categories=["LLM01", "LLM07"],
    max_dynamic_datapoints=5,
    max_turns=2,
    target_config=TargetConfig(
        system_prompt="You are a helpful customer support assistant."
    ),
))
print(f"Resistance rate: {report.summary.resistance_rate:.0%}")

Test an ORQ agent

# agent: targets auto-select the orq backend
report = asyncio.run(red_team(
    "agent:my-agent-key",
    categories=["LLM01", "ASI01", "ASI02"],
    max_dynamic_datapoints=5,
    max_turns=3,
))

Modes

Mode Description
dynamic Generates adversarial attacks using LLM-based strategy planning and multi-turn orchestration
static Runs a pre-built OWASP dataset for reproducible regression testing
hybrid Combines dynamic generation with a static dataset in a single run

Target Types

  • llm:<model> — Test an LLM directly via OpenAI API. Provide a system prompt via TargetConfig.
  • agent:<key> — Test an ORQ platform agent. Auto-discovers tools, memory, and system prompt.
  • deployment:<key> — Test an ORQ deployment.

agent: and deployment: targets automatically use the ORQ backend.

LLM Client Configuration

Red teaming needs an OpenAI-compatible LLM for attack generation and evaluation. OPENAI_* variables take priority over ORQ_*:

Priority Variables Description
1st OPENAI_API_KEY + OPENAI_BASE_URL (optional) Direct OpenAI or any compatible endpoint
2nd ORQ_API_KEY + ORQ_BASE_URL (optional) ORQ router

Or pass a custom client: red_team(..., llm_client=AsyncOpenAI(api_key="sk-...")).

Parameters

Parameter Type Default Description
target str | list[str] required Target identifier(s)
mode str "dynamic" "dynamic", "static", or "hybrid"
categories list[str] | None all OWASP categories (e.g. ["ASI01", "LLM07"])
max_turns int 5 Max conversation turns per attack
max_dynamic_datapoints int | None None Cap generated attack datapoints
attack_model str "gpt-5-mini" Model for adversarial prompt generation
evaluator_model str "gpt-5-mini" Model for evaluation scoring
parallelism int 5 Max concurrent jobs
name str | None "red-team" Experiment name
backend str "openai" "openai" or "orq" (auto-detected for agent targets)
llm_client AsyncOpenAI | None None Custom LLM client
dataset_path str | None None Path to local static dataset

CLI

# Show all options
eq redteam run --help

# Test an LLM with a system prompt
eq redteam run -t "llm:gpt-5-mini" \
  --system-prompt "You are a helpful assistant." \
  -c LLM01 -c LLM07 --max-turns 2 --max-dynamic-datapoints 5 -y

# Test an ORQ agent
eq redteam run -t "agent:my-agent-key" \
  -c ASI01 -c LLM07 --max-turns 3 -y

# Multi-target comparison
eq redteam run -t "llm:gpt-5-mini" -t "llm:gpt-4o" \
  -c LLM07 --max-turns 2 --max-dynamic-datapoints 3 -y

# Export reports
eq redteam run -t "llm:gpt-5-mini" \
  --save-report report.json --export-md ./reports --export-html ./reports -y

# List previous runs
eq redteam runs

See examples/redteam/ for complete Python examples covering both backends.

🛠️ Development

# Install dependencies
uv sync

# Run type checking
uv run basedpyright

# Format code
uv run ruff format

# Lint code
uv run ruff check

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

evaluatorq-1.3.0.tar.gz (541.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

evaluatorq-1.3.0-py3-none-any.whl (353.7 kB view details)

Uploaded Python 3

File details

Details for the file evaluatorq-1.3.0.tar.gz.

File metadata

  • Download URL: evaluatorq-1.3.0.tar.gz
  • Upload date:
  • Size: 541.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for evaluatorq-1.3.0.tar.gz
Algorithm Hash digest
SHA256 ecee0bfdea99775f9e8624396ec875f2a6f4b121b83f60db9fdae3484f00e7b6
MD5 35db588878dfcf066625cf5c641750ad
BLAKE2b-256 6fb614df548cc40f10591b6db5d4349a68407d2d076343602cac5303e2764e46

See more details on using hashes here.

File details

Details for the file evaluatorq-1.3.0-py3-none-any.whl.

File metadata

  • Download URL: evaluatorq-1.3.0-py3-none-any.whl
  • Upload date:
  • Size: 353.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: uv/0.11.8 {"installer":{"name":"uv","version":"0.11.8","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for evaluatorq-1.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 35f20ac36269018acf4894b107cb626b052cf54fb8eaef3b55804ceb4cc0bf81
MD5 dbb966af9452d418931904db1e43e8d2
BLAKE2b-256 c1c785dd141721ee7c6785afc79f7d794378348f9698efd5c0da0e491c63015b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page