diff --git a/aieng-eval-agents/aieng/agent_evals/configs.py b/aieng-eval-agents/aieng/agent_evals/configs.py index e6cdc5c..564ec37 100644 --- a/aieng-eval-agents/aieng/agent_evals/configs.py +++ b/aieng-eval-agents/aieng/agent_evals/configs.py @@ -113,6 +113,18 @@ class Configs(BaseSettings): default="gemini-2.5-pro", description="Model name for LLM-as-judge evaluation tasks.", ) + default_temperature: float = Field( + default=1.0, + ge=0.0, + le=2.0, + description="Default temperature for LLM generation. Lower values (0.0-0.3) produce more consistent outputs.", + ) + default_evaluator_temperature: float = Field( + default=0.0, + ge=0.0, + le=2.0, + description="Temperature for LLM-as-judge evaluations. Default 0.0 for deterministic judging.", + ) # === Tracing (Langfuse) === langfuse_public_key: str | None = Field( diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py new file mode 100644 index 0000000..3d15401 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py @@ -0,0 +1,365 @@ +"""DeepSearchQA grader for evaluating knowledge agent responses. + +This module provides the official DeepSearchQA evaluation methodology using +an LLM grader (autorater) to assess answer correctness with precision, recall, +and F1 metrics. + +References +---------- +- Paper: DeepSearchQA: Bridging the Comprehensiveness Gap for Deep Research Agents +- Dataset: https://huggingface.co/datasets/google/deepsearchqa +- Leaderboard: https://www.kaggle.com/benchmarks/google/dsqa +""" + +import logging +from enum import Enum +from typing import Any + +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.evaluation.graders._utils import run_structured_parse_call +from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig +from aieng.agent_evals.evaluation.types import Evaluation +from pydantic import BaseModel, Field + + +logger = logging.getLogger(__name__) + + +class EvaluationOutcome(str, Enum): + """Possible outcomes for DeepSearchQA evaluation. + + The four disjoint categories represent the relationship between + the submitted answer set (S) and ground truth set (G). + """ + + FULLY_CORRECT = "fully_correct" + CORRECT_WITH_EXTRANEOUS = "correct_with_extraneous" + PARTIALLY_CORRECT = "partially_correct" + FULLY_INCORRECT = "fully_incorrect" + + +class DeepSearchQAResult(BaseModel): + """Result from DeepSearchQA evaluation with IR metrics. + + This follows the official DeepSearchQA evaluation methodology from: + https://www.kaggle.com/benchmarks/google/dsqa + """ + + precision: float = Field( + default=0.0, description="Fraction of predicted items that are correct (0-1). P = |S ∩ G| / |S|" + ) + recall: float = Field( + default=0.0, description="Fraction of ground truth items that were found (0-1). R = |S ∩ G| / |G|" + ) + f1_score: float = Field( + default=0.0, description="Harmonic mean of precision and recall (0-1). F1 = 2 * P * R / (P + R)" + ) + outcome: EvaluationOutcome = Field( + default=EvaluationOutcome.FULLY_INCORRECT, + description="Evaluation outcome indicating the relationship between submitted and ground truth answer sets", + ) + correctness_details: dict[str, bool] = Field( + default_factory=dict, description="For each ground truth item, whether it was found in the response" + ) + extraneous_items: list[str] = Field( + default_factory=list, description="Items in the response that are not in the ground truth" + ) + explanation: str = Field(default="", description="Explanation from the grader about the evaluation") + + def to_evaluations(self) -> list[Evaluation]: + """Convert this result to Langfuse Evaluation objects. + + Returns + ------- + list[Evaluation] + Four evaluations: Outcome (categorical), F1, Precision, Recall (numeric). + """ + comment_parts = [ + f"Outcome: {self.outcome}", + f"Precision: {self.precision:.2f}", + f"Recall: {self.recall:.2f}", + f"F1: {self.f1_score:.2f}", + ] + + if self.explanation: + comment_parts.append(f"\nExplanation: {self.explanation}") + + if self.correctness_details: + found = sum(1 for v in self.correctness_details.values() if v) + total = len(self.correctness_details) + comment_parts.append(f"\nCorrectness: {found}/{total} items found") + + if self.extraneous_items: + comment_parts.append(f"\nExtraneous: {len(self.extraneous_items)} items") + + comment = "\n".join(comment_parts) + + outcome_display = { + EvaluationOutcome.FULLY_CORRECT: "Fully Correct", + EvaluationOutcome.CORRECT_WITH_EXTRANEOUS: "Correct with Extraneous", + EvaluationOutcome.PARTIALLY_CORRECT: "Partially Correct", + EvaluationOutcome.FULLY_INCORRECT: "Fully Incorrect", + } + + return [ + Evaluation( + name="Outcome", + value=outcome_display.get(self.outcome, self.outcome.value), + comment=self.explanation, + ), + Evaluation(name="F1", value=self.f1_score, comment=comment), + Evaluation(name="Precision", value=self.precision, comment=comment), + Evaluation(name="Recall", value=self.recall, comment=comment), + ] + + @staticmethod + def error_evaluations(error_msg: str) -> list[Evaluation]: + """Create error evaluations when evaluation fails. + + Parameters + ---------- + error_msg : str + Description of the error that occurred. + + Returns + ------- + list[Evaluation] + Three evaluations (F1, Precision, Recall) all set to 0.0. + """ + comment = f"Evaluation error: {error_msg}" + return [ + Evaluation(name="F1", value=0.0, comment=comment), + Evaluation(name="Precision", value=0.0, comment=comment), + Evaluation(name="Recall", value=0.0, comment=comment), + ] + + +class DeepSearchQAGraderResponse(BaseModel): + """Structured response from the DeepSearchQA grader. + + This matches the official DeepSearchQA grader output format. + """ + + answer_correctness: dict[str, Any] = Field( + alias="Answer Correctness", + description=( + "Dictionary containing: " + "Explanation (str) - Explanation of the evaluation; " + "Correctness Details (dict[str, bool]) - Per-item correctness; " + "Excessive Answers (list[str]) - Extra items not in ground truth" + ), + ) + + +# Official DeepSearchQA grader prompt from Appendix A of the paper +DEEPSEARCHQA_GRADER_PROMPT = """\ +Your task is to evaluate whether a given "AI Response" for a specific "User Prompt" +arrived at the correct answer. + +**Answer Correctness Task** +* **Purpose:** Assess whether the AI response provides the correct answer(s) based on +the provided "Correct Answer" and "Prompt Type". +* **Process:** + * Identify the "Prompt Type": "{prompt_type}". + * Refer to the "Correct Answer": "{answer}". + * Based on the "Prompt Type", determine if the "AI Response" contains the expected +answer(s). + * **'Single Answer'**: Check if the response provides the answer that addresses +the user's question. It does not have to match the exact wording of the provided +answer. + * **'Set Answer'**: Check if the response includes *each* item from the provided +ground truth answers. The order might not matter unless specified otherwise. The +response might include more answers than the list. Determine the correctness * +only* based on the list first and then check if the response includes answers not +in the list. +* **Explanation:** Provide a brief explanation justifying your assessment of answer +correctness, referencing specific parts of the AI response and the correct answer. +* **Correctness Details:** Provide a dictionary, one key for each expected answer +part, and value is a boolean indicating whether each expected answer part was found. + * For 'Set Answer', this will be a list of attributes, one for each item/part in +the "Correct Answer". Each key will be a string indicating the expected answer +part, and the value will be a boolean indicating whether that part was found in +the response. +* **Excessive Answers:** Provide a list of strings, each indicating an excessive +answer part. If the response provides answers that are **not** in the "Correct Answer +" list, add these answers as excessive answers. Return an empty list when there's no +excessive answers in the response. + +**Output Format:** +Your evaluation *must* be structured as a nested JSON dictionary with the following top- +level keys: '"Answer Correctness"'. Please return NULL if any of "Prompt", "AI Response" +or "Correct Answer" is empty. +The value for '"Answer Correctness"' should be a dictionary containing '"Explanation"' ( +a string), '"Correctness Details"' (a dictionary where each key is the expected correct +answer, and the value is a boolean indicating whether the response contains the correct +answer), and '"Excessive Answers"' (a list of strings indicating the excessive answers). + +Make sure you return a valid JSON string. Pay special attention to quotes, commas and +special characters in the JSON string. Make sure to escape all special characters and +quotes in the JSON string. + +**Example (Partial):** +```json +{{ + "Answer Correctness": {{ + "Explanation": "The response correctly identified Belgium and France but also +includes an excessive answer, Italy.", + "Correctness Details": {{ + "Belgium": true, + "France": true + }}, + "Excessive Answers": [ "Italy" ] + }} +}} +``` + +**Now, proceed with the evaluation using the provided User Prompt, AI Response, and +Correct Answer.** + +User Prompt (Wrapped in and ): + +{prompt} + +-------------------- +** Correct Answer (Wrapped in and ): +Prompt Type: {prompt_type} + +{answer} + +-------------------- +AI assistant response (Wrapped in and ): + +{response} + +-------------------- +Rating: +""" + + +def _calculate_metrics_from_grader( + grader_result: dict[str, Any], +) -> DeepSearchQAResult: + """Calculate precision, recall, F1 from grader output. + + This follows the exact methodology from the paper: + - Precision = |S∩G| / |S| + - Recall = |S∩G| / |G| + - F1 = 2*P*R / (P+R) + + Parameters + ---------- + grader_result : dict + Output from the LLM grader with Correctness Details and Excessive Answers. + + Returns + ------- + DeepSearchQAResult + Computed metrics and classifications. + """ + correctness_details = grader_result.get("Correctness Details", {}) + extraneous_items = grader_result.get("Excessive Answers", []) + explanation = grader_result.get("Explanation", "") + + # Count matched ground truth items + num_ground_truth = len(correctness_details) + num_matched = sum(1 for v in correctness_details.values() if v) + num_extraneous = len(extraneous_items) + + # Total predicted items = matched + extraneous + num_predicted = num_matched + num_extraneous + + # Calculate metrics + precision = num_matched / num_predicted if num_predicted > 0 else 0.0 + recall = num_matched / num_ground_truth if num_ground_truth > 0 else 1.0 + f1_score = 2 * precision * recall / (precision + recall) if precision + recall > 0 else 0.0 + + # Determine outcome based on set relationships + if num_matched == num_ground_truth and num_extraneous == 0: + outcome = EvaluationOutcome.FULLY_CORRECT + elif num_matched == num_ground_truth and num_extraneous > 0: + outcome = EvaluationOutcome.CORRECT_WITH_EXTRANEOUS + elif num_matched > 0: + outcome = EvaluationOutcome.PARTIALLY_CORRECT + else: + outcome = EvaluationOutcome.FULLY_INCORRECT + + return DeepSearchQAResult( + precision=precision, + recall=recall, + f1_score=f1_score, + outcome=outcome, + correctness_details=correctness_details, + extraneous_items=extraneous_items, + explanation=explanation, + ) + + +async def evaluate_deepsearchqa_async( + *, + question: str, + answer: str, + ground_truth: str, + answer_type: str = "Single Answer", + model_config: LLMRequestConfig | None = None, +) -> DeepSearchQAResult: + """Evaluate an answer using DeepSearchQA methodology. + + This async evaluator uses shared infrastructure for LLM evaluation. + + Parameters + ---------- + question : str + The original question. + answer : str + The agent's answer. + ground_truth : str + The expected ground truth answer. + answer_type : str + Type of answer: "Single Answer" or "Set Answer". + model_config : LLMRequestConfig | None, optional + Optional model configuration. If None, defaults are used. + + Returns + ------- + DeepSearchQAResult + The evaluation result with precision, recall, and F1 metrics. + """ + config = model_config or LLMRequestConfig() + client_manager = AsyncClientManager.get_instance() + + # Build the grader prompt + user_prompt = DEEPSEARCHQA_GRADER_PROMPT.format( + prompt=question, + response=answer, + answer=ground_truth, + prompt_type=answer_type, + ) + + try: + completion = await run_structured_parse_call( + openai_client=client_manager.openai_client, + default_model=client_manager.configs.default_evaluator_model, + model_config=config, + system_prompt="", # DeepSearchQA uses all instructions in user prompt + user_prompt=user_prompt, + response_format=DeepSearchQAGraderResponse, + ) + + grader_response: DeepSearchQAGraderResponse | None = completion.choices[0].message.parsed + + if grader_response is None: + raise ValueError("Grader returned null response") + + return _calculate_metrics_from_grader(grader_response.answer_correctness) + + except Exception as e: + logger.warning(f"Failed to evaluate with DeepSearchQA grader: {e}") + return DeepSearchQAResult( + precision=0.0, + recall=0.0, + f1_score=0.0, + outcome=EvaluationOutcome.FULLY_INCORRECT, + correctness_details={}, + extraneous_items=[], + explanation=f"Grader error: {e}", + ) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/web.py b/aieng-eval-agents/aieng/agent_evals/tools/web.py index 1c99d44..79188c3 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/web.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/web.py @@ -1,7 +1,13 @@ """Web fetch tool for retrieving content from URLs. Provides the web_fetch tool which fetches content from any URL (HTML pages or PDFs) -and returns the content for the agent to analyze. Similar to Anthropic's web_fetch tool. +and uses an LLM to extract relevant information based on a query. This prevents +context overflow by returning only extracted information instead of full page content. + +Architecture: +1. Fetch URL and convert to markdown/text +2. Use LLM to extract information based on query +3. Return only extracted information (~200-1000 tokens instead of 10k-100k) """ import logging @@ -12,15 +18,23 @@ from urllib.parse import urljoin import httpx +from aieng.agent_evals.async_client_manager import AsyncClientManager from google.adk.tools.function_tool import FunctionTool from html_to_markdown import convert as html_to_markdown from pypdf import PdfReader -from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential +from tenacity import RetryError, retry, retry_if_exception_type, stop_after_attempt, wait_exponential logger = logging.getLogger(__name__) -MAX_CONTENT_CHARS = 100_000 +# Maximum chars to fetch before LLM processing (prevents fetching enormous files) +MAX_FETCH_CHARS = 200_000 + +# Maximum chars to send to extraction LLM (most models handle ~100k chars well) +MAX_EXTRACTION_INPUT_CHARS = 100_000 + +# Maximum tokens for LLM extraction output +MAX_EXTRACTION_OUTPUT_TOKENS = 2000 def _make_absolute_url(base_url: str) -> Callable[[re.Match[str]], str]: @@ -130,69 +144,258 @@ def _extract_pdf_text(content: bytes, max_pages: int = 10) -> tuple[str, int]: return "\n\n".join(text_parts), num_pages -def _truncate_content(text: str) -> tuple[str, bool]: - """Truncate content if it exceeds the maximum length.""" - truncated = len(text) > MAX_CONTENT_CHARS +def _truncate_content(text: str, max_chars: int) -> tuple[str, bool]: + """Truncate content if it exceeds the maximum length. + + Parameters + ---------- + text : str + The text to truncate. + max_chars : int + Maximum number of characters allowed. + + Returns + ------- + tuple[str, bool] + The (potentially truncated) text and whether it was truncated. + """ + truncated = len(text) > max_chars if truncated: - text = text[:MAX_CONTENT_CHARS] + "\n\n[Content truncated due to length]" + text = text[:max_chars] + "\n\n[Content truncated due to length]" return text, truncated -def _make_error_response(error: str, url: str) -> dict[str, Any]: - """Create an error response dict.""" - return {"status": "error", "error": error, "url": url} +@retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((Exception,)), # Retry on any exception from LLM API +) +async def _extract_information_with_llm_with_retry( + openai_client: Any, model: str, content: str, query: str, url: str, was_truncated: bool +) -> dict[str, Any]: + """Execute LLM extraction with retry logic. + Parameters + ---------- + openai_client : Any + The OpenAI client instance. + model : str + Model name to use. + content : str + The content to extract from. + query : str + What information to extract. + url : str + Source URL. + was_truncated : bool + Whether the source content was truncated. -def _make_success_response(url: str, content: str, content_type: str, truncated: bool, **extra: Any) -> dict[str, Any]: - """Create a success response dict.""" - result = { + Returns + ------- + dict[str, Any] + Success response with extracted information and metadata. + """ + # Build extraction prompt + system_prompt = """You are a precise information extraction assistant. Your task is to extract specific information from web pages based on user queries. + +Instructions: +- Extract ONLY the information requested in the query +- Be accurate and concise +- If the information is not found, say so clearly +- Include relevant quotes or data points when available +- Do not add information not present in the source +- Format your response clearly with markdown if appropriate""" + + user_prompt = f"""Extract the following information from this webpage: + +QUERY: {query} + +SOURCE URL: {url} + +WEBPAGE CONTENT: +{content} + +Please extract the requested information. Be concise but complete.""" + + # Call LLM to extract information + response = await openai_client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + max_tokens=MAX_EXTRACTION_OUTPUT_TOKENS, + temperature=0.0, # Deterministic for extraction + ) + + extracted_info = response.choices[0].message.content or "" + + return { "status": "success", + "extracted_info": extracted_info, "url": url, - "content": content, - "content_type": content_type, - "content_length": len(content), - "truncated": truncated, + "query": query, + "source_was_truncated": was_truncated, + "extraction_model": model, + "input_tokens": response.usage.prompt_tokens if response.usage else 0, + "output_tokens": response.usage.completion_tokens if response.usage else 0, } - result.update(extra) - return result -async def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: - """Fetch content from a URL (HTML page or PDF document). +async def _extract_information_with_llm(content: str, query: str, url: str) -> dict[str, Any]: + """Extract relevant information from content using an LLM. + + This is the key function that implements the WebFetch architecture. + Instead of returning full page content, it uses an LLM to extract only the + information requested in the query. + + Parameters + ---------- + content : str + The full page content (markdown or text). + query : str + What information to extract from the content. + url : str + Source URL (for context and citation). + + Returns + ------- + dict[str, Any] + Response with 'status', 'extracted_info', 'url', and metadata. + """ + try: + # Truncate content for LLM processing if needed + content, was_truncated = _truncate_content(content, MAX_EXTRACTION_INPUT_CHARS) + + # Get OpenAI client (configured for Gemini endpoint) + client_manager = AsyncClientManager.get_instance() + openai_client = client_manager.openai_client + model = client_manager.configs.default_worker_model # gemini-2.5-flash + + # Call with retry logic + return await _extract_information_with_llm_with_retry( + openai_client=openai_client, + model=model, + content=content, + query=query, + url=url, + was_truncated=was_truncated, + ) + + except RetryError as e: + # Extract the underlying error from retry failure + original_error = e.last_attempt.exception() + logger.error(f"LLM extraction failed for {url} after 3 retries: {original_error}") + return { + "status": "error", + "error": f"Information extraction failed after 3 retries: {original_error!s}", + "url": url, + "query": query, + } + except Exception as e: + logger.error(f"LLM extraction failed for {url}: {e}") + return { + "status": "error", + "error": f"Information extraction failed: {e!s}", + "url": url, + "query": query, + } - This tool retrieves the full content from a URL for analysis. It handles - both HTML pages (converted to readable text) and PDF documents (text extracted). - For large data files (CSV, XLSX) that need searching, use fetch_file instead. +def _handle_fetch_error(e: Exception, url: str) -> dict[str, Any]: + """Handle exceptions from web_fetch and return appropriate error response. + + Parameters + ---------- + e : Exception + The exception that occurred during fetching. + url : str + The URL that was being fetched. + + Returns + ------- + dict + Error response with 'status', 'error', and 'url' fields. + """ + if isinstance(e, httpx.HTTPStatusError): + logger.warning(f"HTTP error fetching {url}: {e}") + error_msg = f"HTTP {e.response.status_code}: {e.response.reason_phrase}" + return {"status": "error", "error": error_msg, "url": url} + + if isinstance(e, httpx.RequestError): + logger.warning(f"Request error fetching {url}: {e}") + return {"status": "error", "error": f"Request failed: {e!s}", "url": url} + + if isinstance(e, RetryError): + # Extract the underlying error from retry failure + # without showing full stack trace + original_error = e.last_attempt.exception() + if isinstance(original_error, httpx.HTTPStatusError): + logger.error(f"HTTP error fetching {url} (after 3 retries): {original_error}") + error_msg = f"HTTP {original_error.response.status_code}: {original_error.response.reason_phrase} (failed after 3 retries)" + else: + logger.error(f"Failed to fetch {url} after 3 retries: {original_error}") + error_msg = f"Failed after 3 retries: {original_error!s}" + return {"status": "error", "error": error_msg, "url": url} + + logger.error(f"Unexpected error in web_fetch for {url}: {e}") + return {"status": "error", "error": f"Unexpected error: {e!s}", "url": url} + + +async def web_fetch(url: str, query: str, max_pages: int = 10) -> dict[str, Any]: + """Fetch and extract information from web pages and PDFs using LLM. + + This tool implements WebFetch: + 1. Fetches the URL (HTML or PDF) + 2. Converts to markdown/text + 3. Uses an LLM to extract only the information specified in query + 4. Returns extracted info (~200-1000 tokens) instead of full content + + For data files like CSV or XLSX that need line-by-line searching, + use fetch_file instead. Parameters ---------- url : str The URL to fetch. Must be a valid HTTP or HTTPS URL. + query : str + What information to extract from the page. Be specific about what + you need (e.g., "publication date and main findings", "contact email + and phone number", "list of product features"). max_pages : int, optional For PDFs, maximum number of pages to extract (default 10). Returns ------- dict - On success: 'status', 'url', 'content', 'content_type', - 'content_length', 'truncated'. For PDFs also includes: - 'num_pages', 'pages_extracted'. On error: 'status', 'error', 'url'. + On success: 'status', 'extracted_info', 'url', 'query', + 'source_was_truncated', 'extraction_model', 'input_tokens', + 'output_tokens'. On error: 'status', 'error', 'url', 'query'. Examples -------- - >>> # Fetch an HTML page - >>> result = await web_fetch("https://example.com/about") - >>> print(result["content"]) - - >>> # Fetch a PDF - >>> result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf") - >>> print(f"Pages: {result['num_pages']}") - >>> print(result["content"]) + >>> # Extract specific info from a webpage + >>> result = await web_fetch( + ... "https://example.com/about", + ... query="company founding date and CEO name", + ... ) + >>> print(result["extracted_info"]) + + >>> # Extract from a PDF + >>> result = await web_fetch( + ... "https://arxiv.org/pdf/2301.00234.pdf", + ... query="main research findings and datasets used", + ... ) + >>> print(result["extracted_info"]) """ # Validate URL if not url.startswith(("http://", "https://")): - return _make_error_response("Invalid URL. Must start with http:// or https://", url) + return { + "status": "error", + "error": "Invalid URL. Must start with http:// or https://", + "url": url, + "query": query, + } try: async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client: @@ -202,44 +405,36 @@ async def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: # Handle PDF documents if "application/pdf" in content_type or url.lower().endswith(".pdf"): - return _handle_pdf_response(response.content, max_pages, final_url, url) + pdf_text, num_pages = _extract_pdf_text(response.content, max_pages) + # Truncate if needed before LLM extraction + pdf_text, _ = _truncate_content(pdf_text, MAX_FETCH_CHARS) + + logger.info(f"Extracted {len(pdf_text)} chars from PDF ({num_pages} pages), extracting info...") + result = await _extract_information_with_llm(pdf_text, query, final_url) + # Add PDF metadata + if result["status"] == "success": + result["content_type"] = "application/pdf" + result["num_pages"] = num_pages + result["pages_extracted"] = min(num_pages, max_pages) + return result # Handle HTML and text content if "text/html" in content_type or content_type == "": text = _html_to_markdown(response.text, base_url=final_url) else: text = response.text - text, truncated = _truncate_content(text) - return _make_success_response(final_url, text, content_type or "text/html", truncated) - - except httpx.HTTPStatusError as e: - logger.warning(f"HTTP error fetching {url}: {e}") - return _make_error_response(f"HTTP {e.response.status_code}: {e.response.reason_phrase}", url) - except httpx.RequestError as e: - logger.warning(f"Request error fetching {url}: {e}") - return _make_error_response(f"Request failed: {e!s}", url) - except Exception as e: - logger.exception(f"Unexpected error in web_fetch for {url}") - return _make_error_response(f"Unexpected error: {e!s}", url) + # Truncate if needed before LLM extraction + text, _ = _truncate_content(text, MAX_FETCH_CHARS) + logger.info(f"Fetched {len(text)} chars from {final_url}, extracting info...") + result = await _extract_information_with_llm(text, query, final_url) + if result["status"] == "success": + result["content_type"] = content_type or "text/html" + return result -def _handle_pdf_response(content: bytes, max_pages: int, final_url: str, url: str) -> dict[str, Any]: - """Handle PDF content extraction and response creation.""" - try: - text, num_pages = _extract_pdf_text(content, max_pages) - text, truncated = _truncate_content(text) - - return _make_success_response( - final_url, - text, - "application/pdf", - truncated, - num_pages=num_pages, - pages_extracted=min(num_pages, max_pages), - ) except Exception as e: - return _make_error_response(f"Failed to extract PDF text: {e!s}", url) + return _handle_fetch_error(e, url) def create_web_fetch_tool() -> FunctionTool: diff --git a/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_deepsearchqa_grader.py b/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_deepsearchqa_grader.py new file mode 100644 index 0000000..c2371fe --- /dev/null +++ b/aieng-eval-agents/tests/aieng/agent_evals/knowledge_qa/test_deepsearchqa_grader.py @@ -0,0 +1,122 @@ +"""Tests for the DeepSearchQA grader.""" + +import pytest +from aieng.agent_evals.knowledge_qa.deepsearchqa_grader import ( + DeepSearchQAResult, + EvaluationOutcome, + _calculate_metrics_from_grader, +) + + +class TestDeepSearchQAResult: + """Tests for the DeepSearchQAResult model.""" + + def test_result_creation(self): + """Test creating a DeepSearchQA result.""" + result = DeepSearchQAResult( + precision=0.8, + recall=0.9, + f1_score=0.847, + outcome=EvaluationOutcome.CORRECT_WITH_EXTRANEOUS, + correctness_details={"item1": True, "item2": True, "item3": False}, + extraneous_items=["extra1"], + explanation="Found 2 out of 3 items with 1 extraneous", + ) + assert result.precision == 0.8 + assert result.recall == 0.9 + assert result.f1_score == 0.847 + assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS + assert result.correctness_details["item1"] is True + assert len(result.extraneous_items) == 1 + + def test_result_defaults(self): + """Test default values.""" + result = DeepSearchQAResult() + assert result.precision == 0.0 + assert result.recall == 0.0 + assert result.f1_score == 0.0 + assert result.outcome == EvaluationOutcome.FULLY_INCORRECT + assert result.correctness_details == {} + assert result.extraneous_items == [] + + +class TestCalculateMetrics: + """Tests for the _calculate_metrics_from_grader function.""" + + def test_calculate_metrics_perfect_match(self): + """Test metrics calculation with perfect match (fully_correct).""" + # Simulate grader output for perfect match + grader_result = { + "Explanation": "All items found correctly", + "Correctness Details": {"A": True, "B": True, "C": True}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 1.0 + assert result.recall == 1.0 + assert result.f1_score == 1.0 + assert result.outcome == EvaluationOutcome.FULLY_CORRECT + + def test_calculate_metrics_with_extraneous(self): + """Test metrics calculation with extraneous items (correct_with_extraneous).""" + # Simulate grader output: all ground truth found + extra item + grader_result = { + "Explanation": "All items found but includes extra", + "Correctness Details": {"A": True, "B": True, "C": True}, + "Excessive Answers": ["D"], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 0.75 # 3/(3+1) + assert result.recall == 1.0 # 3/3 + assert result.outcome == EvaluationOutcome.CORRECT_WITH_EXTRANEOUS + assert "D" in result.extraneous_items + + def test_calculate_metrics_with_missed(self): + """Test metrics calculation with missed items (partially_correct).""" + # Simulate grader output: only 2 of 3 ground truth found + grader_result = { + "Explanation": "Found A and B but missed C", + "Correctness Details": {"A": True, "B": True, "C": False}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 1.0 # 2/2 (no extraneous) + assert result.recall == pytest.approx(2 / 3) # 2/3 + assert result.outcome == EvaluationOutcome.PARTIALLY_CORRECT + assert result.correctness_details["C"] is False + + def test_calculate_metrics_fully_incorrect(self): + """Test metrics calculation with no matches (fully_incorrect).""" + # Simulate grader output: no correct items + grader_result = { + "Explanation": "No correct items found", + "Correctness Details": {"A": False, "B": False}, + "Excessive Answers": ["X", "Y"], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.precision == 0.0 + assert result.recall == 0.0 + assert result.f1_score == 0.0 + assert result.outcome == EvaluationOutcome.FULLY_INCORRECT + + def test_calculate_metrics_empty_ground_truth(self): + """Test metrics calculation with empty ground truth.""" + # Edge case: no ground truth items + grader_result = { + "Explanation": "No ground truth to check", + "Correctness Details": {}, + "Excessive Answers": [], + } + + result = _calculate_metrics_from_grader(grader_result) + + assert result.recall == 1.0 # Edge case handling + assert result.outcome == EvaluationOutcome.FULLY_CORRECT diff --git a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py index d0e83f4..4747d10 100644 --- a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py +++ b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_web.py @@ -76,9 +76,11 @@ class TestWebFetch: """Tests for the web_fetch function.""" @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_html_success(self, mock_client_class): - """Test successful HTML fetch returns content.""" + async def test_fetch_html_success(self, mock_http_client_class, mock_client_manager_class): + """Test successful HTML fetch with LLM extraction.""" + # Mock HTTP response mock_response = MagicMock() mock_response.text = "

