Skip to content

Add AdaptiveInterruptionDetector#980

Merged
lukasIO merged 28 commits intofeat/barge-infrom
lukas/barge-transport
Jan 27, 2026
Merged

Add AdaptiveInterruptionDetector#980
lukasIO merged 28 commits intofeat/barge-infrom
lukas/barge-transport

Conversation

@lukasIO
Copy link
Contributor

@lukasIO lukasIO commented Jan 16, 2026

Summary by CodeRabbit

  • New Features

    • Adaptive interruption detection with configurable thresholds, timing parameters, and probability estimation.
    • HTTP and WebSocket transport options for real-time interruption inference.
    • Stream-based interruption detection and management APIs.
    • Telemetry attributes to track interruption events and metrics.
  • Chores

    • Added ofetch dependency.

✏️ Tip: You can customize this high-level summary in your review settings.

@changeset-bot
Copy link

changeset-bot bot commented Jan 16, 2026

⚠️ No Changeset found

Latest commit: d32c202

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coderabbitai
Copy link

coderabbitai bot commented Jan 16, 2026

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

This PR introduces an adaptive interruption detection system for the agents package, including a core detector class, stream-based architecture, HTTP and WebSocket transports for remote inference, caching utilities, type definitions, and telemetry integration. Also adds ofetch dependency and extends StreamChannel with custom error support.

Changes

Cohort / File(s) Summary
Configuration & Dependencies
agents/package.json, .changeset/config.json
Added ofetch ^1.5.1 dependency for HTTP requests; reformatted changeset config structure.
Module Exports
agents/src/index.ts
Re-exported interruption module exports at top level via export * from './inference/interruption/index.js'.
Core Detector
agents/src/inference/interruption/AdaptiveInterruptionDetector.ts
New class managing interruption detection streams with options validation, API credential handling, multi-stream coordination, and dynamic option propagation across active streams.
Stream Infrastructure
agents/src/inference/interruption/InterruptionStream.ts
Added InterruptionStreamSentinel factory and InterruptionStreamBase streaming pipeline orchestrating audio buffering, overlap detection, resampling, state management, and transport delegation.
Type Definitions
agents/src/inference/interruption/types.ts
Defined InterruptionEventType enum, InterruptionEvent/Options interfaces, ApiConnectOptions, and InterruptionSentinel control signals (AgentSpeechStarted/Ended, OverlapSpeechStarted/Ended, Flush).
Data Models & Errors
agents/src/inference/interruption/InterruptionCacheEntry.ts, agents/src/inference/interruption/errors.ts
Added InterruptionCacheEntry for caching inference results; InterruptionDetectionError class for recoverable/non-recoverable error handling with telemetry metadata.
Configuration & Defaults
agents/src/inference/interruption/defaults.ts
Exported timing/threshold constants, apiConnectDefaults, intervalForRetry exponential backoff, and interruptionOptionDefaults aggregating all runtime configuration with environment variable fallbacks.
Transport: HTTP
agents/src/inference/interruption/http_transport.ts
Implemented HTTP-based inference transport with predictHTTP for POST requests to /bargein endpoint, retry logic, PredictResponse mapping, and createHttpTransport pipeline for audio streaming with interruption event emission.
Transport: WebSocket
agents/src/inference/interruption/ws_transport.ts, agents/src/inference/interruption/ws_transport.test.ts
Implemented WebSocket transport with token-based auth, session management, bidirectional messaging, binary audio streaming with timestamps, reconnection support, and comprehensive stream/lifecycle tests.
Utilities
agents/src/inference/interruption/utils.ts, agents/src/inference/interruption/utils.test.ts
Added BoundedCache<K, V> FIFO eviction structure, estimateProbability sliding-window probability computation, slidingWindowMinMax aggregation, and test coverage.
Infrastructure Updates
agents/src/stream/stream_channel.ts
Parameterized StreamChannel with custom error type <T, E extends Error = Error> and added abort(error: E) method.
Telemetry
agents/src/telemetry/trace_types.ts
Added five new interruption-related attribute constants: ATTR_IS_INTERRUPTION, ATTR_INTERRUPTION_PROBABILITY, ATTR_INTERRUPTION_TOTAL_DURATION, ATTR_INTERRUPTION_PREDICTION_DURATION, ATTR_INTERRUPTION_DETECTION_DELAY.

Sequence Diagram

