diff --git a/aieng-eval-agents/aieng/agent_evals/configs.py b/aieng-eval-agents/aieng/agent_evals/configs.py
index e6cdc5c..564ec37 100644
--- a/aieng-eval-agents/aieng/agent_evals/configs.py
+++ b/aieng-eval-agents/aieng/agent_evals/configs.py
@@ -113,6 +113,18 @@ class Configs(BaseSettings):
default="gemini-2.5-pro",
description="Model name for LLM-as-judge evaluation tasks.",
)
+ default_temperature: float = Field(
+ default=1.0,
+ ge=0.0,
+ le=2.0,
+ description="Default temperature for LLM generation. Lower values (0.0-0.3) produce more consistent outputs.",
+ )
+ default_evaluator_temperature: float = Field(
+ default=0.0,
+ ge=0.0,
+ le=2.0,
+ description="Temperature for LLM-as-judge evaluations. Default 0.0 for deterministic judging.",
+ )
# === Tracing (Langfuse) ===
langfuse_public_key: str | None = Field(
diff --git a/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py
new file mode 100644
index 0000000..3d15401
--- /dev/null
+++ b/aieng-eval-agents/aieng/agent_evals/knowledge_qa/deepsearchqa_grader.py
@@ -0,0 +1,365 @@
+"""DeepSearchQA grader for evaluating knowledge agent responses.
+
+This module provides the official DeepSearchQA evaluation methodology using
+an LLM grader (autorater) to assess answer correctness with precision, recall,
+and F1 metrics.
+
+References
+----------
+- Paper: DeepSearchQA: Bridging the Comprehensiveness Gap for Deep Research Agents
+- Dataset: https://huggingface.co/datasets/google/deepsearchqa
+- Leaderboard: https://www.kaggle.com/benchmarks/google/dsqa
+"""
+
+import logging
+from enum import Enum
+from typing import Any
+
+from aieng.agent_evals.async_client_manager import AsyncClientManager
+from aieng.agent_evals.evaluation.graders._utils import run_structured_parse_call
+from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig
+from aieng.agent_evals.evaluation.types import Evaluation
+from pydantic import BaseModel, Field
+
+
+logger = logging.getLogger(__name__)
+
+
+class EvaluationOutcome(str, Enum):
+ """Possible outcomes for DeepSearchQA evaluation.
+
+ The four disjoint categories represent the relationship between
+ the submitted answer set (S) and ground truth set (G).
+ """
+
+ FULLY_CORRECT = "fully_correct"
+ CORRECT_WITH_EXTRANEOUS = "correct_with_extraneous"
+ PARTIALLY_CORRECT = "partially_correct"
+ FULLY_INCORRECT = "fully_incorrect"
+
+
+class DeepSearchQAResult(BaseModel):
+ """Result from DeepSearchQA evaluation with IR metrics.
+
+ This follows the official DeepSearchQA evaluation methodology from:
+ https://www.kaggle.com/benchmarks/google/dsqa
+ """
+
+ precision: float = Field(
+ default=0.0, description="Fraction of predicted items that are correct (0-1). P = |S ∩ G| / |S|"
+ )
+ recall: float = Field(
+ default=0.0, description="Fraction of ground truth items that were found (0-1). R = |S ∩ G| / |G|"
+ )
+ f1_score: float = Field(
+ default=0.0, description="Harmonic mean of precision and recall (0-1). F1 = 2 * P * R / (P + R)"
+ )
+ outcome: EvaluationOutcome = Field(
+ default=EvaluationOutcome.FULLY_INCORRECT,
+ description="Evaluation outcome indicating the relationship between submitted and ground truth answer sets",
+ )
+ correctness_details: dict[str, bool] = Field(
+ default_factory=dict, description="For each ground truth item, whether it was found in the response"
+ )
+ extraneous_items: list[str] = Field(
+ default_factory=list, description="Items in the response that are not in the ground truth"
+ )
+ explanation: str = Field(default="", description="Explanation from the grader about the evaluation")
+
+ def to_evaluations(self) -> list[Evaluation]:
+ """Convert this result to Langfuse Evaluation objects.
+
+ Returns
+ -------
+ list[Evaluation]
+ Four evaluations: Outcome (categorical), F1, Precision, Recall (numeric).
+ """
+ comment_parts = [
+ f"Outcome: {self.outcome}",
+ f"Precision: {self.precision:.2f}",
+ f"Recall: {self.recall:.2f}",
+ f"F1: {self.f1_score:.2f}",
+ ]
+
+ if self.explanation:
+ comment_parts.append(f"\nExplanation: {self.explanation}")
+
+ if self.correctness_details:
+ found = sum(1 for v in self.correctness_details.values() if v)
+ total = len(self.correctness_details)
+ comment_parts.append(f"\nCorrectness: {found}/{total} items found")
+
+ if self.extraneous_items:
+ comment_parts.append(f"\nExtraneous: {len(self.extraneous_items)} items")
+
+ comment = "\n".join(comment_parts)
+
+ outcome_display = {
+ EvaluationOutcome.FULLY_CORRECT: "Fully Correct",
+ EvaluationOutcome.CORRECT_WITH_EXTRANEOUS: "Correct with Extraneous",
+ EvaluationOutcome.PARTIALLY_CORRECT: "Partially Correct",
+ EvaluationOutcome.FULLY_INCORRECT: "Fully Incorrect",
+ }
+
+ return [
+ Evaluation(
+ name="Outcome",
+ value=outcome_display.get(self.outcome, self.outcome.value),
+ comment=self.explanation,
+ ),
+ Evaluation(name="F1", value=self.f1_score, comment=comment),
+ Evaluation(name="Precision", value=self.precision, comment=comment),
+ Evaluation(name="Recall", value=self.recall, comment=comment),
+ ]
+
+ @staticmethod
+ def error_evaluations(error_msg: str) -> list[Evaluation]:
+ """Create error evaluations when evaluation fails.
+
+ Parameters
+ ----------
+ error_msg : str
+ Description of the error that occurred.
+
+ Returns
+ -------
+ list[Evaluation]
+ Three evaluations (F1, Precision, Recall) all set to 0.0.
+ """
+ comment = f"Evaluation error: {error_msg}"
+ return [
+ Evaluation(name="F1", value=0.0, comment=comment),
+ Evaluation(name="Precision", value=0.0, comment=comment),
+ Evaluation(name="Recall", value=0.0, comment=comment),
+ ]
+
+
+class DeepSearchQAGraderResponse(BaseModel):
+ """Structured response from the DeepSearchQA grader.
+
+ This matches the official DeepSearchQA grader output format.
+ """
+
+ answer_correctness: dict[str, Any] = Field(
+ alias="Answer Correctness",
+ description=(
+ "Dictionary containing: "
+ "Explanation (str) - Explanation of the evaluation; "
+ "Correctness Details (dict[str, bool]) - Per-item correctness; "
+ "Excessive Answers (list[str]) - Extra items not in ground truth"
+ ),
+ )
+
+
+# Official DeepSearchQA grader prompt from Appendix A of the paper
+DEEPSEARCHQA_GRADER_PROMPT = """\
+Your task is to evaluate whether a given "AI Response" for a specific "User Prompt"
+arrived at the correct answer.
+
+**Answer Correctness Task**
+* **Purpose:** Assess whether the AI response provides the correct answer(s) based on
+the provided "Correct Answer" and "Prompt Type".
+* **Process:**
+ * Identify the "Prompt Type": "{prompt_type}".
+ * Refer to the "Correct Answer": "{answer}".
+ * Based on the "Prompt Type", determine if the "AI Response" contains the expected
+answer(s).
+ * **'Single Answer'**: Check if the response provides the answer that addresses
+the user's question. It does not have to match the exact wording of the provided
+answer.
+ * **'Set Answer'**: Check if the response includes *each* item from the provided
+ground truth answers. The order might not matter unless specified otherwise. The
+response might include more answers than the list. Determine the correctness *
+only* based on the list first and then check if the response includes answers not
+in the list.
+* **Explanation:** Provide a brief explanation justifying your assessment of answer
+correctness, referencing specific parts of the AI response and the correct answer.
+* **Correctness Details:** Provide a dictionary, one key for each expected answer
+part, and value is a boolean indicating whether each expected answer part was found.
+ * For 'Set Answer', this will be a list of attributes, one for each item/part in
+the "Correct Answer". Each key will be a string indicating the expected answer
+part, and the value will be a boolean indicating whether that part was found in
+the response.
+* **Excessive Answers:** Provide a list of strings, each indicating an excessive
+answer part. If the response provides answers that are **not** in the "Correct Answer
+" list, add these answers as excessive answers. Return an empty list when there's no
+excessive answers in the response.
+
+**Output Format:**
+Your evaluation *must* be structured as a nested JSON dictionary with the following top-
+level keys: '"Answer Correctness"'. Please return NULL if any of "Prompt", "AI Response"
+or "Correct Answer" is empty.
+The value for '"Answer Correctness"' should be a dictionary containing '"Explanation"' (
+a string), '"Correctness Details"' (a dictionary where each key is the expected correct
+answer, and the value is a boolean indicating whether the response contains the correct
+answer), and '"Excessive Answers"' (a list of strings indicating the excessive answers).
+
+Make sure you return a valid JSON string. Pay special attention to quotes, commas and
+special characters in the JSON string. Make sure to escape all special characters and
+quotes in the JSON string.
+
+**Example (Partial):**
+```json
+{{
+ "Answer Correctness": {{
+ "Explanation": "The response correctly identified Belgium and France but also
+includes an excessive answer, Italy.",
+ "Correctness Details": {{
+ "Belgium": true,
+ "France": true
+ }},
+ "Excessive Answers": [ "Italy" ]
+ }}
+}}
+```
+
+**Now, proceed with the evaluation using the provided User Prompt, AI Response, and
+Correct Answer.**
+
+User Prompt (Wrapped in
Hello World
" mock_response.headers = {"content-type": "text/html"} @@ -87,23 +89,46 @@ async def test_fetch_html_success(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client - result = await web_fetch("https://example.com") + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "Extracted: Hello World" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 100 + mock_llm_response.usage.completion_tokens = 20 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com", query="get the main message") assert result["status"] == "success" - assert "content" in result - assert "Hello World" in result["content"] + assert "extracted_info" in result + assert "Hello World" in result["extracted_info"] assert result["content_type"] == "text/html" + assert result["query"] == "get the main message" + assert result["input_tokens"] == 100 + assert result["output_tokens"] == 20 @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_pdf_success(self, mock_client_class): - """Test that PDF content is extracted successfully.""" + async def test_fetch_pdf_success(self, mock_http_client_class, mock_client_manager_class): + """Test that PDF content is extracted and processed with LLM.""" # Create a PDF with text writer = PdfWriter() writer.add_blank_page(width=200, height=200) @@ -119,23 +144,45 @@ async def test_fetch_pdf_success(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "PDF summary information" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 150 + mock_llm_response.usage.completion_tokens = 30 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) - result = await web_fetch("https://example.com/doc.pdf") + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com/doc.pdf", query="summarize the document") assert result["status"] == "success" assert result["content_type"] == "application/pdf" assert "num_pages" in result assert result["num_pages"] >= 1 + assert "extracted_info" in result + assert "PDF summary" in result["extracted_info"] @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_returns_content_length(self, mock_client_class): - """Test that fetch returns content length.""" + async def test_fetch_returns_token_counts(self, mock_http_client_class, mock_client_manager_class): + """Test that fetch returns token usage from LLM extraction.""" long_text = "A" * 10000 mock_response = MagicMock() mock_response.text = f"{long_text}
" @@ -145,25 +192,45 @@ async def test_fetch_returns_content_length(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction with realistic token counts for large input + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "This page contains repeated A characters" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 2500 # Large input + mock_llm_response.usage.completion_tokens = 25 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" - result = await web_fetch("https://example.com") + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager + + result = await web_fetch("https://example.com", query="what is on this page") assert result["status"] == "success" - # Content length should include the 10000 As (may have some markdown formatting) - assert result["content_length"] >= 10000 - assert not result["truncated"] + assert result["input_tokens"] == 2500 + assert result["output_tokens"] == 25 + assert result["source_was_truncated"] is False @pytest.mark.asyncio + @patch("aieng.agent_evals.tools.web.AsyncClientManager") @patch("aieng.agent_evals.tools.web.httpx.AsyncClient") - async def test_fetch_truncates_large_content(self, mock_client_class): - """Test that very large content is truncated.""" - # Create content larger than MAX_CONTENT_CHARS (100KB) - large_text = "A" * 150_000 + async def test_fetch_truncates_large_content(self, mock_http_client_class, mock_client_manager_class): + """Test that very large content is truncated before LLM extraction.""" + # Create content larger than MAX_FETCH_CHARS (200KB) + large_text = "A" * 250_000 mock_response = MagicMock() mock_response.text = f"{large_text}" mock_response.headers = {"content-type": "text/html"} @@ -172,24 +239,43 @@ async def test_fetch_truncates_large_content(self, mock_client_class): async def mock_get(*_args, **_kwargs): return mock_response - mock_client = MagicMock() - mock_client.get = mock_get - mock_client.__aenter__ = AsyncMock(return_value=mock_client) - mock_client.__aexit__ = AsyncMock(return_value=None) - mock_client_class.return_value = mock_client + mock_http_client = MagicMock() + mock_http_client.get = mock_get + mock_http_client.__aenter__ = AsyncMock(return_value=mock_http_client) + mock_http_client.__aexit__ = AsyncMock(return_value=None) + mock_http_client_class.return_value = mock_http_client + + # Mock LLM extraction + mock_llm_response = MagicMock() + mock_llm_response.choices = [MagicMock()] + mock_llm_response.choices[0].message.content = "Content was truncated but extracted" + mock_llm_response.usage = MagicMock() + mock_llm_response.usage.prompt_tokens = 25000 + mock_llm_response.usage.completion_tokens = 50 + + mock_openai_client = MagicMock() + mock_openai_client.chat.completions.create = AsyncMock(return_value=mock_llm_response) + + mock_configs = MagicMock() + mock_configs.default_worker_model = "gemini-2.5-flash" + + mock_client_manager = MagicMock() + mock_client_manager.openai_client = mock_openai_client + mock_client_manager.configs = mock_configs + mock_client_manager_class.get_instance.return_value = mock_client_manager - result = await web_fetch("https://example.com") + result = await web_fetch("https://example.com", query="extract info") assert result["status"] == "success" - assert result["truncated"] is True - assert "[Content truncated" in result["content"] + assert result["source_was_truncated"] is True @pytest.mark.asyncio async def test_fetch_invalid_url(self): """Test that invalid URLs return error.""" - result = await web_fetch("not-a-url") + result = await web_fetch("not-a-url", query="get info") assert result["status"] == "error" assert "Invalid URL" in result["error"] + assert result["query"] == "get info" class TestCreateWebFetchTool: @@ -349,53 +435,75 @@ async def __aexit__(self, *_args): @pytest.mark.integration_test class TestWebFetchIntegration: - """Integration tests for web_fetch (requires network). + """Integration tests for web_fetch (requires network and API keys). These tests verify that web_fetch works correctly for both HTML pages - and PDF documents, returning content suitable for the agent to analyze. + and PDF documents, using LLM extraction to return only relevant information + instead of full page content. """ @pytest.mark.asyncio - async def test_fetch_html_page_returns_readable_content(self): - """Test that HTML pages are converted to readable markdown.""" - result = await web_fetch("https://www.iana.org/help/example-domains") + async def test_fetch_html_page_with_extraction(self): + """Test that HTML pages are fetched and information is extracted via LLM.""" + result = await web_fetch( + "https://www.iana.org/help/example-domains", query="What is the purpose of example.com domain?" + ) assert result["status"] == "success" assert result["content_type"] == "text/html" or "html" in result["content_type"].lower() - # Verify content is markdown (no raw HTML tags) - content = result["content"] - assert "" not in content.lower() - assert "" not in content.lower() + # Verify extracted info is returned (not raw content) + assert "extracted_info" in result + extracted = result["extracted_info"] - # Verify content has meaningful text - assert len(content) > 100 - assert "example" in content.lower() + # Extracted info should be concise (not full page) + assert len(extracted) < 5000 # Should be much smaller than full page + assert len(extracted) > 50 # But should have meaningful content - # Verify links are preserved in markdown format (if any exist) - # The page should have links that are converted to [text](url) format - if "http" in content: - # Links should be in markdown format, not raw tags - assert " 0 + assert result["output_tokens"] > 0 + + # Verify query is echoed back + assert result["query"] == "What is the purpose of example.com domain?" @pytest.mark.asyncio - async def test_fetch_pdf_extracts_text(self): - """Test that PDF content is extracted as searchable text.""" - result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", max_pages=2) + async def test_fetch_pdf_with_extraction(self): + """Test that PDF content is extracted and processed via LLM.""" + result = await web_fetch( + "https://arxiv.org/pdf/2301.00234.pdf", query="What is the main research contribution?", max_pages=2 + ) assert result["status"] == "success" assert result["content_type"] == "application/pdf" assert result["num_pages"] > 0 - # Verify extracted text is substantial - content = result["content"] - assert len(content) > 500 + # Verify extracted info instead of raw text + assert "extracted_info" in result + extracted = result["extracted_info"] + + # Extracted info should be concise + assert len(extracted) < 5000 + assert len(extracted) > 50 - # Verify page markers are present - assert "--- Page" in content + # Verify PDF metadata + assert result["pages_extracted"] <= 2 + + # Verify token usage + assert result["input_tokens"] > 0 + assert result["output_tokens"] > 0 @pytest.mark.asyncio async def test_fetch_pdf_pagination(self): - """Test that PDF max_pages parameter limits extraction.""" - result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", max_pages=1) + """Test that PDF max_pages parameter limits extraction before LLM processing.""" + result = await web_fetch("https://arxiv.org/pdf/2301.00234.pdf", query="summarize the abstract", max_pages=1) assert result["status"] == "success" assert result["pages_extracted"] == 1 assert result["num_pages"] >= 1 + + # Even with 1 page, should get meaningful extraction + assert "extracted_info" in result + assert len(result["extracted_info"]) > 20 diff --git a/implementations/knowledge_qa/evaluate.py b/implementations/knowledge_qa/evaluate.py new file mode 100644 index 0000000..892fe40 --- /dev/null +++ b/implementations/knowledge_qa/evaluate.py @@ -0,0 +1,258 @@ +"""Evaluate the Knowledge Agent using Langfuse experiments. + +This script runs the Knowledge Agent against a Langfuse dataset and evaluates +results using the DeepSearchQA LLM-as-judge methodology. Results are automatically +logged to Langfuse for analysis and comparison. + +Optionally, trace-level groundedness evaluation can be enabled to check if agent +outputs are supported by tool observations. + +Usage: + # Run a full evaluation + python evaluate.py + + # Run with custom dataset and experiment name + python evaluate.py --dataset-name "MyDataset" --experiment-name "v2-test" + + # Enable trace groundedness evaluation + ENABLE_TRACE_GROUNDEDNESS=true python evaluate.py +""" + +import asyncio +import logging +import os +from typing import Any + +import click +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.evaluation import run_experiment, run_experiment_with_trace_evals +from aieng.agent_evals.evaluation.graders import create_trace_groundedness_evaluator # type: ignore[attr-defined] +from aieng.agent_evals.evaluation.graders.config import LLMRequestConfig +from aieng.agent_evals.evaluation.types import EvaluationResult +from aieng.agent_evals.knowledge_qa.agent import KnowledgeGroundedAgent +from aieng.agent_evals.knowledge_qa.deepsearchqa_grader import DeepSearchQAResult, evaluate_deepsearchqa_async +from aieng.agent_evals.logging_config import setup_logging +from dotenv import load_dotenv +from langfuse.experiment import Evaluation, ExperimentResult + + +load_dotenv(verbose=True) +setup_logging(level=logging.INFO, show_time=True, show_path=False) +logger = logging.getLogger(__name__) + + +DEFAULT_DATASET_NAME = "DeepSearchQA-Subset" +DEFAULT_EXPERIMENT_NAME = "Knowledge Agent Evaluation" + +# Configuration for trace groundedness evaluation +ENABLE_TRACE_GROUNDEDNESS = os.getenv("ENABLE_TRACE_GROUNDEDNESS", "false").lower() in ("true", "1", "yes") + + +async def agent_task(*, item: Any, **kwargs: Any) -> dict[str, Any]: # noqa: ARG001 + """Run the Knowledge Agent on a dataset item. + + Parameters + ---------- + item : Any + The Langfuse experiment item containing the question. + **kwargs : Any + Additional arguments from the harness (unused). + + Returns + ------- + dict[str, Any] + Dictionary containing 'text' (response) and 'agent_response' + (full response object). + """ + question = item.input + logger.info(f"Running agent on: {question[:80]}...") + + try: + agent = KnowledgeGroundedAgent(enable_planning=True) # type: ignore[call-arg] + response = await agent.answer_async(question) + logger.info(f"Agent completed: {len(response.text)} chars, {len(response.tool_calls)} tool calls") + + return { + "text": response.text, + "agent_response": response, + } + except Exception as e: + logger.error(f"Agent failed: {e}") + return {"text": f"Error: {e}", "agent_response": None} + + +async def deepsearchqa_evaluator( + *, + input: str, # noqa: A002 + output: dict[str, Any], + expected_output: str, + metadata: dict[str, Any] | None = None, + **kwargs: Any, # noqa: ARG001 +) -> list[Evaluation]: + """Evaluate the agent's response using DeepSearchQA methodology. + + This evaluator uses the modern async infrastructure with shared client + management and retry logic. + + Parameters + ---------- + input : str + The original question. + output : dict[str, Any] + Dictionary containing 'text' and 'agent_response'. + expected_output : str + The ground truth answer. + metadata : dict[str, Any] | None, optional + Item metadata (contains answer_type). + **kwargs : Any + Additional arguments from the harness (unused). + + Returns + ------- + list[Evaluation] + List of Langfuse Evaluations with F1, precision, recall, and outcome scores. + """ + output_text = output.get("text", "") if isinstance(output, dict) else str(output) + answer_type = metadata.get("answer_type", "Set Answer") if metadata else "Set Answer" + + logger.info(f"Evaluating response (answer_type: {answer_type})...") + + try: + # Use the modern async evaluator with default config + result = await evaluate_deepsearchqa_async( + question=input, + answer=output_text, + ground_truth=expected_output, + answer_type=answer_type, + model_config=LLMRequestConfig(temperature=0.0), + ) + + evaluations = result.to_evaluations() + logger.info(f"Evaluation complete: {result.outcome} (F1: {result.f1_score:.2f})") + return evaluations + + except Exception as e: + logger.error(f"Evaluation failed: {e}") + return DeepSearchQAResult.error_evaluations(str(e)) + + +async def run_evaluation( + dataset_name: str, + experiment_name: str, + max_concurrency: int = 1, + enable_trace_groundedness: bool = False, +) -> ExperimentResult | EvaluationResult: + """Run the full evaluation experiment. + + Parameters + ---------- + dataset_name : str + Name of the Langfuse dataset to evaluate against. + experiment_name : str + Name for this experiment run. + max_concurrency : int, optional + Maximum concurrent agent runs, by default 1. + enable_trace_groundedness : bool, optional + Whether to enable trace-level groundedness evaluation, by default False. + """ + client_manager = AsyncClientManager.get_instance() + + try: + logger.info(f"Starting experiment '{experiment_name}' on dataset '{dataset_name}'") + logger.info(f"Max concurrency: {max_concurrency}") + logger.info(f"Trace groundedness: {'enabled' if enable_trace_groundedness else 'disabled'}") + + result: ExperimentResult | EvaluationResult + if enable_trace_groundedness: + # Create trace groundedness evaluator + # Only consider web_fetch and google_search tools as evidence + groundedness_evaluator = create_trace_groundedness_evaluator( + name="trace_groundedness", + model_config=LLMRequestConfig(temperature=0.0), + ) + + # Run with trace evaluations + result = run_experiment_with_trace_evals( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge and trace groundedness", + task=agent_task, + evaluators=[deepsearchqa_evaluator], # Item-level evaluators + trace_evaluators=[groundedness_evaluator], # Trace-level evaluators + max_concurrency=max_concurrency, + ) + else: + # Run without trace evaluations + result = run_experiment( + dataset_name=dataset_name, + name=experiment_name, + description="Knowledge Agent evaluation with DeepSearchQA judge", + task=agent_task, + evaluators=[deepsearchqa_evaluator], + max_concurrency=max_concurrency, + ) + + logger.info("Experiment complete!") + # Handle both ExperimentResult and EvaluationResult + if isinstance(result, EvaluationResult): + # EvaluationResult from run_experiment_with_trace_evals + logger.info(f"Results: {result.experiment}") + if result.trace_evaluations: + trace_evals = result.trace_evaluations + logger.info( + f"Trace evaluations: {len(trace_evals.evaluations_by_trace_id)} traces, " + f"{len(trace_evals.skipped_trace_ids)} skipped, {len(trace_evals.failed_trace_ids)} failed" + ) + else: + # ExperimentResult from run_experiment + logger.info(f"Results: {result}") + + return result + + finally: + logger.info("Closing client manager and flushing data...") + try: + await client_manager.close() + await asyncio.sleep(0.1) + logger.info("Cleanup complete") + except Exception as e: + logger.warning(f"Cleanup warning: {e}") + + +@click.command() +@click.option( + "--dataset-name", + default=DEFAULT_DATASET_NAME, + help="Name of the Langfuse dataset to evaluate against.", +) +@click.option( + "--experiment-name", + default=DEFAULT_EXPERIMENT_NAME, + help="Name for this experiment run.", +) +@click.option( + "--max-concurrency", + default=1, + type=int, + help="Maximum concurrent agent runs (default: 1).", +) +@click.option( + "--enable-trace-groundedness", + is_flag=True, + default=ENABLE_TRACE_GROUNDEDNESS, + help="Enable trace-level groundedness evaluation.", +) +def cli(dataset_name: str, experiment_name: str, max_concurrency: int, enable_trace_groundedness: bool) -> None: + """Run Knowledge Agent evaluation using Langfuse experiments.""" + asyncio.run( + run_evaluation( + dataset_name, + experiment_name, + max_concurrency, + enable_trace_groundedness, + ) + ) + + +if __name__ == "__main__": + cli()