Add AdaptiveInterruptionDetector#980
Conversation
|
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThis PR introduces an adaptive interruption detection system for the agents package, including a core detector class, stream-based architecture, HTTP and WebSocket transports for remote inference, caching utilities, type definitions, and telemetry integration. Also adds ofetch dependency and extends StreamChannel with custom error support. Changes
Sequence DiagramsequenceDiagram
actor Agent as Agent/Client
participant Detector as AdaptiveInterruptionDetector
participant Stream as InterruptionStreamBase
participant Transport as HTTP/WS Transport
participant Service as Inference Service
participant Telemetry as OpenTelemetry Span
Agent->>Detector: createStream()
Detector->>Stream: new InterruptionStreamBase()
Agent->>Stream: pushFrame(AudioFrame)
Stream->>Stream: Resample & buffer audio
alt Overlap Speech Detected
Stream->>Transport: send audio chunk + timestamp
Transport->>Service: POST/WS send Int16Array
Service->>Service: Run model inference
Service->>Transport: return probabilities
Transport->>Stream: emit InterruptionEvent
Stream->>Telemetry: updateUserSpeakingSpan()
Stream->>Agent: emit InterruptionEvent
end
Agent->>Stream: flush() / endInput()
Stream->>Transport: close connection
Transport->>Service: session.close()
Service-->>Transport: ack
Transport-->>Stream: done
Stream-->>Agent: stream closed
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
…into lukas/barge-transport
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 16
🤖 Fix all issues with AI agents
In `@agents/src/inference/interruption/AdaptiveInterruptionDetector.ts`:
- Line 1: Add the required SPDX file header comment to the top of
AdaptiveInterruptionDetector.ts to satisfy repo coding guidelines: insert the
standard SPDX-License-Identifier line and accompanying copyright/author header
used across the project (matching other TS files) as a block comment before any
imports (e.g., before the import of TypedEventEmitter) so the file now begins
with the repository's SPDX header followed by the existing import and
class/function definitions such as AdaptiveInterruptionDetector.
In `@agents/src/inference/interruption/defaults.ts`:
- Around line 1-2: The new file defaults.ts is missing the required SPDX header;
add the standard SPDX license identifier comment at the very top of the file
(above the imports) so the file begins with the repo's SPDX header line (e.g.,
the SPDX-License-Identifier comment used across the project) to satisfy
compliance; ensure the header precedes any code such as the imports of
ApiConnectOptions and InterruptionOptions.
In `@agents/src/inference/interruption/errors.ts`:
- Around line 1-3: Add the required SPDX license header to the top of the new
file agents/src/inference/interruption/errors.ts to comply with repo standards:
insert the SPDX short identifier line (e.g., "SPDX-License-Identifier:
<LICENSE>") and the repository's copyright/ownership line as used in other
source files, placing it before the existing file comment so the header is the
first lines in errors.ts.
In `@agents/src/inference/interruption/http_transport.ts`:
- Line 1: This new TypeScript file (http_transport.ts) is missing the required
SPDX file header; add the repo's standard SPDX header comment block (including
SPDX-License-Identifier and required copyright/owner line and year) at the very
top of the file above the existing import { ofetch } from 'ofetch'; so the file
starts with the header comment per coding guidelines.
In `@agents/src/inference/interruption/InterruptionCacheEntry.ts`:
- Line 1: Add the required SPDX header at the very top of the new file so it
complies with repository guidelines; open
agents/src/inference/interruption/InterruptionCacheEntry.ts and insert the
standard SPDX comment block as the first lines before any imports (so it appears
before the existing import { estimateProbability } from './utils.js'). Ensure
the header follows the repo's exact SPDX format used in other files.
In `@agents/src/inference/interruption/InterruptionStream.ts`:
- Around line 112-127: updateOptions currently mutates this.options but leaves
the transportOptions snapshot created in setupTransform unchanged, so HTTP
transport keeps using old threshold/minFrames; update updateOptions to also
update the mutable transport options used by the HTTP path (e.g., a new field
like this.transportOptions or the existing transportOptions object created in
setupTransform) so that changes to threshold, minInterruptionDurationInS and
derived minFrames are copied into that transport options object, and keep the
existing wsReconnect/useProxy reconnection logic for WebSocket flows; ensure the
same fix is applied to the other update code path noted around the second
occurrence (the block at the other mention of updateOptions).
- Line 1: Add the required SPDX file header comment to the top of the new
TypeScript file InterruptionStream.ts (above the existing import line), e.g., a
block or single-line comment containing the SPDX-License-Identifier and any
repository-required copyright/owner line; ensure the header is the first lines
of the file before "import { AudioFrame, AudioResampler } from
'@livekit/rtc-node';".
- Around line 327-330: flush() currently calls
this.inputStream.write(InterruptionStreamSentinel.flush()) without awaiting it,
so the sentinel can race with close(); change InterruptionStream.flush() to wait
for the write to complete before returning: inside flush() (after
ensureStreamsNotEnded()) await the write result—either await the promise
returned by this.inputStream.write if it returns one, or wrap the write in a
Promise that resolves on the write callback/'drain' event—so that the
InterruptionStreamSentinel.flush() is guaranteed to be delivered before close()
can run.
In `@agents/src/inference/interruption/types.ts`:
- Around line 1-2: This new file (types.ts) is missing the repository's required
SPDX headers; add the standard SPDX header block at the very top of the file
(above the import of Span) following the repo coding guidelines so the file
includes the copyright/identifier lines required by policy.
In `@agents/src/inference/interruption/utils.ts`:
- Line 1: This new module imports FRAME_DURATION_IN_S and
MIN_INTERRUPTION_DURATION_IN_S but is missing the required SPDX license header;
add the standard SPDX header comment as the first line(s) of the file (above the
import) so the file includes the repository's required license identifier before
any code or imports.
In `@agents/src/inference/interruption/ws_transport.test.ts`:
- Around line 1-3: The SPDX header in the new test file ws_transport.test.ts
currently shows 2024; update the header year to 2025 by replacing "2024 LiveKit,
Inc." with "2025 LiveKit, Inc." and ensure the SPDX-License-Identifier line
remains unchanged so the file uses the correct 2025 copyright header.
In `@agents/src/inference/interruption/ws_transport.ts`:
- Around line 1-10: Insert the required SPDX license header as a top-of-file
comment in ws_transport.ts (before the existing imports like Readable, Writable,
TransformStream, WebSocket) to comply with repo guidelines; ensure the header
matches the project's standard SPDX text (for example the exact
"SPDX-License-Identifier: <identifier>" line used across the repo) and place it
as the very first lines of the file without altering the existing imports or
logic.
- Around line 163-205: In processResponses, the inner fallback mistakenly
retries JSON.parse(line) (same as the first attempt) instead of attempting to
parse the entire buffer; update the catch block inside the lines loop to try
parsing buffer (or another intended fallback source) rather than re-parsing line
so that incomplete newline-delimited data can be recovered; specifically modify
processResponses' inner catch (the second JSON.parse currently using line) to
parse the complete buffer variable and call handleMessage(message) on success,
and keep the existing outer buffer-as-complete-message logic intact to avoid
double-processing.
- Around line 62-93: connectWebSocket currently leaves the WebSocket (ws) and
its streams open when the connection promise rejects on timeout or error,
causing resource leaks; update the connection Promise so that on the timeout
handler and the 'error' handler you explicitly close/terminate the ws (call
ws.close() or ws.terminate()) and remove listeners, and also abort/close the
streams returned by webSocketToStream (call readable.cancel() and
writable.abort() or equivalent) before rejecting; ensure the same cleanup runs
for both the timeout and error paths and that normal 'open' success still clears
the timeout and leaves streams intact.
- Around line 360-389: The transform handler on the TransformStream currently
only checks state.overlapSpeechStartedAt before sending audio, but that
timestamp is never cleared and can allow audio to stream after overlap speech
ends; update the transform(chunk, controller) logic (in ws_transport.ts) to
require the overlapSpeechStarted boolean as well (e.g., check
state.overlapSpeechStarted && state.overlapSpeechStartedAt) and bail out if
overlapSpeechStarted is false, before calling sendAudioData; ensure this gate is
applied for Int16Array chunks so sendAudioData is only invoked when both the
flag and the timestamp indicate overlap speech is active.
In `@agents/src/stream/stream_channel.ts`:
- Around line 23-25: The abort implementation doesn't set isClosed, so the
closed getter remains false after abort; update the abort method (the abort:
(error: E) => { ... } block that calls writer.abort(error)) to set isClosed =
true after successful writer.abort and mirror the error-handling pattern used by
close() (catch the TypeError-like double-abort case, set isClosed = true and
swallow that specific error, otherwise rethrow) so closed accurately reflects
the stream state.
🧹 Nitpick comments (1)
agents/src/inference/interruption/AdaptiveInterruptionDetector.ts (1)
14-21: Avoid exposing options that are ignored.
sampleRate,minFrames, anduseProxyare derived internally, butAdaptiveInterruptionDetectorOptionscurrently allows callers to pass them with no effect. Consider narrowing the public options type to avoid no‑op inputs.♻️ Suggested type narrowing
-export type AdaptiveInterruptionDetectorOptions = Partial<InterruptionOptions>; +export type AdaptiveInterruptionDetectorOptions = Partial< + Omit<InterruptionOptions, 'minFrames' | 'sampleRate' | 'useProxy'> +>;
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yamlis excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (16)
.changeset/config.jsonagents/package.jsonagents/src/index.tsagents/src/inference/interruption/AdaptiveInterruptionDetector.tsagents/src/inference/interruption/InterruptionCacheEntry.tsagents/src/inference/interruption/InterruptionStream.tsagents/src/inference/interruption/defaults.tsagents/src/inference/interruption/errors.tsagents/src/inference/interruption/http_transport.tsagents/src/inference/interruption/types.tsagents/src/inference/interruption/utils.test.tsagents/src/inference/interruption/utils.tsagents/src/inference/interruption/ws_transport.test.tsagents/src/inference/interruption/ws_transport.tsagents/src/stream/stream_channel.tsagents/src/telemetry/trace_types.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx,js,jsx}
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Add SPDX-FileCopyrightText and SPDX-License-Identifier headers to all newly added files with '// SPDX-FileCopyrightText: 2025 LiveKit, Inc.' and '// SPDX-License-Identifier: Apache-2.0'
Files:
agents/src/inference/interruption/ws_transport.tsagents/src/index.tsagents/src/inference/interruption/InterruptionCacheEntry.tsagents/src/inference/interruption/utils.test.tsagents/src/telemetry/trace_types.tsagents/src/inference/interruption/errors.tsagents/src/inference/interruption/ws_transport.test.tsagents/src/inference/interruption/types.tsagents/src/stream/stream_channel.tsagents/src/inference/interruption/defaults.tsagents/src/inference/interruption/http_transport.tsagents/src/inference/interruption/utils.tsagents/src/inference/interruption/AdaptiveInterruptionDetector.tsagents/src/inference/interruption/InterruptionStream.ts
**/*.{ts,tsx}?(test|example|spec)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
When testing inference LLM, always use full model names from
agents/src/inference/models.ts(e.g., 'openai/gpt-4o-mini' instead of 'gpt-4o-mini')
Files:
agents/src/inference/interruption/ws_transport.tsagents/src/index.tsagents/src/inference/interruption/InterruptionCacheEntry.tsagents/src/inference/interruption/utils.test.tsagents/src/telemetry/trace_types.tsagents/src/inference/interruption/errors.tsagents/src/inference/interruption/ws_transport.test.tsagents/src/inference/interruption/types.tsagents/src/stream/stream_channel.tsagents/src/inference/interruption/defaults.tsagents/src/inference/interruption/http_transport.tsagents/src/inference/interruption/utils.tsagents/src/inference/interruption/AdaptiveInterruptionDetector.tsagents/src/inference/interruption/InterruptionStream.ts
**/*.{ts,tsx}?(test|example)
📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)
Initialize logger before using any LLM functionality with
initializeLogger({ pretty: true })from '@livekit/agents'
Files:
agents/src/inference/interruption/ws_transport.tsagents/src/index.tsagents/src/inference/interruption/InterruptionCacheEntry.tsagents/src/inference/interruption/utils.test.tsagents/src/telemetry/trace_types.tsagents/src/inference/interruption/errors.tsagents/src/inference/interruption/ws_transport.test.tsagents/src/inference/interruption/types.tsagents/src/stream/stream_channel.tsagents/src/inference/interruption/defaults.tsagents/src/inference/interruption/http_transport.tsagents/src/inference/interruption/utils.tsagents/src/inference/interruption/AdaptiveInterruptionDetector.tsagents/src/inference/interruption/InterruptionStream.ts
🧠 Learnings (1)
📚 Learning: 2026-01-16T14:33:39.551Z
Learnt from: CR
Repo: livekit/agents-js PR: 0
File: .cursor/rules/agent-core.mdc:0-0
Timestamp: 2026-01-16T14:33:39.551Z
Learning: Applies to **/{examples,test}/**/*.test.ts : Include both basic streaming and tool calling tests to verify full LLM functionality
Applied to files:
agents/src/inference/interruption/utils.test.tsagents/src/inference/interruption/ws_transport.test.ts
🧬 Code graph analysis (9)
agents/src/inference/interruption/ws_transport.ts (5)
agents/src/inference/interruption/utils.ts (1)
BoundedCache(7-106)agents/src/inference/interruption/InterruptionCacheEntry.ts (1)
InterruptionCacheEntry(7-44)agents/src/inference/utils.ts (1)
createAccessToken(10-19)agents/src/inference/interruption/types.ts (1)
InterruptionEvent(14-25)agents/src/inference/interruption/defaults.ts (1)
intervalForRetry(25-34)
agents/src/inference/interruption/InterruptionCacheEntry.ts (1)
agents/src/inference/interruption/utils.ts (1)
estimateProbability(112-122)
agents/src/inference/interruption/utils.test.ts (1)
agents/src/inference/interruption/utils.ts (1)
slidingWindowMinMax(124-137)
agents/src/inference/interruption/ws_transport.test.ts (1)
agents/src/inference/interruption/ws_transport.ts (1)
webSocketToStream(46-57)
agents/src/inference/interruption/types.ts (1)
agents/src/inference/interruption/InterruptionStream.ts (7)
ApiConnectOptions(31-31)AgentSpeechStarted(30-30)AgentSpeechEnded(29-29)OverlapSpeechStarted(35-35)OverlapSpeechEnded(34-34)Flush(32-32)InterruptionSentinel(33-33)
agents/src/inference/interruption/defaults.ts (1)
agents/src/inference/interruption/types.ts (2)
ApiConnectOptions(48-52)InterruptionOptions(30-43)
agents/src/inference/interruption/utils.ts (1)
agents/src/inference/interruption/defaults.ts (2)
MIN_INTERRUPTION_DURATION_IN_S(4-4)FRAME_DURATION_IN_S(12-12)
agents/src/inference/interruption/AdaptiveInterruptionDetector.ts (4)
agents/src/inference/interruption/types.ts (2)
InterruptionEvent(14-25)InterruptionOptions(30-43)agents/src/inference/interruption/errors.ts (1)
InterruptionDetectionError(4-22)agents/src/inference/interruption/InterruptionStream.ts (1)
stream(303-305)agents/src/inference/interruption/defaults.ts (4)
interruptionOptionDefaults(36-49)DEFAULT_BASE_URL(13-13)SAMPLE_RATE(10-10)FRAMES_PER_SECOND(11-11)
agents/src/inference/interruption/InterruptionStream.ts (5)
agents/src/inference/interruption/types.ts (9)
AgentSpeechStarted(56-58)AgentSpeechEnded(60-62)OverlapSpeechStarted(64-68)OverlapSpeechEnded(70-72)Flush(74-76)InterruptionSentinel(81-86)InterruptionEvent(14-25)InterruptionOptions(30-43)ApiConnectOptions(48-52)agents/src/inference/interruption/InterruptionCacheEntry.ts (1)
InterruptionCacheEntry(7-44)agents/src/stream/stream_channel.ts (2)
StreamChannel(7-13)createStreamChannel(15-44)agents/src/inference/interruption/defaults.ts (2)
apiConnectDefaults(15-19)FRAMES_PER_SECOND(11-11)agents/src/inference/interruption/utils.ts (1)
BoundedCache(7-106)
🔇 Additional comments (37)
agents/src/stream/stream_channel.ts (1)
7-13: LGTM on the interface changes.The generic error type
E extends Error = Errorprovides good type safety while maintaining backward compatibility with the default. Theabortmethod signature is appropriate for stream cancellation scenarios..changeset/config.json (1)
11-11: No action needed.agents/src/index.ts (1)
39-39: LGTM for the public re-export.agents/src/telemetry/trace_types.ts (1)
55-59: LGTM for new interruption attributes.agents/package.json (1)
72-72: No action needed. Theofetchdependency (MIT license) is compatible with the project's Node targets (20, 24) and includes fallbacks for older Node versions. Dependency is ready for release.agents/src/inference/interruption/utils.test.ts (4)
8-11: Short-array guard is covered well.
13-17: Window-size-1 behavior is validated clearly.
19-24: Sustained-probability window case is well tested.
26-30: Equal-length window scenario coverage looks good.agents/src/inference/interruption/ws_transport.test.ts (6)
8-15: Server helper looks solid.
29-115: Readable-stream coverage is thorough.
117-173: Writable-stream cases are well covered.
176-207: Bidirectional echo test is a good integration check.
210-241: Abrupt close handling is validated well.
18-25: [Rewritten review comment]
[Exactly ONE classification tag]agents/src/inference/interruption/InterruptionCacheEntry.ts (1)
7-43: Cache entry shape and probability helper are cleanly modeled.agents/src/inference/interruption/utils.ts (2)
7-106: BoundedCache API is straightforward and usable.
112-137: Probability helpers are clear and match the intended semantics.agents/src/inference/interruption/ws_transport.ts (4)
46-57: WebSocket stream bridge is clean and idiomatic.
212-306: Message handling logic is coherent and well structured.
308-336: Audio framing and header construction look correct.
338-359: Close/reconnect sequencing is sensible.agents/src/inference/interruption/defaults.ts (2)
25-34: Retry helper is clear and matches the stated intent.
36-49: Defaults block is well organized and easy to reason about.agents/src/inference/interruption/http_transport.ts (3)
10-34: Clear request/response models.The HTTP option and response types are easy to follow and make the transport contract explicit.
98-175: Transport pipeline looks solid.Pass‑through of non‑audio events, cache updates, and interruption emission flow are coherent.
41-72: Verify timebase consistency for the/bargeinendpoint'screated_atresponse.The code sends
performance.now()as thecreated_atquery parameter to the server (line 45), but the response'screated_atvalue is used to populate the returned object (line 69). If the server generates its own timestamp instead of echoing the client's value, the duration calculations using that timestamp later (particularly increateHttpTransportat line 136:totalDurationInS: (performance.now() - createdAt) / 1000) will be incorrect.Required action: Confirm whether the
/bargeinendpoint echoes the client'screated_atparameter or generates its own server-side timestamp. If it does not echo, use the original client timestamp for all duration calculations as suggested in the review.agents/src/inference/interruption/AdaptiveInterruptionDetector.ts (4)
30-111: Initialization and validation look good.Default merging, credential checks, and the max‑duration guard are well handled.
113-147: Accessors and error emission are clean.Simple getters and
emitErrorkeep the public surface predictable.
148-165: Stream tracking is straightforward.Creating and removing streams via the Set is clear and matches the intended lifecycle.
170-188: Option propagation is handled cleanly.Updating local options then propagating to active streams is clear and consistent.
agents/src/inference/interruption/InterruptionStream.ts (6)
27-61: Sentinel helpers are clear and compact.The factory methods make sentinel construction explicit and easy to use.
63-72: Span attribute updates look consistent.Attributes are set with clear trace keys and normalized values.
96-107: Constructor wiring is solid.Defaults, API options, and stream setup are clearly initialized.
129-284: Pipeline composition is cohesive.The two‑stage transform (audio → transport) reads cleanly and keeps transport concerns isolated.
303-325: Resampling and sentinel handling are clean.The resampler path and sentinel short‑circuiting are handled thoughtfully.
346-387: Windowing + mix‑down logic looks solid.The buffer shift and channel averaging are clear and safe.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| const transport = new TransformStream<Int16Array | InterruptionEvent, InterruptionEvent>( | ||
| { | ||
| start(controller) { | ||
| outputController = controller; | ||
| }, | ||
|
|
||
| async transform(chunk, controller) { | ||
| // Pass through InterruptionEvents unchanged | ||
| if (!(chunk instanceof Int16Array)) { | ||
| controller.enqueue(chunk); | ||
| return; | ||
| } | ||
|
|
||
| const state = getState(); | ||
| if (!state.overlapSpeechStartedAt) return; | ||
|
|
||
| try { | ||
| await sendAudioData(chunk); | ||
| } catch (err) { | ||
| logger.error({ err }, 'Failed to send audio data over WebSocket'); | ||
| } | ||
| }, | ||
|
|
||
| async flush() { | ||
| await close(); | ||
| }, | ||
| }, | ||
| { highWaterMark: 2 }, | ||
| { highWaterMark: 2 }, | ||
| ); |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the structure of the state and how these fields are managed
rg -A 5 -B 5 "overlapSpeechStarted" agents/src/inference/interruption/ws_transport.ts | head -100Repository: livekit/agents-js
Length of output: 178
🏁 Script executed:
# Look for where state is initialized and modified
rg "overlapSpeechStartedAt\s*=" agents/src/inference/interruption/ -A 2 -B 2Repository: livekit/agents-js
Length of output: 570
🏁 Script executed:
# Check for any reset or clear operations
rg "overlapSpeechStarted\s*=" agents/src/inference/interruption/ -A 2 -B 2Repository: livekit/agents-js
Length of output: 3065
🏁 Script executed:
# Search for where overlapSpeechStartedAt is cleared or reset
rg "overlapSpeechStartedAt\s*=" agents/src/inference/interruption/ -A 1 -B 1Repository: livekit/agents-js
Length of output: 372
🏁 Script executed:
# Look at the complete flow in InterruptionStream.ts where overlap speech started is handled
rg -B 10 -A 10 "overlap-speech-started" agents/src/inference/interruption/InterruptionStream.tsRepository: livekit/agents-js
Length of output: 2390
🏁 Script executed:
# Check the state definition in ws_transport.ts to see what fields are tracked
rg -B 5 -A 15 "getState\|setState" agents/src/inference/interruption/ws_transport.ts | head -80Repository: livekit/agents-js
Length of output: 43
🏁 Script executed:
# Search for overlap-speech-ended and overlap-speech-not-interrupted to see if they clear the timestamp
rg "overlap-speech-(ended|not-interrupted)" agents/src/inference/interruption/InterruptionStream.ts -A 5 -B 2Repository: livekit/agents-js
Length of output: 689
🏁 Script executed:
# Look at how state flows from InterruptionStream to ws_transport - search for state definition/export
rg "this\.overlapSpeechStartedAt" agents/src/inference/interruption/InterruptionStream.ts -B 3 -A 3Repository: livekit/agents-js
Length of output: 1029
🏁 Script executed:
# Check ws_transport.ts for how getState is defined and what it returns
rg -B 20 "getState()" agents/src/inference/interruption/ws_transport.ts | head -60Repository: livekit/agents-js
Length of output: 1653
🏁 Script executed:
# Get the full overlap-speech-ended handler to see what happens after the condition
rg -A 25 "overlap-speech-ended" agents/src/inference/interruption/InterruptionStream.tsRepository: livekit/agents-js
Length of output: 2417
Add explicit gate on overlapSpeechStarted flag in addition to timestamp check.
The transform function only checks overlapSpeechStartedAt when deciding whether to send audio. Since this timestamp is never cleared after overlap speech ends (only set once on 'overlap-speech-started'), it persists while overlapSpeechStarted may be false. This allows audio to stream unintentionally.
Suggested fix
- if (!state.overlapSpeechStartedAt) return;
+ if (!state.overlapSpeechStarted || !state.overlapSpeechStartedAt) return;🤖 Prompt for AI Agents
In `@agents/src/inference/interruption/ws_transport.ts` around lines 360 - 389,
The transform handler on the TransformStream currently only checks
state.overlapSpeechStartedAt before sending audio, but that timestamp is never
cleared and can allow audio to stream after overlap speech ends; update the
transform(chunk, controller) logic (in ws_transport.ts) to require the
overlapSpeechStarted boolean as well (e.g., check state.overlapSpeechStarted &&
state.overlapSpeechStartedAt) and bail out if overlapSpeechStarted is false,
before calling sendAudioData; ensure this gate is applied for Int16Array chunks
so sendAudioData is only invoked when both the flag and the timestamp indicate
overlap speech is active.
There was a problem hiding this comment.
That's not applicable as audio is only written to the transport when overlapSpeechStarted is true.
The ws_transport's overlapSpeechStartedAt check is a secondary guard, not the primary control.
The flow is: InterruptionStream gates audio → sends to transport → transport has additional guard
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check how InterruptionStream sends audio to the transport and where it gates based on overlapSpeechStarted
rg -A 10 -B 5 "overlapSpeechStarted" agents/src/inference/interruption/InterruptionStream.ts | head -150
Length of output: 5830
🏁 Script executed:
#!/bin/bash
# Look for where the transport is created and how data flows to it
rg -A 15 "createWsTransport|createHttpTransport" agents/src/inference/interruption/InterruptionStream.ts
Length of output: 1460
🏁 Script executed:
#!/bin/bash
# Check for any pipeline or pipeThrough calls that show the data flow
rg -A 5 -B 5 "pipeThrough|pipeTo" agents/src/inference/interruption/InterruptionStream.ts
Length of output: 399
@lukasIO, you're absolutely right, and thank you for the clarification! I can see from the code that the audioTransformer in InterruptionStream.ts only enqueues audio slices when overlapSpeechStarted is true:
if (
accumulatedSamples >= Math.floor(this.options.detectionIntervalInS * this.options.sampleRate) &&
overlapSpeechStarted
) {
const audioSlice = inferenceS16Data.slice(0, startIdx);
controller.enqueue(audioSlice);
}Since the pipeline flows audioTransformer -> transport, the transport only receives Int16Array chunks that have already passed the overlapSpeechStarted gate. The overlapSpeechStartedAt check in ws_transport is indeed a secondary guard. My original suggestion was based on an incomplete understanding of the pipeline architecture. I apologize for the noise!
| await writer.write(new TextEncoder().encode(sessionCreateMsg)); | ||
|
|
||
| // Start reading responses | ||
| readerTask = processResponses(conn.readable); |
There was a problem hiding this comment.
Should we wrap this with Task.from and control cancellation logic inside processResponse? Otherwise it could become a dangling promise.
There was a problem hiding this comment.
there's a test case already that ensures closing the websocket closes the streams also. Do you think we need an additional abort signal beyond the ability to close the websocket?
| while (true) { | ||
| const { done, value } = await reader.read(); |
There was a problem hiding this comment.
Same as ^, we should use abortController explicitly and do await Promise.race([reader.read(), waitForAbort(signal)]); to unblock dangling await
| const message: WsMessage = JSON.parse(line); | ||
| handleMessage(message); | ||
| } catch { | ||
| logger.warn({ line }, 'Failed to parse WebSocket message'); | ||
| } |
There was a problem hiding this comment.
Could be safer and easier to debug if we write a zod schema for WsMessage and to a safeParseAsync here.
There was a problem hiding this comment.
ideally this would be a protobuf as part of the agents protocol, we should keep this in mind for a v2 of the endpoint
| // Also try parsing buffer as complete message (for non-newline-delimited) | ||
| if (buffer.trim()) { | ||
| try { | ||
| const message: WsMessage = JSON.parse(buffer); |
| case MSG_INTERRUPTION_DETECTED: { | ||
| const createdAt = message.created_at ?? 0; | ||
| if (state.overlapSpeechStarted && state.overlapSpeechStartedAt !== undefined) { | ||
| const existing = state.cache.get(createdAt); | ||
| const entry = new InterruptionCacheEntry({ | ||
| createdAt, | ||
| speechInput: existing?.speechInput, | ||
| totalDurationInS: (performance.now() - createdAt) / 1000, | ||
| probabilities: message.probabilities, | ||
| isInterruption: true, |
There was a problem hiding this comment.
Same ^, usually I found explicitly defining ClientMessage and ServerMessage zod schema, and manually parsed the raw string to JSON makes it easier to debug & iterate on Ws-based STT/TTS API. This way we don't need to do additional assertions on optional fields.
There was a problem hiding this comment.
| writer.releaseLock(); | ||
| writer = null; | ||
| } | ||
| ws?.close(1000); |
There was a problem hiding this comment.
What is 1000? Maybe add a comment? Also just curious does closing ws automatically cleanup resources of writable / readable stream attached to it?
There was a problem hiding this comment.
answered this above, 1000 is the clean close status for websockets. I'll add a comment
| { highWaterMark: 2 }, | ||
| { highWaterMark: 2 }, |
There was a problem hiding this comment.
Any reason we set high water mark to be 2?
I saw in IdentityTransform we make it to be max integer:
export class IdentityTransform<T> extends TransformStream<T, T> {
constructor() {
super(
{
transform: (chunk, controller) => controller.enqueue(chunk),
},
// By default the transfor stream will buffer only one chunk at a time.
// In order to follow the python agents channel.py, we set set the capaciy to be effectively infinite.
{ highWaterMark: Number.MAX_SAFE_INTEGER },
{ highWaterMark: Number.MAX_SAFE_INTEGER },
);
}
}There was a problem hiding this comment.
setting it to 2 instead of unbounded ensures we apply backpressure and don't try to overload the barge in server with endless parallel requests in case it doesn't keep up
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.