Hello World

" mock_response.headers = {"content-type": "text/html"} @@ -87,23 +89,46 @@ async def test_fetch_html_success(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client - result = await web_fetch("https://example.com") + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "Extracted: Hello World" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 100 + mock_llm_response.usage.completion_tokens = 20 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com", query="get the main message") assert result["status"] == "success" - assert "content" in result - assert "Hello World" in result["content"] + assert "extracted_info" in result + assert "Hello World" in result["extracted_info"] assert result["content_type"] == "text/html" + assert result["query"] == "get the main message" + assert result["input_tokens"] == 100 + assert result["output_tokens"] == 20 @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_pdf_success(self, mock_client_class): - """Test that PDF content is extracted successfully.""" + async def test_fetch_pdf_success(self, mock_http_client_class, mock_client_manager_class): + """Test that PDF content is extracted and processed with LLM.""" # Create a PDF with text writer = PdfWriter() writer.add_blank_page(width=200, height=200) @@ -119,23 +144,45 @@ async def test_fetch_pdf_success(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "PDF summary information" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 150 + mock_llm_response.usage.completion_tokens = 30 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) - result = await web_fetch("https://example.com/doc.pdf") + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com/doc.pdf", query="summarize the document") assert result["status"] == "success" assert result["content_type"] == "application/pdf" assert "num_pages" in result assert result["num_pages"] >= 1 + assert "extracted_info" in result + assert "PDF summary" in result["extracted_info"] @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_returns_content_length(self, mock_client_class): - """Test that fetch returns content length.""" + async def test_fetch_returns_token_counts(self, mock_http_client_class, mock_client_manager_class): + """Test that fetch returns token usage from LLM extraction.""" long_text = "A" * 10000 mock_response = MagicMock() mock_response.text = f"

