From 2b9e725a074357f7bb3c3b91ca126e4f0260f940 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 10 Feb 2026 08:36:46 -0500 Subject: [PATCH 1/9] Add evaluate script --- .../aieng/agent_evals/knowledge_qa/judges.py | 762 ++++++++++++++++++ .../aieng/agent_evals/tools/web.py | 59 +- implementations/knowledge_qa/evaluate.py | 408 ++++++++++ 3 files changed, 1216 insertions(+), 13 deletions(-) create mode 100644 aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py create mode 100644 implementations/knowledge_qa/evaluate.py diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py new file mode 100644 index 0000000..654927d --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py @@ -0,0 +1,762 @@ +"""LLM-as-judge evaluators for knowledge agent responses. + +This module provides comprehensive evaluation using LLM judges across +multiple dimensions: comprehensiveness, causal reasoning, exhaustiveness, +source quality, and plan quality. +""" + +import json +import logging +from typing import TYPE_CHECKING, Any + +from aieng.agent_evals.configs import Configs +from google import genai +from google.genai import types +from pydantic import BaseModel, Field + + +if TYPE_CHECKING: + from langfuse.experiment import Evaluation + + +logger = logging.getLogger(__name__) + + +class JudgeResult(BaseModel): + """Result from an LLM judge evaluation. + + Attributes + ---------- + dimension : str + The evaluation dimension (e.g., "comprehensiveness", "causal_chain"). + score : float + Score on a 1-5 scale. + explanation : str + Detailed explanation of the score. + evidence : list[str] + Specific examples supporting the score. + """ + + dimension: str + score: float # 1-5 scale + explanation: str = "" + evidence: list[str] = Field(default_factory=list) + + +def _parse_judge_response(response_text: str, dimension: str) -> JudgeResult: + """Parse a judge's response into a JudgeResult. + + Parameters + ---------- + response_text : str + Raw response from the judge LLM. + dimension : str + The dimension being evaluated. + + Returns + ------- + JudgeResult + Parsed result. + """ + try: + text = response_text.strip() + + # Handle markdown code blocks + if "```json" in text: + start = text.find("```json") + 7 + end = text.find("```", start) + text = text[start:end].strip() + elif "```" in text: + start = text.find("```") + 3 + end = text.find("```", start) + text = text[start:end].strip() + + data = json.loads(text) + + return JudgeResult( + dimension=dimension, + score=float(data.get("score", 3)), + explanation=data.get("explanation", ""), + evidence=data.get("evidence", []), + ) + + except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e: + logger.warning(f"Failed to parse judge response for {dimension}: {e}") + return JudgeResult( + dimension=dimension, + score=3.0, # Default middle score + explanation=f"Parse error: {e}. Raw response: {response_text[:200]}", + evidence=[], + ) + + +class BaseJudge: + """Base class for LLM-as-judge evaluators. + + Parameters + ---------- + config : Configs, optional + Configuration settings. + model : str, optional + Model to use for judging. Defaults to planner model. + """ + + dimension: str = "base" + system_prompt: str = "" + + def __init__( + self, + config: "Configs | None" = None, + model: str | None = None, + ) -> None: + """Initialize the judge.""" + # Load config from environment if not provided + if config is None: + config = Configs() # type: ignore[call-arg] + self._config = config + + if model is not None: + self._model = model + else: + self._model = config.default_evaluator_model + + self._client = genai.Client() + self._temperature = config.default_evaluator_temperature + + def _call_llm(self, prompt: str) -> str: + """Call the LLM with the given prompt.""" + response = self._client.models.generate_content( + model=self._model, + contents=types.Content( + role="user", + parts=[types.Part(text=prompt)], + ), + config=types.GenerateContentConfig( + system_instruction=self.system_prompt, + temperature=self._temperature, + ), + ) + return response.text or "" + + async def _call_llm_async(self, prompt: str) -> str: + """Call the LLM asynchronously.""" + response = await self._client.aio.models.generate_content( + model=self._model, + contents=types.Content( + role="user", + parts=[types.Part(text=prompt)], + ), + config=types.GenerateContentConfig( + system_instruction=self.system_prompt, + temperature=self._temperature, + ), + ) + return response.text or "" + + def close(self) -> None: + """Close the judge's client to clean up resources.""" + if hasattr(self, "_client") and self._client is not None: + self._client.close() + + +class DeepSearchQAResult(BaseModel): + """Result from DeepSearchQA evaluation with IR metrics. + + This follows the official DeepSearchQA evaluation methodology from: + https://www.kaggle.com/benchmarks/google/dsqa + + Attributes + ---------- + precision : float + Fraction of predicted items that are correct (0-1). + P = |S ∩ G| / |S| + recall : float + Fraction of ground truth items that were found (0-1). + R = |S ∩ G| / |G| + f1_score : float + Harmonic mean of precision and recall (0-1). + F1 = 2 * P * R / (P + R) + outcome : str + One of: "fully_correct", "correct_with_extraneous", + "partially_correct", "fully_incorrect". + correctness_details : dict[str, bool] + For each ground truth item, whether it was found in the response. + extraneous_items : list[str] + Items in the response that are not in the ground truth. + explanation : str + Explanation from the judge about the evaluation. + """ + + precision: float = 0.0 + recall: float = 0.0 + f1_score: float = 0.0 + outcome: str = "fully_incorrect" + correctness_details: dict[str, bool] = Field(default_factory=dict) + extraneous_items: list[str] = Field(default_factory=list) + explanation: str = "" + + def to_evaluations(self) -> list["Evaluation"]: + """Convert this result to Langfuse Evaluation objects. + + Returns + ------- + list[Evaluation] + Four evaluations: Outcome (categorical), F1, Precision, Recall (numeric). + """ + from langfuse.experiment import Evaluation # noqa: PLC0415 + + comment_parts = [ + f"Outcome: {self.outcome}", + f"Precision: {self.precision:.2f}", + f"Recall: {self.recall:.2f}", + f"F1: {self.f1_score:.2f}", + ] + + if self.explanation: + comment_parts.append(f"\nExplanation: {self.explanation}") + + if self.correctness_details: + found = sum(1 for v in self.correctness_details.values() if v) + total = len(self.correctness_details) + comment_parts.append(f"\nCorrectness: {found}/{total} items found") + + if self.extraneous_items: + comment_parts.append(f"\nExtraneous: {len(self.extraneous_items)} items") + + comment = "\n".join(comment_parts) + + outcome_display = { + "fully_correct": "Fully Correct", + "correct_with_extraneous": "Correct with Extraneous", + "partially_correct": "Partially Correct", + "fully_incorrect": "Fully Incorrect", + } + + return [ + Evaluation( + name="Outcome", + value=outcome_display.get(self.outcome, self.outcome), + comment=self.explanation, + ), + Evaluation(name="F1", value=self.f1_score, comment=comment), + Evaluation(name="Precision", value=self.precision, comment=comment), + Evaluation(name="Recall", value=self.recall, comment=comment), + ] + + @staticmethod + def error_evaluations(error_msg: str) -> list["Evaluation"]: + """Create error evaluations when evaluation fails. + + Parameters + ---------- + error_msg : str + Description of the error that occurred. + + Returns + ------- + list[Evaluation] + Three evaluations (F1, Precision, Recall) all set to 0.0. + """ + from langfuse.experiment import Evaluation # noqa: PLC0415 + + comment = f"Evaluation error: {error_msg}" + return [ + Evaluation(name="F1", value=0.0, comment=comment), + Evaluation(name="Precision", value=0.0, comment=comment), + Evaluation(name="Recall", value=0.0, comment=comment), + ] + + +# Official DeepSearchQA grader prompt from Appendix A of the paper +DEEPSEARCHQA_GRADER_PROMPT = """\ +Your task is to evaluate whether a given "AI Response" for a specific "User Prompt" +arrived at the correct answer. + +**Answer Correctness Task** +* **Purpose:** Assess whether the AI response provides the correct answer(s) based on +the provided "Correct Answer" and "Prompt Type". +* **Process:** + * Identify the "Prompt Type": "{prompt_type}". + * Refer to the "Correct Answer": "{answer}". + * Based on the "Prompt Type", determine if the "AI Response" contains the expected +answer(s). + * **'Single Answer'**: Check if the response provides the answer that addresses +the user's question. It does not have to match the exact wording of the provided +answer. + * **'Set Answer'**: Check if the response includes *each* item from the provided +ground truth answers. The order might not matter unless specified otherwise. The +response might include more answers than the list. Determine the correctness * +only* based on the list first and then check if the response includes answers not +in the list. +* **Explanation:** Provide a brief explanation justifying your assessment of answer +correctness, referencing specific parts of the AI response and the correct answer. +* **Correctness Details:** Provide a dictionary, one key for each expected answer +part, and value is a boolean indicating whether each expected answer part was found. + * For 'Set Answer', this will be a list of attributes, one for each item/part in +the "Correct Answer". Each key will be a string indicating the expected answer +part, and the value will be a boolean indicating whether that part was found in +the response. +* **Excessive Answers:** Provide a list of strings, each indicating an excessive +answer part. If the response provides answers that are **not** in the "Correct Answer +" list, add these answers as excessive answers. Return an empty list when there's no +excessive answers in the response. + +**Output Format:** +Your evaluation *must* be structured as a nested JSON dictionary with the following top- +level keys: '"Answer Correctness"'. Please return NULL if any of "Prompt", "AI Response" +or "Correct Answer" is empty. +The value for '"Answer Correctness"' should be a dictionary containing '"Explanation"' ( +a string), '"Correctness Details"' (a dictionary where each key is the expected correct +answer, and the value is a boolean indicating whether the response contains the correct +answer), and '"Excessive Answers"' (a list of strings indicating the excessive answers). + +Make sure you return a valid JSON string. Pay special attention to quotes, commas and +special characters in the JSON string. Make sure to escape all special characters and +quotes in the JSON string. + +**Example (Partial):** +```json +{{ + "Answer Correctness": {{ + "Explanation": "The response correctly identified Belgium and France but also +includes an excessive answer, Italy.", + "Correctness Details": {{ + "Belgium": true, + "France": true + }}, + "Excessive Answers": [ "Italy" ] + }} +}} +``` + +**Now, proceed with the evaluation using the provided User Prompt, AI Response, and +Correct Answer.** + +User Prompt (Wrapped in and ): + +{prompt} + +-------------------- +** Correct Answer (Wrapped in and ): +Prompt Type: {prompt_type} + +{answer} + +-------------------- +AI assistant response (Wrapped in and ): + +{response} + +-------------------- +Rating: +""" + + +class DeepSearchQAJudge(BaseJudge): + """Official DeepSearchQA evaluation using precision, recall, and F1. + + This judge implements the exact evaluation methodology from the DeepSearchQA + benchmark paper (Appendix A). The LLM autorater determines semantic equivalence + of answers, then precision/recall/F1 are calculated programmatically. + + The benchmark distinguishes between four disjoint categories: + 1. Fully Correct (S=G): All ground truth items present, no extraneous items + 2. Fully Incorrect (S∩G=∅): Zero correct items found + 3. Partially Correct: Some but not all ground truth items found + 4. Correct with Extraneous (G⊂S): All ground truth found but has extra items + + Metrics: + - Precision: P = |S∩G| / |S| (accuracy of submitted items) + - Recall: R = |S∩G| / |G| (exhaustiveness against ground truth) + - F1 Score: F1 = 2*P*R / (P+R) (primary ranking metric) + + References + ---------- + - Paper: DeepSearchQA: Bridging the Comprehensiveness Gap for Deep Research Agents + - Dataset: https://huggingface.co/datasets/google/deepsearchqa + - Leaderboard: https://www.kaggle.com/benchmarks/google/dsqa + """ + + dimension = "deepsearchqa" + system_prompt = "" # Not used - we use the full grader prompt directly + + def _call_grader( + self, + prompt: str, + response: str, + answer: str, + prompt_type: str, + ) -> dict[str, Any]: + """Call the LLM grader using the official DeepSearchQA prompt. + + Parameters + ---------- + prompt : str + The original question/prompt. + response : str + The AI response to evaluate. + answer : str + The ground truth answer. + prompt_type : str + "Single Answer" or "Set Answer". + + Returns + ------- + dict + Parsed grader response with Correctness Details and Excessive Answers. + """ + grader_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( + prompt=prompt, + response=response, + answer=answer, + prompt_type=prompt_type, + ) + + try: + llm_response = self._client.models.generate_content( + model=self._model, + contents=types.Content( + role="user", + parts=[types.Part(text=grader_prompt)], + ), + config=types.GenerateContentConfig( + temperature=self._temperature, + ), + ) + response_text = (llm_response.text or "").strip() + + # Parse JSON from response + if "```json" in response_text: + start = response_text.find("```json") + 7 + end = response_text.find("```", start) + response_text = response_text[start:end].strip() + elif "```" in response_text: + start = response_text.find("```") + 3 + end = response_text.find("```", start) + response_text = response_text[start:end].strip() + + data = json.loads(response_text) + return data.get("Answer Correctness", {}) + + except Exception as e: + logger.warning(f"Failed to call grader: {e}") + return { + "Explanation": f"Grader error: {e}", + "Correctness Details": {}, + "Excessive Answers": [], + } + + def _calculate_metrics_from_grader( + self, + grader_result: dict[str, Any], + ) -> DeepSearchQAResult: + """Calculate precision, recall, F1 from grader output. + + This follows the exact methodology from the paper: + - Precision = |S∩G| / |S| + - Recall = |S∩G| / |G| + - F1 = 2*P*R / (P+R) + + Parameters + ---------- + grader_result : dict + Output from the LLM grader with Correctness Details and Excessive Answers. + + Returns + ------- + DeepSearchQAResult + Computed metrics and classifications. + """ + correctness_details = grader_result.get("Correctness Details", {}) + extraneous_items = grader_result.get("Excessive Answers", []) + explanation = grader_result.get("Explanation", "") + + # Count matched ground truth items + num_ground_truth = len(correctness_details) + num_matched = sum(1 for v in correctness_details.values() if v) + num_extraneous = len(extraneous_items) + + # Total predicted items = matched + extraneous + num_predicted = num_matched + num_extraneous + + # Calculate metrics + precision = num_matched / num_predicted if num_predicted > 0 else 0.0 + recall = num_matched / num_ground_truth if num_ground_truth > 0 else 1.0 + f1_score = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0 + + # Determine outcome based on set relationships + if num_matched == num_ground_truth and num_extraneous == 0: + outcome = "fully_correct" + elif num_matched == num_ground_truth and num_extraneous > 0: + outcome = "correct_with_extraneous" + elif num_matched > 0: + outcome = "partially_correct" + else: + outcome = "fully_incorrect" + + return DeepSearchQAResult( + precision=precision, + recall=recall, + f1_score=f1_score, + outcome=outcome, + correctness_details=correctness_details, + extraneous_items=extraneous_items, + explanation=explanation, + ) + + def evaluate( + self, + question: str, + answer: str, + ground_truth: str, + answer_type: str = "Single Answer", + ) -> JudgeResult: + """Evaluate an answer using DeepSearchQA methodology. + + Parameters + ---------- + question : str + The original question. + answer : str + The agent's answer. + ground_truth : str + The expected ground truth answer. + answer_type : str + Type of answer: "Single Answer" or "Set Answer". + + Returns + ------- + JudgeResult + The evaluation result with precision, recall, and F1 in evidence. + """ + logger.info(f"Evaluating answer for: {question[:100]}...") + + # Call the grader + grader_result = self._call_grader( + prompt=question, + response=answer, + answer=ground_truth, + prompt_type=answer_type, + ) + + # Calculate metrics from grader output + result = self._calculate_metrics_from_grader(grader_result) + + # Convert F1 to 1-5 scale for consistency with other judges + score = 1 + (result.f1_score * 4) # Maps 0-1 to 1-5 + + return JudgeResult( + dimension=self.dimension, + score=score, + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}. {result.explanation}", + evidence=[ + f"Precision: {result.precision:.2f}", + f"Recall: {result.recall:.2f}", + f"F1 Score: {result.f1_score:.2f}", + f"Outcome: {result.outcome}", + f"Correctness: {result.correctness_details}", + f"Extraneous: {result.extraneous_items}", + ], + ) + + async def evaluate_async( + self, + question: str, + answer: str, + ground_truth: str, + answer_type: str = "Single Answer", + ) -> JudgeResult: + """Async version of evaluate.""" + logger.info(f"Evaluating answer (async) for: {question[:100]}...") + + # Build the grader prompt + grader_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( + prompt=question, + response=answer, + answer=ground_truth, + prompt_type=answer_type, + ) + + try: + llm_response = await self._client.aio.models.generate_content( + model=self._model, + contents=types.Content( + role="user", + parts=[types.Part(text=grader_prompt)], + ), + config=types.GenerateContentConfig( + temperature=self._temperature, + ), + ) + response_text = (llm_response.text or "").strip() + + # Parse JSON + if "```json" in response_text: + start = response_text.find("```json") + 7 + end = response_text.find("```", start) + response_text = response_text[start:end].strip() + elif "```" in response_text: + start = response_text.find("```") + 3 + end = response_text.find("```", start) + response_text = response_text[start:end].strip() + + data = json.loads(response_text) + grader_result = data.get("Answer Correctness", {}) + + except Exception as e: + logger.warning(f"Failed to call grader async: {e}") + grader_result = { + "Explanation": f"Grader error: {e}", + "Correctness Details": {}, + "Excessive Answers": [], + } + + result = self._calculate_metrics_from_grader(grader_result) + score = 1 + (result.f1_score * 4) + + return JudgeResult( + dimension=self.dimension, + score=score, + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}. {result.explanation}", + evidence=[ + f"Precision: {result.precision:.2f}", + f"Recall: {result.recall:.2f}", + f"F1 Score: {result.f1_score:.2f}", + f"Outcome: {result.outcome}", + f"Correctness: {result.correctness_details}", + f"Extraneous: {result.extraneous_items}", + ], + ) + + def evaluate_with_details( + self, + question: str, + answer: str, + ground_truth: str, + answer_type: str = "Single Answer", + ) -> tuple[JudgeResult, DeepSearchQAResult]: + """Evaluate and return both JudgeResult and detailed DeepSearchQAResult. + + Parameters + ---------- + question : str + The original question. + answer : str + The agent's answer. + ground_truth : str + The expected ground truth answer. + answer_type : str + Type of answer. + + Returns + ------- + tuple[JudgeResult, DeepSearchQAResult] + Both the standard judge result and detailed metrics. + """ + grader_result = self._call_grader( + prompt=question, + response=answer, + answer=ground_truth, + prompt_type=answer_type, + ) + + result = self._calculate_metrics_from_grader(grader_result) + score = 1 + (result.f1_score * 4) + + judge_result = JudgeResult( + dimension=self.dimension, + score=score, + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}", + evidence=[ + f"Precision: {result.precision:.2f}", + f"Recall: {result.recall:.2f}", + f"F1 Score: {result.f1_score:.2f}", + f"Outcome: {result.outcome}", + ], + ) + + return judge_result, result + + async def evaluate_with_details_async( + self, + question: str, + answer: str, + ground_truth: str, + answer_type: str = "Single Answer", + ) -> tuple[JudgeResult, DeepSearchQAResult]: + """Async version of evaluate_with_details. + + Parameters + ---------- + question : str + The original question. + answer : str + The agent's answer. + ground_truth : str + The expected ground truth answer. + answer_type : str + Type of answer. + + Returns + ------- + tuple[JudgeResult, DeepSearchQAResult] + Both the standard judge result and detailed metrics. + """ + # Build the grader prompt + grader_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( + prompt=question, + response=answer, + answer=ground_truth, + prompt_type=answer_type, + ) + + try: + llm_response = await self._client.aio.models.generate_content( + model=self._model, + contents=types.Content( + role="user", + parts=[types.Part(text=grader_prompt)], + ), + config=types.GenerateContentConfig( + temperature=self._temperature, + ), + ) + response_text = (llm_response.text or "").strip() + + # Parse JSON + if "```json" in response_text: + start = response_text.find("```json") + 7 + end = response_text.find("```", start) + response_text = response_text[start:end].strip() + elif "```" in response_text: + start = response_text.find("```") + 3 + end = response_text.find("```", start) + response_text = response_text[start:end].strip() + + data = json.loads(response_text) + grader_result = data.get("Answer Correctness", {}) + + except Exception as e: + logger.warning(f"Failed to call grader async: {e}") + grader_result = { + "Explanation": f"Grader error: {e}", + "Correctness Details": {}, + "Excessive Answers": [], + } + + result = self._calculate_metrics_from_grader(grader_result) + score = 1 + (result.f1_score * 4) + + judge_result = JudgeResult( + dimension=self.dimension, + score=score, + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}", + evidence=[ + f"Precision: {result.precision:.2f}", + f"Recall: {result.recall:.2f}", + f"F1 Score: {result.f1_score:.2f}", + f"Outcome: {result.outcome}", + ], + ) + + return judge_result, result diff --git a/aieng-eval-agents/aieng/agent_evals/tools/web.py b/aieng-eval-agents/aieng/agent_evals/tools/web.py index 1c99d44..d1eb99b 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/web.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/web.py @@ -15,7 +15,7 @@ from google.adk.tools.function_tool import FunctionTool from html_to_markdown import convert as html_to_markdown from pypdf import PdfReader -from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential +from tenacity import RetryError, retry, retry_if_exception_type, stop_after_attempt, wait_exponential logger = logging.getLogger(__name__) @@ -157,13 +157,53 @@ def _make_success_response(url: str, content: str, content_type: str, truncated: return result +def _handle_fetch_error(e: Exception, url: str) -> dict[str, Any]: + """Handle exceptions from web_fetch and return appropriate error response. + + Parameters + ---------- + e : Exception + The exception that occurred during fetching. + url : str + The URL that was being fetched. + + Returns + ------- + dict + Error response with 'status', 'error', and 'url' fields. + """ + if isinstance(e, httpx.HTTPStatusError): + logger.warning(f"HTTP error fetching {url}: {e}") + return _make_error_response(f"HTTP {e.response.status_code}: {e.response.reason_phrase}", url) + + if isinstance(e, httpx.RequestError): + logger.warning(f"Request error fetching {url}: {e}") + return _make_error_response(f"Request failed: {e!s}", url) + + if isinstance(e, RetryError): + # Extract the underlying error from retry failure + # without showing full stack trace + original_error = e.last_attempt.exception() + if isinstance(original_error, httpx.HTTPStatusError): + logger.error(f"HTTP error fetching {url} (after 3 retries): {original_error}") + error_msg = f"HTTP {original_error.response.status_code}: {original_error.response.reason_phrase} (failed after 3 retries)" + else: + logger.error(f"Failed to fetch {url} after 3 retries: {original_error}") + error_msg = f"Failed after 3 retries: {original_error!s}" + return _make_error_response(error_msg, url) + + logger.error(f"Unexpected error in web_fetch for {url}: {e}") + return _make_error_response(f"Unexpected error: {e!s}", url) + + async def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: - """Fetch content from a URL (HTML page or PDF document). + """Fetch HTML pages and PDFs. Use this for ANY PDF URL. - This tool retrieves the full content from a URL for analysis. It handles - both HTML pages (converted to readable text) and PDF documents (text extracted). + Retrieves complete content from URLs including HTML pages (converted to text) + and PDF documents (text extracted). Returns full content ready to analyze. - For large data files (CSV, XLSX) that need searching, use fetch_file instead. + For data files like CSV or XLSX that need line-by-line searching, + use fetch_file instead. Parameters ---------- @@ -213,15 +253,8 @@ async def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: return _make_success_response(final_url, text, content_type or "text/html", truncated) - except httpx.HTTPStatusError as e: - logger.warning(f"HTTP error fetching {url}: {e}") - return _make_error_response(f"HTTP {e.response.status_code}: {e.response.reason_phrase}", url) - except httpx.RequestError as e: - logger.warning(f"Request error fetching {url}: {e}") - return _make_error_response(f"Request failed: {e!s}", url) except Exception as e: - logger.exception(f"Unexpected error in web_fetch for {url}") - return _make_error_response(f"Unexpected error: {e!s}", url) + return _handle_fetch_error(e, url) def _handle_pdf_response(content: bytes, max_pages: int, final_url: str, url: str) -> dict[str, Any]: diff --git a/implementations/knowledge_qa/evaluate.py b/implementations/knowledge_qa/evaluate.py new file mode 100644 index 0000000..8b94918 --- /dev/null +++ b/implementations/knowledge_qa/evaluate.py @@ -0,0 +1,408 @@ +"""Evaluate the Knowledge Agent using Langfuse experiments. + +This script runs the Knowledge Agent against a Langfuse dataset and evaluates +results using the DeepSearchQA LLM-as-judge methodology. Results are automatically +logged to Langfuse for analysis and comparison. + +Usage: + # Run a full evaluation + python evaluate.py + + # Run with custom dataset and experiment name + python evaluate.py --dataset-name "MyDataset" --experiment-name "v2-test" + + # Resume an interrupted evaluation (skips already-evaluated items) + python evaluate.py --experiment-name "v2-test" --resume + +Resume Feature: + Use --resume to continue an interrupted evaluation. The script will: + 1. Check Langfuse for traces with the specified experiment name + 2. Identify items that already have evaluation scores + 3. Skip those items and only evaluate the remaining ones + + Important: Use the SAME --experiment-name as the previous run to ensure + proper resumption. +""" + +import asyncio +import logging +from typing import Any + +import click +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.evaluation import run_experiment +from aieng.agent_evals.knowledge_qa.agent import KnowledgeGroundedAgent +from aieng.agent_evals.knowledge_qa.judges import DeepSearchQAJudge, DeepSearchQAResult +from aieng.agent_evals.logging_config import setup_logging +from dotenv import load_dotenv +from langfuse._client.datasets import DatasetItemClient +from langfuse.experiment import Evaluation + + +load_dotenv(verbose=True) +setup_logging(level=logging.INFO, show_time=True, show_path=False) +logger = logging.getLogger(__name__) + + +DEFAULT_DATASET_NAME = "DeepSearchQA-Subset" +DEFAULT_EXPERIMENT_NAME = "Knowledge Agent Evaluation" + +# Module-level lazy judge instance +_judge: DeepSearchQAJudge | None = None + + +def _get_judge() -> DeepSearchQAJudge: + """Get or create the shared DeepSearchQA Judge instance.""" + global _judge # noqa: PLW0603 + if _judge is None: + _judge = DeepSearchQAJudge() + return _judge + + +def _close_judge() -> None: + """Close the shared judge instance to clean up resources.""" + global _judge # noqa: PLW0603 + if _judge is not None: + _judge.close() + _judge = None + + +async def agent_task(*, item: Any, **kwargs: Any) -> dict[str, Any]: # noqa: ARG001 + """Run the Knowledge Agent on a dataset item. + + This is the task function used by the evaluation harness. + + Parameters + ---------- + item : Any + The Langfuse experiment item containing the question. + **kwargs : Any + Additional arguments from the harness (unused). + + Returns + ------- + dict[str, Any] + Dictionary containing 'text' (response) and 'agent_response' + (full response object). + """ + question = item.input + logger.info(f"Running agent on: {question[:80]}...") + + try: + # Create a fresh agent for each task to avoid shared state issues + agent = KnowledgeGroundedAgent(enable_planning=True) + response = await agent.answer_async(question) + logger.info(f"Agent completed: {len(response.text)} chars, {len(response.tool_calls)} tool calls") + + return { + "text": response.text, + "agent_response": response, + } + except Exception as e: + logger.error(f"Agent failed: {e}") + return {"text": f"Error: {e}", "agent_response": None} + + +async def deepsearchqa_evaluator( + *, + input: str, # noqa: A002 + output: dict[str, Any], + expected_output: str, + metadata: dict[str, Any] | None = None, + **kwargs: Any, # noqa: ARG001 +) -> list[Evaluation]: + """Evaluate the agent's response using DeepSearchQA methodology. + + This is the evaluator function used by the evaluation harness. + + Parameters + ---------- + input : str + The original question. + output : dict[str, Any] + Dictionary containing 'text' and 'agent_response'. + expected_output : str + The ground truth answer. + metadata : dict[str, Any] | None, optional + Item metadata (contains answer_type). + **kwargs : Any + Additional arguments from the harness (unused). + + Returns + ------- + list[Evaluation] + List of Langfuse Evaluations with F1, precision, recall, and outcome scores. + """ + output_text = output.get("text", "") if isinstance(output, dict) else str(output) + answer_type = metadata.get("answer_type", "Set Answer") if metadata else "Set Answer" + + logger.info(f"Evaluating response (answer_type: {answer_type})...") + + try: + judge = _get_judge() + _, result = await judge.evaluate_with_details_async( + question=input, + answer=output_text, + ground_truth=expected_output, + answer_type=answer_type, + ) + + evaluations = result.to_evaluations() + logger.info(f"Evaluation complete: {result.outcome} (F1: {result.f1_score:.2f})") + return evaluations + + except Exception as e: + logger.error(f"Evaluation failed: {e}") + return DeepSearchQAResult.error_evaluations(str(e)) + + +def get_completed_item_ids(langfuse: Any, run_name: str, dataset_id: str) -> set[str]: + """Get dataset item IDs that have already been evaluated in a run. + + Parameters + ---------- + langfuse : Langfuse + The Langfuse client. + run_name : str + Name of the experiment run. + dataset_id : str + ID of the dataset. + + Returns + ------- + set[str] + Set of completed dataset item IDs. + """ + try: + logger.info(f"Checking for existing evaluations in run '{run_name}'...") + completed_ids = set() + page = 1 + limit = 50 + + while True: + run_items_response = langfuse.api.dataset_run_items.list( + dataset_id=dataset_id, + run_name=run_name, + limit=limit, + page=page, + ) + + if not run_items_response.data: + break + + for run_item in run_items_response.data: + if run_item.trace_id and _has_evaluation_scores(langfuse, run_item.trace_id): + completed_ids.add(run_item.dataset_item_id) + logger.debug(f"Found completed evaluation for item {run_item.dataset_item_id}") + + if len(run_items_response.data) < limit: + break + + page += 1 + + logger.info(f"Found {len(completed_ids)} completed evaluations") + return completed_ids + + except Exception as e: + logger.warning(f"Failed to fetch existing evaluations: {e}") + logger.info("Will process all items") + return set() + + +def _has_evaluation_scores(langfuse: Any, trace_id: str) -> bool: + """Check if a trace has evaluation scores.""" + try: + trace = langfuse.api.trace.get(trace_id) + return hasattr(trace, "scores") and bool(trace.scores) + except Exception as e: + logger.debug(f"Could not fetch trace {trace_id}: {e}") + return False + + +async def _process_item_with_resume( + item: DatasetItemClient, + run_name: str, +) -> None: + """Process a single dataset item with Langfuse trace linking (for resume mode). + + Uses ``item.run()`` to create a proper dataset-run-item link so that + traces appear under the existing experiment run in the Langfuse UI. + + Parameters + ---------- + item : DatasetItemClient + The dataset item to process. + run_name : str + The experiment run name for trace linking. + """ + try: + with item.run(run_name=run_name) as root_span: + logger.info(f"Processing item {item.id}: {item.input[:80]}...") + + agent_output = await agent_task(item=item) + + answer_type = item.metadata.get("answer_type", "Set Answer") if item.metadata else "Set Answer" + evaluations = await deepsearchqa_evaluator( + input=item.input, + output=agent_output, + expected_output=item.expected_output or "", + metadata={"answer_type": answer_type}, + ) + + for evaluation in evaluations: + root_span.score_trace( + name=evaluation.name, + value=evaluation.value, + comment=evaluation.comment, + ) + + logger.info(f"Item {item.id} complete with {len(evaluations)} evaluations") + + except Exception as e: + logger.error(f"Item {item.id} failed: {e}") + + +async def _process_items_for_resume( + items: list[DatasetItemClient], + run_name: str, + max_concurrency: int, +) -> None: + """Process multiple items concurrently for resume mode. + + Parameters + ---------- + items : list[DatasetItemClient] + Items to process. + run_name : str + The experiment run name. + max_concurrency : int + Maximum concurrent tasks. + """ + semaphore = asyncio.Semaphore(max_concurrency) + + async def process_with_limit(item: DatasetItemClient) -> None: + async with semaphore: + await _process_item_with_resume(item, run_name) + + tasks = [process_with_limit(item) for item in items] + await asyncio.gather(*tasks, return_exceptions=True) + + logger.info("Resume processing complete!") + + +async def run_evaluation( + dataset_name: str, + experiment_name: str, + max_concurrency: int = 1, + resume: bool = False, +) -> None: + """Run the full evaluation experiment. + + Parameters + ---------- + dataset_name : str + Name of the Langfuse dataset to evaluate against. + experiment_name : str + Name for this experiment run. + max_concurrency : int, optional + Maximum concurrent agent runs, by default 1. + resume : bool, optional + If True, skip items that have already been evaluated, by default False. + """ + client_manager = AsyncClientManager.get_instance() + langfuse = client_manager.langfuse_client + + try: + if resume: + # Resume: fetch dataset, filter completed items, run remaining + logger.info(f"Loading dataset '{dataset_name}' from Langfuse...") + try: + dataset = langfuse.get_dataset(dataset_name) + except Exception as e: + logger.error(f"Failed to load dataset: {e}") + logger.info("Run the dataset upload script first to create the dataset.") + return + + logger.info(f"Found dataset with {len(dataset.items)} items") + + completed_ids = get_completed_item_ids(langfuse, experiment_name, dataset.id) + items_to_process = [item for item in dataset.items if item.id not in completed_ids] + + if not items_to_process: + logger.info("All items already evaluated. Nothing to do.") + return + + logger.info(f"Resume mode: Processing {len(items_to_process)} remaining items") + logger.info(f"Starting experiment: {experiment_name}") + logger.info(f"Max concurrency: {max_concurrency}") + + await _process_items_for_resume( + items=items_to_process, + run_name=experiment_name, + max_concurrency=max_concurrency, + ) + else: + # Normal mode: use the evaluation harness (fetches dataset internally) + logger.info(f"Starting experiment: {experiment_name}") + logger.info(f"Max concurrency: {max_concurrency}") + + result = run_experiment( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge", + task=agent_task, + evaluators=[deepsearchqa_evaluator], + max_concurrency=max_concurrency, + ) + + logger.info("Experiment complete!") + logger.info(result.format().replace("\\n", "\n")) + + finally: + # Cleanup + logger.info("Closing client manager and flushing data...") + try: + _close_judge() + await client_manager.close() + # Give event loop time to process cleanup tasks + await asyncio.sleep(0.1) + logger.info("Cleanup complete") + except Exception as e: + logger.warning(f"Cleanup warning: {e}") + + +@click.command() +@click.option( + "--dataset-name", + default=DEFAULT_DATASET_NAME, + help="Name of the Langfuse dataset to evaluate against.", +) +@click.option( + "--experiment-name", + default=DEFAULT_EXPERIMENT_NAME, + help="Name for this experiment run.", +) +@click.option( + "--max-concurrency", + default=1, + type=int, + help="Maximum concurrent agent runs (default: 1).", +) +@click.option( + "--resume", + is_flag=True, + default=False, + help="Resume from previous run. Skips items that already have evaluation scores.", +) +def cli(dataset_name: str, experiment_name: str, max_concurrency: int, resume: bool) -> None: + """Run Knowledge Agent evaluation using Langfuse experiments. + + Use --resume to continue an interrupted evaluation run. Items that already + have evaluation scores will be skipped. Make sure to use the same + --experiment-name as the previous run. + """ + asyncio.run(run_evaluation(dataset_name, experiment_name, max_concurrency, resume)) + + +if __name__ == "__main__": + cli() From 1febe82ad8114ddbfb55ad35282f5a9b4e41f6db Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 10 Feb 2026 08:37:55 -0500 Subject: [PATCH 2/9] Add temperature settings to config --- aieng-eval-agents/aieng/agent_evals/configs.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/aieng-eval-agents/aieng/agent_evals/configs.py b/aieng-eval-agents/aieng/agent_evals/configs.py index 9255cef..162e33a 100644 --- a/aieng-eval-agents/aieng/agent_evals/configs.py +++ b/aieng-eval-agents/aieng/agent_evals/configs.py @@ -108,6 +108,18 @@ class Configs(BaseSettings): default="gemini-2.5-pro", description="Model name for LLM-as-judge evaluation tasks.", ) + default_temperature: float = Field( + default=1.0, + ge=0.0, + le=2.0, + description="Default temperature for LLM generation. Lower values (0.0-0.3) produce more consistent outputs.", + ) + default_evaluator_temperature: float = Field( + default=0.0, + ge=0.0, + le=2.0, + description="Temperature for LLM-as-judge evaluations. Default 0.0 for deterministic judging.", + ) # === Tracing (Langfuse) === langfuse_public_key: str | None = Field( From 2f07905fc9dadbd328d464e18d5bc9506eaef44e Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 10 Feb 2026 08:40:11 -0500 Subject: [PATCH 3/9] Add ignore --- implementations/knowledge_qa/evaluate.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/implementations/knowledge_qa/evaluate.py b/implementations/knowledge_qa/evaluate.py index 8b94918..05e9d6c 100644 --- a/implementations/knowledge_qa/evaluate.py +++ b/implementations/knowledge_qa/evaluate.py @@ -90,7 +90,7 @@ async def agent_task(*, item: Any, **kwargs: Any) -> dict[str, Any]: # noqa: AR try: # Create a fresh agent for each task to avoid shared state issues - agent = KnowledgeGroundedAgent(enable_planning=True) + agent = KnowledgeGroundedAgent(enable_planning=True) # type: ignore[call-arg] response = await agent.answer_async(question) logger.info(f"Agent completed: {len(response.text)} chars, {len(response.tool_calls)} tool calls") From 69478377a9192594797492116edf12ab7cec0719 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 10 Feb 2026 09:07:40 -0500 Subject: [PATCH 4/9] Remove resume functionality --- implementations/knowledge_qa/evaluate.py | 233 ++--------------------- 1 file changed, 17 insertions(+), 216 deletions(-) diff --git a/implementations/knowledge_qa/evaluate.py b/implementations/knowledge_qa/evaluate.py index 05e9d6c..ff55b68 100644 --- a/implementations/knowledge_qa/evaluate.py +++ b/implementations/knowledge_qa/evaluate.py @@ -10,18 +10,6 @@ # Run with custom dataset and experiment name python evaluate.py --dataset-name "MyDataset" --experiment-name "v2-test" - - # Resume an interrupted evaluation (skips already-evaluated items) - python evaluate.py --experiment-name "v2-test" --resume - -Resume Feature: - Use --resume to continue an interrupted evaluation. The script will: - 1. Check Langfuse for traces with the specified experiment name - 2. Identify items that already have evaluation scores - 3. Skip those items and only evaluate the remaining ones - - Important: Use the SAME --experiment-name as the previous run to ensure - proper resumption. """ import asyncio @@ -35,7 +23,6 @@ from aieng.agent_evals.knowledge_qa.judges import DeepSearchQAJudge, DeepSearchQAResult from aieng.agent_evals.logging_config import setup_logging from dotenv import load_dotenv -from langfuse._client.datasets import DatasetItemClient from langfuse.experiment import Evaluation @@ -70,8 +57,6 @@ def _close_judge() -> None: async def agent_task(*, item: Any, **kwargs: Any) -> dict[str, Any]: # noqa: ARG001 """Run the Knowledge Agent on a dataset item. - This is the task function used by the evaluation harness. - Parameters ---------- item : Any @@ -89,7 +74,6 @@ async def agent_task(*, item: Any, **kwargs: Any) -> dict[str, Any]: # noqa: AR logger.info(f"Running agent on: {question[:80]}...") try: - # Create a fresh agent for each task to avoid shared state issues agent = KnowledgeGroundedAgent(enable_planning=True) # type: ignore[call-arg] response = await agent.answer_async(question) logger.info(f"Agent completed: {len(response.text)} chars, {len(response.tool_calls)} tool calls") @@ -113,8 +97,6 @@ async def deepsearchqa_evaluator( ) -> list[Evaluation]: """Evaluate the agent's response using DeepSearchQA methodology. - This is the evaluator function used by the evaluation harness. - Parameters ---------- input : str @@ -156,145 +138,10 @@ async def deepsearchqa_evaluator( return DeepSearchQAResult.error_evaluations(str(e)) -def get_completed_item_ids(langfuse: Any, run_name: str, dataset_id: str) -> set[str]: - """Get dataset item IDs that have already been evaluated in a run. - - Parameters - ---------- - langfuse : Langfuse - The Langfuse client. - run_name : str - Name of the experiment run. - dataset_id : str - ID of the dataset. - - Returns - ------- - set[str] - Set of completed dataset item IDs. - """ - try: - logger.info(f"Checking for existing evaluations in run '{run_name}'...") - completed_ids = set() - page = 1 - limit = 50 - - while True: - run_items_response = langfuse.api.dataset_run_items.list( - dataset_id=dataset_id, - run_name=run_name, - limit=limit, - page=page, - ) - - if not run_items_response.data: - break - - for run_item in run_items_response.data: - if run_item.trace_id and _has_evaluation_scores(langfuse, run_item.trace_id): - completed_ids.add(run_item.dataset_item_id) - logger.debug(f"Found completed evaluation for item {run_item.dataset_item_id}") - - if len(run_items_response.data) < limit: - break - - page += 1 - - logger.info(f"Found {len(completed_ids)} completed evaluations") - return completed_ids - - except Exception as e: - logger.warning(f"Failed to fetch existing evaluations: {e}") - logger.info("Will process all items") - return set() - - -def _has_evaluation_scores(langfuse: Any, trace_id: str) -> bool: - """Check if a trace has evaluation scores.""" - try: - trace = langfuse.api.trace.get(trace_id) - return hasattr(trace, "scores") and bool(trace.scores) - except Exception as e: - logger.debug(f"Could not fetch trace {trace_id}: {e}") - return False - - -async def _process_item_with_resume( - item: DatasetItemClient, - run_name: str, -) -> None: - """Process a single dataset item with Langfuse trace linking (for resume mode). - - Uses ``item.run()`` to create a proper dataset-run-item link so that - traces appear under the existing experiment run in the Langfuse UI. - - Parameters - ---------- - item : DatasetItemClient - The dataset item to process. - run_name : str - The experiment run name for trace linking. - """ - try: - with item.run(run_name=run_name) as root_span: - logger.info(f"Processing item {item.id}: {item.input[:80]}...") - - agent_output = await agent_task(item=item) - - answer_type = item.metadata.get("answer_type", "Set Answer") if item.metadata else "Set Answer" - evaluations = await deepsearchqa_evaluator( - input=item.input, - output=agent_output, - expected_output=item.expected_output or "", - metadata={"answer_type": answer_type}, - ) - - for evaluation in evaluations: - root_span.score_trace( - name=evaluation.name, - value=evaluation.value, - comment=evaluation.comment, - ) - - logger.info(f"Item {item.id} complete with {len(evaluations)} evaluations") - - except Exception as e: - logger.error(f"Item {item.id} failed: {e}") - - -async def _process_items_for_resume( - items: list[DatasetItemClient], - run_name: str, - max_concurrency: int, -) -> None: - """Process multiple items concurrently for resume mode. - - Parameters - ---------- - items : list[DatasetItemClient] - Items to process. - run_name : str - The experiment run name. - max_concurrency : int - Maximum concurrent tasks. - """ - semaphore = asyncio.Semaphore(max_concurrency) - - async def process_with_limit(item: DatasetItemClient) -> None: - async with semaphore: - await _process_item_with_resume(item, run_name) - - tasks = [process_with_limit(item) for item in items] - await asyncio.gather(*tasks, return_exceptions=True) - - logger.info("Resume processing complete!") - - async def run_evaluation( dataset_name: str, experiment_name: str, max_concurrency: int = 1, - resume: bool = False, ) -> None: """Run the full evaluation experiment. @@ -306,65 +153,30 @@ async def run_evaluation( Name for this experiment run. max_concurrency : int, optional Maximum concurrent agent runs, by default 1. - resume : bool, optional - If True, skip items that have already been evaluated, by default False. """ client_manager = AsyncClientManager.get_instance() - langfuse = client_manager.langfuse_client try: - if resume: - # Resume: fetch dataset, filter completed items, run remaining - logger.info(f"Loading dataset '{dataset_name}' from Langfuse...") - try: - dataset = langfuse.get_dataset(dataset_name) - except Exception as e: - logger.error(f"Failed to load dataset: {e}") - logger.info("Run the dataset upload script first to create the dataset.") - return - - logger.info(f"Found dataset with {len(dataset.items)} items") - - completed_ids = get_completed_item_ids(langfuse, experiment_name, dataset.id) - items_to_process = [item for item in dataset.items if item.id not in completed_ids] - - if not items_to_process: - logger.info("All items already evaluated. Nothing to do.") - return - - logger.info(f"Resume mode: Processing {len(items_to_process)} remaining items") - logger.info(f"Starting experiment: {experiment_name}") - logger.info(f"Max concurrency: {max_concurrency}") - - await _process_items_for_resume( - items=items_to_process, - run_name=experiment_name, - max_concurrency=max_concurrency, - ) - else: - # Normal mode: use the evaluation harness (fetches dataset internally) - logger.info(f"Starting experiment: {experiment_name}") - logger.info(f"Max concurrency: {max_concurrency}") - - result = run_experiment( - dataset_name=dataset_name, - name=experiment_name, - description="Knowledge Agent evaluation with DeepSearchQA judge", - task=agent_task, - evaluators=[deepsearchqa_evaluator], - max_concurrency=max_concurrency, - ) - - logger.info("Experiment complete!") - logger.info(result.format().replace("\\n", "\n")) + logger.info(f"Starting experiment '{experiment_name}' on dataset '{dataset_name}'") + logger.info(f"Max concurrency: {max_concurrency}") + + result = run_experiment( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge", + task=agent_task, + evaluators=[deepsearchqa_evaluator], + max_concurrency=max_concurrency, + ) + + logger.info("Experiment complete!") + logger.info(result.format().replace("\\n", "\n")) finally: - # Cleanup logger.info("Closing client manager and flushing data...") try: _close_judge() await client_manager.close() - # Give event loop time to process cleanup tasks await asyncio.sleep(0.1) logger.info("Cleanup complete") except Exception as e: @@ -388,20 +200,9 @@ async def run_evaluation( type=int, help="Maximum concurrent agent runs (default: 1).", ) -@click.option( - "--resume", - is_flag=True, - default=False, - help="Resume from previous run. Skips items that already have evaluation scores.", -) -def cli(dataset_name: str, experiment_name: str, max_concurrency: int, resume: bool) -> None: - """Run Knowledge Agent evaluation using Langfuse experiments. - - Use --resume to continue an interrupted evaluation run. Items that already - have evaluation scores will be skipped. Make sure to use the same - --experiment-name as the previous run. - """ - asyncio.run(run_evaluation(dataset_name, experiment_name, max_concurrency, resume)) +def cli(dataset_name: str, experiment_name: str, max_concurrency: int) -> None: + """Run Knowledge Agent evaluation using Langfuse experiments.""" + asyncio.run(run_evaluation(dataset_name, experiment_name, max_concurrency)) if __name__ == "__main__": From b346420b76b98ee6cdfb9bdbad57961e13dcab7e Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 11 Feb 2026 10:45:23 -0500 Subject: [PATCH 5/9] Refactor judges and evaluate script --- .../aieng/agent_evals/knowledge_qa/judges.py | 591 +++++++----------- .../aieng/agent_evals/tools/web.py | 283 +++++++-- .../tests/aieng/agent_evals/tools/test_web.py | 246 ++++++-- implementations/knowledge_qa/evaluate.py | 124 ++-- 4 files changed, 731 insertions(+), 513 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py index 654927d..b3763a7 100644 --- a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py +++ b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py @@ -1,22 +1,27 @@ """LLM-as-judge evaluators for knowledge agent responses. -This module provides comprehensive evaluation using LLM judges across -multiple dimensions: comprehensiveness, causal reasoning, exhaustiveness, -source quality, and plan quality. +This module provides comprehensive evaluation using LLM judges for the +DeepSearchQA benchmark. The implementation follows the official DeepSearchQA +evaluation methodology with precision, recall, and F1 metrics. + +The evaluator has been refactored to use shared evaluation infrastructure +while maintaining backward compatibility with the original API. """ -import json +import asyncio import logging from typing import TYPE_CHECKING, Any +from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs -from google import genai -from google.genai import types +from aieng.agent_evals.evaluation.graders._utils import run_structured_parse_call +from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig +from aieng.agent_evals.evaluation.types import Evaluation from pydantic import BaseModel, Field if TYPE_CHECKING: - from langfuse.experiment import Evaluation + pass logger = logging.getLogger(__name__) @@ -43,122 +48,6 @@ class JudgeResult(BaseModel): evidence: list[str] = Field(default_factory=list) -def _parse_judge_response(response_text: str, dimension: str) -> JudgeResult: - """Parse a judge's response into a JudgeResult. - - Parameters - ---------- - response_text : str - Raw response from the judge LLM. - dimension : str - The dimension being evaluated. - - Returns - ------- - JudgeResult - Parsed result. - """ - try: - text = response_text.strip() - - # Handle markdown code blocks - if "```json" in text: - start = text.find("```json") + 7 - end = text.find("```", start) - text = text[start:end].strip() - elif "```" in text: - start = text.find("```") + 3 - end = text.find("```", start) - text = text[start:end].strip() - - data = json.loads(text) - - return JudgeResult( - dimension=dimension, - score=float(data.get("score", 3)), - explanation=data.get("explanation", ""), - evidence=data.get("evidence", []), - ) - - except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e: - logger.warning(f"Failed to parse judge response for {dimension}: {e}") - return JudgeResult( - dimension=dimension, - score=3.0, # Default middle score - explanation=f"Parse error: {e}. Raw response: {response_text[:200]}", - evidence=[], - ) - - -class BaseJudge: - """Base class for LLM-as-judge evaluators. - - Parameters - ---------- - config : Configs, optional - Configuration settings. - model : str, optional - Model to use for judging. Defaults to planner model. - """ - - dimension: str = "base" - system_prompt: str = "" - - def __init__( - self, - config: "Configs | None" = None, - model: str | None = None, - ) -> None: - """Initialize the judge.""" - # Load config from environment if not provided - if config is None: - config = Configs() # type: ignore[call-arg] - self._config = config - - if model is not None: - self._model = model - else: - self._model = config.default_evaluator_model - - self._client = genai.Client() - self._temperature = config.default_evaluator_temperature - - def _call_llm(self, prompt: str) -> str: - """Call the LLM with the given prompt.""" - response = self._client.models.generate_content( - model=self._model, - contents=types.Content( - role="user", - parts=[types.Part(text=prompt)], - ), - config=types.GenerateContentConfig( - system_instruction=self.system_prompt, - temperature=self._temperature, - ), - ) - return response.text or "" - - async def _call_llm_async(self, prompt: str) -> str: - """Call the LLM asynchronously.""" - response = await self._client.aio.models.generate_content( - model=self._model, - contents=types.Content( - role="user", - parts=[types.Part(text=prompt)], - ), - config=types.GenerateContentConfig( - system_instruction=self.system_prompt, - temperature=self._temperature, - ), - ) - return response.text or "" - - def close(self) -> None: - """Close the judge's client to clean up resources.""" - if hasattr(self, "_client") and self._client is not None: - self._client.close() - - class DeepSearchQAResult(BaseModel): """Result from DeepSearchQA evaluation with IR metrics. @@ -195,7 +84,7 @@ class DeepSearchQAResult(BaseModel): extraneous_items: list[str] = Field(default_factory=list) explanation: str = "" - def to_evaluations(self) -> list["Evaluation"]: + def to_evaluations(self) -> list[Evaluation]: """Convert this result to Langfuse Evaluation objects. Returns @@ -203,8 +92,6 @@ def to_evaluations(self) -> list["Evaluation"]: list[Evaluation] Four evaluations: Outcome (categorical), F1, Precision, Recall (numeric). """ - from langfuse.experiment import Evaluation # noqa: PLC0415 - comment_parts = [ f"Outcome: {self.outcome}", f"Precision: {self.precision:.2f}", @@ -244,7 +131,7 @@ def to_evaluations(self) -> list["Evaluation"]: ] @staticmethod - def error_evaluations(error_msg: str) -> list["Evaluation"]: + def error_evaluations(error_msg: str) -> list[Evaluation]: """Create error evaluations when evaluation fails. Parameters @@ -257,8 +144,6 @@ def error_evaluations(error_msg: str) -> list["Evaluation"]: list[Evaluation] Three evaluations (F1, Precision, Recall) all set to 0.0. """ - from langfuse.experiment import Evaluation # noqa: PLC0415 - comment = f"Evaluation error: {error_msg}" return [ Evaluation(name="F1", value=0.0, comment=comment), @@ -267,6 +152,23 @@ def error_evaluations(error_msg: str) -> list["Evaluation"]: ] +class DeepSearchQAGraderResponse(BaseModel): + """Structured response from the DeepSearchQA grader. + + This matches the official DeepSearchQA grader output format. + + Attributes + ---------- + answer_correctness : dict[str, Any] + Dictionary containing: + - Explanation: str - Explanation of the evaluation + - Correctness Details: dict[str, bool] - Per-item correctness + - Excessive Answers: list[str] - Extra items not in ground truth + """ + + answer_correctness: dict[str, Any] = Field(alias="Answer Correctness") + + # Official DeepSearchQA grader prompt from Appendix A of the paper DEEPSEARCHQA_GRADER_PROMPT = """\ Your task is to evaluate whether a given "AI Response" for a specific "User Prompt" @@ -352,7 +254,136 @@ def error_evaluations(error_msg: str) -> list["Evaluation"]: """ -class DeepSearchQAJudge(BaseJudge): +def _calculate_metrics_from_grader( + grader_result: dict[str, Any], +) -> DeepSearchQAResult: + """Calculate precision, recall, F1 from grader output. + + This follows the exact methodology from the paper: + - Precision = |S∩G| / |S| + - Recall = |S∩G| / |G| + - F1 = 2*P*R / (P+R) + + Parameters + ---------- + grader_result : dict + Output from the LLM grader with Correctness Details and Excessive Answers. + + Returns + ------- + DeepSearchQAResult + Computed metrics and classifications. + """ + correctness_details = grader_result.get("Correctness Details", {}) + extraneous_items = grader_result.get("Excessive Answers", []) + explanation = grader_result.get("Explanation", "") + + # Count matched ground truth items + num_ground_truth = len(correctness_details) + num_matched = sum(1 for v in correctness_details.values() if v) + num_extraneous = len(extraneous_items) + + # Total predicted items = matched + extraneous + num_predicted = num_matched + num_extraneous + + # Calculate metrics + precision = num_matched / num_predicted if num_predicted > 0 else 0.0 + recall = num_matched / num_ground_truth if num_ground_truth > 0 else 1.0 + f1_score = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0 + + # Determine outcome based on set relationships + if num_matched == num_ground_truth and num_extraneous == 0: + outcome = "fully_correct" + elif num_matched == num_ground_truth and num_extraneous > 0: + outcome = "correct_with_extraneous" + elif num_matched > 0: + outcome = "partially_correct" + else: + outcome = "fully_incorrect" + + return DeepSearchQAResult( + precision=precision, + recall=recall, + f1_score=f1_score, + outcome=outcome, + correctness_details=correctness_details, + extraneous_items=extraneous_items, + explanation=explanation, + ) + + +async def evaluate_deepsearchqa_async( + *, + question: str, + answer: str, + ground_truth: str, + answer_type: str = "Single Answer", + model_config: LLMRequestConfig | None = None, +) -> DeepSearchQAResult: + """Evaluate an answer using DeepSearchQA methodology. + + This is the modern async evaluator that uses shared infrastructure. + + Parameters + ---------- + question : str + The original question. + answer : str + The agent's answer. + ground_truth : str + The expected ground truth answer. + answer_type : str + Type of answer: "Single Answer" or "Set Answer". + model_config : LLMRequestConfig | None, optional + Optional model configuration. If None, defaults are used. + + Returns + ------- + DeepSearchQAResult + The evaluation result with precision, recall, and F1 metrics. + """ + config = model_config or LLMRequestConfig() + client_manager = AsyncClientManager.get_instance() + + # Build the grader prompt + user_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( + prompt=question, + response=answer, + answer=ground_truth, + prompt_type=answer_type, + ) + + try: + completion = await run_structured_parse_call( + openai_client=client_manager.openai_client, + default_model=client_manager.configs.default_evaluator_model, + model_config=config, + system_prompt="", # DeepSearchQA uses all instructions in user prompt + user_prompt=user_prompt, + response_format=DeepSearchQAGraderResponse, + ) + + grader_response: DeepSearchQAGraderResponse | None = completion.choices[0].message.parsed + + if grader_response is None: + raise ValueError("Grader returned null response") + + return _calculate_metrics_from_grader(grader_response.answer_correctness) + + except Exception as e: + logger.warning(f"Failed to evaluate with DeepSearchQA grader: {e}") + return DeepSearchQAResult( + precision=0.0, + recall=0.0, + f1_score=0.0, + outcome="fully_incorrect", + correctness_details={}, + extraneous_items=[], + explanation=f"Grader error: {e}", + ) + + +class DeepSearchQAJudge: """Official DeepSearchQA evaluation using precision, recall, and F1. This judge implements the exact evaluation methodology from the DeepSearchQA @@ -370,6 +401,12 @@ class DeepSearchQAJudge(BaseJudge): - Recall: R = |S∩G| / |G| (exhaustiveness against ground truth) - F1 Score: F1 = 2*P*R / (P+R) (primary ranking metric) + Notes + ----- + This class provides backward compatibility with the original API. + Internally, it delegates to the modern async evaluator that uses + shared evaluation infrastructure. + References ---------- - Paper: DeepSearchQA: Bridging the Comprehensiveness Gap for Deep Research Agents @@ -378,130 +415,30 @@ class DeepSearchQAJudge(BaseJudge): """ dimension = "deepsearchqa" - system_prompt = "" # Not used - we use the full grader prompt directly - - def _call_grader( - self, - prompt: str, - response: str, - answer: str, - prompt_type: str, - ) -> dict[str, Any]: - """Call the LLM grader using the official DeepSearchQA prompt. - - Parameters - ---------- - prompt : str - The original question/prompt. - response : str - The AI response to evaluate. - answer : str - The ground truth answer. - prompt_type : str - "Single Answer" or "Set Answer". - - Returns - ------- - dict - Parsed grader response with Correctness Details and Excessive Answers. - """ - grader_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( - prompt=prompt, - response=response, - answer=answer, - prompt_type=prompt_type, - ) - try: - llm_response = self._client.models.generate_content( - model=self._model, - contents=types.Content( - role="user", - parts=[types.Part(text=grader_prompt)], - ), - config=types.GenerateContentConfig( - temperature=self._temperature, - ), - ) - response_text = (llm_response.text or "").strip() - - # Parse JSON from response - if "```json" in response_text: - start = response_text.find("```json") + 7 - end = response_text.find("```", start) - response_text = response_text[start:end].strip() - elif "```" in response_text: - start = response_text.find("```") + 3 - end = response_text.find("```", start) - response_text = response_text[start:end].strip() - - data = json.loads(response_text) - return data.get("Answer Correctness", {}) - - except Exception as e: - logger.warning(f"Failed to call grader: {e}") - return { - "Explanation": f"Grader error: {e}", - "Correctness Details": {}, - "Excessive Answers": [], - } - - def _calculate_metrics_from_grader( + def __init__( self, - grader_result: dict[str, Any], - ) -> DeepSearchQAResult: - """Calculate precision, recall, F1 from grader output. - - This follows the exact methodology from the paper: - - Precision = |S∩G| / |S| - - Recall = |S∩G| / |G| - - F1 = 2*P*R / (P+R) + config: "Configs | None" = None, + model: str | None = None, + ) -> None: + """Initialize the judge. Parameters ---------- - grader_result : dict - Output from the LLM grader with Correctness Details and Excessive Answers. - - Returns - ------- - DeepSearchQAResult - Computed metrics and classifications. + config : Configs | None, optional + Configuration settings. If None, defaults from environment are used. + model : str | None, optional + Model to use for judging. If None, default evaluator model is used. """ - correctness_details = grader_result.get("Correctness Details", {}) - extraneous_items = grader_result.get("Excessive Answers", []) - explanation = grader_result.get("Explanation", "") - - # Count matched ground truth items - num_ground_truth = len(correctness_details) - num_matched = sum(1 for v in correctness_details.values() if v) - num_extraneous = len(extraneous_items) - - # Total predicted items = matched + extraneous - num_predicted = num_matched + num_extraneous - - # Calculate metrics - precision = num_matched / num_predicted if num_predicted > 0 else 0.0 - recall = num_matched / num_ground_truth if num_ground_truth > 0 else 1.0 - f1_score = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0 - - # Determine outcome based on set relationships - if num_matched == num_ground_truth and num_extraneous == 0: - outcome = "fully_correct" - elif num_matched == num_ground_truth and num_extraneous > 0: - outcome = "correct_with_extraneous" - elif num_matched > 0: - outcome = "partially_correct" - else: - outcome = "fully_incorrect" + # Store config for backward compatibility + if config is None: + config = Configs() # type: ignore[call-arg] + self._config = config - return DeepSearchQAResult( - precision=precision, - recall=recall, - f1_score=f1_score, - outcome=outcome, - correctness_details=correctness_details, - extraneous_items=extraneous_items, - explanation=explanation, + # Build model config + self._model_config = LLMRequestConfig( + model=model if model is not None else config.default_evaluator_model, + temperature=config.default_evaluator_temperature, ) def evaluate( @@ -513,6 +450,9 @@ def evaluate( ) -> JudgeResult: """Evaluate an answer using DeepSearchQA methodology. + This is a synchronous wrapper around the async evaluator for backward + compatibility. + Parameters ---------- question : str @@ -529,19 +469,25 @@ def evaluate( JudgeResult The evaluation result with precision, recall, and F1 in evidence. """ - logger.info(f"Evaluating answer for: {question[:100]}...") - - # Call the grader - grader_result = self._call_grader( - prompt=question, - response=answer, - answer=ground_truth, - prompt_type=answer_type, + # Run async evaluator in event loop + try: + asyncio.get_running_loop() + # If we're already in an async context, we can't use run_until_complete + raise RuntimeError("Cannot call synchronous evaluate from async context. Use evaluate_async instead.") + except RuntimeError: + # No running loop, safe to create one + pass + + result = asyncio.run( + evaluate_deepsearchqa_async( + question=question, + answer=answer, + ground_truth=ground_truth, + answer_type=answer_type, + model_config=self._model_config, + ) ) - # Calculate metrics from grader output - result = self._calculate_metrics_from_grader(grader_result) - # Convert F1 to 1-5 scale for consistency with other judges score = 1 + (result.f1_score * 4) # Maps 0-1 to 1-5 @@ -567,51 +513,14 @@ async def evaluate_async( answer_type: str = "Single Answer", ) -> JudgeResult: """Async version of evaluate.""" - logger.info(f"Evaluating answer (async) for: {question[:100]}...") - - # Build the grader prompt - grader_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( - prompt=question, - response=answer, - answer=ground_truth, - prompt_type=answer_type, + result = await evaluate_deepsearchqa_async( + question=question, + answer=answer, + ground_truth=ground_truth, + answer_type=answer_type, + model_config=self._model_config, ) - try: - llm_response = await self._client.aio.models.generate_content( - model=self._model, - contents=types.Content( - role="user", - parts=[types.Part(text=grader_prompt)], - ), - config=types.GenerateContentConfig( - temperature=self._temperature, - ), - ) - response_text = (llm_response.text or "").strip() - - # Parse JSON - if "```json" in response_text: - start = response_text.find("```json") + 7 - end = response_text.find("```", start) - response_text = response_text[start:end].strip() - elif "```" in response_text: - start = response_text.find("```") + 3 - end = response_text.find("```", start) - response_text = response_text[start:end].strip() - - data = json.loads(response_text) - grader_result = data.get("Answer Correctness", {}) - - except Exception as e: - logger.warning(f"Failed to call grader async: {e}") - grader_result = { - "Explanation": f"Grader error: {e}", - "Correctness Details": {}, - "Excessive Answers": [], - } - - result = self._calculate_metrics_from_grader(grader_result) score = 1 + (result.f1_score * 4) return JudgeResult( @@ -653,14 +562,26 @@ def evaluate_with_details( tuple[JudgeResult, DeepSearchQAResult] Both the standard judge result and detailed metrics. """ - grader_result = self._call_grader( - prompt=question, - response=answer, - answer=ground_truth, - prompt_type=answer_type, + try: + asyncio.get_running_loop() + # If we're already in an async context, we can't use run_until_complete + raise RuntimeError( + "Cannot call synchronous evaluate_with_details from async context. Use evaluate_with_details_async instead." + ) + except RuntimeError: + # No running loop, safe to create one + pass + + result = asyncio.run( + evaluate_deepsearchqa_async( + question=question, + answer=answer, + ground_truth=ground_truth, + answer_type=answer_type, + model_config=self._model_config, + ) ) - result = self._calculate_metrics_from_grader(grader_result) score = 1 + (result.f1_score * 4) judge_result = JudgeResult( @@ -702,49 +623,14 @@ async def evaluate_with_details_async( tuple[JudgeResult, DeepSearchQAResult] Both the standard judge result and detailed metrics. """ - # Build the grader prompt - grader_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( - prompt=question, - response=answer, - answer=ground_truth, - prompt_type=answer_type, + result = await evaluate_deepsearchqa_async( + question=question, + answer=answer, + ground_truth=ground_truth, + answer_type=answer_type, + model_config=self._model_config, ) - try: - llm_response = await self._client.aio.models.generate_content( - model=self._model, - contents=types.Content( - role="user", - parts=[types.Part(text=grader_prompt)], - ), - config=types.GenerateContentConfig( - temperature=self._temperature, - ), - ) - response_text = (llm_response.text or "").strip() - - # Parse JSON - if "```json" in response_text: - start = response_text.find("```json") + 7 - end = response_text.find("```", start) - response_text = response_text[start:end].strip() - elif "```" in response_text: - start = response_text.find("```") + 3 - end = response_text.find("```", start) - response_text = response_text[start:end].strip() - - data = json.loads(response_text) - grader_result = data.get("Answer Correctness", {}) - - except Exception as e: - logger.warning(f"Failed to call grader async: {e}") - grader_result = { - "Explanation": f"Grader error: {e}", - "Correctness Details": {}, - "Excessive Answers": [], - } - - result = self._calculate_metrics_from_grader(grader_result) score = 1 + (result.f1_score * 4) judge_result = JudgeResult( @@ -760,3 +646,12 @@ async def evaluate_with_details_async( ) return judge_result, result + + def close(self) -> None: + """Close the judge's client to clean up resources. + + Note: With the new shared client manager, cleanup is handled + centrally. This method is kept for backward compatibility. + """ + # No-op: AsyncClientManager handles cleanup + pass diff --git a/aieng-eval-agents/aieng/agent_evals/tools/web.py b/aieng-eval-agents/aieng/agent_evals/tools/web.py index d1eb99b..a8e3ea3 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/web.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/web.py @@ -1,7 +1,13 @@ """Web fetch tool for retrieving content from URLs. Provides the web_fetch tool which fetches content from any URL (HTML pages or PDFs) -and returns the content for the agent to analyze. Similar to Anthropic's web_fetch tool. +and uses an LLM to extract relevant information based on a query. This prevents +context overflow by returning only extracted information instead of full page content. + +Architecture matches Claude Code's WebFetch tool: +1. Fetch URL and convert to markdown/text +2. Use LLM to extract information based on query +3. Return only extracted information (~200-1000 tokens instead of 10k-100k) """ import logging @@ -12,6 +18,7 @@ from urllib.parse import urljoin import httpx +from aieng.agent_evals.async_client_manager import AsyncClientManager from google.adk.tools.function_tool import FunctionTool from html_to_markdown import convert as html_to_markdown from pypdf import PdfReader @@ -20,7 +27,14 @@ logger = logging.getLogger(__name__) -MAX_CONTENT_CHARS = 100_000 +# Maximum chars to fetch before LLM processing (prevents fetching enormous files) +MAX_FETCH_CHARS = 200_000 + +# Maximum chars to send to extraction LLM (most models handle ~100k chars well) +MAX_EXTRACTION_INPUT_CHARS = 100_000 + +# Maximum tokens for LLM extraction output +MAX_EXTRACTION_OUTPUT_TOKENS = 2000 def _make_absolute_url(base_url: str) -> Callable[[re.Match[str]], str]: @@ -130,31 +144,162 @@ def _extract_pdf_text(content: bytes, max_pages: int = 10) -> tuple[str, int]: return "\n\n".join(text_parts), num_pages -def _truncate_content(text: str) -> tuple[str, bool]: - """Truncate content if it exceeds the maximum length.""" - truncated = len(text) > MAX_CONTENT_CHARS +def _truncate_content(text: str, max_chars: int) -> tuple[str, bool]: + """Truncate content if it exceeds the maximum length. + + Parameters + ---------- + text : str + The text to truncate. + max_chars : int + Maximum number of characters allowed. + + Returns + ------- + tuple[str, bool] + The (potentially truncated) text and whether it was truncated. + """ + truncated = len(text) > max_chars if truncated: - text = text[:MAX_CONTENT_CHARS] + "\n\n[Content truncated due to length]" + text = text[:max_chars] + "\n\n[Content truncated due to length]" return text, truncated -def _make_error_response(error: str, url: str) -> dict[str, Any]: - """Create an error response dict.""" - return {"status": "error", "error": error, "url": url} +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((Exception,)), # Retry on any exception from LLM API +) +async def _extract_information_with_llm_with_retry( + openai_client: Any, model: str, content: str, query: str, url: str, was_truncated: bool +) -> dict[str, Any]: + """Execute LLM extraction with retry logic. + + Parameters + ---------- + openai_client : Any + The OpenAI client instance. + model : str + Model name to use. + content : str + The content to extract from. + query : str + What information to extract. + url : str + Source URL. + was_truncated : bool + Whether the source content was truncated. + + Returns + ------- + dict[str, Any] + Success response with extracted information and metadata. + """ + # Build extraction prompt + system_prompt = """You are a precise information extraction assistant. Your task is to extract specific information from web pages based on user queries. + +Instructions: +- Extract ONLY the information requested in the query +- Be accurate and concise +- If the information is not found, say so clearly +- Include relevant quotes or data points when available +- Do not add information not present in the source +- Format your response clearly with markdown if appropriate""" + + user_prompt = f"""Extract the following information from this webpage: + +QUERY: {query} + +SOURCE URL: {url} + +WEBPAGE CONTENT: +{content} +Please extract the requested information. Be concise but complete.""" -def _make_success_response(url: str, content: str, content_type: str, truncated: bool, **extra: Any) -> dict[str, Any]: - """Create a success response dict.""" - result = { + # Call LLM to extract information + response = await openai_client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + max_tokens=MAX_EXTRACTION_OUTPUT_TOKENS, + temperature=0.0, # Deterministic for extraction + ) + + extracted_info = response.choices[0].message.content or "" + + return { "status": "success", + "extracted_info": extracted_info, "url": url, - "content": content, - "content_type": content_type, - "content_length": len(content), - "truncated": truncated, + "query": query, + "source_was_truncated": was_truncated, + "extraction_model": model, + "input_tokens": response.usage.prompt_tokens if response.usage else 0, + "output_tokens": response.usage.completion_tokens if response.usage else 0, } - result.update(extra) - return result + + +async def _extract_information_with_llm(content: str, query: str, url: str) -> dict[str, Any]: + """Extract relevant information from content using an LLM. + + This is the key function that implements Claude Code's WebFetch architecture. + Instead of returning full page content, it uses an LLM to extract only the + information requested in the query. + + Parameters + ---------- + content : str + The full page content (markdown or text). + query : str + What information to extract from the content. + url : str + Source URL (for context and citation). + + Returns + ------- + dict[str, Any] + Response with 'status', 'extracted_info', 'url', and metadata. + """ + try: + # Truncate content for LLM processing if needed + content, was_truncated = _truncate_content(content, MAX_EXTRACTION_INPUT_CHARS) + + # Get OpenAI client (configured for Gemini endpoint) + client_manager = AsyncClientManager.get_instance() + openai_client = client_manager.openai_client + model = client_manager.configs.default_worker_model # gemini-2.5-flash + + # Call with retry logic + return await _extract_information_with_llm_with_retry( + openai_client=openai_client, + model=model, + content=content, + query=query, + url=url, + was_truncated=was_truncated, + ) + + except RetryError as e: + # Extract the underlying error from retry failure + original_error = e.last_attempt.exception() + logger.error(f"LLM extraction failed for {url} after 3 retries: {original_error}") + return { + "status": "error", + "error": f"Information extraction failed after 3 retries: {original_error!s}", + "url": url, + "query": query, + } + except Exception as e: + logger.error(f"LLM extraction failed for {url}: {e}") + return { + "status": "error", + "error": f"Information extraction failed: {e!s}", + "url": url, + "query": query, + } def _handle_fetch_error(e: Exception, url: str) -> dict[str, Any]: @@ -174,11 +319,12 @@ def _handle_fetch_error(e: Exception, url: str) -> dict[str, Any]: """ if isinstance(e, httpx.HTTPStatusError): logger.warning(f"HTTP error fetching {url}: {e}") - return _make_error_response(f"HTTP {e.response.status_code}: {e.response.reason_phrase}", url) + error_msg = f"HTTP {e.response.status_code}: {e.response.reason_phrase}" + return {"status": "error", "error": error_msg, "url": url} if isinstance(e, httpx.RequestError): logger.warning(f"Request error fetching {url}: {e}") - return _make_error_response(f"Request failed: {e!s}", url) + return {"status": "error", "error": f"Request failed: {e!s}", "url": url} if isinstance(e, RetryError): # Extract the underlying error from retry failure @@ -190,17 +336,23 @@ def _handle_fetch_error(e: Exception, url: str) -> dict[str, Any]: else: logger.error(f"Failed to fetch {url} after 3 retries: {original_error}") error_msg = f"Failed after 3 retries: {original_error!s}" - return _make_error_response(error_msg, url) + return {"status": "error", "error": error_msg, "url": url} logger.error(f"Unexpected error in web_fetch for {url}: {e}") - return _make_error_response(f"Unexpected error: {e!s}", url) + return {"status": "error", "error": f"Unexpected error: {e!s}", "url": url} + +async def web_fetch(url: str, query: str, max_pages: int = 10) -> dict[str, Any]: + """Fetch and extract information from web pages and PDFs using LLM. -async def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: - """Fetch HTML pages and PDFs. Use this for ANY PDF URL. + This tool implements Claude Code's WebFetch architecture: + 1. Fetches the URL (HTML or PDF) + 2. Converts to markdown/text + 3. Uses an LLM to extract only the information specified in query + 4. Returns extracted info (~200-1000 tokens) instead of full content - Retrieves complete content from URLs including HTML pages (converted to text) - and PDF documents (text extracted). Returns full content ready to analyze. + This prevents context overflow by avoiding accumulation of large web page + content in the agent's context window. For data files like CSV or XLSX that need line-by-line searching, use fetch_file instead. @@ -209,30 +361,44 @@ async def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: ---------- url : str The URL to fetch. Must be a valid HTTP or HTTPS URL. + query : str + What information to extract from the page. Be specific about what + you need (e.g., "publication date and main findings", "contact email + and phone number", "list of product features"). max_pages : int, optional For PDFs, maximum number of pages to extract (default 10). Returns ------- dict - On success: 'status', 'url', 'content', 'content_type', - 'content_length', 'truncated'. For PDFs also includes: - 'num_pages', 'pages_extracted'. On error: 'status', 'error', 'url'. + On success: 'status', 'extracted_info', 'url', 'query', + 'source_was_truncated', 'extraction_model', 'input_tokens', + 'output_tokens'. On error: 'status', 'error', 'url', 'query'. Examples -------- - >>> # Fetch an HTML page - >>> result = await web_fetch("https://example.com/about") - >>> print(result["content"]) - - >>> # Fetch a PDF - >>> result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf") - >>> print(f"Pages: {result['num_pages']}") - >>> print(result["content"]) + >>> # Extract specific info from a webpage + >>> result = await web_fetch( + ... "https://example.com/about", + ... query="company founding date and CEO name", + ... ) + >>> print(result["extracted_info"]) + + >>> # Extract from a PDF + >>> result = await web_fetch( + ... "https://arxiv.org/pdf/2301.00234.pdf", + ... query="main research findings and datasets used", + ... ) + >>> print(result["extracted_info"]) """ # Validate URL if not url.startswith(("http://", "https://")): - return _make_error_response("Invalid URL. Must start with http:// or https://", url) + return { + "status": "error", + "error": "Invalid URL. Must start with http:// or https://", + "url": url, + "query": query, + } try: async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: @@ -242,37 +408,36 @@ async def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: # Handle PDF documents if "application/pdf" in content_type or url.lower().endswith(".pdf"): - return _handle_pdf_response(response.content, max_pages, final_url, url) + pdf_text, num_pages = _extract_pdf_text(response.content, max_pages) + # Truncate if needed before LLM extraction + pdf_text, _ = _truncate_content(pdf_text, MAX_FETCH_CHARS) + + logger.info(f"Extracted {len(pdf_text)} chars from PDF ({num_pages} pages), extracting info...") + result = await _extract_information_with_llm(pdf_text, query, final_url) + # Add PDF metadata + if result["status"] == "success": + result["content_type"] = "application/pdf" + result["num_pages"] = num_pages + result["pages_extracted"] = min(num_pages, max_pages) + return result # Handle HTML and text content if "text/html" in content_type or content_type == "": text = _html_to_markdown(response.text, base_url=final_url) else: text = response.text - text, truncated = _truncate_content(text) - return _make_success_response(final_url, text, content_type or "text/html", truncated) - - except Exception as e: - return _handle_fetch_error(e, url) + # Truncate if needed before LLM extraction + text, _ = _truncate_content(text, MAX_FETCH_CHARS) + logger.info(f"Fetched {len(text)} chars from {final_url}, extracting info...") + result = await _extract_information_with_llm(text, query, final_url) + if result["status"] == "success": + result["content_type"] = content_type or "text/html" + return result -def _handle_pdf_response(content: bytes, max_pages: int, final_url: str, url: str) -> dict[str, Any]: - """Handle PDF content extraction and response creation.""" - try: - text, num_pages = _extract_pdf_text(content, max_pages) - text, truncated = _truncate_content(text) - - return _make_success_response( - final_url, - text, - "application/pdf", - truncated, - num_pages=num_pages, - pages_extracted=min(num_pages, max_pages), - ) except Exception as e: - return _make_error_response(f"Failed to extract PDF text: {e!s}", url) + return _handle_fetch_error(e, url) def create_web_fetch_tool() -> FunctionTool: diff --git a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py index d0e83f4..4747d10 100644 --- a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py +++ b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py @@ -76,9 +76,11 @@ class TestWebFetch: """Tests for the web_fetch function.""" @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_html_success(self, mock_client_class): - """Test successful HTML fetch returns content.""" + async def test_fetch_html_success(self, mock_http_client_class, mock_client_manager_class): + """Test successful HTML fetch with LLM extraction.""" + # Mock HTTP response mock_response = MagicMock() mock_response.text = "