sequenceDiagram
    actor Agent as Agent/Client
    participant Detector as AdaptiveInterruptionDetector
    participant Stream as InterruptionStreamBase
    participant Transport as HTTP/WS Transport
    participant Service as Inference Service
    participant Telemetry as OpenTelemetry Span

    Agent->>Detector: createStream()
    Detector->>Stream: new InterruptionStreamBase()
    
    Agent->>Stream: pushFrame(AudioFrame)
    Stream->>Stream: Resample & buffer audio
    
    alt Overlap Speech Detected
        Stream->>Transport: send audio chunk + timestamp
        Transport->>Service: POST/WS send Int16Array
        Service->>Service: Run model inference
        Service->>Transport: return probabilities
        Transport->>Stream: emit InterruptionEvent
        Stream->>Telemetry: updateUserSpeakingSpan()
        Stream->>Agent: emit InterruptionEvent
    end
    
    Agent->>Stream: flush() / endInput()
    Stream->>Transport: close connection
    Transport->>Service: session.close()
    Service-->>Transport: ack
    Transport-->>Stream: done
    Stream-->>Agent: stream closed
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Suggested reviewers

  • chenghao-mou
  • theomonnom

Poem

🐰 A Curious Interruption 🎤

With whiskers twitched and ears perked high,
I built a system sharp and spry,
To catch mid-hop when speech breaks through—
HTTP, WebSocket, streams brand new!
Cache and cache, the buffers sing,
As interruptions take their wing! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request lacks any description content; the author provided no explanation of changes, rationale, testing, or checklist items required by the template. Add a comprehensive description following the provided template, including a summary of changes, rationale for all additions, test results, and pre-review checklist completion.
Docstring Coverage ⚠️ Warning Docstring coverage is 47.37% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Add AdaptiveInterruptionDetector' directly matches the main change: introducing a new AdaptiveInterruptionDetector class that serves as the primary public API for the interruption detection feature, alongside supporting transport and utility modules.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

@lukasIO lukasIO marked this pull request as ready for review January 22, 2026 09:48
@coderabbitai
Copy link

coderabbitai bot commented Jan 22, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

🤖 Fix all issues with AI agents
In `@agents/src/inference/interruption/AdaptiveInterruptionDetector.ts`:
- Line 1: Add the required SPDX file header comment to the top of
AdaptiveInterruptionDetector.ts to satisfy repo coding guidelines: insert the
standard SPDX-License-Identifier line and accompanying copyright/author header
used across the project (matching other TS files) as a block comment before any
imports (e.g., before the import of TypedEventEmitter) so the file now begins
with the repository's SPDX header followed by the existing import and
class/function definitions such as AdaptiveInterruptionDetector.

In `@agents/src/inference/interruption/defaults.ts`:
- Around line 1-2: The new file defaults.ts is missing the required SPDX header;
add the standard SPDX license identifier comment at the very top of the file
(above the imports) so the file begins with the repo's SPDX header line (e.g.,
the SPDX-License-Identifier comment used across the project) to satisfy
compliance; ensure the header precedes any code such as the imports of
ApiConnectOptions and InterruptionOptions.

In `@agents/src/inference/interruption/errors.ts`:
- Around line 1-3: Add the required SPDX license header to the top of the new
file agents/src/inference/interruption/errors.ts to comply with repo standards:
insert the SPDX short identifier line (e.g., "SPDX-License-Identifier:
<LICENSE>") and the repository's copyright/ownership line as used in other
source files, placing it before the existing file comment so the header is the
first lines in errors.ts.

In `@agents/src/inference/interruption/http_transport.ts`:
- Line 1: This new TypeScript file (http_transport.ts) is missing the required
SPDX file header; add the repo's standard SPDX header comment block (including
SPDX-License-Identifier and required copyright/owner line and year) at the very
top of the file above the existing import { ofetch } from 'ofetch'; so the file
starts with the header comment per coding guidelines.

In `@agents/src/inference/interruption/InterruptionCacheEntry.ts`:
- Line 1: Add the required SPDX header at the very top of the new file so it
complies with repository guidelines; open
agents/src/inference/interruption/InterruptionCacheEntry.ts and insert the
standard SPDX comment block as the first lines before any imports (so it appears
before the existing import { estimateProbability } from './utils.js'). Ensure
the header follows the repo's exact SPDX format used in other files.

