From 071af2414c31212ee50e96a5551417c168662571 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 10 Feb 2026 16:59:04 -0500 Subject: [PATCH 1/9] Adding final response evaluation and some minor improvements --- .../{evaluation.py => evaluation/offline.py} | 2 +- .../report_generation/evaluation/online.py | 63 +++++++++++++++++++ implementations/report_generation/demo.py | 9 +++ implementations/report_generation/evaluate.py | 2 +- .../report_generation/gradio_utils.py | 12 +++- 5 files changed, 83 insertions(+), 5 deletions(-) rename aieng-eval-agents/aieng/agent_evals/report_generation/{evaluation.py => evaluation/offline.py} (99%) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py similarity index 99% rename from aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py rename to aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py index 9099170..8e179a0 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/offline.py @@ -3,7 +3,7 @@ Example ------- ->>> from aieng.agent_evals.report_generation.evaluation import evaluate +>>> from aieng.agent_evals.report_generation.evaluation.offline import evaluate >>> evaluate( >>> dataset_name="OnlineRetailReportEval", >>> reports_output_path=Path("reports/"), diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py new file mode 100644 index 0000000..d487592 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py @@ -0,0 +1,63 @@ +"""Functions to report online evaluation of the report generation agent to Langfuse.""" + +from aieng.agent_evals.report_generation.agent import EventParser, EventType +from google.adk.events.event import Event +from langfuse import Langfuse + + +def report_if_final_response(event: Event, langfuse_client: Langfuse, string_match: str = "") -> None: + """Report a score to Langfuse if the event is a final response. + + The score will be reported as 1 if the final response is valid + and contains the string match. Otherwise, the score will be reported as 0. + + Parameters + ---------- + event : Event + The event to check. + langfuse_client : Langfuse + The Langfuse client to use. + string_match : str + The string to match in the final response. + Optional, default to empty string. + """ + trace_id = langfuse_client.get_current_trace_id() + + if event.is_final_response(): + parsed_events = EventParser.parse(event) + for parsed_event in parsed_events: + if parsed_event.type == EventType.FINAL_RESPONSE: + if string_match in parsed_event.text: + langfuse_client.create_score( + name="Valid Final Response", + value=1, + trace_id=trace_id, + comment="Final response contains the string match.", + metadata={ + "final_response": parsed_event.text, + "string_match": string_match, + }, + ) + return + + langfuse_client.create_score( + name="Valid Final Response", + value=0, + trace_id=trace_id, + comment="Final response does not contains the string match.", + metadata={ + "final_response": parsed_event.text, + "string_match": string_match, + }, + ) + return + + langfuse_client.create_score( + name="Valid Final Response", + value=0, + trace_id=trace_id, + comment="Final response not found in the event", + metadata={ + "string_match": string_match, + }, + ) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index 7771b9b..b6978cf 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -15,6 +15,7 @@ import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.evaluation.online import report_if_final_response from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv from google.adk.runners import Runner @@ -65,6 +66,9 @@ async def agent_session_handler( langfuse_project_name=get_langfuse_project_name() if enable_trace else None, ) + # Get the Langfuse client for online reporting + langfuse_client = AsyncClientManager.get_instance().langfuse_client + # Construct an in-memory session for the agent to maintain # conversation history across multiple turns of a chat # This makes it possible to ask follow-up questions that refer to @@ -86,12 +90,17 @@ async def agent_session_handler( session_id=current_session.id, new_message=content, ): + # Report the final response evaluation to Langfuse + report_if_final_response(event, langfuse_client, string_match="](gradio_api/file=") + # Parse the stream events, convert to Gradio chat messages and append to # the chat history turn_messages += agent_event_to_gradio_messages(event) if len(turn_messages) > 0: yield turn_messages + langfuse_client.flush() + @click.command() @click.option("--enable-trace", required=False, default=True, help="Whether to enable tracing with Langfuse.") diff --git a/implementations/report_generation/evaluate.py b/implementations/report_generation/evaluate.py index 4df0879..d46fda4 100644 --- a/implementations/report_generation/evaluate.py +++ b/implementations/report_generation/evaluate.py @@ -11,7 +11,7 @@ import asyncio import click -from aieng.agent_evals.report_generation.evaluation import evaluate +from aieng.agent_evals.report_generation.evaluation.offline import evaluate from dotenv import load_dotenv from implementations.report_generation.data.langfuse_upload import DEFAULT_EVALUATION_DATASET_NAME diff --git a/implementations/report_generation/gradio_utils.py b/implementations/report_generation/gradio_utils.py index 76338f8..9510a3c 100644 --- a/implementations/report_generation/gradio_utils.py +++ b/implementations/report_generation/gradio_utils.py @@ -1,5 +1,6 @@ """Utility functions for the report generation agent.""" +import json import logging from aieng.agent_evals.report_generation.agent import EventParser, EventType @@ -40,11 +41,15 @@ def agent_event_to_gradio_messages(event: Event) -> list[ChatMessage]: ) ) elif parsed_event.type == EventType.TOOL_CALL: + formatted_arguments = json.dumps(parsed_event.arguments, indent=2).replace("\\n", "\n") output.append( ChatMessage( role="assistant", - content=f"```\n{parsed_event.arguments}\n```", - metadata={"title": f"🛠️ Used tool `{parsed_event.text}`"}, + content=f"```\n{formatted_arguments}\n```", + metadata={ + "title": f"🛠️ Used tool `{parsed_event.text}`", + "status": "done", # This makes it collapsed by default + }, ) ) elif parsed_event.type == EventType.THOUGHT: @@ -56,10 +61,11 @@ def agent_event_to_gradio_messages(event: Event) -> list[ChatMessage]: ) ) elif parsed_event.type == EventType.TOOL_RESPONSE: + formatted_arguments = json.dumps(parsed_event.arguments, indent=2).replace("\\n", "\n") output.append( ChatMessage( role="assistant", - content=f"```\n{parsed_event.arguments}\n```", + content=f"```\n{formatted_arguments}\n```", metadata={ "title": f"📝 Tool call output: `{parsed_event.text}`", "status": "done", # This makes it collapsed by default From cad754af210871628c59b3dbf1e3f5bd036e620d Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Tue, 10 Feb 2026 18:17:28 -0500 Subject: [PATCH 2/9] Finished online scores, need to put it in a thread --- .../report_generation/evaluation/online.py | 207 +++++++++++++++--- implementations/report_generation/demo.py | 12 +- 2 files changed, 180 insertions(+), 39 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py index d487592..4b0dfe6 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py @@ -1,63 +1,200 @@ """Functions to report online evaluation of the report generation agent to Langfuse.""" +import logging +from typing import Any + +from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import EventParser, EventType from google.adk.events.event import Event from langfuse import Langfuse +from langfuse.api.resources.commons.types.observations_view import ObservationsView +from langfuse.api.resources.observations.types.observations_views import ObservationsViews +from tenacity import retry, stop_after_attempt, wait_exponential + + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) -def report_if_final_response(event: Event, langfuse_client: Langfuse, string_match: str = "") -> None: +def report_final_response_score(event: Event, string_match: str = "") -> None: """Report a score to Langfuse if the event is a final response. The score will be reported as 1 if the final response is valid and contains the string match. Otherwise, the score will be reported as 0. + This has to be called within the context of a Langfuse trace. + Parameters ---------- event : Event The event to check. - langfuse_client : Langfuse - The Langfuse client to use. string_match : str The string to match in the final response. Optional, default to empty string. + + Raises + ------ + ValueError + If the event is not a final response. + """ + if not event.is_final_response(): + raise ValueError("Event is not a final response") + + langfuse_client = AsyncClientManager.get_instance().langfuse_client + trace_id = langfuse_client.get_current_trace_id() + + if trace_id is None: + raise ValueError("Langfuse trace ID is None.") + + logger.info("Reporting score for valid final response") + + parsed_events = EventParser.parse(event) + for parsed_event in parsed_events: + if parsed_event.type == EventType.FINAL_RESPONSE: + if string_match in parsed_event.text: + score = 1 + comment = "Final response contains the string match." + else: + score = 0 + comment = "Final response does not contains the string match." + + logger.info("Reporting score for valid final response") + langfuse_client.create_score( + name="Valid Final Response", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "final_response": parsed_event.text, + "string_match": string_match, + }, + ) + return + + langfuse_client.create_score( + name="Valid Final Response", + value=0, + trace_id=trace_id, + comment="Final response not found in the event", + metadata={ + "string_match": string_match, + }, + ) + + +def report_usage_scores( + token_threshold: int = 0, + latency_threshold: int = 0, + cost_threshold: float = 0, +) -> None: + """Report usage metrics to Langfuse. + + This function has to be called within the context of a Langfuse trace. + + Parameters + ---------- + token_threshold: int + The token threshold to report the metrics for. + if the token count is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + latency_threshold: int + The latency threshold in seconds to report the metrics for. + if the latency is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + cost_threshold: float + The cost threshold to report the metrics for. + if the cost is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client trace_id = langfuse_client.get_current_trace_id() - if event.is_final_response(): - parsed_events = EventParser.parse(event) - for parsed_event in parsed_events: - if parsed_event.type == EventType.FINAL_RESPONSE: - if string_match in parsed_event.text: - langfuse_client.create_score( - name="Valid Final Response", - value=1, - trace_id=trace_id, - comment="Final response contains the string match.", - metadata={ - "final_response": parsed_event.text, - "string_match": string_match, - }, - ) - return - - langfuse_client.create_score( - name="Valid Final Response", - value=0, - trace_id=trace_id, - comment="Final response does not contains the string match.", - metadata={ - "final_response": parsed_event.text, - "string_match": string_match, - }, - ) - return + if trace_id is None: + raise ValueError("Langfuse trace ID is None.") + + observations = _get_observations_with_retry(trace_id, langfuse_client) + if token_threshold > 0: + total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) + if total_tokens <= token_threshold: + score = 1 + comment = "Token count is less than or equal to the threshold." + else: + score = 0 + comment = "Token count is greater than the threshold." + + logger.info("Reporting score for token count") + langfuse_client.create_score( + name="Token Count", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_tokens": total_tokens, + "token_threshold": token_threshold, + }, + ) + + if latency_threshold > 0: + total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) + if total_latency <= latency_threshold: + score = 1 + comment = "Latency is less than or equal to the threshold." + else: + score = 0 + comment = "Latency is greater than the threshold." + + logger.info("Reporting score for latency") + langfuse_client.create_score( + name="Latency", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_latency": total_latency, + "latency_threshold": latency_threshold, + }, + ) + + if cost_threshold > 0: + total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) + if total_cost <= cost_threshold: + score = 1 + comment = "Cost is less than or equal to the threshold." + else: + score = 0 + comment = "Cost is greater than the threshold." + + logger.info("Reporting score for cost") langfuse_client.create_score( - name="Valid Final Response", - value=0, + name="Cost", + value=score, trace_id=trace_id, - comment="Final response not found in the event", + comment=comment, metadata={ - "string_match": string_match, + "total_cost": total_cost, + "cost_threshold": cost_threshold, }, ) + + langfuse_client.flush() + + +def _obs_attr(observation: ObservationsView, attribute: str) -> Any: + attribute_value = getattr(observation, attribute) + if attribute_value == 0: + logger.error(f"Observation attribute value for {attribute} is 0") + return 0 + if attribute_value is None: + logger.error(f"Observation attribute value for {attribute} is None") + return 0 + return attribute_value + + +@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) +def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: + logger.info(f"Getting observations for trace {trace_id}...") + return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index b6978cf..383b1b9 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -15,7 +15,7 @@ import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import get_report_generation_agent -from aieng.agent_evals.report_generation.evaluation.online import report_if_final_response +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score, report_usage_scores from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv from google.adk.runners import Runner @@ -90,15 +90,19 @@ async def agent_session_handler( session_id=current_session.id, new_message=content, ): - # Report the final response evaluation to Langfuse - report_if_final_response(event, langfuse_client, string_match="](gradio_api/file=") - # Parse the stream events, convert to Gradio chat messages and append to # the chat history turn_messages += agent_event_to_gradio_messages(event) if len(turn_messages) > 0: yield turn_messages + if event.is_final_response(): + # Report the final response evaluation to Langfuse + report_final_response_score(event, string_match="](gradio_api/file=") + + # TODO: need to put this in a thread + report_usage_scores(token_threshold=10000, latency_threshold=60) + langfuse_client.flush() From 3aedc8eb6ea47e07250a26ea51dad5d2dddb743b Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Wed, 11 Feb 2026 12:06:45 -0500 Subject: [PATCH 3/9] Moving the score reporting to the lasngfuse module for better reusability --- .../aieng/agent_evals/langfuse.py | 123 +++++++++++++++++ .../report_generation/evaluation/online.py | 126 +----------------- implementations/report_generation/demo.py | 17 ++- 3 files changed, 139 insertions(+), 127 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index eda2005..e5d5dad 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -11,11 +11,15 @@ from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs from aieng.agent_evals.progress import track_with_progress +from langfuse import Langfuse +from langfuse.api.resources.commons.types.observations_view import ObservationsView +from langfuse.api.resources.observations.types.observations_views import ObservationsViews from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor +from tenacity import retry, stop_after_attempt, wait_exponential logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -370,3 +374,122 @@ def _normalize_dataset_record(item: Any, record_number: int) -> dict[str, Any]: "expected_output": item["expected_output"], "metadata": metadata, } + + +def report_usage_scores( + trace_id: str, + token_threshold: int = 0, + latency_threshold: int = 0, + cost_threshold: float = 0, +) -> None: + """Report usage scores to Langfuse for a given trace ID. + + WARNING: Due to the nature of the Langfuse API, this function may hang + while trying to fetch the observations. + + Parameters + ---------- + trace_id: str + The ID of the trace to report the usage scores for. + token_threshold: int + The token threshold to report the score for. + if the token count is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + latency_threshold: int + The latency threshold in seconds to report the score for. + if the latency is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + cost_threshold: float + The cost threshold to report the score for. + if the cost is greater than the threshold, the score + will be reported as 0. + Optional, default to 0 (no reporting). + """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client + observations = _get_observations_with_retry(trace_id, langfuse_client) + + if token_threshold > 0: + total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) + if total_tokens <= token_threshold: + score = 1 + comment = "Token count is less than or equal to the threshold." + else: + score = 0 + comment = "Token count is greater than the threshold." + + logger.info("Reporting score for token count") + langfuse_client.create_score( + name="Token Count", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_tokens": total_tokens, + "token_threshold": token_threshold, + }, + ) + + if latency_threshold > 0: + total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) + if total_latency <= latency_threshold: + score = 1 + comment = "Latency is less than or equal to the threshold." + else: + score = 0 + comment = "Latency is greater than the threshold." + + logger.info("Reporting score for latency") + langfuse_client.create_score( + name="Latency", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_latency": total_latency, + "latency_threshold": latency_threshold, + }, + ) + + if cost_threshold > 0: + total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) + if total_cost <= cost_threshold: + score = 1 + comment = "Cost is less than or equal to the threshold." + else: + score = 0 + comment = "Cost is greater than the threshold." + + logger.info("Reporting score for cost") + langfuse_client.create_score( + name="Cost", + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "total_cost": total_cost, + "cost_threshold": cost_threshold, + }, + ) + + langfuse_client.flush() + + +def _obs_attr(observation: ObservationsView, attribute: str) -> Any: + """Get the value of an attribute from an observation.""" + attribute_value = getattr(observation, attribute) + if attribute_value == 0: + logger.error(f"Observation attribute value for {attribute} is 0") + return 0 + if attribute_value is None: + logger.error(f"Observation attribute value for {attribute} is None") + return 0 + return attribute_value + + +@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) +def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: + """Get the observations for a given trace ID with retry/backoff.""" + logger.info(f"Getting observations for trace {trace_id}...") + return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py index 4b0dfe6..fdf26e4 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/online.py @@ -1,15 +1,10 @@ """Functions to report online evaluation of the report generation agent to Langfuse.""" import logging -from typing import Any from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.report_generation.agent import EventParser, EventType from google.adk.events.event import Event -from langfuse import Langfuse -from langfuse.api.resources.commons.types.observations_view import ObservationsView -from langfuse.api.resources.observations.types.observations_views import ObservationsViews -from tenacity import retry, stop_after_attempt, wait_exponential logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -46,8 +41,6 @@ def report_final_response_score(event: Event, string_match: str = "") -> None: if trace_id is None: raise ValueError("Langfuse trace ID is None.") - logger.info("Reporting score for valid final response") - parsed_events = EventParser.parse(event) for parsed_event in parsed_events: if parsed_event.type == EventType.FINAL_RESPONSE: @@ -69,8 +62,10 @@ def report_final_response_score(event: Event, string_match: str = "") -> None: "string_match": string_match, }, ) + langfuse_client.flush() return + logger.info("Reporting score for invalid final response") langfuse_client.create_score( name="Valid Final Response", value=0, @@ -80,121 +75,4 @@ def report_final_response_score(event: Event, string_match: str = "") -> None: "string_match": string_match, }, ) - - -def report_usage_scores( - token_threshold: int = 0, - latency_threshold: int = 0, - cost_threshold: float = 0, -) -> None: - """Report usage metrics to Langfuse. - - This function has to be called within the context of a Langfuse trace. - - Parameters - ---------- - token_threshold: int - The token threshold to report the metrics for. - if the token count is greater than the threshold, the score - will be reported as 0. - Optional, default to 0 (no reporting). - latency_threshold: int - The latency threshold in seconds to report the metrics for. - if the latency is greater than the threshold, the score - will be reported as 0. - Optional, default to 0 (no reporting). - cost_threshold: float - The cost threshold to report the metrics for. - if the cost is greater than the threshold, the score - will be reported as 0. - Optional, default to 0 (no reporting). - """ - langfuse_client = AsyncClientManager.get_instance().langfuse_client - trace_id = langfuse_client.get_current_trace_id() - - if trace_id is None: - raise ValueError("Langfuse trace ID is None.") - - observations = _get_observations_with_retry(trace_id, langfuse_client) - - if token_threshold > 0: - total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) - if total_tokens <= token_threshold: - score = 1 - comment = "Token count is less than or equal to the threshold." - else: - score = 0 - comment = "Token count is greater than the threshold." - - logger.info("Reporting score for token count") - langfuse_client.create_score( - name="Token Count", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_tokens": total_tokens, - "token_threshold": token_threshold, - }, - ) - - if latency_threshold > 0: - total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) - if total_latency <= latency_threshold: - score = 1 - comment = "Latency is less than or equal to the threshold." - else: - score = 0 - comment = "Latency is greater than the threshold." - - logger.info("Reporting score for latency") - langfuse_client.create_score( - name="Latency", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_latency": total_latency, - "latency_threshold": latency_threshold, - }, - ) - - if cost_threshold > 0: - total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) - if total_cost <= cost_threshold: - score = 1 - comment = "Cost is less than or equal to the threshold." - else: - score = 0 - comment = "Cost is greater than the threshold." - - logger.info("Reporting score for cost") - langfuse_client.create_score( - name="Cost", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_cost": total_cost, - "cost_threshold": cost_threshold, - }, - ) - langfuse_client.flush() - - -def _obs_attr(observation: ObservationsView, attribute: str) -> Any: - attribute_value = getattr(observation, attribute) - if attribute_value == 0: - logger.error(f"Observation attribute value for {attribute} is 0") - return 0 - if attribute_value is None: - logger.error(f"Observation attribute value for {attribute} is None") - return 0 - return attribute_value - - -@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) -def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: - logger.info(f"Getting observations for trace {trace_id}...") - return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index 383b1b9..e39f05c 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -8,14 +8,16 @@ import asyncio import logging +import threading from functools import partial from typing import Any, AsyncGenerator import click import gradio as gr from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.langfuse import report_usage_scores from aieng.agent_evals.report_generation.agent import get_report_generation_agent -from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score, report_usage_scores +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv from google.adk.runners import Runner @@ -100,8 +102,17 @@ async def agent_session_handler( # Report the final response evaluation to Langfuse report_final_response_score(event, string_match="](gradio_api/file=") - # TODO: need to put this in a thread - report_usage_scores(token_threshold=10000, latency_threshold=60) + # Run usage scoring in a thread so it doesn't block the UI + thread = threading.Thread( + target=report_usage_scores, + kwargs={ + "trace_id": langfuse_client.get_current_trace_id(), + "token_threshold": 10000, + "latency_threshold": 60, + }, + daemon=True, + ) + thread.start() langfuse_client.flush() From aed2cf285803aa37397e41161533b50bc87b302c Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 11:10:34 -0500 Subject: [PATCH 4/9] Adding missing init files --- aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py | 0 .../aieng/agent_evals/report_generation/evaluation/__init__.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py create mode 100644 aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py b/aieng-eval-agents/aieng/agent_evals/report_generation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation/__init__.py new file mode 100644 index 0000000..e69de29 From efa320e0164dd5ab09e11db9836bc773787ee561 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 12:21:14 -0500 Subject: [PATCH 5/9] Using the trace fetch functions instead of making them myself --- .../aieng/agent_evals/evaluation/trace.py | 15 ++- .../aieng/agent_evals/langfuse.py | 125 ++++++------------ implementations/report_generation/demo.py | 2 +- 3 files changed, 56 insertions(+), 86 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py index 4fa543f..5a749f1 100644 --- a/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py +++ b/aieng-eval-agents/aieng/agent_evals/evaluation/trace.py @@ -20,7 +20,6 @@ TraceObservationPredicate, TraceWaitConfig, ) -from aieng.agent_evals.langfuse import flush_traces from langfuse import Langfuse from langfuse.api import ObservationsView from langfuse.api.core import ApiError @@ -143,6 +142,16 @@ async def _evaluate_item( return result +def flush_traces() -> None: + """Flush any pending traces to Langfuse. + + Call this before your application exits to ensure all traces are sent. + """ + manager = AsyncClientManager.get_instance() + if manager._langfuse_client is not None: + manager._langfuse_client.flush() + + def extract_trace_metrics( trace: TraceWithFullDetails, *, @@ -226,7 +235,7 @@ async def _evaluate_trace( return [], TraceEvalStatus.SKIPPED, "Missing `trace_id` on experiment item result." try: - trace, ready = await _fetch_trace_with_wait(langfuse_client, trace_id, wait) + trace, ready = await fetch_trace_with_wait(langfuse_client, trace_id, wait) except Exception as exc: return [], TraceEvalStatus.FAILED, f"Trace fetch failed: {exc}" @@ -248,7 +257,7 @@ async def _evaluate_trace( return evaluations, TraceEvalStatus.OK, None -async def _fetch_trace_with_wait( +async def fetch_trace_with_wait( langfuse_client: Langfuse, trace_id: str, wait: TraceWaitConfig ) -> tuple[TraceWithFullDetails | None, bool]: """Fetch a trace with retry/backoff until it is ready or timeout expires.""" diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index e5d5dad..1b087f4 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -1,5 +1,6 @@ """Functions and objects pertaining to Langfuse.""" +import asyncio import base64 import hashlib import json @@ -10,16 +11,15 @@ from aieng.agent_evals.async_client_manager import AsyncClientManager from aieng.agent_evals.configs import Configs +from aieng.agent_evals.evaluation.trace import extract_trace_metrics, fetch_trace_with_wait +from aieng.agent_evals.evaluation.types import TraceWaitConfig from aieng.agent_evals.progress import track_with_progress from langfuse import Langfuse -from langfuse.api.resources.commons.types.observations_view import ObservationsView -from langfuse.api.resources.observations.types.observations_views import ObservationsViews from opentelemetry import trace from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor -from tenacity import retry, stop_after_attempt, wait_exponential logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") @@ -164,16 +164,6 @@ def init_tracing(service_name: str = "aieng-eval-agents") -> bool: return False -def flush_traces() -> None: - """Flush any pending traces to Langfuse. - - Call this before your application exits to ensure all traces are sent. - """ - manager = AsyncClientManager.get_instance() - if manager._langfuse_client is not None: - manager._langfuse_client.flush() - - def is_tracing_enabled() -> bool: """Check if Langfuse tracing is currently enabled. @@ -392,7 +382,7 @@ def report_usage_scores( trace_id: str The ID of the trace to report the usage scores for. token_threshold: int - The token threshold to report the score for. + The total token (input + output) threshold to report the score for. if the token count is greater than the threshold, the score will be reported as 0. Optional, default to 0 (no reporting). @@ -408,88 +398,59 @@ def report_usage_scores( Optional, default to 0 (no reporting). """ langfuse_client = AsyncClientManager.get_instance().langfuse_client - observations = _get_observations_with_retry(trace_id, langfuse_client) - if token_threshold > 0: - total_tokens = sum(_obs_attr(observation, "totalTokens") for observation in observations.data) - if total_tokens <= token_threshold: - score = 1 - comment = "Token count is less than or equal to the threshold." - else: - score = 0 - comment = "Token count is greater than the threshold." + logger.info(f"Fetching trace {trace_id}...") + trace, ready = asyncio.run( + fetch_trace_with_wait(langfuse_client, trace_id, TraceWaitConfig()), + ) - logger.info("Reporting score for token count") - langfuse_client.create_score( - name="Token Count", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_tokens": total_tokens, - "token_threshold": token_threshold, - }, - ) + if trace is None: + logger.error(f"Trace {trace_id} not found. Will not report usage scores.") + return - if latency_threshold > 0: - total_latency = sum(_obs_attr(observation, "latency") for observation in observations.data) - if total_latency <= latency_threshold: - score = 1 - comment = "Latency is less than or equal to the threshold." - else: - score = 0 - comment = "Latency is greater than the threshold." + if not ready: + logger.warning(f"Trace {trace_id} is not ready. Scores will be reported on partial traces.") - logger.info("Reporting score for latency") - langfuse_client.create_score( - name="Latency", - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "total_latency": total_latency, - "latency_threshold": latency_threshold, - }, - ) + trace_metrics = extract_trace_metrics(trace) + + if token_threshold > 0: + total_tokens = trace_metrics.total_input_tokens + trace_metrics.total_output_tokens + _report_score(langfuse_client, "Token Count", total_tokens, token_threshold, trace_id) + + if latency_threshold > 0: + _report_score(langfuse_client, "Latency", trace_metrics.latency_sec, latency_threshold, trace_id) if cost_threshold > 0: - total_cost = sum(_obs_attr(observation, "calculated_total_cost") for observation in observations.data) - if total_cost <= cost_threshold: + _report_score(langfuse_client, "Cost", trace_metrics.total_cost, cost_threshold, trace_id) + + langfuse_client.flush() + + +def _report_score( + langfuse_client: Langfuse, + name: str, + value: int | float | None, + threshold: int | float, + trace_id: str, +) -> None: + if value is None: + logger.error(f"Trace {trace_id} has no value for {name}. Will not report score for {name}.") + + else: + if value <= threshold: score = 1 - comment = "Cost is less than or equal to the threshold." + comment = f"{value} is less than or equal to the threshold." else: score = 0 - comment = "Cost is greater than the threshold." + comment = f"{value} is greater than the threshold." - logger.info("Reporting score for cost") langfuse_client.create_score( - name="Cost", + name=name, value=score, trace_id=trace_id, comment=comment, metadata={ - "total_cost": total_cost, - "cost_threshold": cost_threshold, + "value": value, + "threshold": threshold, }, ) - - langfuse_client.flush() - - -def _obs_attr(observation: ObservationsView, attribute: str) -> Any: - """Get the value of an attribute from an observation.""" - attribute_value = getattr(observation, attribute) - if attribute_value == 0: - logger.error(f"Observation attribute value for {attribute} is 0") - return 0 - if attribute_value is None: - logger.error(f"Observation attribute value for {attribute} is None") - return 0 - return attribute_value - - -@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=5, max=15)) -def _get_observations_with_retry(trace_id: str, langfuse_client: Langfuse) -> ObservationsViews: - """Get the observations for a given trace ID with retry/backoff.""" - logger.info(f"Getting observations for trace {trace_id}...") - return langfuse_client.api.observations.get_many(trace_id=trace_id, type="GENERATION") diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index e39f05c..a5eb267 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -107,7 +107,7 @@ async def agent_session_handler( target=report_usage_scores, kwargs={ "trace_id": langfuse_client.get_current_trace_id(), - "token_threshold": 10000, + "token_threshold": 20000, "latency_threshold": 60, }, daemon=True, From 1de6a3716dd2662d6839ace9cd2f856adfd9f3af Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 12:22:50 -0500 Subject: [PATCH 6/9] Adjusting the token threshold to 15k --- implementations/report_generation/demo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index a5eb267..9a3dbfb 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -107,7 +107,7 @@ async def agent_session_handler( target=report_usage_scores, kwargs={ "trace_id": langfuse_client.get_current_trace_id(), - "token_threshold": 20000, + "token_threshold": 15000, "latency_threshold": 60, }, daemon=True, From ac7ba2c662c46c9b09135435926f91ee290f4c83 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 12:23:52 -0500 Subject: [PATCH 7/9] Adding log for every metric reported --- aieng-eval-agents/aieng/agent_evals/langfuse.py | 1 + 1 file changed, 1 insertion(+) diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index 1b087f4..8f5f59e 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -444,6 +444,7 @@ def _report_score( score = 0 comment = f"{value} is greater than the threshold." + logger.info(f"Reporting score for {name}") langfuse_client.create_score( name=name, value=score, From 0bf23513977680e2fe4451d25f5d4c22cce29e98 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Thu, 12 Feb 2026 17:25:25 -0500 Subject: [PATCH 8/9] Adding agent.py to the demo as well --- .../aieng/agent_evals/langfuse.py | 40 ++++++----- .../agent_evals/report_generation/agent.py | 3 + implementations/report_generation/agent.py | 67 +++++++++++++++++++ implementations/report_generation/demo.py | 24 +++++-- 4 files changed, 111 insertions(+), 23 deletions(-) create mode 100644 implementations/report_generation/agent.py diff --git a/aieng-eval-agents/aieng/agent_evals/langfuse.py b/aieng-eval-agents/aieng/agent_evals/langfuse.py index 8f5f59e..88667da 100644 --- a/aieng-eval-agents/aieng/agent_evals/langfuse.py +++ b/aieng-eval-agents/aieng/agent_evals/langfuse.py @@ -435,23 +435,27 @@ def _report_score( ) -> None: if value is None: logger.error(f"Trace {trace_id} has no value for {name}. Will not report score for {name}.") + return + + if value == 0: + logger.error(f"Trace {trace_id} has a value of 0 for {name}. Will not report score for {name}.") + return + if value <= threshold: + score = 1 + comment = f"{value} is less than or equal to the threshold." else: - if value <= threshold: - score = 1 - comment = f"{value} is less than or equal to the threshold." - else: - score = 0 - comment = f"{value} is greater than the threshold." - - logger.info(f"Reporting score for {name}") - langfuse_client.create_score( - name=name, - value=score, - trace_id=trace_id, - comment=comment, - metadata={ - "value": value, - "threshold": threshold, - }, - ) + score = 0 + comment = f"{value} is greater than the threshold." + + logger.info(f"Reporting score for {name}") + langfuse_client.create_score( + name=name, + value=score, + trace_id=trace_id, + comment=comment, + metadata={ + "value": value, + "threshold": threshold, + }, + ) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py index 4daf8a1..5874315 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py @@ -24,6 +24,7 @@ from aieng.agent_evals.langfuse import setup_langfuse_tracer from aieng.agent_evals.report_generation.file_writer import ReportFileWriter from google.adk.agents import Agent +from google.adk.agents.base_agent import AfterAgentCallback from google.adk.events.event import Event from pydantic import BaseModel @@ -36,6 +37,7 @@ def get_report_generation_agent( instructions: str, reports_output_path: Path, langfuse_project_name: str | None, + after_agent_callback: AfterAgentCallback | None = None, ) -> Agent: """ Define the report generation agent. @@ -72,6 +74,7 @@ def get_report_generation_agent( client_manager.report_generation_db().get_schema_info, report_file_writer.write_xlsx, ], + after_agent_callback=after_agent_callback, ) diff --git a/implementations/report_generation/agent.py b/implementations/report_generation/agent.py new file mode 100644 index 0000000..75fe298 --- /dev/null +++ b/implementations/report_generation/agent.py @@ -0,0 +1,67 @@ +"""Entry point for the Google ADK UI for the report generation agent. + +Example +------- +$ adk web implementations/ +""" + +import logging +import threading + +from aieng.agent_evals.async_client_manager import AsyncClientManager +from aieng.agent_evals.langfuse import report_usage_scores +from aieng.agent_evals.report_generation.agent import get_report_generation_agent +from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score +from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS +from dotenv import load_dotenv +from google.adk.agents.callback_context import CallbackContext + +from .env_vars import get_langfuse_project_name, get_reports_output_path + + +load_dotenv(verbose=True) +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s") +logger = logging.getLogger(__name__) + + +def calculate_and_send_scores(callback_context: CallbackContext) -> None: + """Calculate token usage and latency scores and submit them to Langfuse. + + This is a callback function to be called after the agent has run. + + Parameters + ---------- + callback_context : CallbackContext + The callback context at the end of the agent run. + """ + langfuse_client = AsyncClientManager.get_instance().langfuse_client + langfuse_client.flush() + + for event in callback_context.session.events: + if event.is_final_response() and event.content and event.content.role == "model": + # Report the final response evaluation to Langfuse + report_final_response_score(event, string_match="](gradio_api/file=") + + # Run usage scoring in a thread so it doesn't block the UI + thread = threading.Thread( + target=report_usage_scores, + kwargs={ + "trace_id": langfuse_client.get_current_trace_id(), + "token_threshold": 15000, + "latency_threshold": 60, + }, + daemon=True, + ) + thread.start() + + return + + logger.error("No final response found in the callback context. Will not report scores to Langfuse.") + + +root_agent = get_report_generation_agent( + instructions=MAIN_AGENT_INSTRUCTIONS, + reports_output_path=get_reports_output_path(), + langfuse_project_name=get_langfuse_project_name(), + after_agent_callback=calculate_and_send_scores, +) diff --git a/implementations/report_generation/demo.py b/implementations/report_generation/demo.py index 9a3dbfb..b2ca8b2 100644 --- a/implementations/report_generation/demo.py +++ b/implementations/report_generation/demo.py @@ -20,6 +20,7 @@ from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS from dotenv import load_dotenv +from google.adk.agents.callback_context import CallbackContext from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.genai.types import Content, Part @@ -66,11 +67,9 @@ async def agent_session_handler( instructions=MAIN_AGENT_INSTRUCTIONS, reports_output_path=get_reports_output_path(), langfuse_project_name=get_langfuse_project_name() if enable_trace else None, + after_agent_callback=calculate_and_send_scores, ) - # Get the Langfuse client for online reporting - langfuse_client = AsyncClientManager.get_instance().langfuse_client - # Construct an in-memory session for the agent to maintain # conversation history across multiple turns of a chat # This makes it possible to ask follow-up questions that refer to @@ -98,11 +97,24 @@ async def agent_session_handler( if len(turn_messages) > 0: yield turn_messages - if event.is_final_response(): + +def calculate_and_send_scores(callback_context: CallbackContext) -> None: + """Calculate token usage and latency scores and submit them to Langfuse. + + This is a callback function to be called after the agent has run. + + Parameters + ---------- + callback_context : CallbackContext + The callback context at the end of the agent run. + """ + for event in callback_context.session.events: + if event.is_final_response() and event.content and event.content.role == "model": # Report the final response evaluation to Langfuse report_final_response_score(event, string_match="](gradio_api/file=") # Run usage scoring in a thread so it doesn't block the UI + langfuse_client = AsyncClientManager.get_instance().langfuse_client thread = threading.Thread( target=report_usage_scores, kwargs={ @@ -114,7 +126,9 @@ async def agent_session_handler( ) thread.start() - langfuse_client.flush() + return + + logger.error("No final response found in the callback context. Will not report scores to Langfuse.") @click.command() From 1f29c7df256cfb5b32ad2dbfbf0812db7c830f71 Mon Sep 17 00:00:00 2001 From: Marcelo Lotif Date: Fri, 13 Feb 2026 09:48:09 -0500 Subject: [PATCH 9/9] Using init_tracing instead --- .../aieng/agent_evals/report_generation/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py index 5874315..9629fc4 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/agent.py @@ -21,7 +21,7 @@ from typing import Any from aieng.agent_evals.async_client_manager import AsyncClientManager -from aieng.agent_evals.langfuse import setup_langfuse_tracer +from aieng.agent_evals.langfuse import init_tracing from aieng.agent_evals.report_generation.file_writer import ReportFileWriter from google.adk.agents import Agent from google.adk.agents.base_agent import AfterAgentCallback @@ -58,7 +58,7 @@ def get_report_generation_agent( """ # Setup langfuse tracing if project name is provided if langfuse_project_name: - setup_langfuse_tracer(langfuse_project_name) + init_tracing(langfuse_project_name) # Get the client manager singleton instance client_manager = AsyncClientManager.get_instance()