Hello World

" mock_response.headers = {"content-type": "text/html"} @@ -87,23 +89,46 @@ async def test_fetch_html_success(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client - result = await web_fetch("https://example.com") + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "Extracted: Hello World" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 100 + mock_llm_response.usage.completion_tokens = 20 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com", query="get the main message") assert result["status"] == "success" - assert "content" in result - assert "Hello World" in result["content"] + assert "extracted_info" in result + assert "Hello World" in result["extracted_info"] assert result["content_type"] == "text/html" + assert result["query"] == "get the main message" + assert result["input_tokens"] == 100 + assert result["output_tokens"] == 20 @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_pdf_success(self, mock_client_class): - """Test that PDF content is extracted successfully.""" + async def test_fetch_pdf_success(self, mock_http_client_class, mock_client_manager_class): + """Test that PDF content is extracted and processed with LLM.""" # Create a PDF with text writer = PdfWriter() writer.add_blank_page(width=200, height=200) @@ -119,23 +144,45 @@ async def test_fetch_pdf_success(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "PDF summary information" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 150 + mock_llm_response.usage.completion_tokens = 30 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) - result = await web_fetch("https://example.com/doc.pdf") + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com/doc.pdf", query="summarize the document") assert result["status"] == "success" assert result["content_type"] == "application/pdf" assert "num_pages" in result assert result["num_pages"] >= 1 + assert "extracted_info" in result + assert "PDF summary" in result["extracted_info"] @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_returns_content_length(self, mock_client_class): - """Test that fetch returns content length.""" + async def test_fetch_returns_token_counts(self, mock_http_client_class, mock_client_manager_class): + """Test that fetch returns token usage from LLM extraction.""" long_text = "A" * 10000 mock_response = MagicMock() mock_response.text = f"