In `@agents/src/inference/interruption/InterruptionStream.ts`:
- Around line 112-127: updateOptions currently mutates this.options but leaves
the transportOptions snapshot created in setupTransform unchanged, so HTTP
transport keeps using old threshold/minFrames; update updateOptions to also
update the mutable transport options used by the HTTP path (e.g., a new field
like this.transportOptions or the existing transportOptions object created in
setupTransform) so that changes to threshold, minInterruptionDurationInS and
derived minFrames are copied into that transport options object, and keep the
existing wsReconnect/useProxy reconnection logic for WebSocket flows; ensure the
same fix is applied to the other update code path noted around the second
occurrence (the block at the other mention of updateOptions).
- Line 1: Add the required SPDX file header comment to the top of the new
TypeScript file InterruptionStream.ts (above the existing import line), e.g., a
block or single-line comment containing the SPDX-License-Identifier and any
repository-required copyright/owner line; ensure the header is the first lines
of the file before "import { AudioFrame, AudioResampler } from
'@livekit/rtc-node';".
- Around line 327-330: flush() currently calls
this.inputStream.write(InterruptionStreamSentinel.flush()) without awaiting it,
so the sentinel can race with close(); change InterruptionStream.flush() to wait
for the write to complete before returning: inside flush() (after
ensureStreamsNotEnded()) await the write result—either await the promise
returned by this.inputStream.write if it returns one, or wrap the write in a
Promise that resolves on the write callback/'drain' event—so that the
InterruptionStreamSentinel.flush() is guaranteed to be delivered before close()
can run.

In `@agents/src/inference/interruption/types.ts`:
- Around line 1-2: This new file (types.ts) is missing the repository's required
SPDX headers; add the standard SPDX header block at the very top of the file
(above the import of Span) following the repo coding guidelines so the file
includes the copyright/identifier lines required by policy.

In `@agents/src/inference/interruption/utils.ts`:
- Line 1: This new module imports FRAME_DURATION_IN_S and
MIN_INTERRUPTION_DURATION_IN_S but is missing the required SPDX license header;
add the standard SPDX header comment as the first line(s) of the file (above the
import) so the file includes the repository's required license identifier before
any code or imports.

In `@agents/src/inference/interruption/ws_transport.test.ts`:
- Around line 1-3: The SPDX header in the new test file ws_transport.test.ts
currently shows 2024; update the header year to 2025 by replacing "2024 LiveKit,
Inc." with "2025 LiveKit, Inc." and ensure the SPDX-License-Identifier line
remains unchanged so the file uses the correct 2025 copyright header.

In `@agents/src/inference/interruption/ws_transport.ts`:
- Around line 1-10: Insert the required SPDX license header as a top-of-file
comment in ws_transport.ts (before the existing imports like Readable, Writable,
TransformStream, WebSocket) to comply with repo guidelines; ensure the header
matches the project's standard SPDX text (for example the exact
"SPDX-License-Identifier: <identifier>" line used across the repo) and place it
as the very first lines of the file without altering the existing imports or
logic.
- Around line 163-205: In processResponses, the inner fallback mistakenly
retries JSON.parse(line) (same as the first attempt) instead of attempting to
parse the entire buffer; update the catch block inside the lines loop to try
parsing buffer (or another intended fallback source) rather than re-parsing line
so that incomplete newline-delimited data can be recovered; specifically modify
processResponses' inner catch (the second JSON.parse currently using line) to
parse the complete buffer variable and call handleMessage(message) on success,
and keep the existing outer buffer-as-complete-message logic intact to avoid
double-processing.
- Around line 62-93: connectWebSocket currently leaves the WebSocket (ws) and
its streams open when the connection promise rejects on timeout or error,
causing resource leaks; update the connection Promise so that on the timeout
handler and the 'error' handler you explicitly close/terminate the ws (call
ws.close() or ws.terminate()) and remove listeners, and also abort/close the
streams returned by webSocketToStream (call readable.cancel() and
writable.abort() or equivalent) before rejecting; ensure the same cleanup runs
for both the timeout and error paths and that normal 'open' success still clears
the timeout and leaves streams intact.
- Around line 360-389: The transform handler on the TransformStream currently
only checks state.overlapSpeechStartedAt before sending audio, but that
timestamp is never cleared and can allow audio to stream after overlap speech
ends; update the transform(chunk, controller) logic (in ws_transport.ts) to
require the overlapSpeechStarted boolean as well (e.g., check
state.overlapSpeechStarted && state.overlapSpeechStartedAt) and bail out if
overlapSpeechStarted is false, before calling sendAudioData; ensure this gate is
applied for Int16Array chunks so sendAudioData is only invoked when both the
flag and the timestamp indicate overlap speech is active.

In `@agents/src/stream/stream_channel.ts`:
- Around line 23-25: The abort implementation doesn't set isClosed, so the
closed getter remains false after abort; update the abort method (the abort:
(error: E) => { ... } block that calls writer.abort(error)) to set isClosed =
true after successful writer.abort and mirror the error-handling pattern used by
close() (catch the TypeError-like double-abort case, set isClosed = true and
swallow that specific error, otherwise rethrow) so closed accurately reflects
the stream state.
🧹 Nitpick comments (1)
agents/src/inference/interruption/AdaptiveInterruptionDetector.ts (1)

