Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4e31b27
Add turn config interfaces and defaults (#975)
lukasIO Jan 16, 2026
f3c7430
Merge branch 'main' into feat/barge-in
Toubat Jan 21, 2026
07c5d71
Add AdaptiveInterruptionDetector (#980)
lukasIO Jan 27, 2026
f1a2114
Merge branch 'main' into feat/barge-in
lukasIO Jan 27, 2026
c861f50
Add agent activity interruption detector integration (#991)
lukasIO Jan 29, 2026
1862dc3
remove aic
lukasIO Jan 29, 2026
705ed33
reuse
lukasIO Jan 29, 2026
8f53889
remove tests for legacy stream approach
lukasIO Jan 29, 2026
7d24bf0
fix util migration tests
lukasIO Jan 29, 2026
c78cf58
comment out example tests
lukasIO Jan 29, 2026
d5b271c
Rename files to underscore cases (#1007)
toubatbrian Jan 30, 2026
b020180
update date
lukasIO Jan 30, 2026
dbad1e4
update date
lukasIO Jan 30, 2026
d882012
update defaults
lukasIO Jan 30, 2026
67e8f6c
deprecate legacy options and update tests
lukasIO Jan 30, 2026
96d6b57
fix internal types
lukasIO Jan 30, 2026
2ee2748
rabbit comments
lukasIO Jan 30, 2026
62cd448
remove unused stuff
lukasIO Jan 30, 2026
ec6d9bd
more rabbit fixes
lukasIO Jan 30, 2026
016e3a4
better cleanup
lukasIO Jan 30, 2026
9a4939c
ensure inputStartedAt is set
lukasIO Jan 30, 2026
e28b1b1
Fix Inference URL parity (#1011)
toubatbrian Feb 2, 2026
4310baa
Preserve turnDetection after cloning
toubatbrian Feb 2, 2026
175e57b
respect LIVEKIT_REMOTE_EOT_URL environment variable
toubatbrian Feb 2, 2026
0682f25
refine timeout computation
toubatbrian Feb 3, 2026
0d2efc6
save temp
toubatbrian Feb 3, 2026
d83d7b6
fix comments
toubatbrian Feb 3, 2026
3ce96e1
Update realtime_api.ts
toubatbrian Feb 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions .changeset/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@
],
"commit": false,
"ignore": ["livekit-agents-examples"],
"fixed": [
[
"@livekit/agents",
"@livekit/agents-plugin-*",
"@livekit/agents-plugins-test"
]
],
"fixed": [["@livekit/agents", "@livekit/agents-plugin-*", "@livekit/agents-plugins-test"]],
"access": "public",
"baseBranch": "main",
"updateInternalDependencies": "patch",
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ jobs:
- name: Test agents
if: steps.filter.outputs.agents-or-tests == 'true' || github.event_name == 'push'
run: pnpm test agents
- name: Test examples
if: (steps.filter.outputs.examples == 'true' || github.event_name == 'push') && secrets.OPENAI_API_KEY != ''
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: pnpm test:examples
# - name: Test examples
# if: (steps.filter.outputs.examples == 'true' || github.event_name == 'push')
# env:
# OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
# run: pnpm test:examples
# TODO (AJS-83) Re-enable once plugins are refactored with abort controllers
# - name: Test all plugins
# if: steps.filter.outputs.agents-or-tests == 'true' || github.event_name != 'pull_request'
Expand Down
1 change: 1 addition & 0 deletions agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"heap-js": "^2.6.0",
"json-schema": "^0.4.0",
"livekit-server-sdk": "^2.14.1",
"ofetch": "^1.5.1",
"openai": "^6.8.1",
"pidusage": "^4.0.1",
"pino": "^8.19.0",
Expand Down
82 changes: 82 additions & 0 deletions agents/src/inference/interruption/defaults.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { ApiConnectOptions } from './interruption_stream.js';
import type { InterruptionOptions } from './types.js';

export const MIN_INTERRUPTION_DURATION_IN_S = 0.025 * 2; // 25ms per frame, 2 consecutive frames
export const THRESHOLD = 0.65;
export const MAX_AUDIO_DURATION_IN_S = 3.0;
export const AUDIO_PREFIX_DURATION_IN_S = 0.5;
export const DETECTION_INTERVAL_IN_S = 0.1;
export const REMOTE_INFERENCE_TIMEOUT_IN_S = 1.0;
export const SAMPLE_RATE = 16000;
export const FRAMES_PER_SECOND = 40;
export const FRAME_DURATION_IN_S = 0.025; // 25ms per frame

/** Default production inference URL */
export const DEFAULT_BASE_URL = 'https://agent-gateway.livekit.cloud/v1';

/** Staging inference URL */
export const STAGING_BASE_URL = 'https://agent-gateway-staging.livekit.cloud/v1';

/**
* Get the default inference URL based on the environment.
*
* Priority:
* 1. LIVEKIT_INFERENCE_URL if set
* 2. If LIVEKIT_URL contains '.staging.livekit.cloud', use staging gateway
* 3. Otherwise, use production gateway
*/
export function getDefaultInferenceUrl(): string {
// Priority 1: LIVEKIT_INFERENCE_URL
const inferenceUrl = process.env.LIVEKIT_INFERENCE_URL;
if (inferenceUrl) {
return inferenceUrl;
}

// Priority 2: Check LIVEKIT_URL for staging (exact match to Python)
const livekitUrl = process.env.LIVEKIT_URL || '';
if (livekitUrl.includes('.staging.livekit.cloud')) {
return STAGING_BASE_URL;
}

// Priority 3: Default to production
return DEFAULT_BASE_URL;
}

export const apiConnectDefaults: ApiConnectOptions = {
maxRetries: 3,
retryInterval: 2_000,
timeout: 10_000,
} as const;

/**
* Calculate the retry interval using exponential backoff with jitter.
* Matches the Python implementation's _interval_for_retry behavior.
*/
export function intervalForRetry(
attempt: number,
baseInterval: number = apiConnectDefaults.retryInterval,
): number {
// Exponential backoff: baseInterval * 2^attempt with some jitter
const exponentialDelay = baseInterval * Math.pow(2, attempt);
// Add jitter (0-25% of the delay)
const jitter = exponentialDelay * Math.random() * 0.25;
return exponentialDelay + jitter;
}

// baseUrl and useProxy are resolved dynamically in the constructor
// to respect LIVEKIT_REMOTE_EOT_URL environment variable
export const interruptionOptionDefaults: Omit<InterruptionOptions, 'baseUrl' | 'useProxy'> = {
sampleRate: SAMPLE_RATE,
threshold: THRESHOLD,
minFrames: Math.ceil(MIN_INTERRUPTION_DURATION_IN_S * FRAMES_PER_SECOND),
maxAudioDurationInS: MAX_AUDIO_DURATION_IN_S,
audioPrefixDurationInS: AUDIO_PREFIX_DURATION_IN_S,
detectionIntervalInS: DETECTION_INTERVAL_IN_S,
inferenceTimeout: REMOTE_INFERENCE_TIMEOUT_IN_S * 1_000,
apiKey: process.env.LIVEKIT_API_KEY || '',
apiSecret: process.env.LIVEKIT_API_SECRET || '',
minInterruptionDurationInS: MIN_INTERRUPTION_DURATION_IN_S,
} as const;
25 changes: 25 additions & 0 deletions agents/src/inference/interruption/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
/**
* Error thrown during interruption detection.
*/
export class InterruptionDetectionError extends Error {
readonly type = 'InterruptionDetectionError';

readonly timestamp: number;
readonly label: string;
readonly recoverable: boolean;

constructor(message: string, timestamp: number, label: string, recoverable: boolean) {
super(message);
this.name = 'InterruptionDetectionError';
this.timestamp = timestamp;
this.label = label;
this.recoverable = recoverable;
}

toString(): string {
return `${this.name}: ${this.message} (label=${this.label}, timestamp=${this.timestamp}, recoverable=${this.recoverable})`;
}
}
183 changes: 183 additions & 0 deletions agents/src/inference/interruption/http_transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ofetch } from 'ofetch';
import { TransformStream } from 'stream/web';
import { z } from 'zod';
import { log } from '../../log.js';
import { createAccessToken } from '../utils.js';
import { intervalForRetry } from './defaults.js';
import { InterruptionCacheEntry } from './interruption_cache_entry.js';
import { type InterruptionEvent, InterruptionEventType } from './types.js';
import type { BoundedCache } from './utils.js';

export interface PostOptions {
baseUrl: string;
token: string;
signal?: AbortSignal;
timeout?: number;
maxRetries?: number;
}

export interface PredictOptions {
threshold: number;
minFrames: number;
}

export const predictEndpointResponseSchema = z.object({
created_at: z.number(),
is_bargein: z.boolean(),
probabilities: z.array(z.number()),
});

export type PredictEndpointResponse = z.infer<typeof predictEndpointResponseSchema>;

export interface PredictResponse {
createdAt: number;
isBargein: boolean;
probabilities: number[];
predictionDurationInS: number;
}

export async function predictHTTP(
data: Int16Array,
predictOptions: PredictOptions,
options: PostOptions,
): Promise<PredictResponse> {
const createdAt = performance.now();
const url = new URL(`/bargein`, options.baseUrl);
url.searchParams.append('threshold', predictOptions.threshold.toString());
url.searchParams.append('min_frames', predictOptions.minFrames.toFixed());
url.searchParams.append('created_at', createdAt.toFixed());

let retryCount = 0;
const response = await ofetch(url.toString(), {
retry: options.maxRetries ?? 3,
retryDelay: () => {
const delay = intervalForRetry(retryCount);
retryCount++;
return delay;
},
headers: {
'Content-Type': 'application/octet-stream',
Authorization: `Bearer ${options.token}`,
},
signal: options.signal,
timeout: options.timeout,
method: 'POST',
body: data,
});
const { created_at, is_bargein, probabilities } = predictEndpointResponseSchema.parse(response);

return {
createdAt: created_at,
isBargein: is_bargein,
probabilities,
predictionDurationInS: (performance.now() - createdAt) / 1000,
};
}

export interface HttpTransportOptions {
baseUrl: string;
apiKey: string;
apiSecret: string;
threshold: number;
minFrames: number;
timeout: number;
maxRetries?: number;
}

export interface HttpTransportState {
overlapSpeechStarted: boolean;
overlapSpeechStartedAt: number | undefined;
cache: BoundedCache<number, InterruptionCacheEntry>;
}

/**
* Creates an HTTP transport TransformStream for interruption detection.
*
* This transport receives Int16Array audio slices and outputs InterruptionEvents.
* Each audio slice triggers an HTTP POST request.
*
* @param options - Transport options object. This is read on each request, so mutations
* to threshold/minFrames will be picked up dynamically.
*/
export function createHttpTransport(
options: HttpTransportOptions,
getState: () => HttpTransportState,
setState: (partial: Partial<HttpTransportState>) => void,
updateUserSpeakingSpan?: (entry: InterruptionCacheEntry) => void,
): TransformStream<Int16Array | InterruptionEvent, InterruptionEvent> {
const logger = log();

return new TransformStream<Int16Array | InterruptionEvent, InterruptionEvent>(
{
async transform(chunk, controller) {
// Pass through InterruptionEvents unchanged
if (!(chunk instanceof Int16Array)) {
controller.enqueue(chunk);
return;
}

const state = getState();
if (!state.overlapSpeechStartedAt) return;

try {
const resp = await predictHTTP(
chunk,
{ threshold: options.threshold, minFrames: options.minFrames },
{
baseUrl: options.baseUrl,
timeout: options.timeout,
maxRetries: options.maxRetries,
token: await createAccessToken(options.apiKey, options.apiSecret),
},
);

const { createdAt, isBargein, probabilities, predictionDurationInS } = resp;
const entry = new InterruptionCacheEntry({
createdAt,
probabilities,
isInterruption: isBargein,
speechInput: chunk,
totalDurationInS: (performance.now() - createdAt) / 1000,
detectionDelayInS: (Date.now() - state.overlapSpeechStartedAt) / 1000,
predictionDurationInS,
});
state.cache.set(createdAt, entry);

if (state.overlapSpeechStarted && entry.isInterruption) {
if (updateUserSpeakingSpan) {
updateUserSpeakingSpan(entry);
Comment on lines +147 to +151

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Re-check overlap state after HTTP await

Because state is captured before the await predictHTTP(...), an overlap that ends while the request is in flight will still have state.overlapSpeechStarted === true here, which can emit an interruption event after overlap speech has already ended. This shows up when overlap ends quickly or the HTTP call is slow, producing false-positive interruptions. Consider re-reading getState() after the await (or checking a monotonic overlap token) before emitting.

Useful? React with 👍 / 👎.

}
const event: InterruptionEvent = {
type: InterruptionEventType.INTERRUPTION,
timestamp: Date.now(),
overlapSpeechStartedAt: state.overlapSpeechStartedAt,
isInterruption: entry.isInterruption,
speechInput: entry.speechInput,
probabilities: entry.probabilities,
totalDurationInS: entry.totalDurationInS,
predictionDurationInS: entry.predictionDurationInS,
detectionDelayInS: entry.detectionDelayInS,
probability: entry.probability,
};
logger.debug(
{
detectionDelayInS: entry.detectionDelayInS,
totalDurationInS: entry.totalDurationInS,
},
'interruption detected',
);
setState({ overlapSpeechStarted: false });
controller.enqueue(event);
}
} catch (err) {
logger.error({ err }, 'Failed to send audio data over HTTP');
}
},
},
{ highWaterMark: 2 },
{ highWaterMark: 2 },
);
}
47 changes: 47 additions & 0 deletions agents/src/inference/interruption/interruption_cache_entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { estimateProbability } from './utils.js';

/**
* Typed cache entry for interruption inference results.
* Mutable to support setOrUpdate pattern from Python's _BoundedCache.
*/
export class InterruptionCacheEntry {
createdAt: number;
totalDurationInS: number;
predictionDurationInS: number;
detectionDelayInS: number;
speechInput?: Int16Array;
probabilities?: number[];
isInterruption?: boolean;

constructor(params: {
createdAt: number;
speechInput?: Int16Array;
totalDurationInS?: number;
predictionDurationInS?: number;
detectionDelayInS?: number;
probabilities?: number[];
isInterruption?: boolean;
}) {
this.createdAt = params.createdAt;
this.totalDurationInS = params.totalDurationInS ?? 0;
this.predictionDurationInS = params.predictionDurationInS ?? 0;
this.detectionDelayInS = params.detectionDelayInS ?? 0;
this.speechInput = params.speechInput;
this.probabilities = params.probabilities;
this.isInterruption = params.isInterruption;
}

/**
* The conservative estimated probability of the interruption event.
*/
get probability(): number {
return this.probabilities ? estimateProbability(this.probabilities) : 0;
}

static default(): InterruptionCacheEntry {
return new InterruptionCacheEntry({ createdAt: 0 });
}
}
Loading