{long_text}

" @@ -145,25 +192,45 @@ async def test_fetch_returns_content_length(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction with realistic token counts for large input + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "This page contains repeated A characters" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 2500 # Large input + mock_llm_response.usage.completion_tokens = 25 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" - result = await web_fetch("https://example.com") + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com", query="what is on this page") assert result["status"] == "success" - # Content length should include the 10000 As (may have some markdown formatting) - assert result["content_length"] >= 10000 - assert not result["truncated"] + assert result["input_tokens"] == 2500 + assert result["output_tokens"] == 25 + assert result["source_was_truncated"] is False @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_truncates_large_content(self, mock_client_class): - """Test that very large content is truncated.""" - # Create content larger than MAX_CONTENT_CHARS (100KB) - large_text = "A" * 150_000 + async def test_fetch_truncates_large_content(self, mock_http_client_class, mock_client_manager_class): + """Test that very large content is truncated before LLM extraction.""" + # Create content larger than MAX_FETCH_CHARS (200KB) + large_text = "A" * 250_000 mock_response = MagicMock() mock_response.text = f"{large_text}" mock_response.headers = {"content-type": "text/html"} @@ -172,24 +239,43 @@ async def test_fetch_truncates_large_content(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "Content was truncated but extracted" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 25000 + mock_llm_response.usage.completion_tokens = 50 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager - result = await web_fetch("https://example.com") + result = await web_fetch("https://example.com", query="extract info") assert result["status"] == "success" - assert result["truncated"] is True - assert "[Content truncated" in result["content"] + assert result["source_was_truncated"] is True @pytest.mark.asyncio async def test_fetch_invalid_url(self): """Test that invalid URLs return error.""" - result = await web_fetch("not-a-url") + result = await web_fetch("not-a-url", query="get info") assert result["status"] == "error" assert "Invalid URL" in result["error"] + assert result["query"] == "get info" class TestCreateWebFetchTool: @@ -349,53 +435,75 @@ async def __aexit__(self, *_args): @pytest.mark.integration_test class TestWebFetchIntegration: - """Integration tests for web_fetch (requires network). + """Integration tests for web_fetch (requires network and API keys). These tests verify that web_fetch works correctly for both HTML pages - and PDF documents, returning content suitable for the agent to analyze. + and PDF documents, using LLM extraction to return only relevant information + instead of full page content. """ @pytest.mark.asyncio - async def test_fetch_html_page_returns_readable_content(self): - """Test that HTML pages are converted to readable markdown.""" - result = await web_fetch("https://www.iana.org/help/example-domains") + async def test_fetch_html_page_with_extraction(self): + """Test that HTML pages are fetched and information is extracted via LLM.""" + result = await web_fetch( + "https://www.iana.org/help/example-domains", query="What is the purpose of example.com domain?" + ) assert result["status"] == "success" assert result["content_type"] == "text/html" or "html" in result["content_type"].lower() - # Verify content is markdown (no raw HTML tags) - content = result["content"] - assert "" not in content.lower() - assert "" not in content.lower() + # Verify extracted info is returned (not raw content) + assert "extracted_info" in result + extracted = result["extracted_info"] - # Verify content has meaningful text - assert len(content) > 100 - assert "example" in content.lower() + # Extracted info should be concise (not full page) + assert len(extracted) < 5000 # Should be much smaller than full page + assert len(extracted) > 50 # But should have meaningful content - # Verify links are preserved in markdown format (if any exist) - # The page should have links that are converted to [text](url) format - if "http" in content: - # Links should be in markdown format, not raw tags - assert " 0 + assert result["output_tokens"] > 0 + + # Verify query is echoed back + assert result["query"] == "What is the purpose of example.com domain?" @pytest.mark.asyncio - async def test_fetch_pdf_extracts_text(self): - """Test that PDF content is extracted as searchable text.""" - result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", max_pages=2) + async def test_fetch_pdf_with_extraction(self): + """Test that PDF content is extracted and processed via LLM.""" + result = await web_fetch( + "https://arxiv.org/pdf/2301.00234.pdf", query="What is the main research contribution?", max_pages=2 + ) assert result["status"] == "success" assert result["content_type"] == "application/pdf" assert result["num_pages"] > 0 - # Verify extracted text is substantial - content = result["content"] - assert len(content) > 500 + # Verify extracted info instead of raw text + assert "extracted_info" in result + extracted = result["extracted_info"] + + # Extracted info should be concise + assert len(extracted) < 5000 + assert len(extracted) > 50 - # Verify page markers are present - assert "--- Page" in content + # Verify PDF metadata + assert result["pages_extracted"] <= 2 + + # Verify token usage + assert result["input_tokens"] > 0 + assert result["output_tokens"] > 0 @pytest.mark.asyncio async def test_fetch_pdf_pagination(self): - """Test that PDF max_pages parameter limits extraction.""" - result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", max_pages=1) + """Test that PDF max_pages parameter limits extraction before LLM processing.""" + result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", query="summarize the abstract", max_pages=1) assert result["status"] == "success" assert result["pages_extracted"] == 1 assert result["num_pages"] >= 1 + + # Even with 1 page, should get meaningful extraction + assert "extracted_info" in result + assert len(result["extracted_info"]) > 20 diff --git a/implementations/knowledge_qa/evaluate.py b/implementations/knowledge_qa/evaluate.py index ff55b68..5234285 100644 --- a/implementations/knowledge_qa/evaluate.py +++ b/implementations/knowledge_qa/evaluate.py @@ -4,26 +4,36 @@ results using the DeepSearchQA LLM-as-judge methodology. Results are automatically logged to Langfuse for analysis and comparison. +Optionally, trace-level groundedness evaluation can be enabled to check if agent +outputs are supported by tool observations. + Usage: # Run a full evaluation python evaluate.py # Run with custom dataset and experiment name python evaluate.py --dataset-name "MyDataset" --experiment-name "v2-test" + + # Enable trace groundedness evaluation + ENABLE_TRACE_GROUNDEDNESS=true python evaluate.py """ import asyncio import logging +import os from typing import Any import click from aieng.agent_evals.async_client_manager import AsyncClientManager -from aieng.agent_evals.evaluation import run_experiment +from aieng.agent_evals.evaluation import run_experiment, run_experiment_with_trace_evals +from aieng.agent_evals.evaluation.graders import create_trace_groundedness_evaluator +from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig +from aieng.agent_evals.evaluation.types import EvaluationResult from aieng.agent_evals.knowledge_qa.agent import KnowledgeGroundedAgent -from aieng.agent_evals.knowledge_qa.judges import DeepSearchQAJudge, DeepSearchQAResult +from aieng.agent_evals.knowledge_qa.judges import DeepSearchQAResult, evaluate_deepsearchqa_async from aieng.agent_evals.logging_config import setup_logging from dotenv import load_dotenv -from langfuse.experiment import Evaluation +from langfuse.experiment import Evaluation, ExperimentResult load_dotenv(verbose=True) @@ -34,24 +44,8 @@ DEFAULT_DATASET_NAME = "DeepSearchQA-Subset" DEFAULT_EXPERIMENT_NAME = "Knowledge Agent Evaluation" -# Module-level lazy judge instance -_judge: DeepSearchQAJudge | None = None - - -def _get_judge() -> DeepSearchQAJudge: - """Get or create the shared DeepSearchQA Judge instance.""" - global _judge # noqa: PLW0603 - if _judge is None: - _judge = DeepSearchQAJudge() - return _judge - - -def _close_judge() -> None: - """Close the shared judge instance to clean up resources.""" - global _judge # noqa: PLW0603 - if _judge is not None: - _judge.close() - _judge = None +# Configuration for trace groundedness evaluation +ENABLE_TRACE_GROUNDEDNESS = os.getenv("ENABLE_TRACE_GROUNDEDNESS", "false").lower() in ("true", "1", "yes") async def agent_task(*, item: Any, **kwargs: Any) -> dict[str, Any]: # noqa: ARG001 @@ -97,6 +91,9 @@ async def deepsearchqa_evaluator( ) -> list[Evaluation]: """Evaluate the agent's response using DeepSearchQA methodology. + This evaluator uses the modern async infrastructure with shared client + management and retry logic. + Parameters ---------- input : str @@ -121,12 +118,13 @@ async def deepsearchqa_evaluator( logger.info(f"Evaluating response (answer_type: {answer_type})...") try: - judge = _get_judge() - _, result = await judge.evaluate_with_details_async( + # Use the modern async evaluator with default config + result = await evaluate_deepsearchqa_async( question=input, answer=output_text, ground_truth=expected_output, answer_type=answer_type, + model_config=LLMRequestConfig(temperature=0.0), ) evaluations = result.to_evaluations() @@ -142,7 +140,8 @@ async def run_evaluation( dataset_name: str, experiment_name: str, max_concurrency: int = 1, -) -> None: + enable_trace_groundedness: bool = False, +) -> ExperimentResult | EvaluationResult: """Run the full evaluation experiment. Parameters @@ -153,29 +152,67 @@ async def run_evaluation( Name for this experiment run. max_concurrency : int, optional Maximum concurrent agent runs, by default 1. + enable_trace_groundedness : bool, optional + Whether to enable trace-level groundedness evaluation, by default False. """ client_manager = AsyncClientManager.get_instance() try: logger.info(f"Starting experiment '{experiment_name}' on dataset '{dataset_name}'") logger.info(f"Max concurrency: {max_concurrency}") - - result = run_experiment( - dataset_name=dataset_name, - name=experiment_name, - description="Knowledge Agent evaluation with DeepSearchQA judge", - task=agent_task, - evaluators=[deepsearchqa_evaluator], - max_concurrency=max_concurrency, - ) + logger.info(f"Trace groundedness: {'enabled' if enable_trace_groundedness else 'disabled'}") + + result: ExperimentResult | EvaluationResult + if enable_trace_groundedness: + # Create trace groundedness evaluator + # Only consider web_fetch and google_search tools as evidence + groundedness_evaluator = create_trace_groundedness_evaluator( + name="trace_groundedness", + model_config=LLMRequestConfig(temperature=0.0), + max_tool_observations=10, # Limit context size + ) + + # Run with trace evaluations + result = run_experiment_with_trace_evals( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge and trace groundedness", + task=agent_task, + evaluators=[deepsearchqa_evaluator], # Item-level evaluators + trace_evaluators=[groundedness_evaluator], # Trace-level evaluators + max_concurrency=max_concurrency, + ) + else: + # Run without trace evaluations + result = run_experiment( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge", + task=agent_task, + evaluators=[deepsearchqa_evaluator], + max_concurrency=max_concurrency, + ) logger.info("Experiment complete!") - logger.info(result.format().replace("\\n", "\n")) + # Handle both ExperimentResult and EvaluationResult + if isinstance(result, EvaluationResult): + # EvaluationResult from run_experiment_with_trace_evals + logger.info(f"Results: {result.experiment}") + if result.trace_evaluations: + trace_evals = result.trace_evaluations + logger.info( + f"Trace evaluations: {len(trace_evals.evaluations_by_trace_id)} traces, " + f"{len(trace_evals.skipped_trace_ids)} skipped, {len(trace_evals.failed_trace_ids)} failed" + ) + else: + # ExperimentResult from run_experiment + logger.info(f"Results: {result}") + + return result finally: logger.info("Closing client manager and flushing data...") try: - _close_judge() await client_manager.close() await asyncio.sleep(0.1) logger.info("Cleanup complete") @@ -200,9 +237,22 @@ async def run_evaluation( type=int, help="Maximum concurrent agent runs (default: 1).", ) -def cli(dataset_name: str, experiment_name: str, max_concurrency: int) -> None: +@click.option( + "--enable-trace-groundedness", + is_flag=True, + default=ENABLE_TRACE_GROUNDEDNESS, + help="Enable trace-level groundedness evaluation.", +) +def cli(dataset_name: str, experiment_name: str, max_concurrency: int, enable_trace_groundedness: bool) -> None: """Run Knowledge Agent evaluation using Langfuse experiments.""" - asyncio.run(run_evaluation(dataset_name, experiment_name, max_concurrency)) + asyncio.run( + run_evaluation( + dataset_name, + experiment_name, + max_concurrency, + enable_trace_groundedness, + ) + ) if __name__ == "__main__": From 39e89c82870c8b37c33101a0a616de17e3cd5ef5 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 11 Feb 2026 12:04:03 -0500 Subject: [PATCH 6/9] Fixes based on code review --- .../aieng/agent_evals/knowledge_qa/judges.py | 136 ++++--- .../agent_evals/knowledge_qa/test_judges.py | 335 ++++++++++++++++++ 2 files changed, 400 insertions(+), 71 deletions(-) create mode 100644 aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py index b3763a7..ed29157 100644 --- a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py +++ b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py @@ -10,6 +10,7 @@ import asyncio import logging +from enum import Enum from typing import TYPE_CHECKING, Any from aieng.agent_evals.async_client_manager import AsyncClientManager @@ -27,25 +28,26 @@ logger = logging.getLogger(__name__) -class JudgeResult(BaseModel): - """Result from an LLM judge evaluation. +class EvaluationOutcome(str, Enum): + """Possible outcomes for DeepSearchQA evaluation. - Attributes - ---------- - dimension : str - The evaluation dimension (e.g., "comprehensiveness", "causal_chain"). - score : float - Score on a 1-5 scale. - explanation : str - Detailed explanation of the score. - evidence : list[str] - Specific examples supporting the score. + The four disjoint categories represent the relationship between + the submitted answer set (S) and ground truth set (G). """ - dimension: str - score: float # 1-5 scale - explanation: str = "" - evidence: list[str] = Field(default_factory=list) + FULLY_CORRECT = "fully_correct" + CORRECT_WITH_EXTRANEOUS = "correct_with_extraneous" + PARTIALLY_CORRECT = "partially_correct" + FULLY_INCORRECT = "fully_incorrect" + + +class JudgeResult(BaseModel): + """Result from an LLM judge evaluation.""" + + dimension: str = Field(description='The evaluation dimension (e.g., "comprehensiveness", "causal_chain")') + score: float = Field(description="Score on a 1-5 scale") + explanation: str = Field(default="", description="Detailed explanation of the score") + evidence: list[str] = Field(default_factory=list, description="Specific examples supporting the score") class DeepSearchQAResult(BaseModel): @@ -53,36 +55,28 @@ class DeepSearchQAResult(BaseModel): This follows the official DeepSearchQA evaluation methodology from: https://www.kaggle.com/benchmarks/google/dsqa - - Attributes - ---------- - precision : float - Fraction of predicted items that are correct (0-1). - P = |S ∩ G| / |S| - recall : float - Fraction of ground truth items that were found (0-1). - R = |S ∩ G| / |G| - f1_score : float - Harmonic mean of precision and recall (0-1). - F1 = 2 * P * R / (P + R) - outcome : str - One of: "fully_correct", "correct_with_extraneous", - "partially_correct", "fully_incorrect". - correctness_details : dict[str, bool] - For each ground truth item, whether it was found in the response. - extraneous_items : list[str] - Items in the response that are not in the ground truth. - explanation : str - Explanation from the judge about the evaluation. """ - precision: float = 0.0 - recall: float = 0.0 - f1_score: float = 0.0 - outcome: str = "fully_incorrect" - correctness_details: dict[str, bool] = Field(default_factory=dict) - extraneous_items: list[str] = Field(default_factory=list) - explanation: str = "" + precision: float = Field( + default=0.0, description="Fraction of predicted items that are correct (0-1). P = |S ∩ G| / |S|" + ) + recall: float = Field( + default=0.0, description="Fraction of ground truth items that were found (0-1). R = |S ∩ G| / |G|" + ) + f1_score: float = Field( + default=0.0, description="Harmonic mean of precision and recall (0-1). F1 = 2 * P * R / (P + R)" + ) + outcome: EvaluationOutcome = Field( + default=EvaluationOutcome.FULLY_INCORRECT, + description="Evaluation outcome indicating the relationship between submitted and ground truth answer sets", + ) + correctness_details: dict[str, bool] = Field( + default_factory=dict, description="For each ground truth item, whether it was found in the response" + ) + extraneous_items: list[str] = Field( + default_factory=list, description="Items in the response that are not in the ground truth" + ) + explanation: str = Field(default="", description="Explanation from the judge about the evaluation") def to_evaluations(self) -> list[Evaluation]: """Convert this result to Langfuse Evaluation objects. @@ -113,16 +107,16 @@ def to_evaluations(self) -> list[Evaluation]: comment = "\n".join(comment_parts) outcome_display = { - "fully_correct": "Fully Correct", - "correct_with_extraneous": "Correct with Extraneous", - "partially_correct": "Partially Correct", - "fully_incorrect": "Fully Incorrect", + EvaluationOutcome.FULLY_CORRECT: "Fully Correct", + EvaluationOutcome.CORRECT_WITH_EXTRANEOUS: "Correct with Extraneous", + EvaluationOutcome.PARTIALLY_CORRECT: "Partially Correct", + EvaluationOutcome.FULLY_INCORRECT: "Fully Incorrect", } return [ Evaluation( name="Outcome", - value=outcome_display.get(self.outcome, self.outcome), + value=outcome_display.get(self.outcome, self.outcome.value), comment=self.explanation, ), Evaluation(name="F1", value=self.f1_score, comment=comment), @@ -156,17 +150,17 @@ class DeepSearchQAGraderResponse(BaseModel): """Structured response from the DeepSearchQA grader. This matches the official DeepSearchQA grader output format. - - Attributes - ---------- - answer_correctness : dict[str, Any] - Dictionary containing: - - Explanation: str - Explanation of the evaluation - - Correctness Details: dict[str, bool] - Per-item correctness - - Excessive Answers: list[str] - Extra items not in ground truth """ - answer_correctness: dict[str, Any] = Field(alias="Answer Correctness") + answer_correctness: dict[str, Any] = Field( + alias="Answer Correctness", + description=( + "Dictionary containing: " + "Explanation (str) - Explanation of the evaluation; " + "Correctness Details (dict[str, bool]) - Per-item correctness; " + "Excessive Answers (list[str]) - Extra items not in ground truth" + ), + ) # Official DeepSearchQA grader prompt from Appendix A of the paper @@ -293,13 +287,13 @@ def _calculate_metrics_from_grader( # Determine outcome based on set relationships if num_matched == num_ground_truth and num_extraneous == 0: - outcome = "fully_correct" + outcome = EvaluationOutcome.FULLY_CORRECT elif num_matched == num_ground_truth and num_extraneous > 0: - outcome = "correct_with_extraneous" + outcome = EvaluationOutcome.CORRECT_WITH_EXTRANEOUS elif num_matched > 0: - outcome = "partially_correct" + outcome = EvaluationOutcome.PARTIALLY_CORRECT else: - outcome = "fully_incorrect" + outcome = EvaluationOutcome.FULLY_INCORRECT return DeepSearchQAResult( precision=precision, @@ -376,7 +370,7 @@ async def evaluate_deepsearchqa_async( precision=0.0, recall=0.0, f1_score=0.0, - outcome="fully_incorrect", + outcome=EvaluationOutcome.FULLY_INCORRECT, correctness_details={}, extraneous_items=[], explanation=f"Grader error: {e}", @@ -494,12 +488,12 @@ def evaluate( return JudgeResult( dimension=self.dimension, score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}. {result.explanation}", + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}. {result.explanation}", evidence=[ f"Precision: {result.precision:.2f}", f"Recall: {result.recall:.2f}", f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome}", + f"Outcome: {result.outcome.value}", f"Correctness: {result.correctness_details}", f"Extraneous: {result.extraneous_items}", ], @@ -526,12 +520,12 @@ async def evaluate_async( return JudgeResult( dimension=self.dimension, score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}. {result.explanation}", + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}. {result.explanation}", evidence=[ f"Precision: {result.precision:.2f}", f"Recall: {result.recall:.2f}", f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome}", + f"Outcome: {result.outcome.value}", f"Correctness: {result.correctness_details}", f"Extraneous: {result.extraneous_items}", ], @@ -587,12 +581,12 @@ def evaluate_with_details( judge_result = JudgeResult( dimension=self.dimension, score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}", + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}", evidence=[ f"Precision: {result.precision:.2f}", f"Recall: {result.recall:.2f}", f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome}", + f"Outcome: {result.outcome.value}", ], ) @@ -636,12 +630,12 @@ async def evaluate_with_details_async( judge_result = JudgeResult( dimension=self.dimension, score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome}", + explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}", evidence=[ f"Precision: {result.precision:.2f}", f"Recall: {result.recall:.2f}", f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome}", + f"Outcome: {result.outcome.value}", ], ) diff --git a/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py b/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py new file mode 100644 index 0000000..55539c0 --- /dev/null +++ b/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py @@ -0,0 +1,335 @@ +"""Tests for the LLM-as-judge evaluators.""" + +from unittest.mock import MagicMock, patch + +import pytest +from aieng.agent_evals.knowledge_qa.judges import ( + DeepSearchQAJudge, + DeepSearchQAResult, + EvaluationOutcome, + JudgeResult, + _calculate_metrics_from_grader, +) +from pydantic import SecretStr + + +class TestJudgeResult: + """Tests for the JudgeResult model.""" + + def test_judge_result_creation(self): + """Test creating a judge result.""" + result = JudgeResult( + dimension="comprehensiveness", + score=4.5, + explanation="Good coverage of all aspects", + evidence=["Covered point A", "Covered point B"], + ) + assert result.dimension == "comprehensiveness" + assert result.score == 4.5 + assert result.explanation == "Good coverage of all aspects" + assert len(result.evidence) == 2 + + def test_judge_result_defaults(self): + """Test default values for judge result.""" + result = JudgeResult(dimension="test", score=3.0) + assert result.explanation == "" + assert result.evidence == [] + + +class TestDeepSearchQAResult: + """Tests for the DeepSearchQAResult model.""" + + def test_result_creation(self): + """Test creating a DeepSearchQA result.""" + result = DeepSearchQAResult( + precision=0.8, + recall=0.9, + f1_score=0.847, + outcome=EvaluationOutcome.CORRECT_WITH_EXTRANEOUS, + correctness_details={"item1": True, "item2": True, "item3": False}, + extraneous_items=["extra1"], + explanation="Found 2 out of 3 items with 1 extraneous", + ) + assert result.precision == 0.8 + assert result.recall == 0.9 + assert result.f1_score == 0.847 + assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS + assert result.correctness_details["item1"] is True + assert len(result.extraneous_items) == 1 + + def test_result_defaults(self): + """Test default values.""" + result = DeepSearchQAResult() + assert result.precision == 0.0 + assert result.recall == 0.0 + assert result.f1_score == 0.0 + assert result.outcome == EvaluationOutcome.FULLY_INCORRECT + assert result.correctness_details == {} + assert result.extraneous_items == [] + + +class TestCalculateMetrics: + """Tests for the _calculate_metrics_from_grader function.""" + + def test_calculate_metrics_perfect_match(self): + """Test metrics calculation with perfect match (fully_correct).""" + # Simulate grader output for perfect match + grader_result = { + "Explanation": "All items found correctly", + "Correctness Details": {"A": True, "B": True, "C": True}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 1.0 + assert result.recall == 1.0 + assert result.f1_score == 1.0 + assert result.outcome == EvaluationOutcome.FULLY_CORRECT + + def test_calculate_metrics_with_extraneous(self): + """Test metrics calculation with extraneous items (correct_with_extraneous).""" + # Simulate grader output: all ground truth found + extra item + grader_result = { + "Explanation": "All items found but includes extra", + "Correctness Details": {"A": True, "B": True, "C": True}, + "Excessive Answers": ["D"], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 0.75 # 3/(3+1) + assert result.recall == 1.0 # 3/3 + assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS + assert "D" in result.extraneous_items + + def test_calculate_metrics_with_missed(self): + """Test metrics calculation with missed items (partially_correct).""" + # Simulate grader output: only 2 of 3 ground truth found + grader_result = { + "Explanation": "Found A and B but missed C", + "Correctness Details": {"A": True, "B": True, "C": False}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 1.0 # 2/2 (no extraneous) + assert result.recall == pytest.approx(2 / 3) # 2/3 + assert result.outcome == EvaluationOutcome.PARTIALLY_CORRECT + assert result.correctness_details["C"] is False + + def test_calculate_metrics_fully_incorrect(self): + """Test metrics calculation with no matches (fully_incorrect).""" + # Simulate grader output: no correct items + grader_result = { + "Explanation": "No correct items found", + "Correctness Details": {"A": False, "B": False}, + "Excessive Answers": ["X", "Y"], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 0.0 + assert result.recall == 0.0 + assert result.f1_score == 0.0 + assert result.outcome == EvaluationOutcome.FULLY_INCORRECT + + def test_calculate_metrics_empty_ground_truth(self): + """Test metrics calculation with empty ground truth.""" + # Edge case: no ground truth items + grader_result = { + "Explanation": "No ground truth to check", + "Correctness Details": {}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.recall == 1.0 # Edge case handling + assert result.outcome == EvaluationOutcome.FULLY_CORRECT + + +@pytest.fixture +def mock_configs(): + """Fixture to mock the Configs class.""" + mock_config = MagicMock() + mock_config.openai_api_key = SecretStr("test-api-key") + mock_config.default_evaluator_model = "gemini-2.5-pro" + mock_config.default_evaluator_temperature = 0.0 + return mock_config + + +@patch("aieng.agent_evals.knowledge_qa.judges.Configs") +class TestDeepSearchQAJudge: + """Tests for the DeepSearchQAJudge.""" + + @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") + def test_evaluate_full(self, mock_evaluate_async, mock_configs_cls, mock_configs): + """Test full evaluation flow.""" + # Configure the Configs mock to return our mock_configs + mock_configs_cls.return_value = mock_configs + + # Mock the async evaluator to return a result + mock_result = DeepSearchQAResult( + precision=1.0, + recall=1.0, + f1_score=1.0, + outcome=EvaluationOutcome.FULLY_CORRECT, + correctness_details={"USA": True, "UK": True}, + extraneous_items=[], + explanation="Both USA and UK found correctly", + ) + mock_evaluate_async.return_value = mock_result + + judge = DeepSearchQAJudge() + result = judge.evaluate( + question="Name two G7 countries", + answer="USA and UK", + ground_truth="USA, UK", + answer_type="Set Answer", + ) + + assert result.dimension == "deepsearchqa" + assert result.score == 5.0 # F1=1.0 -> score=5 + assert "Precision: 1.00" in result.evidence[0] + assert "Recall: 1.00" in result.evidence[1] + assert EvaluationOutcome.FULLY_CORRECT.value in result.evidence[3] + + @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") + def test_evaluate_partial_match(self, mock_evaluate_async, mock_configs_cls, mock_configs): + """Test evaluation with partial match.""" + # Configure the Configs mock + mock_configs_cls.return_value = mock_configs + + # Mock partial match result + mock_result = DeepSearchQAResult( + precision=1.0, + recall=2 / 3, + f1_score=0.8, + outcome=EvaluationOutcome.PARTIALLY_CORRECT, + correctness_details={"George Washington": True, "John Adams": True, "Thomas Jefferson": False}, + extraneous_items=[], + explanation="Found Washington and Adams, missed Jefferson", + ) + mock_evaluate_async.return_value = mock_result + + judge = DeepSearchQAJudge() + result = judge.evaluate( + question="Name the first three US presidents", + answer="George Washington and John Adams", + ground_truth="George Washington, John Adams, Thomas Jefferson", + answer_type="Set Answer", + ) + + assert result.dimension == "deepsearchqa" + assert EvaluationOutcome.PARTIALLY_CORRECT.value in result.evidence[3] + assert result.score < 5.0 # Not perfect + + @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") + def test_evaluate_with_details(self, mock_evaluate_async, mock_configs_cls, mock_configs): + """Test evaluation with detailed results.""" + # Configure the Configs mock + mock_configs_cls.return_value = mock_configs + + mock_result = DeepSearchQAResult( + precision=1.0, + recall=1.0, + f1_score=1.0, + outcome=EvaluationOutcome.FULLY_CORRECT, + correctness_details={"Paris": True}, + extraneous_items=[], + explanation="Correct answer found", + ) + mock_evaluate_async.return_value = mock_result + + judge = DeepSearchQAJudge() + judge_result, detailed_result = judge.evaluate_with_details( + question="What is the capital of France?", + answer="Paris", + ground_truth="Paris", + answer_type="Single Answer", + ) + + assert isinstance(judge_result, JudgeResult) + assert isinstance(detailed_result, DeepSearchQAResult) + assert detailed_result.f1_score == 1.0 + assert detailed_result.outcome == EvaluationOutcome.FULLY_CORRECT + + @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") + def test_evaluate_single_answer_type(self, mock_evaluate_async, mock_configs_cls, mock_configs): + """Test evaluation with Single Answer type.""" + # Configure the Configs mock + mock_configs_cls.return_value = mock_configs + + mock_result = DeepSearchQAResult( + precision=1.0, + recall=1.0, + f1_score=1.0, + outcome=EvaluationOutcome.FULLY_CORRECT, + correctness_details={"42": True}, + extraneous_items=[], + explanation="Answer is semantically equivalent", + ) + mock_evaluate_async.return_value = mock_result + + judge = DeepSearchQAJudge() + result = judge.evaluate( + question="What is the answer to life, the universe, and everything?", + answer="The answer is 42.", + ground_truth="42", + answer_type="Single Answer", + ) + + assert result.dimension == "deepsearchqa" + assert result.score == 5.0 # Perfect match + + @pytest.mark.asyncio + async def test_evaluate_async(self, mock_configs_cls, mock_configs): + """Test async evaluation.""" + # Configure the Configs mock + mock_configs_cls.return_value = mock_configs + + with patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") as mock_evaluate: + mock_result = DeepSearchQAResult( + precision=1.0, + recall=1.0, + f1_score=1.0, + outcome=EvaluationOutcome.FULLY_CORRECT, + correctness_details={"test": True}, + extraneous_items=[], + explanation="Test passed", + ) + mock_evaluate.return_value = mock_result + + judge = DeepSearchQAJudge() + result = await judge.evaluate_async( + question="Test question?", + answer="Test answer", + ground_truth="Test answer", + answer_type="Single Answer", + ) + + assert result.dimension == "deepsearchqa" + assert result.score == 5.0 + + +@pytest.mark.integration_test +class TestJudgesIntegration: + """Integration tests for judges. + + These tests require valid API keys (OPENAI_API_KEY or GOOGLE_API_KEY). + """ + + def test_deepsearchqa_judge_real(self): + """Test DeepSearchQA judge with real LLM.""" + judge = DeepSearchQAJudge() + result = judge.evaluate( + question="Name the first three US presidents", + answer="George Washington, John Adams, Thomas Jefferson", + ground_truth="George Washington, John Adams, Thomas Jefferson", + answer_type="Set Answer", + ) + + assert result.dimension == "deepsearchqa" + assert result.score >= 4.0 # Should be high for correct answer From 45ddeb14c4a261070aabf54db52b9384c803f713 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 11 Feb 2026 15:48:26 -0500 Subject: [PATCH 7/9] Small fixes to docstring --- aieng-eval-agents/aieng/agent_evals/tools/web.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/web.py b/aieng-eval-agents/aieng/agent_evals/tools/web.py index a8e3ea3..79188c3 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/web.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/web.py @@ -4,7 +4,7 @@ and uses an LLM to extract relevant information based on a query. This prevents context overflow by returning only extracted information instead of full page content. -Architecture matches Claude Code's WebFetch tool: +Architecture: 1. Fetch URL and convert to markdown/text 2. Use LLM to extract information based on query 3. Return only extracted information (~200-1000 tokens instead of 10k-100k) @@ -245,7 +245,7 @@ async def _extract_information_with_llm_with_retry( async def _extract_information_with_llm(content: str, query: str, url: str) -> dict[str, Any]: """Extract relevant information from content using an LLM. - This is the key function that implements Claude Code's WebFetch architecture. + This is the key function that implements the WebFetch architecture. Instead of returning full page content, it uses an LLM to extract only the information requested in the query. @@ -345,15 +345,12 @@ def _handle_fetch_error(e: Exception, url: str) -> dict[str, Any]: async def web_fetch(url: str, query: str, max_pages: int = 10) -> dict[str, Any]: """Fetch and extract information from web pages and PDFs using LLM. - This tool implements Claude Code's WebFetch architecture: + This tool implements WebFetch: 1. Fetches the URL (HTML or PDF) 2. Converts to markdown/text 3. Uses an LLM to extract only the information specified in query 4. Returns extracted info (~200-1000 tokens) instead of full content - This prevents context overflow by avoiding accumulation of large web page - content in the agent's context window. - For data files like CSV or XLSX that need line-by-line searching, use fetch_file instead. From 6dfc0696b4b1c91eb89ca5b87dc2993fb8b6362d Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 11 Feb 2026 15:57:38 -0500 Subject: [PATCH 8/9] Small fix --- .../aieng/agent_evals/knowledge_qa/judges.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py index ed29157..cb2d4e0 100644 --- a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py +++ b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py @@ -506,7 +506,26 @@ async def evaluate_async( ground_truth: str, answer_type: str = "Single Answer", ) -> JudgeResult: - """Async version of evaluate.""" + """Evaluate an answer using DeepSearchQA methodology (async). + + This is the async version of evaluate() for use in async contexts. + + Parameters + ---------- + question : str + The original question. + answer : str + The agent's answer. + ground_truth : str + The expected ground truth answer. + answer_type : str, optional + Type of answer: "Single Answer" or "Set Answer", by default "Single Answer". + + Returns + ------- + JudgeResult + The evaluation result with precision, recall, and F1 in evidence. + """ result = await evaluate_deepsearchqa_async( question=question, answer=answer, From c9127818b7d779ab7b3d6102ad4140c1d5399cb6 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Fri, 13 Feb 2026 16:28:18 -0500 Subject: [PATCH 9/9] Fix and simplify grader --- .../{judges.py => deepsearchqa_grader.py} | 329 +---------------- .../knowledge_qa/test_deepsearchqa_grader.py | 122 +++++++ .../agent_evals/knowledge_qa/test_judges.py | 335 ------------------ implementations/knowledge_qa/evaluate.py | 5 +- 4 files changed, 136 insertions(+), 655 deletions(-) rename aieng-eval-agents/aieng/agent_evals/knowledge_qa/{judges.py => deepsearchqa_grader.py} (53%) create mode 100644 aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_deepsearchqa_grader.py delete mode 100644 aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py similarity index 53% rename from aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py rename to aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py index cb2d4e0..3d15401 100644 --- a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/judges.py +++ b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py @@ -1,30 +1,27 @@ -"""LLM-as-judge evaluators for knowledge agent responses. +"""DeepSearchQA grader for evaluating knowledge agent responses. -This module provides comprehensive evaluation using LLM judges for the -DeepSearchQA benchmark. The implementation follows the official DeepSearchQA -evaluation methodology with precision, recall, and F1 metrics. +This module provides the official DeepSearchQA evaluation methodology using +an LLM grader (autorater) to assess answer correctness with precision, recall, +and F1 metrics. -The evaluator has been refactored to use shared evaluation infrastructure -while maintaining backward compatibility with the original API. +References +---------- +- Paper: DeepSearchQA: Bridging the Comprehensiveness Gap for Deep Research Agents +- Dataset: https://huggingface.co/datasets/google/deepsearchqa +- Leaderboard: https://www.kaggle.com/benchmarks/google/dsqa """ -import asyncio import logging from enum import Enum -from typing import TYPE_CHECKING, Any +from typing import Any from aieng.agent_evals.async_client_manager import AsyncClientManager -from aieng.agent_evals.configs import Configs from aieng.agent_evals.evaluation.graders._utils import run_structured_parse_call from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig from aieng.agent_evals.evaluation.types import Evaluation from pydantic import BaseModel, Field -if TYPE_CHECKING: - pass - - logger = logging.getLogger(__name__) @@ -41,15 +38,6 @@ class EvaluationOutcome(str, Enum): FULLY_INCORRECT = "fully_incorrect" -class JudgeResult(BaseModel): - """Result from an LLM judge evaluation.""" - - dimension: str = Field(description='The evaluation dimension (e.g., "comprehensiveness", "causal_chain")') - score: float = Field(description="Score on a 1-5 scale") - explanation: str = Field(default="", description="Detailed explanation of the score") - evidence: list[str] = Field(default_factory=list, description="Specific examples supporting the score") - - class DeepSearchQAResult(BaseModel): """Result from DeepSearchQA evaluation with IR metrics. @@ -76,7 +64,7 @@ class DeepSearchQAResult(BaseModel): extraneous_items: list[str] = Field( default_factory=list, description="Items in the response that are not in the ground truth" ) - explanation: str = Field(default="", description="Explanation from the judge about the evaluation") + explanation: str = Field(default="", description="Explanation from the grader about the evaluation") def to_evaluations(self) -> list[Evaluation]: """Convert this result to Langfuse Evaluation objects. @@ -316,7 +304,7 @@ async def evaluate_deepsearchqa_async( ) -> DeepSearchQAResult: """Evaluate an answer using DeepSearchQA methodology. - This is the modern async evaluator that uses shared infrastructure. + This async evaluator uses shared infrastructure for LLM evaluation. Parameters ---------- @@ -375,296 +363,3 @@ async def evaluate_deepsearchqa_async( extraneous_items=[], explanation=f"Grader error: {e}", ) - - -class DeepSearchQAJudge: - """Official DeepSearchQA evaluation using precision, recall, and F1. - - This judge implements the exact evaluation methodology from the DeepSearchQA - benchmark paper (Appendix A). The LLM autorater determines semantic equivalence - of answers, then precision/recall/F1 are calculated programmatically. - - The benchmark distinguishes between four disjoint categories: - 1. Fully Correct (S=G): All ground truth items present, no extraneous items - 2. Fully Incorrect (S∩G=∅): Zero correct items found - 3. Partially Correct: Some but not all ground truth items found - 4. Correct with Extraneous (G⊂S): All ground truth found but has extra items - - Metrics: - - Precision: P = |S∩G| / |S| (accuracy of submitted items) - - Recall: R = |S∩G| / |G| (exhaustiveness against ground truth) - - F1 Score: F1 = 2*P*R / (P+R) (primary ranking metric) - - Notes - ----- - This class provides backward compatibility with the original API. - Internally, it delegates to the modern async evaluator that uses - shared evaluation infrastructure. - - References - ---------- - - Paper: DeepSearchQA: Bridging the Comprehensiveness Gap for Deep Research Agents - - Dataset: https://huggingface.co/datasets/google/deepsearchqa - - Leaderboard: https://www.kaggle.com/benchmarks/google/dsqa - """ - - dimension = "deepsearchqa" - - def __init__( - self, - config: "Configs | None" = None, - model: str | None = None, - ) -> None: - """Initialize the judge. - - Parameters - ---------- - config : Configs | None, optional - Configuration settings. If None, defaults from environment are used. - model : str | None, optional - Model to use for judging. If None, default evaluator model is used. - """ - # Store config for backward compatibility - if config is None: - config = Configs() # type: ignore[call-arg] - self._config = config - - # Build model config - self._model_config = LLMRequestConfig( - model=model if model is not None else config.default_evaluator_model, - temperature=config.default_evaluator_temperature, - ) - - def evaluate( - self, - question: str, - answer: str, - ground_truth: str, - answer_type: str = "Single Answer", - ) -> JudgeResult: - """Evaluate an answer using DeepSearchQA methodology. - - This is a synchronous wrapper around the async evaluator for backward - compatibility. - - Parameters - ---------- - question : str - The original question. - answer : str - The agent's answer. - ground_truth : str - The expected ground truth answer. - answer_type : str - Type of answer: "Single Answer" or "Set Answer". - - Returns - ------- - JudgeResult - The evaluation result with precision, recall, and F1 in evidence. - """ - # Run async evaluator in event loop - try: - asyncio.get_running_loop() - # If we're already in an async context, we can't use run_until_complete - raise RuntimeError("Cannot call synchronous evaluate from async context. Use evaluate_async instead.") - except RuntimeError: - # No running loop, safe to create one - pass - - result = asyncio.run( - evaluate_deepsearchqa_async( - question=question, - answer=answer, - ground_truth=ground_truth, - answer_type=answer_type, - model_config=self._model_config, - ) - ) - - # Convert F1 to 1-5 scale for consistency with other judges - score = 1 + (result.f1_score * 4) # Maps 0-1 to 1-5 - - return JudgeResult( - dimension=self.dimension, - score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}. {result.explanation}", - evidence=[ - f"Precision: {result.precision:.2f}", - f"Recall: {result.recall:.2f}", - f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome.value}", - f"Correctness: {result.correctness_details}", - f"Extraneous: {result.extraneous_items}", - ], - ) - - async def evaluate_async( - self, - question: str, - answer: str, - ground_truth: str, - answer_type: str = "Single Answer", - ) -> JudgeResult: - """Evaluate an answer using DeepSearchQA methodology (async). - - This is the async version of evaluate() for use in async contexts. - - Parameters - ---------- - question : str - The original question. - answer : str - The agent's answer. - ground_truth : str - The expected ground truth answer. - answer_type : str, optional - Type of answer: "Single Answer" or "Set Answer", by default "Single Answer". - - Returns - ------- - JudgeResult - The evaluation result with precision, recall, and F1 in evidence. - """ - result = await evaluate_deepsearchqa_async( - question=question, - answer=answer, - ground_truth=ground_truth, - answer_type=answer_type, - model_config=self._model_config, - ) - - score = 1 + (result.f1_score * 4) - - return JudgeResult( - dimension=self.dimension, - score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}. {result.explanation}", - evidence=[ - f"Precision: {result.precision:.2f}", - f"Recall: {result.recall:.2f}", - f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome.value}", - f"Correctness: {result.correctness_details}", - f"Extraneous: {result.extraneous_items}", - ], - ) - - def evaluate_with_details( - self, - question: str, - answer: str, - ground_truth: str, - answer_type: str = "Single Answer", - ) -> tuple[JudgeResult, DeepSearchQAResult]: - """Evaluate and return both JudgeResult and detailed DeepSearchQAResult. - - Parameters - ---------- - question : str - The original question. - answer : str - The agent's answer. - ground_truth : str - The expected ground truth answer. - answer_type : str - Type of answer. - - Returns - ------- - tuple[JudgeResult, DeepSearchQAResult] - Both the standard judge result and detailed metrics. - """ - try: - asyncio.get_running_loop() - # If we're already in an async context, we can't use run_until_complete - raise RuntimeError( - "Cannot call synchronous evaluate_with_details from async context. Use evaluate_with_details_async instead." - ) - except RuntimeError: - # No running loop, safe to create one - pass - - result = asyncio.run( - evaluate_deepsearchqa_async( - question=question, - answer=answer, - ground_truth=ground_truth, - answer_type=answer_type, - model_config=self._model_config, - ) - ) - - score = 1 + (result.f1_score * 4) - - judge_result = JudgeResult( - dimension=self.dimension, - score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}", - evidence=[ - f"Precision: {result.precision:.2f}", - f"Recall: {result.recall:.2f}", - f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome.value}", - ], - ) - - return judge_result, result - - async def evaluate_with_details_async( - self, - question: str, - answer: str, - ground_truth: str, - answer_type: str = "Single Answer", - ) -> tuple[JudgeResult, DeepSearchQAResult]: - """Async version of evaluate_with_details. - - Parameters - ---------- - question : str - The original question. - answer : str - The agent's answer. - ground_truth : str - The expected ground truth answer. - answer_type : str - Type of answer. - - Returns - ------- - tuple[JudgeResult, DeepSearchQAResult] - Both the standard judge result and detailed metrics. - """ - result = await evaluate_deepsearchqa_async( - question=question, - answer=answer, - ground_truth=ground_truth, - answer_type=answer_type, - model_config=self._model_config, - ) - - score = 1 + (result.f1_score * 4) - - judge_result = JudgeResult( - dimension=self.dimension, - score=score, - explanation=f"F1: {result.f1_score:.2f}, Outcome: {result.outcome.value}", - evidence=[ - f"Precision: {result.precision:.2f}", - f"Recall: {result.recall:.2f}", - f"F1 Score: {result.f1_score:.2f}", - f"Outcome: {result.outcome.value}", - ], - ) - - return judge_result, result - - def close(self) -> None: - """Close the judge's client to clean up resources. - - Note: With the new shared client manager, cleanup is handled - centrally. This method is kept for backward compatibility. - """ - # No-op: AsyncClientManager handles cleanup - pass diff --git a/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_deepsearchqa_grader.py b/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_deepsearchqa_grader.py new file mode 100644 index 0000000..c2371fe --- /dev/null +++ b/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_deepsearchqa_grader.py @@ -0,0 +1,122 @@ +"""Tests for the DeepSearchQA grader.""" + +import pytest +from aieng.agent_evals.knowledge_qa.deepsearchqa_grader import ( + DeepSearchQAResult, + EvaluationOutcome, + _calculate_metrics_from_grader, +) + + +class TestDeepSearchQAResult: + """Tests for the DeepSearchQAResult model.""" + + def test_result_creation(self): + """Test creating a DeepSearchQA result.""" + result = DeepSearchQAResult( + precision=0.8, + recall=0.9, + f1_score=0.847, + outcome=EvaluationOutcome.CORRECT_WITH_EXTRANEOUS, + correctness_details={"item1": True, "item2": True, "item3": False}, + extraneous_items=["extra1"], + explanation="Found 2 out of 3 items with 1 extraneous", + ) + assert result.precision == 0.8 + assert result.recall == 0.9 + assert result.f1_score == 0.847 + assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS + assert result.correctness_details["item1"] is True + assert len(result.extraneous_items) == 1 + + def test_result_defaults(self): + """Test default values.""" + result = DeepSearchQAResult() + assert result.precision == 0.0 + assert result.recall == 0.0 + assert result.f1_score == 0.0 + assert result.outcome == EvaluationOutcome.FULLY_INCORRECT + assert result.correctness_details == {} + assert result.extraneous_items == [] + + +class TestCalculateMetrics: + """Tests for the _calculate_metrics_from_grader function.""" + + def test_calculate_metrics_perfect_match(self): + """Test metrics calculation with perfect match (fully_correct).""" + # Simulate grader output for perfect match + grader_result = { + "Explanation": "All items found correctly", + "Correctness Details": {"A": True, "B": True, "C": True}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 1.0 + assert result.recall == 1.0 + assert result.f1_score == 1.0 + assert result.outcome == EvaluationOutcome.FULLY_CORRECT + + def test_calculate_metrics_with_extraneous(self): + """Test metrics calculation with extraneous items (correct_with_extraneous).""" + # Simulate grader output: all ground truth found + extra item + grader_result = { + "Explanation": "All items found but includes extra", + "Correctness Details": {"A": True, "B": True, "C": True}, + "Excessive Answers": ["D"], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 0.75 # 3/(3+1) + assert result.recall == 1.0 # 3/3 + assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS + assert "D" in result.extraneous_items + + def test_calculate_metrics_with_missed(self): + """Test metrics calculation with missed items (partially_correct).""" + # Simulate grader output: only 2 of 3 ground truth found + grader_result = { + "Explanation": "Found A and B but missed C", + "Correctness Details": {"A": True, "B": True, "C": False}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 1.0 # 2/2 (no extraneous) + assert result.recall == pytest.approx(2 / 3) # 2/3 + assert result.outcome == EvaluationOutcome.PARTIALLY_CORRECT + assert result.correctness_details["C"] is False + + def test_calculate_metrics_fully_incorrect(self): + """Test metrics calculation with no matches (fully_incorrect).""" + # Simulate grader output: no correct items + grader_result = { + "Explanation": "No correct items found", + "Correctness Details": {"A": False, "B": False}, + "Excessive Answers": ["X", "Y"], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 0.0 + assert result.recall == 0.0 + assert result.f1_score == 0.0 + assert result.outcome == EvaluationOutcome.FULLY_INCORRECT + + def test_calculate_metrics_empty_ground_truth(self): + """Test metrics calculation with empty ground truth.""" + # Edge case: no ground truth items + grader_result = { + "Explanation": "No ground truth to check", + "Correctness Details": {}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.recall == 1.0 # Edge case handling + assert result.outcome == EvaluationOutcome.FULLY_CORRECT diff --git a/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py b/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py deleted file mode 100644 index 55539c0..0000000 --- a/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_judges.py +++ /dev/null @@ -1,335 +0,0 @@ -"""Tests for the LLM-as-judge evaluators.""" - -from unittest.mock import MagicMock, patch - -import pytest -from aieng.agent_evals.knowledge_qa.judges import ( - DeepSearchQAJudge, - DeepSearchQAResult, - EvaluationOutcome, - JudgeResult, - _calculate_metrics_from_grader, -) -from pydantic import SecretStr - - -class TestJudgeResult: - """Tests for the JudgeResult model.""" - - def test_judge_result_creation(self): - """Test creating a judge result.""" - result = JudgeResult( - dimension="comprehensiveness", - score=4.5, - explanation="Good coverage of all aspects", - evidence=["Covered point A", "Covered point B"], - ) - assert result.dimension == "comprehensiveness" - assert result.score == 4.5 - assert result.explanation == "Good coverage of all aspects" - assert len(result.evidence) == 2 - - def test_judge_result_defaults(self): - """Test default values for judge result.""" - result = JudgeResult(dimension="test", score=3.0) - assert result.explanation == "" - assert result.evidence == [] - - -class TestDeepSearchQAResult: - """Tests for the DeepSearchQAResult model.""" - - def test_result_creation(self): - """Test creating a DeepSearchQA result.""" - result = DeepSearchQAResult( - precision=0.8, - recall=0.9, - f1_score=0.847, - outcome=EvaluationOutcome.CORRECT_WITH_EXTRANEOUS, - correctness_details={"item1": True, "item2": True, "item3": False}, - extraneous_items=["extra1"], - explanation="Found 2 out of 3 items with 1 extraneous", - ) - assert result.precision == 0.8 - assert result.recall == 0.9 - assert result.f1_score == 0.847 - assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS - assert result.correctness_details["item1"] is True - assert len(result.extraneous_items) == 1 - - def test_result_defaults(self): - """Test default values.""" - result = DeepSearchQAResult() - assert result.precision == 0.0 - assert result.recall == 0.0 - assert result.f1_score == 0.0 - assert result.outcome == EvaluationOutcome.FULLY_INCORRECT - assert result.correctness_details == {} - assert result.extraneous_items == [] - - -class TestCalculateMetrics: - """Tests for the _calculate_metrics_from_grader function.""" - - def test_calculate_metrics_perfect_match(self): - """Test metrics calculation with perfect match (fully_correct).""" - # Simulate grader output for perfect match - grader_result = { - "Explanation": "All items found correctly", - "Correctness Details": {"A": True, "B": True, "C": True}, - "Excessive Answers": [], - } - - result = _calculate_metrics_from_grader(grader_result) - - assert result.precision == 1.0 - assert result.recall == 1.0 - assert result.f1_score == 1.0 - assert result.outcome == EvaluationOutcome.FULLY_CORRECT - - def test_calculate_metrics_with_extraneous(self): - """Test metrics calculation with extraneous items (correct_with_extraneous).""" - # Simulate grader output: all ground truth found + extra item - grader_result = { - "Explanation": "All items found but includes extra", - "Correctness Details": {"A": True, "B": True, "C": True}, - "Excessive Answers": ["D"], - } - - result = _calculate_metrics_from_grader(grader_result) - - assert result.precision == 0.75 # 3/(3+1) - assert result.recall == 1.0 # 3/3 - assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS - assert "D" in result.extraneous_items - - def test_calculate_metrics_with_missed(self): - """Test metrics calculation with missed items (partially_correct).""" - # Simulate grader output: only 2 of 3 ground truth found - grader_result = { - "Explanation": "Found A and B but missed C", - "Correctness Details": {"A": True, "B": True, "C": False}, - "Excessive Answers": [], - } - - result = _calculate_metrics_from_grader(grader_result) - - assert result.precision == 1.0 # 2/2 (no extraneous) - assert result.recall == pytest.approx(2 / 3) # 2/3 - assert result.outcome == EvaluationOutcome.PARTIALLY_CORRECT - assert result.correctness_details["C"] is False - - def test_calculate_metrics_fully_incorrect(self): - """Test metrics calculation with no matches (fully_incorrect).""" - # Simulate grader output: no correct items - grader_result = { - "Explanation": "No correct items found", - "Correctness Details": {"A": False, "B": False}, - "Excessive Answers": ["X", "Y"], - } - - result = _calculate_metrics_from_grader(grader_result) - - assert result.precision == 0.0 - assert result.recall == 0.0 - assert result.f1_score == 0.0 - assert result.outcome == EvaluationOutcome.FULLY_INCORRECT - - def test_calculate_metrics_empty_ground_truth(self): - """Test metrics calculation with empty ground truth.""" - # Edge case: no ground truth items - grader_result = { - "Explanation": "No ground truth to check", - "Correctness Details": {}, - "Excessive Answers": [], - } - - result = _calculate_metrics_from_grader(grader_result) - - assert result.recall == 1.0 # Edge case handling - assert result.outcome == EvaluationOutcome.FULLY_CORRECT - - -@pytest.fixture -def mock_configs(): - """Fixture to mock the Configs class.""" - mock_config = MagicMock() - mock_config.openai_api_key = SecretStr("test-api-key") - mock_config.default_evaluator_model = "gemini-2.5-pro" - mock_config.default_evaluator_temperature = 0.0 - return mock_config - - -@patch("aieng.agent_evals.knowledge_qa.judges.Configs") -class TestDeepSearchQAJudge: - """Tests for the DeepSearchQAJudge.""" - - @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") - def test_evaluate_full(self, mock_evaluate_async, mock_configs_cls, mock_configs): - """Test full evaluation flow.""" - # Configure the Configs mock to return our mock_configs - mock_configs_cls.return_value = mock_configs - - # Mock the async evaluator to return a result - mock_result = DeepSearchQAResult( - precision=1.0, - recall=1.0, - f1_score=1.0, - outcome=EvaluationOutcome.FULLY_CORRECT, - correctness_details={"USA": True, "UK": True}, - extraneous_items=[], - explanation="Both USA and UK found correctly", - ) - mock_evaluate_async.return_value = mock_result - - judge = DeepSearchQAJudge() - result = judge.evaluate( - question="Name two G7 countries", - answer="USA and UK", - ground_truth="USA, UK", - answer_type="Set Answer", - ) - - assert result.dimension == "deepsearchqa" - assert result.score == 5.0 # F1=1.0 -> score=5 - assert "Precision: 1.00" in result.evidence[0] - assert "Recall: 1.00" in result.evidence[1] - assert EvaluationOutcome.FULLY_CORRECT.value in result.evidence[3] - - @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") - def test_evaluate_partial_match(self, mock_evaluate_async, mock_configs_cls, mock_configs): - """Test evaluation with partial match.""" - # Configure the Configs mock - mock_configs_cls.return_value = mock_configs - - # Mock partial match result - mock_result = DeepSearchQAResult( - precision=1.0, - recall=2 / 3, - f1_score=0.8, - outcome=EvaluationOutcome.PARTIALLY_CORRECT, - correctness_details={"George Washington": True, "John Adams": True, "Thomas Jefferson": False}, - extraneous_items=[], - explanation="Found Washington and Adams, missed Jefferson", - ) - mock_evaluate_async.return_value = mock_result - - judge = DeepSearchQAJudge() - result = judge.evaluate( - question="Name the first three US presidents", - answer="George Washington and John Adams", - ground_truth="George Washington, John Adams, Thomas Jefferson", - answer_type="Set Answer", - ) - - assert result.dimension == "deepsearchqa" - assert EvaluationOutcome.PARTIALLY_CORRECT.value in result.evidence[3] - assert result.score < 5.0 # Not perfect - - @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") - def test_evaluate_with_details(self, mock_evaluate_async, mock_configs_cls, mock_configs): - """Test evaluation with detailed results.""" - # Configure the Configs mock - mock_configs_cls.return_value = mock_configs - - mock_result = DeepSearchQAResult( - precision=1.0, - recall=1.0, - f1_score=1.0, - outcome=EvaluationOutcome.FULLY_CORRECT, - correctness_details={"Paris": True}, - extraneous_items=[], - explanation="Correct answer found", - ) - mock_evaluate_async.return_value = mock_result - - judge = DeepSearchQAJudge() - judge_result, detailed_result = judge.evaluate_with_details( - question="What is the capital of France?", - answer="Paris", - ground_truth="Paris", - answer_type="Single Answer", - ) - - assert isinstance(judge_result, JudgeResult) - assert isinstance(detailed_result, DeepSearchQAResult) - assert detailed_result.f1_score == 1.0 - assert detailed_result.outcome == EvaluationOutcome.FULLY_CORRECT - - @patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") - def test_evaluate_single_answer_type(self, mock_evaluate_async, mock_configs_cls, mock_configs): - """Test evaluation with Single Answer type.""" - # Configure the Configs mock - mock_configs_cls.return_value = mock_configs - - mock_result = DeepSearchQAResult( - precision=1.0, - recall=1.0, - f1_score=1.0, - outcome=EvaluationOutcome.FULLY_CORRECT, - correctness_details={"42": True}, - extraneous_items=[], - explanation="Answer is semantically equivalent", - ) - mock_evaluate_async.return_value = mock_result - - judge = DeepSearchQAJudge() - result = judge.evaluate( - question="What is the answer to life, the universe, and everything?", - answer="The answer is 42.", - ground_truth="42", - answer_type="Single Answer", - ) - - assert result.dimension == "deepsearchqa" - assert result.score == 5.0 # Perfect match - - @pytest.mark.asyncio - async def test_evaluate_async(self, mock_configs_cls, mock_configs): - """Test async evaluation.""" - # Configure the Configs mock - mock_configs_cls.return_value = mock_configs - - with patch("aieng.agent_evals.knowledge_qa.judges.evaluate_deepsearchqa_async") as mock_evaluate: - mock_result = DeepSearchQAResult( - precision=1.0, - recall=1.0, - f1_score=1.0, - outcome=EvaluationOutcome.FULLY_CORRECT, - correctness_details={"test": True}, - extraneous_items=[], - explanation="Test passed", - ) - mock_evaluate.return_value = mock_result - - judge = DeepSearchQAJudge() - result = await judge.evaluate_async( - question="Test question?", - answer="Test answer", - ground_truth="Test answer", - answer_type="Single Answer", - ) - - assert result.dimension == "deepsearchqa" - assert result.score == 5.0 - - -@pytest.mark.integration_test -class TestJudgesIntegration: - """Integration tests for judges. - - These tests require valid API keys (OPENAI_API_KEY or GOOGLE_API_KEY). - """ - - def test_deepsearchqa_judge_real(self): - """Test DeepSearchQA judge with real LLM.""" - judge = DeepSearchQAJudge() - result = judge.evaluate( - question="Name the first three US presidents", - answer="George Washington, John Adams, Thomas Jefferson", - ground_truth="George Washington, John Adams, Thomas Jefferson", - answer_type="Set Answer", - ) - - assert result.dimension == "deepsearchqa" - assert result.score >= 4.0 # Should be high for correct answer diff --git a/implementations/knowledge_qa/evaluate.py b/implementations/knowledge_qa/evaluate.py index 5234285..892fe40 100644 --- a/implementations/knowledge_qa/evaluate.py +++ b/implementations/knowledge_qa/evaluate.py @@ -26,11 +26,11 @@ import click from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.evaluation import run_experiment, run_experiment_with_trace_evals -from aieng.agent_evals.evaluation.graders import create_trace_groundedness_evaluator +from aieng.agent_evals.evaluation.graders import create_trace_groundedness_evaluator # type: ignore[attr-defined] from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig from aieng.agent_evals.evaluation.types import EvaluationResult from aieng.agent_evals.knowledge_qa.agent import KnowledgeGroundedAgent -from aieng.agent_evals.knowledge_qa.judges import DeepSearchQAResult, evaluate_deepsearchqa_async +from aieng.agent_evals.knowledge_qa.deepsearchqa_grader import DeepSearchQAResult, evaluate_deepsearchqa_async from aieng.agent_evals.logging_config import setup_logging from dotenv import load_dotenv from langfuse.experiment import Evaluation, ExperimentResult @@ -169,7 +169,6 @@ async def run_evaluation( groundedness_evaluator = create_trace_groundedness_evaluator( name="trace_groundedness", model_config=LLMRequestConfig(temperature=0.0), - max_tool_observations=10, # Limit context size ) # Run with trace evaluations