14-21: Avoid exposing options that are ignored.

sampleRate, minFrames, and useProxy are derived internally, but AdaptiveInterruptionDetectorOptions currently allows callers to pass them with no effect. Consider narrowing the public options type to avoid no‑op inputs.

♻️ Suggested type narrowing
-export type AdaptiveInterruptionDetectorOptions = Partial<InterruptionOptions>;
+export type AdaptiveInterruptionDetectorOptions = Partial<
+  Omit<InterruptionOptions, 'minFrames' | 'sampleRate' | 'useProxy'>
+>;
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f3c7430 and a580d7e.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (16)
  • .changeset/config.json
  • agents/package.json
  • agents/src/index.ts
  • agents/src/inference/interruption/AdaptiveInterruptionDetector.ts
  • agents/src/inference/interruption/InterruptionCacheEntry.ts
  • agents/src/inference/interruption/InterruptionStream.ts
  • agents/src/inference/interruption/defaults.ts
  • agents/src/inference/interruption/errors.ts
  • agents/src/inference/interruption/http_transport.ts
  • agents/src/inference/interruption/types.ts
  • agents/src/inference/interruption/utils.test.ts
  • agents/src/inference/interruption/utils.ts
  • agents/src/inference/interruption/ws_transport.test.ts
  • agents/src/inference/interruption/ws_transport.ts
  • agents/src/stream/stream_channel.ts
  • agents/src/telemetry/trace_types.ts
🧰 Additional context used
📓 Path-based instructions (3)
**/*.{ts,tsx,js,jsx}

📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)

Add SPDX-FileCopyrightText and SPDX-License-Identifier headers to all newly added files with '// SPDX-FileCopyrightText: 2025 LiveKit, Inc.' and '// SPDX-License-Identifier: Apache-2.0'

Files:

  • agents/src/inference/interruption/ws_transport.ts
  • agents/src/index.ts
  • agents/src/inference/interruption/InterruptionCacheEntry.ts
  • agents/src/inference/interruption/utils.test.ts
  • agents/src/telemetry/trace_types.ts
  • agents/src/inference/interruption/errors.ts
  • agents/src/inference/interruption/ws_transport.test.ts
  • agents/src/inference/interruption/types.ts
  • agents/src/stream/stream_channel.ts
  • agents/src/inference/interruption/defaults.ts
  • agents/src/inference/interruption/http_transport.ts
  • agents/src/inference/interruption/utils.ts
  • agents/src/inference/interruption/AdaptiveInterruptionDetector.ts
  • agents/src/inference/interruption/InterruptionStream.ts
**/*.{ts,tsx}?(test|example|spec)

📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)

When testing inference LLM, always use full model names from agents/src/inference/models.ts (e.g., 'openai/gpt-4o-mini' instead of 'gpt-4o-mini')

Files:

  • agents/src/inference/interruption/ws_transport.ts
  • agents/src/index.ts
  • agents/src/inference/interruption/InterruptionCacheEntry.ts
  • agents/src/inference/interruption/utils.test.ts
  • agents/src/telemetry/trace_types.ts
  • agents/src/inference/interruption/errors.ts
  • agents/src/inference/interruption/ws_transport.test.ts
  • agents/src/inference/interruption/types.ts
  • agents/src/stream/stream_channel.ts
  • agents/src/inference/interruption/defaults.ts
  • agents/src/inference/interruption/http_transport.ts
  • agents/src/inference/interruption/utils.ts
  • agents/src/inference/interruption/AdaptiveInterruptionDetector.ts
  • agents/src/inference/interruption/InterruptionStream.ts
**/*.{ts,tsx}?(test|example)

📄 CodeRabbit inference engine (.cursor/rules/agent-core.mdc)

Initialize logger before using any LLM functionality with initializeLogger({ pretty: true }) from '@livekit/agents'

Files:

  • agents/src/inference/interruption/ws_transport.ts
  • agents/src/index.ts
  • agents/src/inference/interruption/InterruptionCacheEntry.ts
  • agents/src/inference/interruption/utils.test.ts
  • agents/src/telemetry/trace_types.ts
  • agents/src/inference/interruption/errors.ts
  • agents/src/inference/interruption/ws_transport.test.ts
  • agents/src/inference/interruption/types.ts
  • agents/src/stream/stream_channel.ts
  • agents/src/inference/interruption/defaults.ts
  • agents/src/inference/interruption/http_transport.ts
  • agents/src/inference/interruption/utils.ts
  • agents/src/inference/interruption/AdaptiveInterruptionDetector.ts
  • agents/src/inference/interruption/InterruptionStream.ts
🧠 Learnings (1)
📚 Learning: 2026-01-16T14:33:39.551Z
Learnt from: CR
Repo: livekit/agents-js PR: 0
File: .cursor/rules/agent-core.mdc:0-0
Timestamp: 2026-01-16T14:33:39.551Z
Learning: Applies to **/{examples,test}/**/*.test.ts : Include both basic streaming and tool calling tests to verify full LLM functionality

