Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4e31b27
Add turn config interfaces and defaults (#975)
lukasIO Jan 16, 2026
f3c7430
Merge branch 'main' into feat/barge-in
Toubat Jan 21, 2026
07c5d71
Add AdaptiveInterruptionDetector (#980)
lukasIO Jan 27, 2026
f1a2114
Merge branch 'main' into feat/barge-in
lukasIO Jan 27, 2026
c861f50
Add agent activity interruption detector integration (#991)
lukasIO Jan 29, 2026
1862dc3
remove aic
lukasIO Jan 29, 2026
705ed33
reuse
lukasIO Jan 29, 2026
8f53889
remove tests for legacy stream approach
lukasIO Jan 29, 2026
7d24bf0
fix util migration tests
lukasIO Jan 29, 2026
c78cf58
comment out example tests
lukasIO Jan 29, 2026
d5b271c
Rename files to underscore cases (#1007)
toubatbrian Jan 30, 2026
b020180
update date
lukasIO Jan 30, 2026
dbad1e4
update date
lukasIO Jan 30, 2026
d882012
update defaults
lukasIO Jan 30, 2026
67e8f6c
deprecate legacy options and update tests
lukasIO Jan 30, 2026
96d6b57
fix internal types
lukasIO Jan 30, 2026
2ee2748
rabbit comments
lukasIO Jan 30, 2026
62cd448
remove unused stuff
lukasIO Jan 30, 2026
ec6d9bd
more rabbit fixes
lukasIO Jan 30, 2026
016e3a4
better cleanup
lukasIO Jan 30, 2026
9a4939c
ensure inputStartedAt is set
lukasIO Jan 30, 2026
e28b1b1
Fix Inference URL parity (#1011)
toubatbrian Feb 2, 2026
4310baa
Preserve turnDetection after cloning
toubatbrian Feb 2, 2026
175e57b
respect LIVEKIT_REMOTE_EOT_URL environment variable
toubatbrian Feb 2, 2026
0682f25
refine timeout computation
toubatbrian Feb 3, 2026
a820521
Merge branch 'main' into feat/barge-in
lukasIO Feb 4, 2026
e175e7d
migrate turnhandling options on agent level
lukasIO Feb 4, 2026
9cb0a29
Create silly-donkeys-shop.md
lukasIO Feb 4, 2026
5f088b9
add explicit logging for sample rate error
lukasIO Feb 4, 2026
6fbc417
conditionally create interruption stream channel only when detection …
lukasIO Feb 4, 2026
9f2932d
propagate updated turn detection options
lukasIO Feb 4, 2026
b4a82ad
fix comment
lukasIO Feb 4, 2026
f2ac83a
Merge branch 'feat/barge-in' of github.com:livekit/agents-js into fea…
lukasIO Feb 4, 2026
ec26bb1
fix tests
lukasIO Feb 4, 2026
b5c541f
migrate allowInterruptions
lukasIO Feb 4, 2026
76bd4e8
fix inputStartedAt assignment
lukasIO Feb 4, 2026
245bc66
Session Usage Collection (#1014)
toubatbrian Feb 5, 2026
ea27278
resolve comments
toubatbrian Feb 5, 2026
63eccca
Merge branch 'main' into feat/barge-in
toubatbrian Feb 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions .changeset/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,7 @@
],
"commit": false,
"ignore": ["livekit-agents-examples"],
"fixed": [
[
"@livekit/agents",
"@livekit/agents-plugin-*",
"@livekit/agents-plugins-test"
]
],
"fixed": [["@livekit/agents", "@livekit/agents-plugin-*", "@livekit/agents-plugins-test"]],
"access": "public",
"baseBranch": "main",
"updateInternalDependencies": "patch",
Expand Down
10 changes: 10 additions & 0 deletions .changeset/lucky-grapes-care.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@livekit/agents": patch
"@livekit/agents-plugin-cartesia": patch
"@livekit/agents-plugin-deepgram": patch
"@livekit/agents-plugin-google": patch
"@livekit/agents-plugin-openai": patch
"livekit-agents-examples": patch
---

Add granular session models usage stats
5 changes: 5 additions & 0 deletions .changeset/silly-donkeys-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": minor
---

Refactor turn handling options and add barge-in model support
10 changes: 5 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ jobs:
- name: Test agents
if: steps.filter.outputs.agents-or-tests == 'true' || github.event_name == 'push'
run: pnpm test agents
- name: Test examples
if: (steps.filter.outputs.examples == 'true' || github.event_name == 'push') && secrets.OPENAI_API_KEY != ''
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: pnpm test:examples
# - name: Test examples
# if: (steps.filter.outputs.examples == 'true' || github.event_name == 'push')
# env:
# OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
# run: pnpm test:examples
# TODO (AJS-83) Re-enable once plugins are refactored with abort controllers
# - name: Test all plugins
# if: steps.filter.outputs.agents-or-tests == 'true' || github.event_name != 'pull_request'
Expand Down
1 change: 1 addition & 0 deletions agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
"heap-js": "^2.6.0",
"json-schema": "^0.4.0",
"livekit-server-sdk": "^2.14.1",
"ofetch": "^1.5.1",
"openai": "^6.8.1",
"pidusage": "^4.0.1",
"pino": "^8.19.0",
Expand Down
51 changes: 51 additions & 0 deletions agents/src/inference/interruption/defaults.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { ApiConnectOptions } from './interruption_stream.js';
import type { InterruptionOptions } from './types.js';

export const MIN_INTERRUPTION_DURATION_IN_S = 0.025 * 2; // 25ms per frame, 2 consecutive frames
export const THRESHOLD = 0.65;
export const MAX_AUDIO_DURATION_IN_S = 3.0;
export const AUDIO_PREFIX_DURATION_IN_S = 0.5;
export const DETECTION_INTERVAL_IN_S = 0.1;
export const REMOTE_INFERENCE_TIMEOUT_IN_S = 1.0;
export const SAMPLE_RATE = 16000;
export const FRAMES_PER_SECOND = 40;
export const FRAME_DURATION_IN_S = 0.025; // 25ms per frame

export const apiConnectDefaults: ApiConnectOptions = {
maxRetries: 3,
retryInterval: 2_000,
timeout: 10_000,
} as const;

/**
* Calculate the retry interval using exponential backoff with jitter.
* Matches the Python implementation's _interval_for_retry behavior.
*/
export function intervalForRetry(
attempt: number,
baseInterval: number = apiConnectDefaults.retryInterval,
): number {
// Exponential backoff: baseInterval * 2^attempt with some jitter
const exponentialDelay = baseInterval * Math.pow(2, attempt);
// Add jitter (0-25% of the delay)
const jitter = exponentialDelay * Math.random() * 0.25;
return exponentialDelay + jitter;
}

// baseUrl and useProxy are resolved dynamically in the constructor
// to respect LIVEKIT_REMOTE_EOT_URL environment variable
export const interruptionOptionDefaults: Omit<InterruptionOptions, 'baseUrl' | 'useProxy'> = {
sampleRate: SAMPLE_RATE,
threshold: THRESHOLD,
minFrames: Math.ceil(MIN_INTERRUPTION_DURATION_IN_S * FRAMES_PER_SECOND),
maxAudioDurationInS: MAX_AUDIO_DURATION_IN_S,
audioPrefixDurationInS: AUDIO_PREFIX_DURATION_IN_S,
detectionIntervalInS: DETECTION_INTERVAL_IN_S,
inferenceTimeout: REMOTE_INFERENCE_TIMEOUT_IN_S * 1_000,
apiKey: process.env.LIVEKIT_API_KEY || '',
apiSecret: process.env.LIVEKIT_API_SECRET || '',
minInterruptionDurationInS: MIN_INTERRUPTION_DURATION_IN_S,
} as const;
25 changes: 25 additions & 0 deletions agents/src/inference/interruption/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
/**
* Error thrown during interruption detection.
*/
export class InterruptionDetectionError extends Error {
readonly type = 'InterruptionDetectionError';

readonly timestamp: number;
readonly label: string;
readonly recoverable: boolean;

constructor(message: string, timestamp: number, label: string, recoverable: boolean) {
super(message);
this.name = 'InterruptionDetectionError';
this.timestamp = timestamp;
this.label = label;
this.recoverable = recoverable;
}

toString(): string {
return `${this.name}: ${this.message} (label=${this.label}, timestamp=${this.timestamp}, recoverable=${this.recoverable})`;
}
}
183 changes: 183 additions & 0 deletions agents/src/inference/interruption/http_transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { ofetch } from 'ofetch';
import { TransformStream } from 'stream/web';
import { z } from 'zod';
import { log } from '../../log.js';
import { createAccessToken } from '../utils.js';
import { intervalForRetry } from './defaults.js';
import { InterruptionCacheEntry } from './interruption_cache_entry.js';
import { type InterruptionEvent, InterruptionEventType } from './types.js';
import type { BoundedCache } from './utils.js';

export interface PostOptions {
baseUrl: string;
token: string;
signal?: AbortSignal;
timeout?: number;
maxRetries?: number;
}

export interface PredictOptions {
threshold: number;
minFrames: number;
}

export const predictEndpointResponseSchema = z.object({
created_at: z.number(),
is_bargein: z.boolean(),
probabilities: z.array(z.number()),
});

export type PredictEndpointResponse = z.infer<typeof predictEndpointResponseSchema>;

export interface PredictResponse {
createdAt: number;
isBargein: boolean;
probabilities: number[];
predictionDurationInS: number;
}

export async function predictHTTP(
data: Int16Array,
predictOptions: PredictOptions,
options: PostOptions,
): Promise<PredictResponse> {
const createdAt = performance.now();
const url = new URL(`/bargein`, options.baseUrl);
url.searchParams.append('threshold', predictOptions.threshold.toString());
url.searchParams.append('min_frames', predictOptions.minFrames.toFixed());
url.searchParams.append('created_at', createdAt.toFixed());

let retryCount = 0;
const response = await ofetch(url.toString(), {
retry: options.maxRetries ?? 3,
retryDelay: () => {
const delay = intervalForRetry(retryCount);
retryCount++;
return delay;
},
headers: {
'Content-Type': 'application/octet-stream',
Authorization: `Bearer ${options.token}`,
},
signal: options.signal,
timeout: options.timeout,
method: 'POST',
body: data,
});
const { created_at, is_bargein, probabilities } = predictEndpointResponseSchema.parse(response);

return {
createdAt: created_at,
isBargein: is_bargein,
probabilities,
predictionDurationInS: (performance.now() - createdAt) / 1000,
};
}

export interface HttpTransportOptions {
baseUrl: string;
apiKey: string;
apiSecret: string;
threshold: number;
minFrames: number;
timeout: number;
maxRetries?: number;
}

export interface HttpTransportState {
overlapSpeechStarted: boolean;
overlapSpeechStartedAt: number | undefined;
cache: BoundedCache<number, InterruptionCacheEntry>;
}

/**
* Creates an HTTP transport TransformStream for interruption detection.
*
* This transport receives Int16Array audio slices and outputs InterruptionEvents.
* Each audio slice triggers an HTTP POST request.
*
* @param options - Transport options object. This is read on each request, so mutations
* to threshold/minFrames will be picked up dynamically.
*/
export function createHttpTransport(
options: HttpTransportOptions,
getState: () => HttpTransportState,
setState: (partial: Partial<HttpTransportState>) => void,
updateUserSpeakingSpan?: (entry: InterruptionCacheEntry) => void,
): TransformStream<Int16Array | InterruptionEvent, InterruptionEvent> {
const logger = log();

return new TransformStream<Int16Array | InterruptionEvent, InterruptionEvent>(
{
async transform(chunk, controller) {
// Pass through InterruptionEvents unchanged
if (!(chunk instanceof Int16Array)) {
controller.enqueue(chunk);
return;
}

const state = getState();
if (!state.overlapSpeechStartedAt) return;

try {
const resp = await predictHTTP(
chunk,
{ threshold: options.threshold, minFrames: options.minFrames },
{
baseUrl: options.baseUrl,
timeout: options.timeout,
maxRetries: options.maxRetries,
token: await createAccessToken(options.apiKey, options.apiSecret),
},
);

const { createdAt, isBargein, probabilities, predictionDurationInS } = resp;
const entry = new InterruptionCacheEntry({
createdAt,
probabilities,
isInterruption: isBargein,
speechInput: chunk,
totalDurationInS: (performance.now() - createdAt) / 1000,
detectionDelayInS: (Date.now() - state.overlapSpeechStartedAt) / 1000,
predictionDurationInS,
});
state.cache.set(createdAt, entry);

if (state.overlapSpeechStarted && entry.isInterruption) {
if (updateUserSpeakingSpan) {
updateUserSpeakingSpan(entry);
}
const event: InterruptionEvent = {
type: InterruptionEventType.INTERRUPTION,
timestamp: Date.now(),
overlapSpeechStartedAt: state.overlapSpeechStartedAt,
isInterruption: entry.isInterruption,
speechInput: entry.speechInput,
probabilities: entry.probabilities,
totalDurationInS: entry.totalDurationInS,
predictionDurationInS: entry.predictionDurationInS,
detectionDelayInS: entry.detectionDelayInS,
probability: entry.probability,
};
logger.debug(
{
detectionDelayInS: entry.detectionDelayInS,
totalDurationInS: entry.totalDurationInS,
},
'interruption detected',
);
setState({ overlapSpeechStarted: false });
controller.enqueue(event);
}
} catch (err) {
logger.error({ err }, 'Failed to send audio data over HTTP');
}
},
},
{ highWaterMark: 2 },
{ highWaterMark: 2 },
);
}
47 changes: 47 additions & 0 deletions agents/src/inference/interruption/interruption_cache_entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { estimateProbability } from './utils.js';

/**
* Typed cache entry for interruption inference results.
* Mutable to support setOrUpdate pattern from Python's _BoundedCache.
*/
export class InterruptionCacheEntry {
createdAt: number;
totalDurationInS: number;
predictionDurationInS: number;
detectionDelayInS: number;
speechInput?: Int16Array;
probabilities?: number[];
isInterruption?: boolean;

constructor(params: {
createdAt: number;
speechInput?: Int16Array;
totalDurationInS?: number;
predictionDurationInS?: number;
detectionDelayInS?: number;
probabilities?: number[];
isInterruption?: boolean;
}) {
this.createdAt = params.createdAt;
this.totalDurationInS = params.totalDurationInS ?? 0;
this.predictionDurationInS = params.predictionDurationInS ?? 0;
this.detectionDelayInS = params.detectionDelayInS ?? 0;
this.speechInput = params.speechInput;
this.probabilities = params.probabilities;
this.isInterruption = params.isInterruption;
}

/**
* The conservative estimated probability of the interruption event.
*/
get probability(): number {
return this.probabilities ? estimateProbability(this.probabilities) : 0;
}

static default(): InterruptionCacheEntry {
return new InterruptionCacheEntry({ createdAt: 0 });
}
}
Loading