From 75be97eadfe9cd961db4ffc6997bb1a928a70be0 Mon Sep 17 00:00:00 2001 From: David Elner Date: Mon, 23 Mar 2026 01:44:43 +0000 Subject: [PATCH] Refactored: Extract retry logic into its own internal component --- lib/braintrust/api/internal/btql.rb | 36 +---- lib/braintrust/eval/runner.rb | 21 ++- lib/braintrust/internal/retry.rb | 41 +++++ .../api/internal/btql_integration_test.rb | 26 ++-- test/braintrust/api/internal/btql_test.rb | 147 ++++++------------ test/braintrust/eval/runner_test.rb | 101 ++++++++++++ test/braintrust/internal/retry_test.rb | 119 ++++++++++++++ 7 files changed, 337 insertions(+), 154 deletions(-) create mode 100644 lib/braintrust/internal/retry.rb create mode 100644 test/braintrust/internal/retry_test.rb diff --git a/lib/braintrust/api/internal/btql.rb b/lib/braintrust/api/internal/btql.rb index 949bced0..1df598b9 100644 --- a/lib/braintrust/api/internal/btql.rb +++ b/lib/braintrust/api/internal/btql.rb @@ -11,19 +11,6 @@ module Internal # Internal BTQL client for querying spans. # Not part of the public API — instantiated directly where needed. class BTQL - # Maximum number of retries before returning partial results. - # Covers both freshness lag (partially indexed) and ingestion lag - # (spans not yet visible to BTQL after OTel flush). - MAX_FRESHNESS_RETRIES = 7 - - # Base delay (seconds) between retries (doubles each attempt, capped). - FRESHNESS_BASE_DELAY = 1.0 - - # Maximum delay (seconds) between retries. Caps exponential growth - # so we keep polling at a reasonable rate in the later window. - # Schedule: 1, 2, 4, 8, 8, 8, 8 = ~39s total worst-case. - MAX_FRESHNESS_DELAY = 8.0 - def initialize(state) @state = state end @@ -31,36 +18,19 @@ def initialize(state) # Query spans belonging to a specific trace within an object. # # Builds a BTQL SQL query that matches the root_span_id and excludes scorer spans. - # Retries with exponential backoff if the response indicates data is not yet fresh. + # Returns a single-shot result; callers are responsible for retry and error handling. # # @param object_type [String] e.g. "experiment" # @param object_id [String] Object UUID # @param root_span_id [String] Hex trace ID of the root span - # @return [Array] Parsed span data + # @return [Array(Array, String)] [rows, freshness] def trace_spans(object_type:, object_id:, root_span_id:) query = build_trace_query( object_type: object_type, object_id: object_id, root_span_id: root_span_id ) - payload = {query: query, fmt: "jsonl"} - - retries = 0 - loop do - rows, freshness = execute_query(payload) - # Return when data is fresh AND non-empty, or we've exhausted retries. - # We retry on empty even when "complete" because there is ingestion lag - # between OTel flush and BTQL indexing — the server may report "complete" - # before it knows about newly-flushed spans. - return rows if (freshness == "complete" && !rows.empty?) || retries >= MAX_FRESHNESS_RETRIES - - retries += 1 - delay = [FRESHNESS_BASE_DELAY * (2**(retries - 1)), MAX_FRESHNESS_DELAY].min - sleep(delay) - end - rescue => e - Braintrust::Log.warn("[BTQL] Query failed: #{e.message}") - [] + execute_query(query: query, fmt: "jsonl") end private diff --git a/lib/braintrust/eval/runner.rb b/lib/braintrust/eval/runner.rb index b4920436..4becf438 100644 --- a/lib/braintrust/eval/runner.rb +++ b/lib/braintrust/eval/runner.rb @@ -6,6 +6,7 @@ require_relative "trace" require_relative "../internal/thread_pool" require_relative "../api/internal/btql" +require_relative "../internal/retry" require "opentelemetry/sdk" require "json" @@ -223,9 +224,23 @@ def build_trace(eval_span) object_id = eval_context.experiment_id btql = API::Internal::BTQL.new(eval_context.state) - Eval::Trace.new( - spans: -> { btql.trace_spans(object_type: object_type, object_id: object_id, root_span_id: root_span_id) } - ) + Eval::Trace.new(spans: -> { fetch_trace_spans(btql, object_type, object_id, root_span_id) }) + end + + # Fetch trace spans with retry to handle freshness and ingestion lag. + # @return [Array] Parsed span data + def fetch_trace_spans(btql, object_type, object_id, root_span_id) + rows, _freshness = Internal::Retry.with_backoff( + max_retries: 7, base_delay: 1.0, max_delay: 8.0, + until: ->(result) { + r, f = result + f == "complete" && !r.empty? + } + ) { btql.trace_spans(object_type: object_type, object_id: object_id, root_span_id: root_span_id) } + rows || [] + rescue => e + Braintrust::Log.warn("[BTQL] Query failed: #{e.message}") + [] end # Build a CaseContext from a Case struct diff --git a/lib/braintrust/internal/retry.rb b/lib/braintrust/internal/retry.rb new file mode 100644 index 00000000..cdc89b43 --- /dev/null +++ b/lib/braintrust/internal/retry.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Braintrust + module Internal + module Retry + MAX_RETRIES = 7 + BASE_DELAY = 1.0 + MAX_DELAY = 8.0 + + # Retry a block with exponential backoff. + # + # The block is the task to attempt. Its return value is captured each attempt. + # + # @param max_retries [Integer] Maximum number of retries after the first attempt + # @param base_delay [Float] Initial delay in seconds (doubles each retry) + # @param max_delay [Float] Cap on delay between retries + # @param until [Proc, nil] Optional condition — receives block result, truthy stops retrying. + # When omitted, the block result's own truthiness decides. + # @return The last block result (whether retries were exhausted or condition was met) + # + # @example Simple: retry until truthy + # conn = Retry.with_backoff(max_retries: 5) { try_connect } + # + # @example With condition: retry until non-empty + # data = Retry.with_backoff(until: ->(r) { r.any? }) { api.fetch } + # + def self.with_backoff(max_retries: MAX_RETRIES, base_delay: BASE_DELAY, max_delay: MAX_DELAY, until: nil, &task) + check = binding.local_variable_get(:until) + result = task.call + retries = 0 + while retries < max_retries && !(check ? check.call(result) : result) + retries += 1 + delay = [base_delay * (2**(retries - 1)), max_delay].min + sleep(delay) + result = task.call + end + result + end + end + end +end diff --git a/test/braintrust/api/internal/btql_integration_test.rb b/test/braintrust/api/internal/btql_integration_test.rb index e1b4a098..7f9bdce6 100644 --- a/test/braintrust/api/internal/btql_integration_test.rb +++ b/test/braintrust/api/internal/btql_integration_test.rb @@ -40,22 +40,23 @@ def test_trace_spans_queries_experiment # Query back via BTQL btql = Braintrust::API::Internal::BTQL.new(state) - result = btql.trace_spans( + rows, freshness = btql.trace_spans( object_type: "experiment", object_id: experiment["id"], root_span_id: root_span_id ) - refute_empty result, "BTQL should return spans for the trace" + refute_empty rows, "BTQL should return spans for the trace" + assert_equal "complete", freshness - result.each do |span| + rows.each do |span| assert span.key?("span_id"), "span should have span_id" assert span.key?("root_span_id"), "span should have root_span_id" assert span.key?("span_attributes"), "span should have span_attributes" end # Verify score spans are excluded by the query filter - types = result.map { |s| s.dig("span_attributes", "type") } + types = rows.map { |s| s.dig("span_attributes", "type") } refute_includes types, "score" ensure cleanup_experiment(experiments, experiment) @@ -76,18 +77,13 @@ def test_trace_spans_returns_empty_for_nonexistent_trace ) btql = Braintrust::API::Internal::BTQL.new(state) + rows, _freshness = btql.trace_spans( + object_type: "experiment", + object_id: experiment["id"], + root_span_id: "0000000000000000ffffffffffffffff" + ) - Braintrust::API::Internal::BTQL.stub_const(:FRESHNESS_BASE_DELAY, 0.001) do - Braintrust::API::Internal::BTQL.stub_const(:MAX_FRESHNESS_DELAY, 0.001) do - result = btql.trace_spans( - object_type: "experiment", - object_id: experiment["id"], - root_span_id: "0000000000000000ffffffffffffffff" - ) - - assert_equal [], result - end - end + assert_equal [], rows ensure cleanup_experiment(experiments, experiment) end diff --git a/test/braintrust/api/internal/btql_test.rb b/test/braintrust/api/internal/btql_test.rb index 23ae7a4b..342ba925 100644 --- a/test/braintrust/api/internal/btql_test.rb +++ b/test/braintrust/api/internal/btql_test.rb @@ -59,11 +59,11 @@ def test_trace_spans_escapes_single_quotes end # ============================================ - # JSONL response parsing + # Return shape — [rows, freshness] # ============================================ - def test_trace_spans_parses_jsonl_response - rows = [ + def test_trace_spans_returns_rows_and_freshness + rows_data = [ {"span_id" => "s1", "name" => "chat"}, {"span_id" => "s2", "name" => "completion"} ] @@ -71,144 +71,85 @@ def test_trace_spans_parses_jsonl_response stub_request(:post, "#{@state.api_url}/btql") .to_return( status: 200, - body: jsonl_body(rows), + body: jsonl_body(rows_data), headers: fresh_headers ) - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") + rows, freshness = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - assert_equal 2, result.length - assert_equal "s1", result[0]["span_id"] - assert_equal "s2", result[1]["span_id"] + assert_equal 2, rows.length + assert_equal "s1", rows[0]["span_id"] + assert_equal "s2", rows[1]["span_id"] + assert_equal "complete", freshness end - def test_trace_spans_handles_empty_lines_in_jsonl - body = "{\"span_id\":\"s1\"}\n\n{\"span_id\":\"s2\"}\n\n" - + def test_trace_spans_returns_stale_freshness stub_request(:post, "#{@state.api_url}/btql") .to_return( status: 200, - body: body, - headers: fresh_headers + body: jsonl_body([{"span_id" => "s1"}]), + headers: stale_headers ) - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - - assert_equal 2, result.length - end - - # ============================================ - # Freshness retry - # ============================================ - - def test_trace_spans_retries_when_freshness_incomplete - call_count = 0 - - stub_request(:post, "#{@state.api_url}/btql") - .to_return do |_request| - call_count += 1 - if call_count < 3 - {status: 200, body: "", headers: stale_headers} - else - {status: 200, body: jsonl_body([{"span_id" => "s1"}]), headers: fresh_headers} - end - end - - Braintrust::API::Internal::BTQL.stub_const(:FRESHNESS_BASE_DELAY, 0.001) do - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - - assert_equal 1, result.length - assert_equal "s1", result[0]["span_id"] - assert_equal 3, call_count - end - end - - def test_trace_spans_retries_when_fresh_but_empty - call_count = 0 - - stub_request(:post, "#{@state.api_url}/btql") - .to_return do |_request| - call_count += 1 - {status: 200, body: "", headers: fresh_headers} - end - - Braintrust::API::Internal::BTQL.stub_const(:FRESHNESS_BASE_DELAY, 0.001) do - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - - assert_equal [], result - # 1 initial + MAX_FRESHNESS_RETRIES retries = 8 total calls - assert_equal 8, call_count, "should retry when fresh but empty (ingestion lag)" - end - end - - def test_trace_spans_returns_immediately_when_fresh_with_data - call_count = 0 - - stub_request(:post, "#{@state.api_url}/btql") - .to_return do |_request| - call_count += 1 - {status: 200, body: jsonl_body([{"span_id" => "s1"}]), headers: fresh_headers} - end - - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") + rows, freshness = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - assert_equal 1, result.length - assert_equal 1, call_count, "should not retry when fresh and has data" + assert_equal 1, rows.length + assert_equal "incomplete", freshness end - def test_trace_spans_returns_partial_after_max_retries + def test_trace_spans_defaults_freshness_to_complete_when_header_missing stub_request(:post, "#{@state.api_url}/btql") .to_return( status: 200, body: jsonl_body([{"span_id" => "s1"}]), - headers: stale_headers + headers: {"Content-Type" => "application/x-jsonlines"} ) - Braintrust::API::Internal::BTQL.stub_const(:FRESHNESS_BASE_DELAY, 0.001) do - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") + rows, freshness = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - # Returns whatever we have after exhausting retries, even if stale - assert_equal 1, result.length - assert_equal "s1", result[0]["span_id"] - end + assert_equal 1, rows.length + assert_equal "complete", freshness end - def test_trace_spans_defaults_to_complete_when_header_missing - call_count = 0 + # ============================================ + # JSONL response parsing + # ============================================ + + def test_trace_spans_handles_empty_lines_in_jsonl + body = "{\"span_id\":\"s1\"}\n\n{\"span_id\":\"s2\"}\n\n" stub_request(:post, "#{@state.api_url}/btql") - .to_return do |_request| - call_count += 1 - {status: 200, body: jsonl_body([{"span_id" => "s1"}]), - headers: {"Content-Type" => "application/x-jsonlines"}} - end + .to_return( + status: 200, + body: body, + headers: fresh_headers + ) - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") + rows, _freshness = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - assert_equal 1, result.length - assert_equal 1, call_count, "missing header should default to complete (no retry)" + assert_equal 2, rows.length end # ============================================ - # Error handling + # Errors propagate to caller # ============================================ - def test_trace_spans_returns_empty_on_http_error + def test_trace_spans_raises_on_http_error stub_request(:post, "#{@state.api_url}/btql") .to_return(status: 500, body: "Internal Server Error") - result = suppress_logs { @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") } - - assert_equal [], result + assert_raises(Braintrust::Error) do + @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") + end end - def test_trace_spans_returns_empty_on_network_error + def test_trace_spans_raises_on_network_error stub_request(:post, "#{@state.api_url}/btql") .to_raise(Errno::ECONNREFUSED) - result = suppress_logs { @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") } - - assert_equal [], result + assert_raises(Errno::ECONNREFUSED) do + @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") + end end # ============================================ @@ -224,9 +165,9 @@ def test_trace_spans_sends_authorization_header headers: fresh_headers ) - result = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") + rows, _freshness = @btql.trace_spans(object_type: "experiment", object_id: "exp-1", root_span_id: "trace-1") - assert_equal 1, result.length + assert_equal 1, rows.length end private diff --git a/test/braintrust/eval/runner_test.rb b/test/braintrust/eval/runner_test.rb index 01246d8f..becef6a8 100644 --- a/test/braintrust/eval/runner_test.rb +++ b/test/braintrust/eval/runner_test.rb @@ -1792,6 +1792,84 @@ def test_scorer_array_return_single_hash_unchanged assert_equal({"quality" => {"reason" => "good"}}, metadata) end + # ============================================ + # fetch_trace_spans — retry and error handling + # ============================================ + + def test_fetch_trace_spans_retries_until_fresh_and_non_empty + call_count = 0 + btql = mock_btql do + call_count += 1 + if call_count < 3 + [[], "incomplete"] + else + [[{"span_id" => "s1"}], "complete"] + end + end + + runner = build_trace_runner + without_retry_sleep do + spans = runner.send(:fetch_trace_spans, btql, "experiment", "exp-1", "trace-1") + + assert_equal 1, spans.length + assert_equal "s1", spans[0]["span_id"] + assert_equal 3, call_count + end + end + + def test_fetch_trace_spans_retries_when_fresh_but_empty + call_count = 0 + btql = mock_btql do + call_count += 1 + [[], "complete"] + end + + runner = build_trace_runner + without_retry_sleep do + spans = runner.send(:fetch_trace_spans, btql, "experiment", "exp-1", "trace-1") + + assert_equal [], spans + assert_equal 8, call_count # 1 initial + 7 retries + end + end + + def test_fetch_trace_spans_returns_immediately_when_fresh_with_data + call_count = 0 + btql = mock_btql do + call_count += 1 + [[{"span_id" => "s1"}], "complete"] + end + + runner = build_trace_runner + spans = runner.send(:fetch_trace_spans, btql, "experiment", "exp-1", "trace-1") + + assert_equal 1, spans.length + assert_equal 1, call_count + end + + def test_fetch_trace_spans_returns_empty_on_error + btql = mock_btql { raise Errno::ECONNREFUSED } + + runner = build_trace_runner + spans = suppress_logs { + runner.send(:fetch_trace_spans, btql, "experiment", "exp-1", "trace-1") + } + + assert_equal [], spans + end + + def test_fetch_trace_spans_returns_partial_after_max_retries + btql = mock_btql { [[{"span_id" => "s1"}], "incomplete"] } + + runner = build_trace_runner + without_retry_sleep do + spans = runner.send(:fetch_trace_spans, btql, "experiment", "exp-1", "trace-1") + + assert_equal 1, spans.length + assert_equal "s1", spans[0]["span_id"] + end + end + private def build_simple_runner(task:, cases:, scorers: [], on_progress: nil) @@ -1805,4 +1883,27 @@ def build_simple_runner(task:, cases:, scorers: [], on_progress: nil) ) Braintrust::Eval::Runner.new(context) end + + def build_trace_runner + rig = setup_otel_test_rig + context = Braintrust::Eval::Context.build( + task: ->(input:) { input }, + scorers: [], + cases: [{input: "a"}], + experiment_id: "exp-1", + state: rig.state, + tracer_provider: rig.tracer_provider + ) + Braintrust::Eval::Runner.new(context) + end + + def mock_btql(&response) + btql = Object.new + btql.define_singleton_method(:trace_spans) { |**_| response.call } + btql + end + + def without_retry_sleep(&block) + Braintrust::Internal::Retry.stub(:sleep, ->(_) {}, &block) + end end diff --git a/test/braintrust/internal/retry_test.rb b/test/braintrust/internal/retry_test.rb new file mode 100644 index 00000000..bfb0c546 --- /dev/null +++ b/test/braintrust/internal/retry_test.rb @@ -0,0 +1,119 @@ +# frozen_string_literal: true + +require "test_helper" +require "braintrust/internal/retry" + +class Braintrust::Internal::RetryTest < Minitest::Test + # ============================================ + # Without until: (truthiness mode) + # ============================================ + + def test_returns_immediately_on_truthy_result + call_count = 0 + result = Braintrust::Internal::Retry.with_backoff(max_retries: 3) { + call_count += 1 + "done" + } + assert_equal "done", result + assert_equal 1, call_count + end + + def test_retries_on_falsy_then_returns_truthy + call_count = 0 + result = Braintrust::Internal::Retry.with_backoff(max_retries: 5, base_delay: 0.001) { + call_count += 1 + (call_count >= 3) ? "got it" : nil + } + assert_equal "got it", result + assert_equal 3, call_count + end + + def test_returns_last_falsy_result_when_retries_exhausted + call_count = 0 + result = Braintrust::Internal::Retry.with_backoff(max_retries: 2, base_delay: 0.001) { + call_count += 1 + nil + } + assert_nil result + assert_equal 3, call_count # 1 initial + 2 retries + end + + # ============================================ + # With until: (condition mode) + # ============================================ + + def test_until_stops_on_satisfied_condition + call_count = 0 + result = Braintrust::Internal::Retry.with_backoff( + max_retries: 5, + base_delay: 0.001, + until: ->(r) { r[:ready] } + ) { + call_count += 1 + {ready: call_count >= 2, value: call_count} + } + assert_equal({ready: true, value: 2}, result) + assert_equal 2, call_count + end + + def test_until_returns_last_result_when_retries_exhausted + call_count = 0 + result = Braintrust::Internal::Retry.with_backoff( + max_retries: 2, + base_delay: 0.001, + until: ->(r) { r[:ready] } + ) { + call_count += 1 + {ready: false, value: call_count} + } + assert_equal({ready: false, value: 3}, result) + assert_equal 3, call_count + end + + def test_until_returns_immediately_when_first_attempt_satisfies + call_count = 0 + result = Braintrust::Internal::Retry.with_backoff( + max_retries: 5, + until: ->(r) { r == "ok" } + ) { + call_count += 1 + "ok" + } + assert_equal "ok", result + assert_equal 1, call_count + end + + def test_until_allows_falsy_block_result_to_be_returned + # until: condition is separate from truthiness, so a falsy result can be "done" + result = Braintrust::Internal::Retry.with_backoff( + max_retries: 3, + until: ->(_r) { true } + ) { nil } + assert_nil result + end + + # ============================================ + # Backoff timing + # ============================================ + + def test_exponential_backoff_with_cap + delays = [] + Braintrust::Internal::Retry.stub(:sleep, ->(d) { delays << d }) do + Braintrust::Internal::Retry.with_backoff( + max_retries: 5, + base_delay: 1.0, + max_delay: 4.0 + ) { nil } + end + # Schedule: 1, 2, 4, 4, 4 + assert_equal [1.0, 2.0, 4.0, 4.0, 4.0], delays + end + + def test_no_sleep_when_first_attempt_succeeds + delays = [] + Braintrust::Internal::Retry.stub(:sleep, ->(d) { delays << d }) do + Braintrust::Internal::Retry.with_backoff(max_retries: 5) { "done" } + end + assert_empty delays + end +end