Applied to files:

  • agents/src/inference/interruption/utils.test.ts
  • agents/src/inference/interruption/ws_transport.test.ts
🧬 Code graph analysis (9)
agents/src/inference/interruption/ws_transport.ts (5)
agents/src/inference/interruption/utils.ts (1)
  • BoundedCache (7-106)
agents/src/inference/interruption/InterruptionCacheEntry.ts (1)
  • InterruptionCacheEntry (7-44)
agents/src/inference/utils.ts (1)
  • createAccessToken (10-19)
agents/src/inference/interruption/types.ts (1)
  • InterruptionEvent (14-25)
agents/src/inference/interruption/defaults.ts (1)
  • intervalForRetry (25-34)
agents/src/inference/interruption/InterruptionCacheEntry.ts (1)
agents/src/inference/interruption/utils.ts (1)
  • estimateProbability (112-122)
agents/src/inference/interruption/utils.test.ts (1)
agents/src/inference/interruption/utils.ts (1)
  • slidingWindowMinMax (124-137)
agents/src/inference/interruption/ws_transport.test.ts (1)
agents/src/inference/interruption/ws_transport.ts (1)
  • webSocketToStream (46-57)
agents/src/inference/interruption/types.ts (1)
agents/src/inference/interruption/InterruptionStream.ts (7)
  • ApiConnectOptions (31-31)
  • AgentSpeechStarted (30-30)
  • AgentSpeechEnded (29-29)
  • OverlapSpeechStarted (35-35)
  • OverlapSpeechEnded (34-34)
  • Flush (32-32)
  • InterruptionSentinel (33-33)
agents/src/inference/interruption/defaults.ts (1)
agents/src/inference/interruption/types.ts (2)
  • ApiConnectOptions (48-52)
  • InterruptionOptions (30-43)
agents/src/inference/interruption/utils.ts (1)
agents/src/inference/interruption/defaults.ts (2)
  • MIN_INTERRUPTION_DURATION_IN_S (4-4)
  • FRAME_DURATION_IN_S (12-12)
agents/src/inference/interruption/AdaptiveInterruptionDetector.ts (4)
agents/src/inference/interruption/types.ts (2)
  • InterruptionEvent (14-25)
  • InterruptionOptions (30-43)
agents/src/inference/interruption/errors.ts (1)
  • InterruptionDetectionError (4-22)
agents/src/inference/interruption/InterruptionStream.ts (1)
  • stream (303-305)
agents/src/inference/interruption/defaults.ts (4)
  • interruptionOptionDefaults (36-49)
  • DEFAULT_BASE_URL (13-13)
  • SAMPLE_RATE (10-10)
  • FRAMES_PER_SECOND (11-11)
agents/src/inference/interruption/InterruptionStream.ts (5)
agents/src/inference/interruption/types.ts (9)
  • AgentSpeechStarted (56-58)
  • AgentSpeechEnded (60-62)
  • OverlapSpeechStarted (64-68)
  • OverlapSpeechEnded (70-72)
  • Flush (74-76)
  • InterruptionSentinel (81-86)
  • InterruptionEvent (14-25)
  • InterruptionOptions (30-43)
  • ApiConnectOptions (48-52)
agents/src/inference/interruption/InterruptionCacheEntry.ts (1)
  • InterruptionCacheEntry (7-44)
agents/src/stream/stream_channel.ts (2)
  • StreamChannel (7-13)
  • createStreamChannel (15-44)
agents/src/inference/interruption/defaults.ts (2)
  • apiConnectDefaults (15-19)
  • FRAMES_PER_SECOND (11-11)
agents/src/inference/interruption/utils.ts (1)
  • BoundedCache (7-106)
🔇 Additional comments (37)
agents/src/stream/stream_channel.ts (1)

7-13: LGTM on the interface changes.

The generic error type E extends Error = Error provides good type safety while maintaining backward compatibility with the default. The abort method signature is appropriate for stream cancellation scenarios.

