Skip to content
15 changes: 12 additions & 3 deletions aieng-eval-agents/aieng/agent_evals/evaluation/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
TraceObservationPredicate,
TraceWaitConfig,
)
from aieng.agent_evals.langfuse import flush_traces
from langfuse import Langfuse
from langfuse.api import ObservationsView
from langfuse.api.core import ApiError
Expand Down Expand Up @@ -143,6 +142,16 @@ async def _evaluate_item(
return result


def flush_traces() -> None:
"""Flush any pending traces to Langfuse.

Call this before your application exits to ensure all traces are sent.
"""
manager = AsyncClientManager.get_instance()
if manager._langfuse_client is not None:
manager._langfuse_client.flush()


def extract_trace_metrics(
trace: TraceWithFullDetails,
*,
Expand Down Expand Up @@ -226,7 +235,7 @@ async def _evaluate_trace(
return [], TraceEvalStatus.SKIPPED, "Missing `trace_id` on experiment item result."

try:
trace, ready = await _fetch_trace_with_wait(langfuse_client, trace_id, wait)
trace, ready = await fetch_trace_with_wait(langfuse_client, trace_id, wait)
except Exception as exc:
return [], TraceEvalStatus.FAILED, f"Trace fetch failed: {exc}"

Expand All @@ -248,7 +257,7 @@ async def _evaluate_trace(
return evaluations, TraceEvalStatus.OK, None


async def _fetch_trace_with_wait(
async def fetch_trace_with_wait(
langfuse_client: Langfuse, trace_id: str, wait: TraceWaitConfig
) -> tuple[TraceWithFullDetails | None, bool]:
"""Fetch a trace with retry/backoff until it is ready or timeout expires."""
Expand Down
109 changes: 99 additions & 10 deletions aieng-eval-agents/aieng/agent_evals/langfuse.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Functions and objects pertaining to Langfuse."""

import asyncio
import base64
import hashlib
import json
Expand All @@ -10,7 +11,10 @@

from aieng.agent_evals.async_client_manager import AsyncClientManager
from aieng.agent_evals.configs import Configs
from aieng.agent_evals.evaluation.trace import extract_trace_metrics, fetch_trace_with_wait
from aieng.agent_evals.evaluation.types import TraceWaitConfig
from aieng.agent_evals.progress import track_with_progress
from langfuse import Langfuse
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
Expand Down Expand Up @@ -160,16 +164,6 @@ def init_tracing(service_name: str = "aieng-eval-agents") -> bool:
return False


def flush_traces() -> None:
"""Flush any pending traces to Langfuse.

Call this before your application exits to ensure all traces are sent.
"""
manager = AsyncClientManager.get_instance()
if manager._langfuse_client is not None:
manager._langfuse_client.flush()


def is_tracing_enabled() -> bool:
"""Check if Langfuse tracing is currently enabled.

Expand Down Expand Up @@ -370,3 +364,98 @@ def _normalize_dataset_record(item: Any, record_number: int) -> dict[str, Any]:
"expected_output": item["expected_output"],
"metadata": metadata,
}


def report_usage_scores(
trace_id: str,
token_threshold: int = 0,
latency_threshold: int = 0,
cost_threshold: float = 0,
) -> None:
"""Report usage scores to Langfuse for a given trace ID.

WARNING: Due to the nature of the Langfuse API, this function may hang
while trying to fetch the observations.

Parameters
----------
trace_id: str
The ID of the trace to report the usage scores for.
token_threshold: int
The total token (input + output) threshold to report the score for.
if the token count is greater than the threshold, the score
will be reported as 0.
Optional, default to 0 (no reporting).
latency_threshold: int
The latency threshold in seconds to report the score for.
if the latency is greater than the threshold, the score
will be reported as 0.
Optional, default to 0 (no reporting).
cost_threshold: float
The cost threshold to report the score for.
if the cost is greater than the threshold, the score
will be reported as 0.
Optional, default to 0 (no reporting).
"""
langfuse_client = AsyncClientManager.get_instance().langfuse_client

logger.info(f"Fetching trace {trace_id}...")
trace, ready = asyncio.run(
fetch_trace_with_wait(langfuse_client, trace_id, TraceWaitConfig()),
)

if trace is None:
logger.error(f"Trace {trace_id} not found. Will not report usage scores.")
return

if not ready:
logger.warning(f"Trace {trace_id} is not ready. Scores will be reported on partial traces.")

trace_metrics = extract_trace_metrics(trace)

if token_threshold > 0:
total_tokens = trace_metrics.total_input_tokens + trace_metrics.total_output_tokens
_report_score(langfuse_client, "Token Count", total_tokens, token_threshold, trace_id)

if latency_threshold > 0:
_report_score(langfuse_client, "Latency", trace_metrics.latency_sec, latency_threshold, trace_id)

if cost_threshold > 0:
_report_score(langfuse_client, "Cost", trace_metrics.total_cost, cost_threshold, trace_id)

langfuse_client.flush()


def _report_score(
langfuse_client: Langfuse,
name: str,
value: int | float | None,
threshold: int | float,
trace_id: str,
) -> None:
if value is None:
logger.error(f"Trace {trace_id} has no value for {name}. Will not report score for {name}.")
return

if value == 0:
logger.error(f"Trace {trace_id} has a value of 0 for {name}. Will not report score for {name}.")
return

if value <= threshold:
score = 1
comment = f"{value} is less than or equal to the threshold."
else:
score = 0
comment = f"{value} is greater than the threshold."

logger.info(f"Reporting score for {name}")
langfuse_client.create_score(
name=name,
value=score,
trace_id=trace_id,
comment=comment,
metadata={
"value": value,
"threshold": threshold,
},
)
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
from typing import Any

from aieng.agent_evals.async_client_manager import AsyncClientManager
from aieng.agent_evals.langfuse import setup_langfuse_tracer
from aieng.agent_evals.langfuse import init_tracing
from aieng.agent_evals.report_generation.file_writer import ReportFileWriter
from google.adk.agents import Agent
from google.adk.agents.base_agent import AfterAgentCallback
from google.adk.events.event import Event
from pydantic import BaseModel

Expand All @@ -36,6 +37,7 @@ def get_report_generation_agent(
instructions: str,
reports_output_path: Path,
langfuse_project_name: str | None,
after_agent_callback: AfterAgentCallback | None = None,
) -> Agent:
"""
Define the report generation agent.
Expand All @@ -56,7 +58,7 @@ def get_report_generation_agent(
"""
# Setup langfuse tracing if project name is provided
if langfuse_project_name:
setup_langfuse_tracer(langfuse_project_name)
init_tracing(langfuse_project_name)

# Get the client manager singleton instance
client_manager = AsyncClientManager.get_instance()
Expand All @@ -72,6 +74,7 @@ def get_report_generation_agent(
client_manager.report_generation_db().get_schema_info,
report_file_writer.write_xlsx,
],
after_agent_callback=after_agent_callback,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Example
-------
>>> from aieng.agent_evals.report_generation.evaluation import evaluate
>>> from aieng.agent_evals.report_generation.evaluation.offline import evaluate
>>> evaluate(
>>> dataset_name="OnlineRetailReportEval",
>>> reports_output_path=Path("reports/"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Functions to report online evaluation of the report generation agent to Langfuse."""

import logging

from aieng.agent_evals.async_client_manager import AsyncClientManager
from aieng.agent_evals.report_generation.agent import EventParser, EventType
from google.adk.events.event import Event


logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger(__name__)


def report_final_response_score(event: Event, string_match: str = "") -> None:
"""Report a score to Langfuse if the event is a final response.

The score will be reported as 1 if the final response is valid
and contains the string match. Otherwise, the score will be reported as 0.

This has to be called within the context of a Langfuse trace.

Parameters
----------
event : Event
The event to check.
string_match : str
The string to match in the final response.
Optional, default to empty string.

Raises
------
ValueError
If the event is not a final response.
"""
if not event.is_final_response():
raise ValueError("Event is not a final response")

langfuse_client = AsyncClientManager.get_instance().langfuse_client
trace_id = langfuse_client.get_current_trace_id()

if trace_id is None:
raise ValueError("Langfuse trace ID is None.")

parsed_events = EventParser.parse(event)
for parsed_event in parsed_events:
if parsed_event.type == EventType.FINAL_RESPONSE:
if string_match in parsed_event.text:
score = 1
comment = "Final response contains the string match."
else:
score = 0
comment = "Final response does not contains the string match."

logger.info("Reporting score for valid final response")
langfuse_client.create_score(
name="Valid Final Response",
value=score,
trace_id=trace_id,
comment=comment,
metadata={
"final_response": parsed_event.text,
"string_match": string_match,
},
)
langfuse_client.flush()
return

logger.info("Reporting score for invalid final response")
langfuse_client.create_score(
name="Valid Final Response",
value=0,
trace_id=trace_id,
comment="Final response not found in the event",
metadata={
"string_match": string_match,
},
)
langfuse_client.flush()
67 changes: 67 additions & 0 deletions implementations/report_generation/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Entry point for the Google ADK UI for the report generation agent.

Example
-------
$ adk web implementations/
"""

import logging
import threading

from aieng.agent_evals.async_client_manager import AsyncClientManager
from aieng.agent_evals.langfuse import report_usage_scores
from aieng.agent_evals.report_generation.agent import get_report_generation_agent
from aieng.agent_evals.report_generation.evaluation.online import report_final_response_score
from aieng.agent_evals.report_generation.prompts import MAIN_AGENT_INSTRUCTIONS
from dotenv import load_dotenv
from google.adk.agents.callback_context import CallbackContext

from .env_vars import get_langfuse_project_name, get_reports_output_path


load_dotenv(verbose=True)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")
logger = logging.getLogger(__name__)


def calculate_and_send_scores(callback_context: CallbackContext) -> None:
"""Calculate token usage and latency scores and submit them to Langfuse.

This is a callback function to be called after the agent has run.

Parameters
----------
callback_context : CallbackContext
The callback context at the end of the agent run.
"""
langfuse_client = AsyncClientManager.get_instance().langfuse_client
langfuse_client.flush()

for event in callback_context.session.events:
if event.is_final_response() and event.content and event.content.role == "model":
# Report the final response evaluation to Langfuse
report_final_response_score(event, string_match="](gradio_api/file=")

# Run usage scoring in a thread so it doesn't block the UI
thread = threading.Thread(
target=report_usage_scores,
kwargs={
"trace_id": langfuse_client.get_current_trace_id(),
"token_threshold": 15000,
"latency_threshold": 60,
},
daemon=True,
)
thread.start()

return

logger.error("No final response found in the callback context. Will not report scores to Langfuse.")


root_agent = get_report_generation_agent(
instructions=MAIN_AGENT_INSTRUCTIONS,
reports_output_path=get_reports_output_path(),
langfuse_project_name=get_langfuse_project_name(),
after_agent_callback=calculate_and_send_scores,
)
Loading