{long_text}

" @@ -145,25 +192,45 @@ async def test_fetch_returns_content_length(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction with realistic token counts for large input + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "This page contains repeated A characters" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 2500 # Large input + mock_llm_response.usage.completion_tokens = 25 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" - result = await web_fetch("https://example.com") + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com", query="what is on this page") assert result["status"] == "success" - # Content length should include the 10000 As (may have some markdown formatting) - assert result["content_length"] >= 10000 - assert not result["truncated"] + assert result["input_tokens"] == 2500 + assert result["output_tokens"] == 25 + assert result["source_was_truncated"] is False @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_truncates_large_content(self, mock_client_class): - """Test that very large content is truncated.""" - # Create content larger than MAX_CONTENT_CHARS (100KB) - large_text = "A" * 150_000 + async def test_fetch_truncates_large_content(self, mock_http_client_class, mock_client_manager_class): + """Test that very large content is truncated before LLM extraction.""" + # Create content larger than MAX_FETCH_CHARS (200KB) + large_text = "A" * 250_000 mock_response = MagicMock() mock_response.text = f"{large_text}" mock_response.headers = {"content-type": "text/html"} @@ -172,24 +239,43 @@ async def test_fetch_truncates_large_content(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "Content was truncated but extracted" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 25000 + mock_llm_response.usage.completion_tokens = 50 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager - result = await web_fetch("https://example.com") + result = await web_fetch("https://example.com", query="extract info") assert result["status"] == "success" - assert result["truncated"] is True - assert "[Content truncated" in result["content"] + assert result["source_was_truncated"] is True @pytest.mark.asyncio async def test_fetch_invalid_url(self): """Test that invalid URLs return error.""" - result = await web_fetch("not-a-url") + result = await web_fetch("not-a-url", query="get info") assert result["status"] == "error" assert "Invalid URL" in result["error"] + assert result["query"] == "get info" class TestCreateWebFetchTool: @@ -349,53 +435,75 @@ async def __aexit__(self, *_args): @pytest.mark.integration_test class TestWebFetchIntegration: - """Integration tests for web_fetch (requires network). + """Integration tests for web_fetch (requires network and API keys). These tests verify that web_fetch works correctly for both HTML pages - and PDF documents, returning content suitable for the agent to analyze. + and PDF documents, using LLM extraction to return only relevant information + instead of full page content. """ @pytest.mark.asyncio - async def test_fetch_html_page_returns_readable_content(self): - """Test that HTML pages are converted to readable markdown.""" - result = await web_fetch("https://www.iana.org/help/example-domains") + async def test_fetch_html_page_with_extraction(self): + """Test that HTML pages are fetched and information is extracted via LLM.""" + result = await web_fetch( + "https://www.iana.org/help/example-domains", query="What is the purpose of example.com domain?" + ) assert result["status"] == "success" assert result["content_type"] == "text/html" or "html" in result["content_type"].lower() - # Verify content is markdown (no raw HTML tags) - content = result["content"] - assert "" not in content.lower() - assert "" not in content.lower() + # Verify extracted info is returned (not raw content) + assert "extracted_info" in result + extracted = result["extracted_info"] - # Verify content has meaningful text - assert len(content) > 100 - assert "example" in content.lower() + # Extracted info should be concise (not full page) + assert len(extracted) < 5000 # Should be much smaller than full page + assert len(extracted) > 50 # But should have meaningful content - # Verify links are preserved in markdown format (if any exist) - # The page should have links that are converted to [text](url) format - if "http" in content: - # Links should be in markdown format, not raw tags - assert " 0 + assert result["output_tokens"] > 0 + + # Verify query is echoed back + assert result["query"] == "What is the purpose of example.com domain?" @pytest.mark.asyncio - async def test_fetch_pdf_extracts_text(self): - """Test that PDF content is extracted as searchable text.""" - result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", max_pages=2) + async def test_fetch_pdf_with_extraction(self): + """Test that PDF content is extracted and processed via LLM.""" + result = await web_fetch( + "https://arxiv.org/pdf/2301.00234.pdf", query="What is the main research contribution?", max_pages=2 + ) assert result["status"] == "success" assert result["content_type"] == "application/pdf" assert result["num_pages"] > 0 - # Verify extracted text is substantial - content = result["content"] - assert len(content) > 500 + # Verify extracted info instead of raw text + assert "extracted_info" in result + extracted = result["extracted_info"] + + # Extracted info should be concise + assert len(extracted) < 5000 + assert len(extracted) > 50 - # Verify page markers are present - assert "--- Page" in content + # Verify PDF metadata + assert result["pages_extracted"] <= 2 + + # Verify token usage + assert result["input_tokens"] > 0 + assert result["output_tokens"] > 0 @pytest.mark.asyncio async def test_fetch_pdf_pagination(self): - """Test that PDF max_pages parameter limits extraction.""" - result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", max_pages=1) + """Test that PDF max_pages parameter limits extraction before LLM processing.""" + result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", query="summarize the abstract", max_pages=1) assert result["status"] == "success" assert result["pages_extracted"] == 1 assert result["num_pages"] >= 1 + + # Even with 1 page, should get meaningful extraction + assert "extracted_info" in result + assert len(result["extracted_info"]) > 20 diff --git a/implementations/knowledge_qa/evaluate.py b/implementations/knowledge_qa/evaluate.py new file mode 100644 index 0000000..892fe40 --- /dev/null +++ b/implementations/knowledge_qa/evaluate.py @@ -0,0 +1,258 @@ +"""Evaluate the Knowledge Agent using Langfuse experiments. + +This script runs the Knowledge Agent against a Langfuse dataset and evaluates +results using the DeepSearchQA LLM-as-judge methodology. Results are automatically +logged to Langfuse for analysis and comparison. + +Optionally, trace-level groundedness evaluation can be enabled to check if agent +outputs are supported by tool observations. + +Usage: + # Run a full evaluation + python evaluate.py + + # Run with custom dataset and experiment name + python evaluate.py --dataset-name "MyDataset" --experiment-name "v2-test" + + # Enable trace groundedness evaluation + ENABLE_TRACE_GROUNDEDNESS=true python evaluate.py +""" + +import asyncio +import logging +import os +from typing import Any + +import click +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.evaluation import run_experiment, run_experiment_with_trace_evals +from aieng.agent_evals.evaluation.graders import create_trace_groundedness_evaluator # type: ignore[attr-defined] +from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig +from aieng.agent_evals.evaluation.types import EvaluationResult +from aieng.agent_evals.knowledge_qa.agent import KnowledgeGroundedAgent +from aieng.agent_evals.knowledge_qa.deepsearchqa_grader import DeepSearchQAResult, evaluate_deepsearchqa_async +from aieng.agent_evals.logging_config import setup_logging +from dotenv import load_dotenv +from langfuse.experiment import Evaluation, ExperimentResult + + +load_dotenv(verbose=True) +setup_logging(level=logging.INFO, show_time=True, show_path=False) +logger = logging.getLogger(__name__) + + +DEFAULT_DATASET_NAME = "DeepSearchQA-Subset" +DEFAULT_EXPERIMENT_NAME = "Knowledge Agent Evaluation" + +# Configuration for trace groundedness evaluation +ENABLE_TRACE_GROUNDEDNESS = os.getenv("ENABLE_TRACE_GROUNDEDNESS", "false").lower() in ("true", "1", "yes") + + +async def agent_task(*, item: Any, **kwargs: Any) -> dict[str, Any]: # noqa: ARG001 + """Run the Knowledge Agent on a dataset item. + + Parameters + ---------- + item : Any + The Langfuse experiment item containing the question. + **kwargs : Any + Additional arguments from the harness (unused). + + Returns + ------- + dict[str, Any] + Dictionary containing 'text' (response) and 'agent_response' + (full response object). + """ + question = item.input + logger.info(f"Running agent on: {question[:80]}...") + + try: + agent = KnowledgeGroundedAgent(enable_planning=True) # type: ignore[call-arg] + response = await agent.answer_async(question) + logger.info(f"Agent completed: {len(response.text)} chars, {len(response.tool_calls)} tool calls") + + return { + "text": response.text, + "agent_response": response, + } + except Exception as e: + logger.error(f"Agent failed: {e}") + return {"text": f"Error: {e}", "agent_response": None} + + +async def deepsearchqa_evaluator( + *, + input: str, # noqa: A002 + output: dict[str, Any], + expected_output: str, + metadata: dict[str, Any] | None = None, + **kwargs: Any, # noqa: ARG001 +) -> list[Evaluation]: + """Evaluate the agent's response using DeepSearchQA methodology. + + This evaluator uses the modern async infrastructure with shared client + management and retry logic. + + Parameters + ---------- + input : str + The original question. + output : dict[str, Any] + Dictionary containing 'text' and 'agent_response'. + expected_output : str + The ground truth answer. + metadata : dict[str, Any] | None, optional + Item metadata (contains answer_type). + **kwargs : Any + Additional arguments from the harness (unused). + + Returns + ------- + list[Evaluation] + List of Langfuse Evaluations with F1, precision, recall, and outcome scores. + """ + output_text = output.get("text", "") if isinstance(output, dict) else str(output) + answer_type = metadata.get("answer_type", "Set Answer") if metadata else "Set Answer" + + logger.info(f"Evaluating response (answer_type: {answer_type})...") + + try: + # Use the modern async evaluator with default config + result = await evaluate_deepsearchqa_async( + question=input, + answer=output_text, + ground_truth=expected_output, + answer_type=answer_type, + model_config=LLMRequestConfig(temperature=0.0), + ) + + evaluations = result.to_evaluations() + logger.info(f"Evaluation complete: {result.outcome} (F1: {result.f1_score:.2f})") + return evaluations + + except Exception as e: + logger.error(f"Evaluation failed: {e}") + return DeepSearchQAResult.error_evaluations(str(e)) + + +async def run_evaluation( + dataset_name: str, + experiment_name: str, + max_concurrency: int = 1, + enable_trace_groundedness: bool = False, +) -> ExperimentResult | EvaluationResult: + """Run the full evaluation experiment. + + Parameters + ---------- + dataset_name : str + Name of the Langfuse dataset to evaluate against. + experiment_name : str + Name for this experiment run. + max_concurrency : int, optional + Maximum concurrent agent runs, by default 1. + enable_trace_groundedness : bool, optional + Whether to enable trace-level groundedness evaluation, by default False. + """ + client_manager = AsyncClientManager.get_instance() + + try: + logger.info(f"Starting experiment '{experiment_name}' on dataset '{dataset_name}'") + logger.info(f"Max concurrency: {max_concurrency}") + logger.info(f"Trace groundedness: {'enabled' if enable_trace_groundedness else 'disabled'}") + + result: ExperimentResult | EvaluationResult + if enable_trace_groundedness: + # Create trace groundedness evaluator + # Only consider web_fetch and google_search tools as evidence + groundedness_evaluator = create_trace_groundedness_evaluator( + name="trace_groundedness", + model_config=LLMRequestConfig(temperature=0.0), + ) + + # Run with trace evaluations + result = run_experiment_with_trace_evals( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge and trace groundedness", + task=agent_task, + evaluators=[deepsearchqa_evaluator], # Item-level evaluators + trace_evaluators=[groundedness_evaluator], # Trace-level evaluators + max_concurrency=max_concurrency, + ) + else: + # Run without trace evaluations + result = run_experiment( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge", + task=agent_task, + evaluators=[deepsearchqa_evaluator], + max_concurrency=max_concurrency, + ) + + logger.info("Experiment complete!") + # Handle both ExperimentResult and EvaluationResult + if isinstance(result, EvaluationResult): + # EvaluationResult from run_experiment_with_trace_evals + logger.info(f"Results: {result.experiment}") + if result.trace_evaluations: + trace_evals = result.trace_evaluations + logger.info( + f"Trace evaluations: {len(trace_evals.evaluations_by_trace_id)} traces, " + f"{len(trace_evals.skipped_trace_ids)} skipped, {len(trace_evals.failed_trace_ids)} failed" + ) + else: + # ExperimentResult from run_experiment + logger.info(f"Results: {result}") + + return result + + finally: + logger.info("Closing client manager and flushing data...") + try: + await client_manager.close() + await asyncio.sleep(0.1) + logger.info("Cleanup complete") + except Exception as e: + logger.warning(f"Cleanup warning: {e}") + + +@click.command() +@click.option( + "--dataset-name", + default=DEFAULT_DATASET_NAME, + help="Name of the Langfuse dataset to evaluate against.", +) +@click.option( + "--experiment-name", + default=DEFAULT_EXPERIMENT_NAME, + help="Name for this experiment run.", +) +@click.option( + "--max-concurrency", + default=1, + type=int, + help="Maximum concurrent agent runs (default: 1).", +) +@click.option( + "--enable-trace-groundedness", + is_flag=True, + default=ENABLE_TRACE_GROUNDEDNESS, + help="Enable trace-level groundedness evaluation.", +) +def cli(dataset_name: str, experiment_name: str, max_concurrency: int, enable_trace_groundedness: bool) -> None: + """Run Knowledge Agent evaluation using Langfuse experiments.""" + asyncio.run( + run_evaluation( + dataset_name, + experiment_name, + max_concurrency, + enable_trace_groundedness, + ) + ) + + +if __name__ == "__main__": + cli()