.changeset/config.json (1)

11-11: No action needed.

agents/src/index.ts (1)

39-39: LGTM for the public re-export.

agents/src/telemetry/trace_types.ts (1)

55-59: LGTM for new interruption attributes.

agents/package.json (1)

72-72: No action needed. The ofetch dependency (MIT license) is compatible with the project's Node targets (20, 24) and includes fallbacks for older Node versions. Dependency is ready for release.

agents/src/inference/interruption/utils.test.ts (4)

8-11: Short-array guard is covered well.


13-17: Window-size-1 behavior is validated clearly.


19-24: Sustained-probability window case is well tested.


26-30: Equal-length window scenario coverage looks good.

agents/src/inference/interruption/ws_transport.test.ts (6)

8-15: Server helper looks solid.


29-115: Readable-stream coverage is thorough.


117-173: Writable-stream cases are well covered.


176-207: Bidirectional echo test is a good integration check.


210-241: Abrupt close handling is validated well.


18-25: [Rewritten review comment]
[Exactly ONE classification tag]

agents/src/inference/interruption/InterruptionCacheEntry.ts (1)

7-43: Cache entry shape and probability helper are cleanly modeled.

agents/src/inference/interruption/utils.ts (2)

7-106: BoundedCache API is straightforward and usable.


112-137: Probability helpers are clear and match the intended semantics.

agents/src/inference/interruption/ws_transport.ts (4)

46-57: WebSocket stream bridge is clean and idiomatic.


212-306: Message handling logic is coherent and well structured.


308-336: Audio framing and header construction look correct.


338-359: Close/reconnect sequencing is sensible.

agents/src/inference/interruption/defaults.ts (2)

25-34: Retry helper is clear and matches the stated intent.


36-49: Defaults block is well organized and easy to reason about.

agents/src/inference/interruption/http_transport.ts (3)

10-34: Clear request/response models.

The HTTP option and response types are easy to follow and make the transport contract explicit.


98-175: Transport pipeline looks solid.

Pass‑through of non‑audio events, cache updates, and interruption emission flow are coherent.


41-72: Verify timebase consistency for the /bargein endpoint's created_at response.

The code sends performance.now() as the created_at query parameter to the server (line 45), but the response's created_at value is used to populate the returned object (line 69). If the server generates its own timestamp instead of echoing the client's value, the duration calculations using that timestamp later (particularly in createHttpTransport at line 136: totalDurationInS: (performance.now() - createdAt) / 1000) will be incorrect.

Required action: Confirm whether the /bargein endpoint echoes the client's created_at parameter or generates its own server-side timestamp. If it does not echo, use the original client timestamp for all duration calculations as suggested in the review.

agents/src/inference/interruption/AdaptiveInterruptionDetector.ts (4)

30-111: Initialization and validation look good.

Default merging, credential checks, and the max‑duration guard are well handled.


113-147: Accessors and error emission are clean.

Simple getters and emitError keep the public surface predictable.


148-165: Stream tracking is straightforward.

Creating and removing streams via the Set is clear and matches the intended lifecycle.


170-188: Option propagation is handled cleanly.

Updating local options then propagating to active streams is clear and consistent.

agents/src/inference/interruption/InterruptionStream.ts (6)

27-61: Sentinel helpers are clear and compact.

The factory methods make sentinel construction explicit and easy to use.


63-72: Span attribute updates look consistent.

Attributes are set with clear trace keys and normalized values.


96-107: Constructor wiring is solid.

Defaults, API options, and stream setup are clearly initialized.


129-284: Pipeline composition is cohesive.

The two‑stage transform (audio → transport) reads cleanly and keeps transport concerns isolated.


303-325: Resampling and sentinel handling are clean.

The resampler path and sentinel short‑circuiting are handled thoughtfully.


346-387: Windowing + mix‑down logic looks solid.

The buffer shift and channel averaging are clear and safe.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +360 to +389
const transport = new TransformStream<Int16Array | InterruptionEvent, InterruptionEvent>(
{
start(controller) {
outputController = controller;
},

async transform(chunk, controller) {
// Pass through InterruptionEvents unchanged
if (!(chunk instanceof Int16Array)) {
controller.enqueue(chunk);
return;
}

const state = getState();
if (!state.overlapSpeechStartedAt) return;

try {
await sendAudioData(chunk);
} catch (err) {
logger.error({ err }, 'Failed to send audio data over WebSocket');
}
},

async flush() {
await close();
},
},
{ highWaterMark: 2 },
{ highWaterMark: 2 },
);
Copy link

