diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py index 4fa543f..5a749f1 100644 --- a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py @@ -20,7 +20,6 @@ TraceObservationPredicate, TraceWaitConfig, ) -from aieng.agent_evals.langfuse import flush_traces from langfuse import Langfuse from langfuse.api import ObservationsView from langfuse.api.core import ApiError @@ -143,6 +142,16 @@ async def _evaluate_item( return result +def flush_traces() -> None: + """Flush any pending traces to Langfuse. + + Call this before your application exits to ensure all traces are sent. + """ + manager = AsyncClientManager.get_instance() + if manager._langfuse_client is not None: + manager._langfuse_client.flush() + + def extract_trace_metrics( trace: TraceWithFullDetails, *, @@ -226,7 +235,7 @@ async def _evaluate_trace( return [], TraceEvalStatus.SKIPPED, "Missing `trace_id` on experiment item result." try: - trace, ready = await _fetch_trace_with_wait(langfuse_client, trace_id, wait) + trace, ready = await fetch_trace_with_wait(langfuse_client, trace_id, wait) except Exception as exc: return [], TraceEvalStatus.FAILED, f"Trace fetch failed: {exc}" @@ -248,7 +257,7 @@ async def _evaluate_trace( return evaluations, TraceEvalStatus.OK, None -async def _fetch_trace_with_wait( +async def fetch_trace_with_wait( langfuse_client: Langfuse, trace_id: str, wait: TraceWaitConfig ) -> tuple[TraceWithFullDetails | None, bool]: """Fetch a trace with retry/backoff until it is ready or timeout expires.""" diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index eda2005..88667da 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -1,5 +1,6 @@ """Functions and objects pertaining to Langfuse.""" +import asyncio import base64 import hashlib import json @@ -10,7 +11,10 @@ from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs +from aieng.agent_evals.evaluation.trace import extract_trace_metrics, fetch_trace_with_wait +from aieng.agent_evals.evaluation.types import TraceWaitConfig from aieng.agent_evals.progress import track_with_progress +from langfuse import Langfuse from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource @@ -160,16 +164,6 @@ def init_tracing(service_name: str = "aieng-eval-agents") -> bool: return False -def flush_traces() -> None: - """Flush any pending traces to Langfuse. - - Call this before your application exits to ensure all traces are sent. - """ - manager = AsyncClientManager.get_instance() - if manager._langfuse_client is not None: - manager._langfuse_client.flush() - - def is_tracing_enabled() -> bool: """Check if Langfuse tracing is currently enabled. @@ -370,3 +364,98 @@ def _normalize_dataset_record(item: Any, record_number: int) -> dict[str, Any]: "expected_output": item["expected_output"], "metadata": metadata, } + + +def report_usage_scores( + trace_id: str, + token_threshold: int = 0, + latency_threshold: int = 0, + cost_threshold: float = 0, +) -> None: + """Report usage scores to Langfuse for a given trace ID. + + WARNING: Due to the nature of the Langfuse API, this function may hang + while trying to fetch the observations. + + Parameters + ---------- + trace_id: str + The ID of the trace to report the usage scores for. + token_threshold: int + The total token (input + output) threshold to report the score for. + if the token count is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + latency_threshold: int + The latency threshold in seconds to report the score for. + if the latency is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + cost_threshold: float + The cost threshold to report the score for. + if the cost is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client + + logger.info(f"Fetching trace {trace_id}...") + trace, ready = asyncio.run( + fetch_trace_with_wait(langfuse_client, trace_id, TraceWaitConfig()), + ) + + if trace is None: + logger.error(f"Trace {trace_id} not found. Will not report usage scores.") + return + + if not ready: + logger.warning(f"Trace {trace_id} is not ready. Scores will be reported on partial traces.") + + trace_metrics = extract_trace_metrics(trace) + + if token_threshold > 0: + total_tokens = trace_metrics.total_input_tokens + trace_metrics.total_output_tokens + _report_score(langfuse_client, "Token Count", total_tokens, token_threshold, trace_id) + + if latency_threshold > 0: + _report_score(langfuse_client, "Latency", trace_metrics.latency_sec, latency_threshold, trace_id) + + if cost_threshold > 0: + _report_score(langfuse_client, "Cost", trace_metrics.total_cost, cost_threshold, trace_id) + + langfuse_client.flush() + + +def _report_score( + langfuse_client: Langfuse, + name: str, + value: int | float | None, + threshold: int | float, + trace_id: str, +) -> None: + if value is None: + logger.error(f"Trace {trace_id} has no value for {name}. Will not report score for {name}.") + return + + if value == 0: + logger.error(f"Trace {trace_id} has a value of 0 for {name}. Will not report score for {name}.") + return + + if value <= threshold: + score = 1 + comment = f"{value} is less than or equal to the threshold." + else: + score = 0 + comment = f"{value} is greater than the threshold." + + logger.info(f"Reporting score for {name}") + langfuse_client.create_score( + name=name, + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "value": value, + "threshold": threshold, + }, + ) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py b/aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py index 5ac660d..1374490 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py @@ -22,9 +22,10 @@ from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.db_manager import DbManager -from aieng.agent_evals.langfuse import setup_langfuse_tracer +from aieng.agent_evals.langfuse import init_tracing from aieng.agent_evals.report_generation.file_writer import ReportFileWriter from google.adk.agents import Agent +from google.adk.agents.base_agent import AfterAgentCallback from google.adk.events.event import Event from pydantic import BaseModel @@ -37,6 +38,7 @@ def get_report_generation_agent( instructions: str, reports_output_path: Path, langfuse_project_name: str | None, + after_agent_callback: AfterAgentCallback | None = None, ) -> Agent: """ Define the report generation agent. @@ -49,6 +51,9 @@ def get_report_generation_agent( The path to the reports output directory. langfuse_project_name : str | None The name of the Langfuse project to use for tracing. + after_agent_callback : AfterAgentCallback | None + The callback function to be called after the agent has + finished executing. Returns ------- @@ -57,7 +62,7 @@ def get_report_generation_agent( """ # Setup langfuse tracing if project name is provided if langfuse_project_name: - setup_langfuse_tracer(langfuse_project_name) + init_tracing(langfuse_project_name) # Get the client manager singleton instance client_manager = AsyncClientManager.get_instance() @@ -74,6 +79,7 @@ def get_report_generation_agent( db_manager.report_generation_db().get_schema_info, report_file_writer.write_xlsx, ], + after_agent_callback=after_agent_callback, ) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py similarity index 98% rename from aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py rename to aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py index 20fe60d..0b0a12d 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py @@ -3,7 +3,7 @@ Example ------- ->>> from aieng.agent_evals.report_generation.evaluation import evaluate +>>> from aieng.agent_evals.report_generation.evaluation.offline import evaluate >>> evaluate( >>> dataset_name="OnlineRetailReportEval", >>> reports_output_path=Path("reports/"), @@ -62,6 +62,7 @@ async def evaluate( dataset_name: str, reports_output_path: Path, langfuse_project_name: str, + max_concurrency: int = 5, ) -> None: """Evaluate the report generation agent against a Langfuse dataset. @@ -76,6 +77,8 @@ async def evaluate( The path to the reports output directory. langfuse_project_name : str The name of the Langfuse project to use for tracing. + max_concurrency : int, optional + The maximum concurrency to use for the evaluation, by default 5. """ # Get the client manager singleton instance and langfuse client client_manager = AsyncClientManager.get_instance() @@ -99,7 +102,7 @@ async def evaluate( description="Evaluate the Report Generation Agent with data from Langfuse", task=report_generation_task.run, evaluators=[final_result_evaluator, trajectory_evaluator], - max_concurrency=1, + max_concurrency=max_concurrency, ) # Log the evaluation result diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py new file mode 100644 index 0000000..fdf26e4 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py @@ -0,0 +1,78 @@ +"""Functions to report online evaluation of the report generation agent to Langfuse.""" + +import logging + +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.report_generation.agent import EventParser, EventType +from google.adk.events.event import Event + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) + + +def report_final_response_score(event: Event, string_match: str = "") -> None: + """Report a score to Langfuse if the event is a final response. + + The score will be reported as 1 if the final response is valid + and contains the string match. Otherwise, the score will be reported as 0. + + This has to be called within the context of a Langfuse trace. + + Parameters + ---------- + event : Event + The event to check. + string_match : str + The string to match in the final response. + Optional, default to empty string. + + Raises + ------ + ValueError + If the event is not a final response. + """ + if not event.is_final_response(): + raise ValueError("Event is not a final response") + + langfuse_client = AsyncClientManager.get_instance().langfuse_client + trace_id = langfuse_client.get_current_trace_id() + + if trace_id is None: + raise ValueError("Langfuse trace ID is None.") + + parsed_events = EventParser.parse(event) + for parsed_event in parsed_events: + if parsed_event.type == EventType.FINAL_RESPONSE: + if string_match in parsed_event.text: + score = 1 + comment = "Final response contains the string match." + else: + score = 0 + comment = "Final response does not contains the string match." + + logger.info("Reporting score for valid final response") + langfuse_client.create_score( + name="Valid Final Response", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "final_response": parsed_event.text, + "string_match": string_match, + }, + ) + langfuse_client.flush() + return + + logger.info("Reporting score for invalid final response") + langfuse_client.create_score( + name="Valid Final Response", + value=0, + trace_id=trace_id, + comment="Final response not found in the event", + metadata={ + "string_match": string_match, + }, + ) + langfuse_client.flush() diff --git a/implementations/report_generation/README.md b/implementations/report_generation/README.md index 967de0c..641527e 100644 --- a/implementations/report_generation/README.md +++ b/implementations/report_generation/README.md @@ -56,6 +56,21 @@ features a text input so you can make your own report requests to it. The agent will automatically upload a trace to Langfuse that can be used to evaluate the run or debug any issues. +### Running the agent using Google ADK + +If you wish to run the agent using the Google Agent Development Kit UI, +please run the command below from the project's root folder: + +```bash +adk web implementations/ +``` + +Once the service is up, it will be available at `http://127.0.0.1:8000`. On the +"Select an Agent" dropdown, please select "report_generation". You can type a request to +the agent on the text box on the right hand side panel. + + + ## Running the Evaluations ### Uploading the Ground Truth Dataset to Langfuse @@ -76,16 +91,16 @@ To upload custom data or use a different dataset name, please run: uv run --env-file .env python -m implementations.report_generation.data.langfuse_upload --dataset-path --dataset-name ``` -### Running the Evaluation Script +### Running the Offline Evaluation Script -Once the dataset has been uploaded to Langfuse, the evaluations can be run with -the command below: +Once the dataset has been uploaded to Langfuse, the offline evaluations +against a pre-determined dataset can be run with the command below: ```bash uv run --env-file .env python -m implementations.report_generation.evaluate ``` -To run the evaluations against a custom dataset, please execute: +To run the offline evaluations against a custom dataset, please execute: ```bash uv run --env-file .env python -m implementations.report_generation.evaluate --dataset-name @@ -98,3 +113,13 @@ agent used against the ground truth and produce True/False scores along with a r At the end of the run, an evaluation report will be displayed along with a link to check details about the evaluation in Langfuse. + +### Online Evaluations + +The agent is also set to collect online evaluation metrics in both the Gradio Demo UI +and the Google ADK UI. The online evaluations will check if the token usage and execution +time are higher than a certain threshold defined in the code, and it will also +check if the final result is present and contains a link to the report. + +Those evaluation results will be sent to Langfuse as scores, where they can be analyzed +both in an aggregate fashion as well as individually. diff --git a/implementations/report_generation/agent.py b/implementations/report_generation/agent.py new file mode 100644 index 0000000..75fe298 --- /dev/null +++ b/implementations/report_generation/agent.py @@ -0,0 +1,67 @@ +"""Entry point for the Google ADK UI for the report generation agent. + +Example +------- +$ adk web implementations/ +""" + +import logging +import threading + +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.langfuse import report_usage_scores +from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score +from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS +from dotenv import load_dotenv +from google.adk.agents.callback_context import CallbackContext + +from .env_vars import get_langfuse_project_name, get_reports_output_path + + +load_dotenv(verbose=True) +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) + + +def calculate_and_send_scores(callback_context: CallbackContext) -> None: + """Calculate token usage and latency scores and submit them to Langfuse. + + This is a callback function to be called after the agent has run. + + Parameters + ---------- + callback_context : CallbackContext + The callback context at the end of the agent run. + """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client + langfuse_client.flush() + + for event in callback_context.session.events: + if event.is_final_response() and event.content and event.content.role == "model": + # Report the final response evaluation to Langfuse + report_final_response_score(event, string_match="](gradio_api/file=") + + # Run usage scoring in a thread so it doesn't block the UI + thread = threading.Thread( + target=report_usage_scores, + kwargs={ + "trace_id": langfuse_client.get_current_trace_id(), + "token_threshold": 15000, + "latency_threshold": 60, + }, + daemon=True, + ) + thread.start() + + return + + logger.error("No final response found in the callback context. Will not report scores to Langfuse.") + + +root_agent = get_report_generation_agent( + instructions=MAIN_AGENT_INSTRUCTIONS, + reports_output_path=get_reports_output_path(), + langfuse_project_name=get_langfuse_project_name(), + after_agent_callback=calculate_and_send_scores, +) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index a2ef39a..4e389f6 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -8,6 +8,7 @@ import asyncio import logging +import threading from functools import partial from typing import Any, AsyncGenerator @@ -15,9 +16,12 @@ import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.db_manager import DbManager +from aieng.agent_evals.langfuse import report_usage_scores from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv +from google.adk.agents.callback_context import CallbackContext from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.genai.types import Content, Part @@ -64,6 +68,7 @@ async def agent_session_handler( instructions=MAIN_AGENT_INSTRUCTIONS, reports_output_path=get_reports_output_path(), langfuse_project_name=get_langfuse_project_name() if enable_trace else None, + after_agent_callback=calculate_and_send_scores, ) # Construct an in-memory session for the agent to maintain @@ -94,6 +99,39 @@ async def agent_session_handler( yield turn_messages +def calculate_and_send_scores(callback_context: CallbackContext) -> None: + """Calculate token usage and latency scores and submit them to Langfuse. + + This is a callback function to be called after the agent has run. + + Parameters + ---------- + callback_context : CallbackContext + The callback context at the end of the agent run. + """ + for event in callback_context.session.events: + if event.is_final_response() and event.content and event.content.role == "model": + # Report the final response evaluation to Langfuse + report_final_response_score(event, string_match="](gradio_api/file=") + + # Run usage scoring in a thread so it doesn't block the UI + langfuse_client = AsyncClientManager.get_instance().langfuse_client + thread = threading.Thread( + target=report_usage_scores, + kwargs={ + "trace_id": langfuse_client.get_current_trace_id(), + "token_threshold": 15000, + "latency_threshold": 60, + }, + daemon=True, + ) + thread.start() + + return + + logger.error("No final response found in the callback context. Will not report scores to Langfuse.") + + @click.command() @click.option("--enable-trace", required=False, default=True, help="Whether to enable tracing with Langfuse.") @click.option( diff --git a/implementations/report_generation/evaluate.py b/implementations/report_generation/evaluate.py index 4df0879..d46fda4 100644 --- a/implementations/report_generation/evaluate.py +++ b/implementations/report_generation/evaluate.py @@ -11,7 +11,7 @@ import asyncio import click -from aieng.agent_evals.report_generation.evaluation import evaluate +from aieng.agent_evals.report_generation.evaluation.offline import evaluate from dotenv import load_dotenv from implementations.report_generation.data.langfuse_upload import DEFAULT_EVALUATION_DATASET_NAME diff --git a/implementations/report_generation/gradio_utils.py b/implementations/report_generation/gradio_utils.py index 76338f8..9510a3c 100644 --- a/implementations/report_generation/gradio_utils.py +++ b/implementations/report_generation/gradio_utils.py @@ -1,5 +1,6 @@ """Utility functions for the report generation agent.""" +import json import logging from aieng.agent_evals.report_generation.agent import EventParser, EventType @@ -40,11 +41,15 @@ def agent_event_to_gradio_messages(event: Event) -> list[ChatMessage]: ) ) elif parsed_event.type == EventType.TOOL_CALL: + formatted_arguments = json.dumps(parsed_event.arguments, indent=2).replace("\\n", "\n") output.append( ChatMessage( role="assistant", - content=f"```\n{parsed_event.arguments}\n```", - metadata={"title": f"🛠️ Used tool `{parsed_event.text}`"}, + content=f"```\n{formatted_arguments}\n```", + metadata={ + "title": f"🛠️ Used tool `{parsed_event.text}`", + "status": "done", # This makes it collapsed by default + }, ) ) elif parsed_event.type == EventType.THOUGHT: @@ -56,10 +61,11 @@ def agent_event_to_gradio_messages(event: Event) -> list[ChatMessage]: ) ) elif parsed_event.type == EventType.TOOL_RESPONSE: + formatted_arguments = json.dumps(parsed_event.arguments, indent=2).replace("\\n", "\n") output.append( ChatMessage( role="assistant", - content=f"```\n{parsed_event.arguments}\n```", + content=f"```\n{formatted_arguments}\n```", metadata={ "title": f"📝 Tool call output: `{parsed_event.text}`", "status": "done", # This makes it collapsed by default