-
Notifications
You must be signed in to change notification settings - Fork 221
Enable Gemini Realtime Model to Produce Error Log #1016
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
4e31b27
Add turn config interfaces and defaults (#975)
lukasIO f3c7430
Merge branch 'main' into feat/barge-in
Toubat 07c5d71
Add AdaptiveInterruptionDetector (#980)
lukasIO f1a2114
Merge branch 'main' into feat/barge-in
lukasIO c861f50
Add agent activity interruption detector integration (#991)
lukasIO 1862dc3
remove aic
lukasIO 705ed33
reuse
lukasIO 8f53889
remove tests for legacy stream approach
lukasIO 7d24bf0
fix util migration tests
lukasIO c78cf58
comment out example tests
lukasIO d5b271c
Rename files to underscore cases (#1007)
toubatbrian b020180
update date
lukasIO dbad1e4
update date
lukasIO d882012
update defaults
lukasIO 67e8f6c
deprecate legacy options and update tests
lukasIO 96d6b57
fix internal types
lukasIO 2ee2748
rabbit comments
lukasIO 62cd448
remove unused stuff
lukasIO ec6d9bd
more rabbit fixes
lukasIO 016e3a4
better cleanup
lukasIO 9a4939c
ensure inputStartedAt is set
lukasIO e28b1b1
Fix Inference URL parity (#1011)
toubatbrian 4310baa
Preserve turnDetection after cloning
toubatbrian 175e57b
respect LIVEKIT_REMOTE_EOT_URL environment variable
toubatbrian 0682f25
refine timeout computation
toubatbrian 0d2efc6
save temp
toubatbrian d83d7b6
fix comments
toubatbrian 3ce96e1
Update realtime_api.ts
toubatbrian File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| // SPDX-FileCopyrightText: 2026 LiveKit, Inc. | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| import type { ApiConnectOptions } from './interruption_stream.js'; | ||
| import type { InterruptionOptions } from './types.js'; | ||
|
|
||
| export const MIN_INTERRUPTION_DURATION_IN_S = 0.025 * 2; // 25ms per frame, 2 consecutive frames | ||
| export const THRESHOLD = 0.65; | ||
| export const MAX_AUDIO_DURATION_IN_S = 3.0; | ||
| export const AUDIO_PREFIX_DURATION_IN_S = 0.5; | ||
| export const DETECTION_INTERVAL_IN_S = 0.1; | ||
| export const REMOTE_INFERENCE_TIMEOUT_IN_S = 1.0; | ||
| export const SAMPLE_RATE = 16000; | ||
| export const FRAMES_PER_SECOND = 40; | ||
| export const FRAME_DURATION_IN_S = 0.025; // 25ms per frame | ||
|
|
||
| /** Default production inference URL */ | ||
| export const DEFAULT_BASE_URL = 'https://agent-gateway.livekit.cloud/v1'; | ||
|
|
||
| /** Staging inference URL */ | ||
| export const STAGING_BASE_URL = 'https://agent-gateway-staging.livekit.cloud/v1'; | ||
|
|
||
| /** | ||
| * Get the default inference URL based on the environment. | ||
| * | ||
| * Priority: | ||
| * 1. LIVEKIT_INFERENCE_URL if set | ||
| * 2. If LIVEKIT_URL contains '.staging.livekit.cloud', use staging gateway | ||
| * 3. Otherwise, use production gateway | ||
| */ | ||
| export function getDefaultInferenceUrl(): string { | ||
| // Priority 1: LIVEKIT_INFERENCE_URL | ||
| const inferenceUrl = process.env.LIVEKIT_INFERENCE_URL; | ||
| if (inferenceUrl) { | ||
| return inferenceUrl; | ||
| } | ||
|
|
||
| // Priority 2: Check LIVEKIT_URL for staging (exact match to Python) | ||
| const livekitUrl = process.env.LIVEKIT_URL || ''; | ||
| if (livekitUrl.includes('.staging.livekit.cloud')) { | ||
| return STAGING_BASE_URL; | ||
| } | ||
|
|
||
| // Priority 3: Default to production | ||
| return DEFAULT_BASE_URL; | ||
| } | ||
|
|
||
| export const apiConnectDefaults: ApiConnectOptions = { | ||
| maxRetries: 3, | ||
| retryInterval: 2_000, | ||
| timeout: 10_000, | ||
| } as const; | ||
|
|
||
| /** | ||
| * Calculate the retry interval using exponential backoff with jitter. | ||
| * Matches the Python implementation's _interval_for_retry behavior. | ||
| */ | ||
| export function intervalForRetry( | ||
| attempt: number, | ||
| baseInterval: number = apiConnectDefaults.retryInterval, | ||
| ): number { | ||
| // Exponential backoff: baseInterval * 2^attempt with some jitter | ||
| const exponentialDelay = baseInterval * Math.pow(2, attempt); | ||
| // Add jitter (0-25% of the delay) | ||
| const jitter = exponentialDelay * Math.random() * 0.25; | ||
| return exponentialDelay + jitter; | ||
| } | ||
|
|
||
| // baseUrl and useProxy are resolved dynamically in the constructor | ||
| // to respect LIVEKIT_REMOTE_EOT_URL environment variable | ||
| export const interruptionOptionDefaults: Omit<InterruptionOptions, 'baseUrl' | 'useProxy'> = { | ||
| sampleRate: SAMPLE_RATE, | ||
| threshold: THRESHOLD, | ||
| minFrames: Math.ceil(MIN_INTERRUPTION_DURATION_IN_S * FRAMES_PER_SECOND), | ||
| maxAudioDurationInS: MAX_AUDIO_DURATION_IN_S, | ||
| audioPrefixDurationInS: AUDIO_PREFIX_DURATION_IN_S, | ||
| detectionIntervalInS: DETECTION_INTERVAL_IN_S, | ||
| inferenceTimeout: REMOTE_INFERENCE_TIMEOUT_IN_S * 1_000, | ||
| apiKey: process.env.LIVEKIT_API_KEY || '', | ||
| apiSecret: process.env.LIVEKIT_API_SECRET || '', | ||
| minInterruptionDurationInS: MIN_INTERRUPTION_DURATION_IN_S, | ||
| } as const; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| // SPDX-FileCopyrightText: 2026 LiveKit, Inc. | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| /** | ||
| * Error thrown during interruption detection. | ||
| */ | ||
| export class InterruptionDetectionError extends Error { | ||
| readonly type = 'InterruptionDetectionError'; | ||
|
|
||
| readonly timestamp: number; | ||
| readonly label: string; | ||
| readonly recoverable: boolean; | ||
|
|
||
| constructor(message: string, timestamp: number, label: string, recoverable: boolean) { | ||
| super(message); | ||
| this.name = 'InterruptionDetectionError'; | ||
| this.timestamp = timestamp; | ||
| this.label = label; | ||
| this.recoverable = recoverable; | ||
| } | ||
|
|
||
| toString(): string { | ||
| return `${this.name}: ${this.message} (label=${this.label}, timestamp=${this.timestamp}, recoverable=${this.recoverable})`; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,183 @@ | ||
| // SPDX-FileCopyrightText: 2026 LiveKit, Inc. | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| import { ofetch } from 'ofetch'; | ||
| import { TransformStream } from 'stream/web'; | ||
| import { z } from 'zod'; | ||
| import { log } from '../../log.js'; | ||
| import { createAccessToken } from '../utils.js'; | ||
| import { intervalForRetry } from './defaults.js'; | ||
| import { InterruptionCacheEntry } from './interruption_cache_entry.js'; | ||
| import { type InterruptionEvent, InterruptionEventType } from './types.js'; | ||
| import type { BoundedCache } from './utils.js'; | ||
|
|
||
| export interface PostOptions { | ||
| baseUrl: string; | ||
| token: string; | ||
| signal?: AbortSignal; | ||
| timeout?: number; | ||
| maxRetries?: number; | ||
| } | ||
|
|
||
| export interface PredictOptions { | ||
| threshold: number; | ||
| minFrames: number; | ||
| } | ||
|
|
||
| export const predictEndpointResponseSchema = z.object({ | ||
| created_at: z.number(), | ||
| is_bargein: z.boolean(), | ||
| probabilities: z.array(z.number()), | ||
| }); | ||
|
|
||
| export type PredictEndpointResponse = z.infer<typeof predictEndpointResponseSchema>; | ||
|
|
||
| export interface PredictResponse { | ||
| createdAt: number; | ||
| isBargein: boolean; | ||
| probabilities: number[]; | ||
| predictionDurationInS: number; | ||
| } | ||
|
|
||
| export async function predictHTTP( | ||
| data: Int16Array, | ||
| predictOptions: PredictOptions, | ||
| options: PostOptions, | ||
| ): Promise<PredictResponse> { | ||
| const createdAt = performance.now(); | ||
| const url = new URL(`/bargein`, options.baseUrl); | ||
| url.searchParams.append('threshold', predictOptions.threshold.toString()); | ||
| url.searchParams.append('min_frames', predictOptions.minFrames.toFixed()); | ||
| url.searchParams.append('created_at', createdAt.toFixed()); | ||
|
|
||
| let retryCount = 0; | ||
| const response = await ofetch(url.toString(), { | ||
| retry: options.maxRetries ?? 3, | ||
| retryDelay: () => { | ||
| const delay = intervalForRetry(retryCount); | ||
| retryCount++; | ||
| return delay; | ||
| }, | ||
| headers: { | ||
| 'Content-Type': 'application/octet-stream', | ||
| Authorization: `Bearer ${options.token}`, | ||
| }, | ||
| signal: options.signal, | ||
| timeout: options.timeout, | ||
| method: 'POST', | ||
| body: data, | ||
| }); | ||
| const { created_at, is_bargein, probabilities } = predictEndpointResponseSchema.parse(response); | ||
|
|
||
| return { | ||
| createdAt: created_at, | ||
| isBargein: is_bargein, | ||
| probabilities, | ||
| predictionDurationInS: (performance.now() - createdAt) / 1000, | ||
| }; | ||
| } | ||
|
|
||
| export interface HttpTransportOptions { | ||
| baseUrl: string; | ||
| apiKey: string; | ||
| apiSecret: string; | ||
| threshold: number; | ||
| minFrames: number; | ||
| timeout: number; | ||
| maxRetries?: number; | ||
| } | ||
|
|
||
| export interface HttpTransportState { | ||
| overlapSpeechStarted: boolean; | ||
| overlapSpeechStartedAt: number | undefined; | ||
| cache: BoundedCache<number, InterruptionCacheEntry>; | ||
| } | ||
|
|
||
| /** | ||
| * Creates an HTTP transport TransformStream for interruption detection. | ||
| * | ||
| * This transport receives Int16Array audio slices and outputs InterruptionEvents. | ||
| * Each audio slice triggers an HTTP POST request. | ||
| * | ||
| * @param options - Transport options object. This is read on each request, so mutations | ||
| * to threshold/minFrames will be picked up dynamically. | ||
| */ | ||
| export function createHttpTransport( | ||
| options: HttpTransportOptions, | ||
| getState: () => HttpTransportState, | ||
| setState: (partial: Partial<HttpTransportState>) => void, | ||
| updateUserSpeakingSpan?: (entry: InterruptionCacheEntry) => void, | ||
| ): TransformStream<Int16Array | InterruptionEvent, InterruptionEvent> { | ||
| const logger = log(); | ||
|
|
||
| return new TransformStream<Int16Array | InterruptionEvent, InterruptionEvent>( | ||
| { | ||
| async transform(chunk, controller) { | ||
| // Pass through InterruptionEvents unchanged | ||
| if (!(chunk instanceof Int16Array)) { | ||
| controller.enqueue(chunk); | ||
| return; | ||
| } | ||
|
|
||
| const state = getState(); | ||
| if (!state.overlapSpeechStartedAt) return; | ||
|
|
||
| try { | ||
| const resp = await predictHTTP( | ||
| chunk, | ||
| { threshold: options.threshold, minFrames: options.minFrames }, | ||
| { | ||
| baseUrl: options.baseUrl, | ||
| timeout: options.timeout, | ||
| maxRetries: options.maxRetries, | ||
| token: await createAccessToken(options.apiKey, options.apiSecret), | ||
| }, | ||
| ); | ||
|
|
||
| const { createdAt, isBargein, probabilities, predictionDurationInS } = resp; | ||
| const entry = new InterruptionCacheEntry({ | ||
| createdAt, | ||
| probabilities, | ||
| isInterruption: isBargein, | ||
| speechInput: chunk, | ||
| totalDurationInS: (performance.now() - createdAt) / 1000, | ||
| detectionDelayInS: (Date.now() - state.overlapSpeechStartedAt) / 1000, | ||
| predictionDurationInS, | ||
| }); | ||
| state.cache.set(createdAt, entry); | ||
|
|
||
| if (state.overlapSpeechStarted && entry.isInterruption) { | ||
| if (updateUserSpeakingSpan) { | ||
| updateUserSpeakingSpan(entry); | ||
| } | ||
| const event: InterruptionEvent = { | ||
| type: InterruptionEventType.INTERRUPTION, | ||
| timestamp: Date.now(), | ||
| overlapSpeechStartedAt: state.overlapSpeechStartedAt, | ||
| isInterruption: entry.isInterruption, | ||
| speechInput: entry.speechInput, | ||
| probabilities: entry.probabilities, | ||
| totalDurationInS: entry.totalDurationInS, | ||
| predictionDurationInS: entry.predictionDurationInS, | ||
| detectionDelayInS: entry.detectionDelayInS, | ||
| probability: entry.probability, | ||
| }; | ||
| logger.debug( | ||
| { | ||
| detectionDelayInS: entry.detectionDelayInS, | ||
| totalDurationInS: entry.totalDurationInS, | ||
| }, | ||
| 'interruption detected', | ||
| ); | ||
| setState({ overlapSpeechStarted: false }); | ||
| controller.enqueue(event); | ||
| } | ||
| } catch (err) { | ||
| logger.error({ err }, 'Failed to send audio data over HTTP'); | ||
| } | ||
| }, | ||
| }, | ||
| { highWaterMark: 2 }, | ||
| { highWaterMark: 2 }, | ||
| ); | ||
| } | ||
47 changes: 47 additions & 0 deletions
47
agents/src/inference/interruption/interruption_cache_entry.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| // SPDX-FileCopyrightText: 2026 LiveKit, Inc. | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| import { estimateProbability } from './utils.js'; | ||
|
|
||
| /** | ||
| * Typed cache entry for interruption inference results. | ||
| * Mutable to support setOrUpdate pattern from Python's _BoundedCache. | ||
| */ | ||
| export class InterruptionCacheEntry { | ||
| createdAt: number; | ||
| totalDurationInS: number; | ||
| predictionDurationInS: number; | ||
| detectionDelayInS: number; | ||
| speechInput?: Int16Array; | ||
| probabilities?: number[]; | ||
| isInterruption?: boolean; | ||
|
|
||
| constructor(params: { | ||
| createdAt: number; | ||
| speechInput?: Int16Array; | ||
| totalDurationInS?: number; | ||
| predictionDurationInS?: number; | ||
| detectionDelayInS?: number; | ||
| probabilities?: number[]; | ||
| isInterruption?: boolean; | ||
| }) { | ||
| this.createdAt = params.createdAt; | ||
| this.totalDurationInS = params.totalDurationInS ?? 0; | ||
| this.predictionDurationInS = params.predictionDurationInS ?? 0; | ||
| this.detectionDelayInS = params.detectionDelayInS ?? 0; | ||
| this.speechInput = params.speechInput; | ||
| this.probabilities = params.probabilities; | ||
| this.isInterruption = params.isInterruption; | ||
| } | ||
|
|
||
| /** | ||
| * The conservative estimated probability of the interruption event. | ||
| */ | ||
| get probability(): number { | ||
| return this.probabilities ? estimateProbability(this.probabilities) : 0; | ||
| } | ||
|
|
||
| static default(): InterruptionCacheEntry { | ||
| return new InterruptionCacheEntry({ createdAt: 0 }); | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because
stateis captured before theawait predictHTTP(...), an overlap that ends while the request is in flight will still havestate.overlapSpeechStarted === truehere, which can emit an interruption event after overlap speech has already ended. This shows up when overlap ends quickly or the HTTP call is slow, producing false-positive interruptions. Consider re-readinggetState()after the await (or checking a monotonic overlap token) before emitting.Useful? React with 👍 / 👎.