@coderabbitai coderabbitai bot Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the structure of the state and how these fields are managed
rg -A 5 -B 5 "overlapSpeechStarted" agents/src/inference/interruption/ws_transport.ts | head -100

Repository: livekit/agents-js

Length of output: 178


🏁 Script executed:

# Look for where state is initialized and modified
rg "overlapSpeechStartedAt\s*=" agents/src/inference/interruption/ -A 2 -B 2

Repository: livekit/agents-js

Length of output: 570


🏁 Script executed:

# Check for any reset or clear operations
rg "overlapSpeechStarted\s*=" agents/src/inference/interruption/ -A 2 -B 2

Repository: livekit/agents-js

Length of output: 3065


🏁 Script executed:

# Search for where overlapSpeechStartedAt is cleared or reset
rg "overlapSpeechStartedAt\s*=" agents/src/inference/interruption/ -A 1 -B 1

Repository: livekit/agents-js

Length of output: 372


🏁 Script executed:

# Look at the complete flow in InterruptionStream.ts where overlap speech started is handled
rg -B 10 -A 10 "overlap-speech-started" agents/src/inference/interruption/InterruptionStream.ts

Repository: livekit/agents-js

Length of output: 2390


🏁 Script executed:

# Check the state definition in ws_transport.ts to see what fields are tracked
rg -B 5 -A 15 "getState\|setState" agents/src/inference/interruption/ws_transport.ts | head -80

Repository: livekit/agents-js

Length of output: 43


🏁 Script executed:

# Search for overlap-speech-ended and overlap-speech-not-interrupted to see if they clear the timestamp
rg "overlap-speech-(ended|not-interrupted)" agents/src/inference/interruption/InterruptionStream.ts -A 5 -B 2

Repository: livekit/agents-js

Length of output: 689


🏁 Script executed:

# Look at how state flows from InterruptionStream to ws_transport - search for state definition/export
rg "this\.overlapSpeechStartedAt" agents/src/inference/interruption/InterruptionStream.ts -B 3 -A 3

Repository: livekit/agents-js

Length of output: 1029


🏁 Script executed:

# Check ws_transport.ts for how getState is defined and what it returns
rg -B 20 "getState()" agents/src/inference/interruption/ws_transport.ts | head -60

Repository: livekit/agents-js

Length of output: 1653


🏁 Script executed:

# Get the full overlap-speech-ended handler to see what happens after the condition
rg -A 25 "overlap-speech-ended" agents/src/inference/interruption/InterruptionStream.ts

Repository: livekit/agents-js

Length of output: 2417


Add explicit gate on overlapSpeechStarted flag in addition to timestamp check.

The transform function only checks overlapSpeechStartedAt when deciding whether to send audio. Since this timestamp is never cleared after overlap speech ends (only set once on 'overlap-speech-started'), it persists while overlapSpeechStarted may be false. This allows audio to stream unintentionally.

Suggested fix
-        if (!state.overlapSpeechStartedAt) return;
+        if (!state.overlapSpeechStarted || !state.overlapSpeechStartedAt) return;
🤖 Prompt for AI Agents
In `@agents/src/inference/interruption/ws_transport.ts` around lines 360 - 389,
The transform handler on the TransformStream currently only checks
state.overlapSpeechStartedAt before sending audio, but that timestamp is never
cleared and can allow audio to stream after overlap speech ends; update the
transform(chunk, controller) logic (in ws_transport.ts) to require the
overlapSpeechStarted boolean as well (e.g., check state.overlapSpeechStarted &&
state.overlapSpeechStartedAt) and bail out if overlapSpeechStarted is false,
before calling sendAudioData; ensure this gate is applied for Int16Array chunks
so sendAudioData is only invoked when both the flag and the timestamp indicate
overlap speech is active.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not applicable as audio is only written to the transport when overlapSpeechStarted is true.
The ws_transport's overlapSpeechStartedAt check is a secondary guard, not the primary control.
The flow is: InterruptionStream gates audio → sends to transport → transport has additional guard

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check how InterruptionStream sends audio to the transport and where it gates based on overlapSpeechStarted
rg -A 10 -B 5 "overlapSpeechStarted" agents/src/inference/interruption/InterruptionStream.ts | head -150

Length of output: 5830


🏁 Script executed:

#!/bin/bash
# Look for where the transport is created and how data flows to it
rg -A 15 "createWsTransport|createHttpTransport" agents/src/inference/interruption/InterruptionStream.ts

Length of output: 1460


🏁 Script executed:

