-
Notifications
You must be signed in to change notification settings - Fork 220
Add AdaptiveInterruptionDetector #980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
9f67b4f
wip http + ws transport for barge in
lukasIO cf3d723
refactor
lukasIO 738d1a5
type errors resolved
lukasIO b3638e9
more wiring
lukasIO df7bb86
exports and overlap handling
lukasIO 1f715c9
thx claude
lukasIO 049ca17
more wip
lukasIO 0af5c0a
changeset
lukasIO 094b1a0
local testing
lukasIO c0c8ff7
Merge branch 'main' into lukas/barge-transport
lukasIO 732d7b4
smaller bugfixes
lukasIO d52d8af
more bug fixes and back pressure
lukasIO 4c4dbc8
better logging
lukasIO c27f8dc
refactor and update naming
lukasIO 3b5caae
Merge branch 'feat/barge-in' into lukas/barge-transport
Toubat aee3612
renaming and update transport tests
lukasIO 0009ac1
Merge branch 'lukas/barge-transport' of github.com:livekit/agents-js …
lukasIO 1f3c315
add missing features
lukasIO 1aac5f7
revert voice activity stuff
lukasIO 94cd4c4
reorganize
lukasIO 776cb30
revert test changes
lukasIO a580d7e
remove broken example
lukasIO dd0c98a
fix mutable transport options
lukasIO 00c16ff
async fixes
lukasIO 7dd5bbe
terminate ws on timeout
lukasIO 543742d
more fixes
lukasIO 9f61326
add license headers
lukasIO d32c202
emit events on detector
lukasIO File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
192 changes: 192 additions & 0 deletions
192
agents/src/inference/interruption/AdaptiveInterruptionDetector.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,192 @@ | ||
| // SPDX-FileCopyrightText: 2026 LiveKit, Inc. | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| import type { TypedEventEmitter } from '@livekit/typed-emitter'; | ||
| import EventEmitter from 'events'; | ||
| import { log } from '../../log.js'; | ||
| import { InterruptionStreamBase } from './InterruptionStream.js'; | ||
| import { | ||
| DEFAULT_BASE_URL, | ||
| FRAMES_PER_SECOND, | ||
| SAMPLE_RATE, | ||
| interruptionOptionDefaults, | ||
| } from './defaults.js'; | ||
| import type { InterruptionDetectionError } from './errors.js'; | ||
| import type { InterruptionEvent, InterruptionOptions } from './types.js'; | ||
|
|
||
| type InterruptionCallbacks = { | ||
| userInterruptionDetected: (event: InterruptionEvent) => void; | ||
| userNonInterruptionDetected: (event: InterruptionEvent) => void; | ||
| overlapSpeechEnded: (event: InterruptionEvent) => void; | ||
| error: (error: InterruptionDetectionError) => void; | ||
| }; | ||
|
|
||
| export type AdaptiveInterruptionDetectorOptions = Omit<Partial<InterruptionOptions>, 'useProxy'>; | ||
|
|
||
| export class AdaptiveInterruptionDetector extends (EventEmitter as new () => TypedEventEmitter<InterruptionCallbacks>) { | ||
| options: InterruptionOptions; | ||
| private readonly _label: string; | ||
| private logger = log(); | ||
| // Use Set instead of WeakSet to allow iteration for propagating option updates | ||
| private streams: Set<InterruptionStreamBase> = new Set(); | ||
|
|
||
| constructor(options: AdaptiveInterruptionDetectorOptions = {}) { | ||
| super(); | ||
|
|
||
| const { | ||
| maxAudioDurationInS, | ||
| baseUrl, | ||
| apiKey, | ||
| apiSecret, | ||
| audioPrefixDurationInS, | ||
| threshold, | ||
| detectionIntervalInS, | ||
| inferenceTimeout, | ||
| minInterruptionDurationInS, | ||
| } = { ...interruptionOptionDefaults, ...options }; | ||
|
|
||
| if (maxAudioDurationInS > 3.0) { | ||
| throw new Error('maxAudioDurationInS must be less than or equal to 3.0 seconds'); | ||
| } | ||
|
|
||
| const lkBaseUrl = baseUrl ?? process.env.LIVEKIT_REMOTE_EOT_URL ?? DEFAULT_BASE_URL; | ||
| let lkApiKey = apiKey ?? ''; | ||
| let lkApiSecret = apiSecret ?? ''; | ||
| let useProxy: boolean; | ||
|
|
||
| // use LiveKit credentials if using the default base URL (inference) | ||
| if (lkBaseUrl === DEFAULT_BASE_URL) { | ||
| lkApiKey = | ||
| apiKey ?? process.env.LIVEKIT_INFERENCE_API_KEY ?? process.env.LIVEKIT_API_KEY ?? ''; | ||
| if (!lkApiKey) { | ||
| throw new Error( | ||
| 'apiKey is required, either as argument or set LIVEKIT_API_KEY environmental variable', | ||
| ); | ||
| } | ||
|
|
||
| lkApiSecret = | ||
| apiSecret ?? | ||
| process.env.LIVEKIT_INFERENCE_API_SECRET ?? | ||
| process.env.LIVEKIT_API_SECRET ?? | ||
| ''; | ||
| if (!lkApiSecret) { | ||
| throw new Error( | ||
| 'apiSecret is required, either as argument or set LIVEKIT_API_SECRET environmental variable', | ||
| ); | ||
| } | ||
|
|
||
| useProxy = true; | ||
| } else { | ||
| // Force useProxy to false for custom URLs (matching Python behavior) | ||
| useProxy = false; | ||
| } | ||
|
|
||
| this.options = { | ||
| sampleRate: SAMPLE_RATE, | ||
| threshold, | ||
| minFrames: Math.ceil(minInterruptionDurationInS * FRAMES_PER_SECOND), | ||
| maxAudioDurationInS, | ||
| audioPrefixDurationInS, | ||
| detectionIntervalInS, | ||
| inferenceTimeout, | ||
| baseUrl: lkBaseUrl, | ||
| apiKey: lkApiKey, | ||
| apiSecret: lkApiSecret, | ||
| useProxy, | ||
| minInterruptionDurationInS, | ||
| }; | ||
|
|
||
| this._label = `${this.constructor.name}`; | ||
|
|
||
| this.logger.debug( | ||
| { | ||
| baseUrl: this.options.baseUrl, | ||
| detectionIntervalInS: this.options.detectionIntervalInS, | ||
| audioPrefixDurationInS: this.options.audioPrefixDurationInS, | ||
| maxAudioDurationInS: this.options.maxAudioDurationInS, | ||
| minFrames: this.options.minFrames, | ||
| threshold: this.options.threshold, | ||
| inferenceTimeout: this.options.inferenceTimeout, | ||
| useProxy: this.options.useProxy, | ||
| }, | ||
| 'adaptive interruption detector initialized', | ||
| ); | ||
| } | ||
|
|
||
| /** | ||
| * The model identifier for this detector. | ||
| */ | ||
| get model(): string { | ||
| return 'adaptive interruption'; | ||
| } | ||
|
|
||
| /** | ||
| * The provider identifier for this detector. | ||
| */ | ||
| get provider(): string { | ||
| return 'livekit'; | ||
| } | ||
|
|
||
| /** | ||
| * The label for this detector instance. | ||
| */ | ||
| get label(): string { | ||
| return this._label; | ||
| } | ||
|
|
||
| /** | ||
| * The sample rate used for audio processing. | ||
| */ | ||
| get sampleRate(): number { | ||
| return this.options.sampleRate; | ||
| } | ||
|
|
||
| /** | ||
| * Emit an error event from the detector. | ||
| */ | ||
| emitError(error: InterruptionDetectionError): void { | ||
| this.emit('error', error); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a new InterruptionStreamBase for internal use. | ||
| * The stream can receive audio frames and sentinels via pushFrame(). | ||
| * Use this when you need direct access to the stream for pushing frames. | ||
| */ | ||
| createStream(): InterruptionStreamBase { | ||
| const streamBase = new InterruptionStreamBase(this, {}); | ||
| this.streams.add(streamBase); | ||
| return streamBase; | ||
| } | ||
|
|
||
| /** | ||
| * Remove a stream from tracking (called when stream is closed). | ||
| */ | ||
| removeStream(stream: InterruptionStreamBase): void { | ||
| this.streams.delete(stream); | ||
| } | ||
|
|
||
| /** | ||
| * Update options for the detector and propagate to all active streams. | ||
| * For WebSocket streams, this triggers a reconnection with new settings. | ||
| */ | ||
| async updateOptions(options: { | ||
| threshold?: number; | ||
| minInterruptionDurationInS?: number; | ||
| }): Promise<void> { | ||
| if (options.threshold !== undefined) { | ||
| this.options.threshold = options.threshold; | ||
| } | ||
| if (options.minInterruptionDurationInS !== undefined) { | ||
| this.options.minInterruptionDurationInS = options.minInterruptionDurationInS; | ||
| this.options.minFrames = Math.ceil(options.minInterruptionDurationInS * FRAMES_PER_SECOND); | ||
| } | ||
|
|
||
| // Propagate option updates to all active streams (matching Python behavior) | ||
| const updatePromises: Promise<void>[] = []; | ||
| for (const stream of this.streams) { | ||
| updatePromises.push(stream.updateOptions(options)); | ||
| } | ||
| await Promise.all(updatePromises); | ||
| } | ||
| } | ||
47 changes: 47 additions & 0 deletions
47
agents/src/inference/interruption/InterruptionCacheEntry.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| // SPDX-FileCopyrightText: 2024 LiveKit, Inc. | ||
| // | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
| import { estimateProbability } from './utils.js'; | ||
lukasIO marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * Typed cache entry for interruption inference results. | ||
| * Mutable to support setOrUpdate pattern from Python's _BoundedCache. | ||
| */ | ||
| export class InterruptionCacheEntry { | ||
| createdAt: number; | ||
| totalDurationInS: number; | ||
| predictionDurationInS: number; | ||
| detectionDelayInS: number; | ||
| speechInput?: Int16Array; | ||
| probabilities?: number[]; | ||
| isInterruption?: boolean; | ||
|
|
||
| constructor(params: { | ||
| createdAt: number; | ||
| speechInput?: Int16Array; | ||
| totalDurationInS?: number; | ||
| predictionDurationInS?: number; | ||
| detectionDelayInS?: number; | ||
| probabilities?: number[]; | ||
| isInterruption?: boolean; | ||
| }) { | ||
| this.createdAt = params.createdAt; | ||
| this.totalDurationInS = params.totalDurationInS ?? 0; | ||
| this.predictionDurationInS = params.predictionDurationInS ?? 0; | ||
| this.detectionDelayInS = params.detectionDelayInS ?? 0; | ||
| this.speechInput = params.speechInput; | ||
| this.probabilities = params.probabilities; | ||
| this.isInterruption = params.isInterruption; | ||
| } | ||
|
|
||
| /** | ||
| * The conservative estimated probability of the interruption event. | ||
| */ | ||
| get probability(): number { | ||
| return this.probabilities ? estimateProbability(this.probabilities) : 0; | ||
| } | ||
|
|
||
| static default(): InterruptionCacheEntry { | ||
| return new InterruptionCacheEntry({ createdAt: 0 }); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.