#!/bin/bash
# Check for any pipeline or pipeThrough calls that show the data flow
rg -A 5 -B 5 "pipeThrough|pipeTo" agents/src/inference/interruption/InterruptionStream.ts

Length of output: 399


@lukasIO, you're absolutely right, and thank you for the clarification! I can see from the code that the audioTransformer in InterruptionStream.ts only enqueues audio slices when overlapSpeechStarted is true:

if (
  accumulatedSamples >= Math.floor(this.options.detectionIntervalInS * this.options.sampleRate) &&
  overlapSpeechStarted
) {
  const audioSlice = inferenceS16Data.slice(0, startIdx);
  controller.enqueue(audioSlice);
}

Since the pipeline flows audioTransformer -> transport, the transport only receives Int16Array chunks that have already passed the overlapSpeechStarted gate. The overlapSpeechStartedAt check in ws_transport is indeed a secondary guard. My original suggestion was based on an incomplete understanding of the pipeline architecture. I apologize for the noise!

@lukasIO lukasIO requested a review from toubatbrian January 22, 2026 10:23
@lukasIO lukasIO changed the title wip http + ws transport for barge in Add AdaptiveInterruptionDetector Jan 22, 2026
await writer.write(new TextEncoder().encode(sessionCreateMsg));

// Start reading responses
readerTask = processResponses(conn.readable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wrap this with Task.from and control cancellation logic inside processResponse? Otherwise it could become a dangling promise.

Copy link
Contributor Author

@lukasIO lukasIO Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a test case already that ensures closing the websocket closes the streams also. Do you think we need an additional abort signal beyond the ability to close the websocket?

Comment on lines +170 to +171
while (true) {
const { done, value } = await reader.read();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as ^, we should use abortController explicitly and do await Promise.race([reader.read(), waitForAbort(signal)]); to unblock dangling await

Comment on lines +183 to +187
const message: WsMessage = JSON.parse(line);
handleMessage(message);
} catch {
logger.warn({ line }, 'Failed to parse WebSocket message');
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be safer and easier to debug if we write a zod schema for WsMessage and to a safeParseAsync here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this would be a protobuf as part of the agents protocol, we should keep this in mind for a v2 of the endpoint

// Also try parsing buffer as complete message (for non-newline-delimited)
if (buffer.trim()) {
try {
const message: WsMessage = JSON.parse(buffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^

Comment on lines +215 to +224
case MSG_INTERRUPTION_DETECTED: {
const createdAt = message.created_at ?? 0;
if (state.overlapSpeechStarted && state.overlapSpeechStartedAt !== undefined) {
const existing = state.cache.get(createdAt);
const entry = new InterruptionCacheEntry({
createdAt,
speechInput: existing?.speechInput,
totalDurationInS: (performance.now() - createdAt) / 1000,
probabilities: message.probabilities,
isInterruption: true,
Copy link
Contributor

@toubatbrian toubatbrian Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same ^, usually I found explicitly defining ClientMessage and ServerMessage zod schema, and manually parsed the raw string to JSON makes it easier to debug & iterate on Ws-based STT/TTS API. This way we don't need to do additional assertions on optional fields.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writer.releaseLock();
writer = null;
}
ws?.close(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is 1000? Maybe add a comment? Also just curious does closing ws automatically cleanup resources of writable / readable stream attached to it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answered this above, 1000 is the clean close status for websockets. I'll add a comment

Comment on lines +382 to +383
{ highWaterMark: 2 },
{ highWaterMark: 2 },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we set high water mark to be 2?

I saw in IdentityTransform we make it to be max integer:

export class IdentityTransform<T> extends TransformStream<T, T> {
  constructor() {
    super(
      {
        transform: (chunk, controller) => controller.enqueue(chunk),
      },
      // By default the transfor stream will buffer only one chunk at a time.
      // In order to follow the python agents channel.py, we set set the capaciy to be effectively infinite.
      { highWaterMark: Number.MAX_SAFE_INTEGER },
      { highWaterMark: Number.MAX_SAFE_INTEGER },
    );
  }
}

Copy link
Contributor Author

@lukasIO lukasIO Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting it to 2 instead of unbounded ensures we apply backpressure and don't try to overload the barge in server with endless parallel requests in case it doesn't keep up

@lukasIO
Copy link
Contributor Author

lukasIO commented Jan 27, 2026

I addressed the comments where applicable directly in #991, commit, that was easier given that there's a bunch of refactoring in that PR again, sorry for the mess!

@lukasIO lukasIO merged commit 07c5d71 into feat/barge-in Jan 27, 2026
2 checks passed
@lukasIO lukasIO deleted the lukas/barge-transport branch January 27, 2026 10:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants