From 51e579dc98acdb4913774ee462ab0c05db7fdb64 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 6 Jan 2026 21:43:38 -0800 Subject: [PATCH 01/10] initial implementation --- pkg/sip/signal_logger.go | 130 ++++++++++++++++++++++++++++++++++ pkg/sip/signal_logger_test.go | 0 2 files changed, 130 insertions(+) create mode 100644 pkg/sip/signal_logger.go create mode 100644 pkg/sip/signal_logger_test.go diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go new file mode 100644 index 00000000..cccaf8e5 --- /dev/null +++ b/pkg/sip/signal_logger.go @@ -0,0 +1,130 @@ +package sip + +import ( + "fmt" + "sync/atomic" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/protocol/logger" +) + +const ( + DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected. + DefaultDecayAlpha = float64(0.95) // 5% of new silence is added to the noise floor. + DefaultAttackAlpha = float64(0.999) // 0.1% of new signal is added to the noise floor. +) + +// Keeps an internal state of whether we're currently transmitting signal (voice or noise), or silence. +// This implements msdk.PCM16Writer to inspect decoded packet content. +// Used to log changes betweem those states. +type SignalLogger struct { + log logger.Logger + next msdk.PCM16Writer + direction string + isLastSignal int32 + decayAlpha float64 // Weight of previous noise floor when updating silence noise floor. + attackAlpha float64 // Weight of previous noise floor when updating signal noise floor. + signalMultiplier float64 // Threshold multiplier for signal to be detected. + noiseFloor float64 // Moveing average of noise floor. + framesProcessed uint64 +} + +type NewSignalLoggerOption func(*SignalLogger) + +func WithSignalMultiplier(signalMultiplier float64) NewSignalLoggerOption { + return func(s *SignalLogger) { + if signalMultiplier < 0 { + signalMultiplier = -signalMultiplier + } + if signalMultiplier > 1 { + s.signalMultiplier = signalMultiplier + } + } +} + +func WithDecayAlpha(alpha float64) NewSignalLoggerOption { + return func(s *SignalLogger) { + if alpha < 0 { + alpha = -alpha + } + if alpha > 1 { + alpha = 1 + } + s.decayAlpha = alpha + } +} + +func WithAttackAlpha(alpha float64) NewSignalLoggerOption { + return func(s *SignalLogger) { + if alpha < 0 { + alpha = -alpha + } + if alpha > 1 { + alpha = 1 + } + s.attackAlpha = alpha + } +} + +func NewSignalLogger(log logger.Logger, direction string, alpha float64, next msdk.PCM16Writer) *SignalLogger { + s := &SignalLogger{ + log: log, + direction: direction, + next: next, + isLastSignal: 0, + framesProcessed: 0, + signalMultiplier: DefaultSignalMultiplier, + decayAlpha: DefaultDecayAlpha, + attackAlpha: DefaultAttackAlpha, + noiseFloor: 0.0, + } + return s +} + +func (s *SignalLogger) String() string { + return fmt.Sprintf("SignalLogger(%s) -> %s", s.direction, s.next.String()) +} + +func (s *SignalLogger) SampleRate() int { + return s.next.SampleRate() +} + +// Calculates the mean absolute deviation of the frame. +func (s *SignalLogger) MeanAbsoluteDeviation(sample msdk.PCM16Sample) float64 { + if len(sample) == 0 { + return 0 + } + var totalAbs int64 + for _, v := range sample { + if v < 0 { + totalAbs += int64(-v) + } else { + totalAbs += int64(v) + } + } + return float64(totalAbs) / float64(len(sample)) +} + +// Updates the noise floor using moving average. +func (s *SignalLogger) UpdateNoiseFloor(currentEnergy float64, isSignal int32) { + alpha := s.attackAlpha + if isSignal == 0 { + alpha = s.decayAlpha + } + s.noiseFloor = (alpha * s.noiseFloor) + ((1 - alpha) * currentEnergy) +} + +func (s *SignalLogger) WriteSample(sample msdk.PCM16Sample) error { + currentEnergy := s.MeanAbsoluteDeviation(sample) + isSignal := int32(0) + if currentEnergy > (s.noiseFloor * s.signalMultiplier) { + isSignal = 1 + } + s.UpdateNoiseFloor(currentEnergy, isSignal) + atomic.AddUint64(&s.framesProcessed, 1) + lastSignal := atomic.SwapInt32(&s.isLastSignal, isSignal) + if lastSignal != isSignal && atomic.LoadUint64(&s.framesProcessed) > 10 { + s.log.Infow("signal changed", "direction", s.direction, "signal", isSignal) + } + return s.next.WriteSample(sample) +} diff --git a/pkg/sip/signal_logger_test.go b/pkg/sip/signal_logger_test.go new file mode 100644 index 00000000..e69de29b From 4583f8fac2f4060e8e17250ec96e045db6d11d57 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 7 Jan 2026 00:14:00 -0800 Subject: [PATCH 02/10] Tests are almost robust. Noise floor is still very adaptive and can fail 1/20~ times --- pkg/sip/signal_logger.go | 128 +++++++++++----- pkg/sip/signal_logger_test.go | 268 ++++++++++++++++++++++++++++++++++ 2 files changed, 360 insertions(+), 36 deletions(-) diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index cccaf8e5..75ba0317 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -1,3 +1,17 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package sip import ( @@ -9,9 +23,13 @@ import ( ) const ( - DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected. - DefaultDecayAlpha = float64(0.95) // 5% of new silence is added to the noise floor. - DefaultAttackAlpha = float64(0.999) // 0.1% of new signal is added to the noise floor. + DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected. + DefaultDecayAlpha = float64(0.95) // 5% of new silence is added to the noise floor. + DefaultAttackAlpha = float64(0.999) // 0.1% of new signal is added to the noise floor. + DefaultNoiseFloorMin = float64(20) // Minimum noise floor. Useful when mic changes to avoid false positives. + DefaultNoiseFloorMax = float64(300) // Maximum noise floor. Would always detect signal in very noisy environments, but that's okay. + AlphaMin = float64(0.1) // Minimum alpha. + AlphaMax = float64(0.99999) // Maximum alpha. ) // Keeps an internal state of whether we're currently transmitting signal (voice or noise), or silence. @@ -26,59 +44,85 @@ type SignalLogger struct { attackAlpha float64 // Weight of previous noise floor when updating signal noise floor. signalMultiplier float64 // Threshold multiplier for signal to be detected. noiseFloor float64 // Moveing average of noise floor. + noiseFloorMin float64 // Minimum noise floor. + noiseFloorMax float64 // Maximum noise floor. framesProcessed uint64 + stateChanges uint64 } -type NewSignalLoggerOption func(*SignalLogger) +type SignalLoggerOption func(*SignalLogger) error -func WithSignalMultiplier(signalMultiplier float64) NewSignalLoggerOption { - return func(s *SignalLogger) { - if signalMultiplier < 0 { - signalMultiplier = -signalMultiplier - } - if signalMultiplier > 1 { - s.signalMultiplier = signalMultiplier +func WithSignalMultiplier(signalMultiplier float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if signalMultiplier <= 1 { + return fmt.Errorf("signal multiplier must be greater than 1") } + s.signalMultiplier = signalMultiplier + return nil } } -func WithDecayAlpha(alpha float64) NewSignalLoggerOption { - return func(s *SignalLogger) { - if alpha < 0 { - alpha = -alpha - } - if alpha > 1 { - alpha = 1 +func WithDecayAlpha(alpha float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if alpha < AlphaMin || alpha > AlphaMax { + return fmt.Errorf("decay alpha must be between %f and %f", AlphaMin, AlphaMax) } s.decayAlpha = alpha + return nil } } -func WithAttackAlpha(alpha float64) NewSignalLoggerOption { - return func(s *SignalLogger) { - if alpha < 0 { - alpha = -alpha - } - if alpha > 1 { - alpha = 1 +func WithAttackAlpha(alpha float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if alpha < AlphaMin || alpha > AlphaMax { + return fmt.Errorf("attack alpha must be between %f and %f", AlphaMin, AlphaMax) } s.attackAlpha = alpha + return nil + } +} + +func WithNoiseFloorMax(noiseFloorMax float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if noiseFloorMax <= 0 { + return fmt.Errorf("noise floor max must be greater than 0") + } + s.noiseFloorMax = noiseFloorMax + return nil } } -func NewSignalLogger(log logger.Logger, direction string, alpha float64, next msdk.PCM16Writer) *SignalLogger { +func WithNoiseFloorMin(noiseFloorMin float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if noiseFloorMin < 0 { + return fmt.Errorf("noise floor min must be non-negative") + } + s.noiseFloorMin = noiseFloorMin + return nil + } +} +func NewSignalLogger(log logger.Logger, direction string, next msdk.PCM16Writer, options ...SignalLoggerOption) (*SignalLogger, error) { s := &SignalLogger{ log: log, direction: direction, next: next, isLastSignal: 0, framesProcessed: 0, + stateChanges: 0, signalMultiplier: DefaultSignalMultiplier, decayAlpha: DefaultDecayAlpha, attackAlpha: DefaultAttackAlpha, + noiseFloorMax: DefaultNoiseFloorMax, + noiseFloorMin: DefaultNoiseFloorMin, noiseFloor: 0.0, } - return s + for _, option := range options { + if err := option(s); err != nil { + return nil, err + } + } + s.noiseFloor = (s.noiseFloorMax + s.noiseFloorMin) / 2 + return s, nil } func (s *SignalLogger) String() string { @@ -106,25 +150,37 @@ func (s *SignalLogger) MeanAbsoluteDeviation(sample msdk.PCM16Sample) float64 { } // Updates the noise floor using moving average. -func (s *SignalLogger) UpdateNoiseFloor(currentEnergy float64, isSignal int32) { - alpha := s.attackAlpha - if isSignal == 0 { - alpha = s.decayAlpha +func (s *SignalLogger) updateNoiseFloor(currentEnergy float64, isSignal int32) { + if s.noiseFloor == 0 { + s.noiseFloor = currentEnergy + } else { + alpha := s.decayAlpha + if isSignal != 0 { + alpha = s.attackAlpha + } + s.noiseFloor = (alpha * s.noiseFloor) + ((1 - alpha) * currentEnergy) } - s.noiseFloor = (alpha * s.noiseFloor) + ((1 - alpha) * currentEnergy) + s.noiseFloor = min(s.noiseFloor, s.noiseFloorMax) + s.noiseFloor = max(s.noiseFloor, s.noiseFloorMin) } func (s *SignalLogger) WriteSample(sample msdk.PCM16Sample) error { currentEnergy := s.MeanAbsoluteDeviation(sample) + lastSignal := atomic.LoadInt32(&s.isLastSignal) + signalMultiplier := s.signalMultiplier + if lastSignal == 1 { + signalMultiplier *= 0.9 // Reduce signal multiplier when last signal was signal, to avoid flip-flopping. + } isSignal := int32(0) - if currentEnergy > (s.noiseFloor * s.signalMultiplier) { + if currentEnergy > (s.noiseFloor * signalMultiplier) { isSignal = 1 } - s.UpdateNoiseFloor(currentEnergy, isSignal) + s.updateNoiseFloor(currentEnergy, isSignal) atomic.AddUint64(&s.framesProcessed, 1) - lastSignal := atomic.SwapInt32(&s.isLastSignal, isSignal) + lastSignal = atomic.SwapInt32(&s.isLastSignal, isSignal) if lastSignal != isSignal && atomic.LoadUint64(&s.framesProcessed) > 10 { - s.log.Infow("signal changed", "direction", s.direction, "signal", isSignal) + stateChanges := atomic.AddUint64(&s.stateChanges, 1) + s.log.Infow("signal changed", "direction", s.direction, "signal", isSignal, "stateChanges", stateChanges) } return s.next.WriteSample(sample) } diff --git a/pkg/sip/signal_logger_test.go b/pkg/sip/signal_logger_test.go index e69de29b..472ff591 100644 --- a/pkg/sip/signal_logger_test.go +++ b/pkg/sip/signal_logger_test.go @@ -0,0 +1,268 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +import ( + "fmt" + "math/rand/v2" + "sync/atomic" + "testing" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/protocol/logger" + "github.com/stretchr/testify/require" +) + +// mockPCM16Writer is a simple mock implementation of PCM16Writer for testing +type mockPCM16Writer struct { + sampleRate int + samples []msdk.PCM16Sample + closed atomic.Bool +} + +func newMockPCM16Writer(sampleRate int) *mockPCM16Writer { + return &mockPCM16Writer{ + sampleRate: sampleRate, + samples: make([]msdk.PCM16Sample, 0), + } +} + +func (m *mockPCM16Writer) String() string { + return "mockPCM16Writer" +} + +func (m *mockPCM16Writer) SampleRate() int { + return m.sampleRate +} + +func (m *mockPCM16Writer) Close() error { + m.closed.Store(true) + return nil +} + +func (m *mockPCM16Writer) WriteSample(sample msdk.PCM16Sample) error { + m.samples = append(m.samples, sample) + return nil +} + +func TestSignalLogger_initialization(t *testing.T) { + log := logger.GetLogger() + next := newMockPCM16Writer(48000) + + t.Run("default initialization", func(t *testing.T) { + sl, err := NewSignalLogger(log, "incoming", next) + require.NoError(t, err) + require.NotNil(t, sl) + require.Equal(t, DefaultSignalMultiplier, sl.signalMultiplier) + require.Equal(t, DefaultDecayAlpha, sl.decayAlpha) + require.Equal(t, DefaultAttackAlpha, sl.attackAlpha) + require.Equal(t, DefaultNoiseFloorMax, sl.noiseFloorMax) + require.Equal(t, DefaultNoiseFloorMin, sl.noiseFloorMin) + }) + + t.Run("with valid options", func(t *testing.T) { + sl, err := NewSignalLogger(log, "incoming", next, WithSignalMultiplier(3.0), WithDecayAlpha(0.9), WithAttackAlpha(0.99), WithNoiseFloorMax(1000), WithNoiseFloorMin(100)) + require.NoError(t, err) + require.NotNil(t, sl) + require.Equal(t, 3.0, sl.signalMultiplier) + require.Equal(t, 0.9, sl.decayAlpha) + require.Equal(t, 0.99, sl.attackAlpha) + require.Equal(t, 1000.0, sl.noiseFloorMax) + require.Equal(t, 100.0, sl.noiseFloorMin) + }) + + t.Run("with invalid options", func(t *testing.T) { + _, err := NewSignalLogger(log, "incoming", next, WithSignalMultiplier(0.5)) + require.Error(t, err) + require.Contains(t, err.Error(), "signal multiplier must be greater than 1") + _, err = NewSignalLogger(log, "incoming", next, WithDecayAlpha(0.05)) + require.Error(t, err) + require.Contains(t, err.Error(), "decay alpha must be between") + _, err = NewSignalLogger(log, "incoming", next, WithAttackAlpha(1.0)) + require.Error(t, err) + require.Contains(t, err.Error(), "attack alpha must be between") + _, err = NewSignalLogger(log, "incoming", next, WithNoiseFloorMax(0)) + require.Error(t, err) + require.Contains(t, err.Error(), "noise floor max must be greater than 0") + _, err = NewSignalLogger(log, "incoming", next, WithNoiseFloorMin(-1)) + require.Error(t, err) + require.Contains(t, err.Error(), "noise floor min must be non-negative") + }) +} + +func TestSignalLogger_MeanAbsoluteDeviation(t *testing.T) { + log := logger.GetLogger() + next := newMockPCM16Writer(48000) + sl, err := NewSignalLogger(log, "incoming", next) + require.NoError(t, err) + + tests := []struct { + name string + sample msdk.PCM16Sample + expected float64 + }{ + { + name: "empty sample", + sample: msdk.PCM16Sample{}, + expected: 0.0, + }, + { + name: "all zeros", + sample: msdk.PCM16Sample{0, 0, 0, 0}, + expected: 0.0, + }, + { + name: "all positive", + sample: msdk.PCM16Sample{100, 200, 300, 400}, + expected: 250.0, + }, + { + name: "all negative", + sample: msdk.PCM16Sample{-100, -200, -300, -400}, + expected: 250.0, + }, + { + name: "mixed positive and negative", + sample: msdk.PCM16Sample{-100, 200, -300, 400}, + expected: 250.0, + }, + { + name: "single value", + sample: msdk.PCM16Sample{1000}, + expected: 1000.0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := sl.MeanAbsoluteDeviation(tt.sample) + require.InDelta(t, tt.expected, result, 0.01) + }) + } +} + +func TestSignalLogger_WriteSample(t *testing.T) { + log := logger.GetLogger() + + createFrame := func(size int, amplitude int16) msdk.PCM16Sample { + frame := make(msdk.PCM16Sample, size) + for i := range frame { + frame[i] = amplitude + } + return frame + } + + silenceFrames := make([]msdk.PCM16Sample, 100) + for i := range silenceFrames { + amplitude := int16(rand.Uint32()) % 50 + silenceFrames[i] = createFrame(480, amplitude) + } + + signalFrames := make([]msdk.PCM16Sample, 100) + for i := range signalFrames { + amplitude := (int16(rand.Uint32()) % 1000) + if amplitude < 0 { + amplitude -= int16(DefaultNoiseFloorMax) + } else { + amplitude += int16(DefaultNoiseFloorMax) + } + signalFrames[i] = createFrame(480, amplitude) + } + + writer := func(t *testing.T, sl *SignalLogger, array []msdk.PCM16Sample, count int) error { + for i := 0; i < count; i++ { + randIndex := rand.Uint() % uint(len(array)) + if err := sl.WriteSample(array[randIndex]); err != nil { + return err + } + t.Logf("%d written sample %d/%d: amplitude %d, noise floor %f, state changes %d, last signal %d\n", sl.framesProcessed, randIndex, len(array), array[randIndex][0], sl.noiseFloor, sl.stateChanges, sl.isLastSignal) + } + return nil + } + + newTestLogger := func(t *testing.T, opts ...SignalLoggerOption) (*SignalLogger, *mockPCM16Writer) { + next := newMockPCM16Writer(48000) + sl, err := NewSignalLogger(log, "incoming", next, opts...) + require.NoError(t, err) + return sl, next + } + + testTransition := func(t *testing.T, first, second []msdk.PCM16Sample, firstCount, secondCount int, opts ...SignalLoggerOption) *SignalLogger { + sl, _ := newTestLogger(t, opts...) + require.NoError(t, writer(t, sl, first, firstCount)) + require.Equal(t, uint64(firstCount), sl.framesProcessed) + require.NoError(t, writer(t, sl, second, secondCount)) + require.Equal(t, uint64(firstCount+secondCount), sl.framesProcessed) + return sl + } + + t.Run("not_printing_on_first_10_frames", func(t *testing.T) { + sl, _ := newTestLogger(t) + sl.noiseFloor = 40 + + require.NoError(t, writer(t, sl, silenceFrames, 5)) + require.Equal(t, int32(0), sl.isLastSignal) + require.NoError(t, writer(t, sl, signalFrames, 3)) + require.Equal(t, int32(1), sl.isLastSignal) + require.NoError(t, writer(t, sl, silenceFrames, 2)) + require.Equal(t, int32(0), sl.isLastSignal) + require.Equal(t, uint64(10), sl.framesProcessed) + require.Equal(t, uint64(0), sl.stateChanges) + }) + + t.Run("printing_on_11th_frame_transition", func(t *testing.T) { + t.Run("silence_to_signal", func(t *testing.T) { + sl := testTransition(t, silenceFrames, signalFrames, 10, 1) + require.Equal(t, uint64(1), sl.stateChanges) + }) + t.Run("signal_to_silence", func(t *testing.T) { + sl := testTransition(t, signalFrames, silenceFrames, 10, 1) + require.Equal(t, uint64(1), sl.stateChanges) + }) + }) + + t.Run("silence_to_silence_transitions", func(t *testing.T) { + sl := testTransition(t, silenceFrames, silenceFrames, 10, 0) // Not too long, it will eventually bring noise floor low enough + require.Equal(t, uint64(0), sl.stateChanges) + require.Equal(t, int32(0), sl.isLastSignal) + }) + + t.Run("signal_to_signal_transitions", func(t *testing.T) { + sl := testTransition(t, signalFrames, signalFrames, 10, 0, WithNoiseFloorMax(200)) + require.Equal(t, uint64(0), sl.stateChanges) + require.Equal(t, int32(1), sl.isLastSignal) + }) + + t.Run("silence_to_signal_transitions", func(t *testing.T) { + for i := 0; i < 100; i++ { + t.Run(fmt.Sprintf("silence_to_signal_transition_%d", i), func(t *testing.T) { + sl := testTransition(t, silenceFrames, signalFrames, 10, 1) + require.Equal(t, uint64(1), sl.stateChanges) + require.Equal(t, int32(1), sl.isLastSignal) + }) + } + }) + + t.Run("signal_to_silence_transitions", func(t *testing.T) { + for i := 0; i < 100; i++ { + t.Run(fmt.Sprintf("signal_to_silence_transition_%d", i), func(t *testing.T) { + sl := testTransition(t, signalFrames, silenceFrames, 10, 1) + require.Equal(t, uint64(1), sl.stateChanges) + require.Equal(t, int32(0), sl.isLastSignal) + }) + } + }) + +} From 6af8ce9187744e2c9175587da5c3203e92bf22f9 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 7 Jan 2026 00:17:12 -0800 Subject: [PATCH 03/10] self review --- pkg/sip/signal_logger.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index 75ba0317..865e92eb 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -26,7 +26,7 @@ const ( DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected. DefaultDecayAlpha = float64(0.95) // 5% of new silence is added to the noise floor. DefaultAttackAlpha = float64(0.999) // 0.1% of new signal is added to the noise floor. - DefaultNoiseFloorMin = float64(20) // Minimum noise floor. Useful when mic changes to avoid false positives. + DefaultNoiseFloorMin = float64(30) // Minimum noise floor. Useful when mic changes to avoid false positives. DefaultNoiseFloorMax = float64(300) // Maximum noise floor. Would always detect signal in very noisy environments, but that's okay. AlphaMin = float64(0.1) // Minimum alpha. AlphaMax = float64(0.99999) // Maximum alpha. @@ -151,15 +151,11 @@ func (s *SignalLogger) MeanAbsoluteDeviation(sample msdk.PCM16Sample) float64 { // Updates the noise floor using moving average. func (s *SignalLogger) updateNoiseFloor(currentEnergy float64, isSignal int32) { - if s.noiseFloor == 0 { - s.noiseFloor = currentEnergy - } else { - alpha := s.decayAlpha - if isSignal != 0 { - alpha = s.attackAlpha - } - s.noiseFloor = (alpha * s.noiseFloor) + ((1 - alpha) * currentEnergy) + alpha := s.decayAlpha + if isSignal != 0 { + alpha = s.attackAlpha } + s.noiseFloor = (alpha * s.noiseFloor) + ((1 - alpha) * currentEnergy) s.noiseFloor = min(s.noiseFloor, s.noiseFloorMax) s.noiseFloor = max(s.noiseFloor, s.noiseFloorMin) } From 2875d70212c2c42208b4e851833c0adcc9dc9c54 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 7 Jan 2026 00:25:23 -0800 Subject: [PATCH 04/10] plumbing part1 --- pkg/sip/inbound.go | 1 + pkg/sip/media_port.go | 17 +++++++++++++++++ pkg/sip/outbound.go | 1 + pkg/sip/signal_logger.go | 4 ++++ 4 files changed, 23 insertions(+) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 41fe9e5a..d6cd0b4d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -956,6 +956,7 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial, MediaTimeout: c.s.conf.MediaTimeout, EnableJitterBuffer: c.jitterBuf, + LogSignalChanges: true, // TODO: Replace with per-project setting Stats: &c.stats.Port, NoInputResample: !RoomResample, }, RoomSampleRate) diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 808caca6..ca411115 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -240,6 +240,7 @@ type MediaOptions struct { Stats *PortStats EnableJitterBuffer bool NoInputResample bool + LogSignalChanges bool } func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) { @@ -280,6 +281,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, mediaTimeout: mediaTimeout, timeoutResetTick: make(chan time.Duration, 1), jitterEnabled: opts.EnableJitterBuffer, + logSignalChanges: opts.LogSignalChanges, port: newUDPConn(log, conn), audioOut: msdk.NewSwitchWriter(sampleRate), audioIn: msdk.NewSwitchWriter(inSampleRate), @@ -313,6 +315,7 @@ type MediaPort struct { stats *PortStats dtmfAudioEnabled bool jitterEnabled bool + logSignalChanges bool mu sync.Mutex conf *MediaConf @@ -722,6 +725,12 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error { if p.stats != nil { audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples) } + if p.logSignalChanges { + audioOut, err = NewSignalLogger(p.log, "output", audioOut) + if err != nil { + return err + } + } if p.conf.Audio.DTMFType != 0 { p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate) @@ -755,6 +764,14 @@ func (p *MediaPort) setupInput() { if p.stats != nil { audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples) } + if p.logSignalChanges { + signalLogger, err := NewSignalLogger(p.log, "input", audioWriter) + if err != nil { + p.log.Errorw("failed to create signal logger", err) + } else { + audioWriter = signalLogger + } + } audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type) // Wrap the decoder with silence suppression handler to fill gaps during silence suppression audioHandler = newSilenceFiller(audioHandler, audioWriter, codecInfo.RTPClockRate, codecInfo.SampleRate, p.log) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 40d6dc32..1ca81369 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -142,6 +142,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi MediaTimeoutInitial: c.conf.MediaTimeoutInitial, MediaTimeout: c.conf.MediaTimeout, EnableJitterBuffer: call.jitterBuf, + LogSignalChanges: true, // TODO: Replace with per-project setting Stats: &call.stats.Port, NoInputResample: !RoomResample, }, RoomSampleRate) diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index 865e92eb..2c8d0977 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -133,6 +133,10 @@ func (s *SignalLogger) SampleRate() int { return s.next.SampleRate() } +func (s *SignalLogger) Close() error { + return s.next.Close() +} + // Calculates the mean absolute deviation of the frame. func (s *SignalLogger) MeanAbsoluteDeviation(sample msdk.PCM16Sample) float64 { if len(sample) == 0 { From b7ae9c8dfb14477e9674198fba668bf6adc4a23e Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 7 Jan 2026 01:09:40 -0800 Subject: [PATCH 05/10] Adding closing print --- pkg/sip/signal_logger.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index 2c8d0977..cb507a57 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -134,6 +134,10 @@ func (s *SignalLogger) SampleRate() int { } func (s *SignalLogger) Close() error { + stateChanges := atomic.AddUint64(&s.stateChanges, 1) + if stateChanges > 0 { + s.log.Infow("signal logger closing", "direction", s.direction, "stateChanges", stateChanges) + } return s.next.Close() } From 0d7080a293d00b6238029ecbc92066dea5fc21df Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 7 Jan 2026 11:00:19 -0800 Subject: [PATCH 06/10] Robust tests, adjusted max noise floor --- pkg/sip/signal_logger.go | 2 +- pkg/sip/signal_logger_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index cb507a57..fcb0dd80 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -27,7 +27,7 @@ const ( DefaultDecayAlpha = float64(0.95) // 5% of new silence is added to the noise floor. DefaultAttackAlpha = float64(0.999) // 0.1% of new signal is added to the noise floor. DefaultNoiseFloorMin = float64(30) // Minimum noise floor. Useful when mic changes to avoid false positives. - DefaultNoiseFloorMax = float64(300) // Maximum noise floor. Would always detect signal in very noisy environments, but that's okay. + DefaultNoiseFloorMax = float64(200) // Maximum noise floor. If an environment is noisy, we still consider that as signal. AlphaMin = float64(0.1) // Minimum alpha. AlphaMax = float64(0.99999) // Maximum alpha. ) diff --git a/pkg/sip/signal_logger_test.go b/pkg/sip/signal_logger_test.go index 472ff591..e82c71f7 100644 --- a/pkg/sip/signal_logger_test.go +++ b/pkg/sip/signal_logger_test.go @@ -173,10 +173,11 @@ func TestSignalLogger_WriteSample(t *testing.T) { signalFrames := make([]msdk.PCM16Sample, 100) for i := range signalFrames { amplitude := (int16(rand.Uint32()) % 1000) + // Adding (DefaultNoiseFloorMax * DefaultSignalMultiplier) prevents signal from being detected as silence when loud for too long. if amplitude < 0 { - amplitude -= int16(DefaultNoiseFloorMax) + amplitude -= int16(DefaultNoiseFloorMax * DefaultSignalMultiplier) } else { - amplitude += int16(DefaultNoiseFloorMax) + amplitude += int16(DefaultNoiseFloorMax * DefaultSignalMultiplier) } signalFrames[i] = createFrame(480, amplitude) } From a862f1e3ca70f058ea388446d62c46e4a67b5641 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 8 Jan 2026 21:44:39 -0800 Subject: [PATCH 07/10] change to name --- pkg/sip/signal_logger.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index fcb0dd80..64867541 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -38,7 +38,7 @@ const ( type SignalLogger struct { log logger.Logger next msdk.PCM16Writer - direction string + name string isLastSignal int32 decayAlpha float64 // Weight of previous noise floor when updating silence noise floor. attackAlpha float64 // Weight of previous noise floor when updating signal noise floor. @@ -101,10 +101,10 @@ func WithNoiseFloorMin(noiseFloorMin float64) SignalLoggerOption { return nil } } -func NewSignalLogger(log logger.Logger, direction string, next msdk.PCM16Writer, options ...SignalLoggerOption) (*SignalLogger, error) { +func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (*SignalLogger, error) { s := &SignalLogger{ log: log, - direction: direction, + name: name, next: next, isLastSignal: 0, framesProcessed: 0, @@ -126,7 +126,7 @@ func NewSignalLogger(log logger.Logger, direction string, next msdk.PCM16Writer, } func (s *SignalLogger) String() string { - return fmt.Sprintf("SignalLogger(%s) -> %s", s.direction, s.next.String()) + return fmt.Sprintf("SignalLogger(%s) -> %s", s.name, s.next.String()) } func (s *SignalLogger) SampleRate() int { @@ -136,7 +136,7 @@ func (s *SignalLogger) SampleRate() int { func (s *SignalLogger) Close() error { stateChanges := atomic.AddUint64(&s.stateChanges, 1) if stateChanges > 0 { - s.log.Infow("signal logger closing", "direction", s.direction, "stateChanges", stateChanges) + s.log.Infow("signal logger closing", "name", s.name, "stateChanges", stateChanges) } return s.next.Close() } From 5c0bcdd7a1bed982a981ed90fbdf0513ad579aa3 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 8 Jan 2026 22:19:07 -0800 Subject: [PATCH 08/10] Removing advnaced features. These are in signal-logger-adaptive. We don't need this at this time --- pkg/sip/signal_logger.go | 135 +++++++++++++------------------ pkg/sip/signal_logger_test.go | 145 +++++++++++++++++----------------- 2 files changed, 124 insertions(+), 156 deletions(-) diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index 64867541..0ad5732b 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -16,38 +16,37 @@ package sip import ( "fmt" - "sync/atomic" + "time" msdk "github.com/livekit/media-sdk" "github.com/livekit/protocol/logger" ) const ( - DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected. - DefaultDecayAlpha = float64(0.95) // 5% of new silence is added to the noise floor. - DefaultAttackAlpha = float64(0.999) // 0.1% of new signal is added to the noise floor. - DefaultNoiseFloorMin = float64(30) // Minimum noise floor. Useful when mic changes to avoid false positives. - DefaultNoiseFloorMax = float64(200) // Maximum noise floor. If an environment is noisy, we still consider that as signal. - AlphaMin = float64(0.1) // Minimum alpha. - AlphaMax = float64(0.99999) // Maximum alpha. + DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected. + DefaultNoiseFloor = float64(25) // Use a low noise floor; The purpose is to detect any kind of signal, not just voice. + DefaultHangoverDuration = 1 * time.Second // Duration of silence that needs to elapse before we no longer consider it a signal. ) // Keeps an internal state of whether we're currently transmitting signal (voice or noise), or silence. // This implements msdk.PCM16Writer to inspect decoded packet content. // Used to log changes betweem those states. type SignalLogger struct { + // Configuration log logger.Logger next msdk.PCM16Writer name string - isLastSignal int32 - decayAlpha float64 // Weight of previous noise floor when updating silence noise floor. - attackAlpha float64 // Weight of previous noise floor when updating signal noise floor. - signalMultiplier float64 // Threshold multiplier for signal to be detected. - noiseFloor float64 // Moveing average of noise floor. - noiseFloorMin float64 // Minimum noise floor. - noiseFloorMax float64 // Maximum noise floor. - framesProcessed uint64 - stateChanges uint64 + hangoverDuration time.Duration // Time to stay in "signal" state after last signal to avoid flip-flopping. + signalMultiplier float64 // Threshold multiplier for signal to be detected. + noiseFloor float64 // Moveing average of noise floor. + + // State + lastSignalTime time.Time + lastIsSignal bool + + // Stats + framesProcessed uint64 + stateChanges uint64 } type SignalLoggerOption func(*SignalLogger) error @@ -62,66 +61,43 @@ func WithSignalMultiplier(signalMultiplier float64) SignalLoggerOption { } } -func WithDecayAlpha(alpha float64) SignalLoggerOption { +func WithNoiseFloor(noiseFloor float64) SignalLoggerOption { return func(s *SignalLogger) error { - if alpha < AlphaMin || alpha > AlphaMax { - return fmt.Errorf("decay alpha must be between %f and %f", AlphaMin, AlphaMax) + if noiseFloor <= 0 { + return fmt.Errorf("noise floor min must be positive") } - s.decayAlpha = alpha + s.noiseFloor = noiseFloor return nil } } -func WithAttackAlpha(alpha float64) SignalLoggerOption { +func WithHangoverDuration(hangoverDuration time.Duration) SignalLoggerOption { return func(s *SignalLogger) error { - if alpha < AlphaMin || alpha > AlphaMax { - return fmt.Errorf("attack alpha must be between %f and %f", AlphaMin, AlphaMax) + if hangoverDuration <= 0 { + return fmt.Errorf("hangover duration must be positive") } - s.attackAlpha = alpha - return nil - } -} - -func WithNoiseFloorMax(noiseFloorMax float64) SignalLoggerOption { - return func(s *SignalLogger) error { - if noiseFloorMax <= 0 { - return fmt.Errorf("noise floor max must be greater than 0") - } - s.noiseFloorMax = noiseFloorMax - return nil - } -} - -func WithNoiseFloorMin(noiseFloorMin float64) SignalLoggerOption { - return func(s *SignalLogger) error { - if noiseFloorMin < 0 { - return fmt.Errorf("noise floor min must be non-negative") - } - s.noiseFloorMin = noiseFloorMin + s.hangoverDuration = hangoverDuration return nil } } func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (*SignalLogger, error) { s := &SignalLogger{ log: log, - name: name, next: next, - isLastSignal: 0, + name: name, + hangoverDuration: DefaultHangoverDuration, + signalMultiplier: DefaultSignalMultiplier, + noiseFloor: DefaultNoiseFloor, + lastSignalTime: time.Time{}, + lastIsSignal: false, framesProcessed: 0, stateChanges: 0, - signalMultiplier: DefaultSignalMultiplier, - decayAlpha: DefaultDecayAlpha, - attackAlpha: DefaultAttackAlpha, - noiseFloorMax: DefaultNoiseFloorMax, - noiseFloorMin: DefaultNoiseFloorMin, - noiseFloor: 0.0, } for _, option := range options { if err := option(s); err != nil { return nil, err } } - s.noiseFloor = (s.noiseFloorMax + s.noiseFloorMin) / 2 return s, nil } @@ -134,9 +110,8 @@ func (s *SignalLogger) SampleRate() int { } func (s *SignalLogger) Close() error { - stateChanges := atomic.AddUint64(&s.stateChanges, 1) - if stateChanges > 0 { - s.log.Infow("signal logger closing", "name", s.name, "stateChanges", stateChanges) + if s.stateChanges > 0 { + s.log.Infow("signal logger closing", "name", s.name, "stateChanges", s.stateChanges) } return s.next.Close() } @@ -157,34 +132,30 @@ func (s *SignalLogger) MeanAbsoluteDeviation(sample msdk.PCM16Sample) float64 { return float64(totalAbs) / float64(len(sample)) } -// Updates the noise floor using moving average. -func (s *SignalLogger) updateNoiseFloor(currentEnergy float64, isSignal int32) { - alpha := s.decayAlpha - if isSignal != 0 { - alpha = s.attackAlpha - } - s.noiseFloor = (alpha * s.noiseFloor) + ((1 - alpha) * currentEnergy) - s.noiseFloor = min(s.noiseFloor, s.noiseFloorMax) - s.noiseFloor = max(s.noiseFloor, s.noiseFloorMin) -} - func (s *SignalLogger) WriteSample(sample msdk.PCM16Sample) error { currentEnergy := s.MeanAbsoluteDeviation(sample) - lastSignal := atomic.LoadInt32(&s.isLastSignal) - signalMultiplier := s.signalMultiplier - if lastSignal == 1 { - signalMultiplier *= 0.9 // Reduce signal multiplier when last signal was signal, to avoid flip-flopping. - } - isSignal := int32(0) - if currentEnergy > (s.noiseFloor * signalMultiplier) { - isSignal = 1 + + now := time.Now() + isSignal := currentEnergy > (s.noiseFloor * s.signalMultiplier) + if isSignal { + s.lastSignalTime = now } - s.updateNoiseFloor(currentEnergy, isSignal) - atomic.AddUint64(&s.framesProcessed, 1) - lastSignal = atomic.SwapInt32(&s.isLastSignal, isSignal) - if lastSignal != isSignal && atomic.LoadUint64(&s.framesProcessed) > 10 { - stateChanges := atomic.AddUint64(&s.stateChanges, 1) - s.log.Infow("signal changed", "direction", s.direction, "signal", isSignal, "stateChanges", stateChanges) + + s.framesProcessed++ + if s.framesProcessed > 10 { // Don't log any changes at first + if isSignal && isSignal != s.lastIsSignal { // silence -> signal: Immediate transition + s.lastIsSignal = true + s.stateChanges++ + s.log.Infow("signal changed", "name", s.name, "signal", isSignal, "stateChanges", s.stateChanges) + } else if !isSignal && isSignal != s.lastIsSignal { // signal -> silence: Only after hangover + if now.Sub(s.lastSignalTime) >= s.hangoverDuration { + s.lastIsSignal = false + s.stateChanges++ + s.log.Infow("signal changed", "name", s.name, "signal", isSignal, "stateChanges", s.stateChanges) + } + } + } else { + s.lastIsSignal = isSignal } return s.next.WriteSample(sample) } diff --git a/pkg/sip/signal_logger_test.go b/pkg/sip/signal_logger_test.go index e82c71f7..7d80942c 100644 --- a/pkg/sip/signal_logger_test.go +++ b/pkg/sip/signal_logger_test.go @@ -19,8 +19,11 @@ import ( "math/rand/v2" "sync/atomic" "testing" + "testing/synctest" + "time" msdk "github.com/livekit/media-sdk" + "github.com/livekit/media-sdk/rtp" "github.com/livekit/protocol/logger" "github.com/stretchr/testify/require" ) @@ -66,39 +69,29 @@ func TestSignalLogger_initialization(t *testing.T) { require.NoError(t, err) require.NotNil(t, sl) require.Equal(t, DefaultSignalMultiplier, sl.signalMultiplier) - require.Equal(t, DefaultDecayAlpha, sl.decayAlpha) - require.Equal(t, DefaultAttackAlpha, sl.attackAlpha) - require.Equal(t, DefaultNoiseFloorMax, sl.noiseFloorMax) - require.Equal(t, DefaultNoiseFloorMin, sl.noiseFloorMin) + require.Equal(t, DefaultNoiseFloor, sl.noiseFloor) + require.Equal(t, DefaultHangoverDuration, sl.hangoverDuration) }) t.Run("with valid options", func(t *testing.T) { - sl, err := NewSignalLogger(log, "incoming", next, WithSignalMultiplier(3.0), WithDecayAlpha(0.9), WithAttackAlpha(0.99), WithNoiseFloorMax(1000), WithNoiseFloorMin(100)) + sl, err := NewSignalLogger(log, "incoming", next, WithSignalMultiplier(3.0), WithNoiseFloor(100), WithHangoverDuration(time.Second)) require.NoError(t, err) require.NotNil(t, sl) require.Equal(t, 3.0, sl.signalMultiplier) - require.Equal(t, 0.9, sl.decayAlpha) - require.Equal(t, 0.99, sl.attackAlpha) - require.Equal(t, 1000.0, sl.noiseFloorMax) - require.Equal(t, 100.0, sl.noiseFloorMin) + require.Equal(t, 100.0, sl.noiseFloor) + require.Equal(t, time.Second, sl.hangoverDuration) }) t.Run("with invalid options", func(t *testing.T) { _, err := NewSignalLogger(log, "incoming", next, WithSignalMultiplier(0.5)) require.Error(t, err) require.Contains(t, err.Error(), "signal multiplier must be greater than 1") - _, err = NewSignalLogger(log, "incoming", next, WithDecayAlpha(0.05)) + _, err = NewSignalLogger(log, "incoming", next, WithNoiseFloor(0)) require.Error(t, err) - require.Contains(t, err.Error(), "decay alpha must be between") - _, err = NewSignalLogger(log, "incoming", next, WithAttackAlpha(1.0)) + require.Contains(t, err.Error(), "noise floor min must be positive") + _, err = NewSignalLogger(log, "incoming", next, WithHangoverDuration(-time.Second)) require.Error(t, err) - require.Contains(t, err.Error(), "attack alpha must be between") - _, err = NewSignalLogger(log, "incoming", next, WithNoiseFloorMax(0)) - require.Error(t, err) - require.Contains(t, err.Error(), "noise floor max must be greater than 0") - _, err = NewSignalLogger(log, "incoming", next, WithNoiseFloorMin(-1)) - require.Error(t, err) - require.Contains(t, err.Error(), "noise floor min must be non-negative") + require.Contains(t, err.Error(), "hangover duration must be positive") }) } @@ -153,20 +146,50 @@ func TestSignalLogger_MeanAbsoluteDeviation(t *testing.T) { } } -func TestSignalLogger_WriteSample(t *testing.T) { - log := logger.GetLogger() +func newTestLogger(t *testing.T, opts ...SignalLoggerOption) (*SignalLogger, *mockPCM16Writer) { + next := newMockPCM16Writer(48000) + sl, err := NewSignalLogger(logger.GetLogger(), "incoming", next, opts...) + require.NoError(t, err) + return sl, next +} - createFrame := func(size int, amplitude int16) msdk.PCM16Sample { - frame := make(msdk.PCM16Sample, size) - for i := range frame { - frame[i] = amplitude +func writer(t *testing.T, sl *SignalLogger, array []msdk.PCM16Sample, count int, wait bool) error { + for i := 0; i < count; i++ { + randIndex := rand.Uint() % uint(len(array)) + if err := sl.WriteSample(array[randIndex]); err != nil { + return err + } + since := time.Since(sl.lastSignalTime).Milliseconds() + t.Logf("%d written sample %d/%d: amplitude %d, noise floor %f, state changes %d, last signal %t (%dms ago)\n", sl.framesProcessed, randIndex, len(array), array[randIndex][0], sl.noiseFloor, sl.stateChanges, sl.lastIsSignal, since) + if wait { + time.Sleep(rtp.DefFrameDur) } - return frame } + return nil +} + +func testTransition(t *testing.T, first, second []msdk.PCM16Sample, firstCount, secondCount int, wait bool, opts ...SignalLoggerOption) *SignalLogger { + sl, _ := newTestLogger(t, opts...) + require.NoError(t, writer(t, sl, first, firstCount, wait)) + require.Equal(t, uint64(firstCount), sl.framesProcessed) + require.NoError(t, writer(t, sl, second, secondCount, wait)) + require.Equal(t, uint64(firstCount+secondCount), sl.framesProcessed) + return sl +} + +func createFrame(size int, amplitude int16) msdk.PCM16Sample { + frame := make(msdk.PCM16Sample, size) + for i := range frame { + frame[i] = amplitude + } + return frame +} + +func TestSignalLogger_WriteSample(t *testing.T) { silenceFrames := make([]msdk.PCM16Sample, 100) for i := range silenceFrames { - amplitude := int16(rand.Uint32()) % 50 + amplitude := int16(rand.Uint32()) % int16(DefaultNoiseFloor) silenceFrames[i] = createFrame(480, amplitude) } @@ -175,83 +198,55 @@ func TestSignalLogger_WriteSample(t *testing.T) { amplitude := (int16(rand.Uint32()) % 1000) // Adding (DefaultNoiseFloorMax * DefaultSignalMultiplier) prevents signal from being detected as silence when loud for too long. if amplitude < 0 { - amplitude -= int16(DefaultNoiseFloorMax * DefaultSignalMultiplier) + amplitude -= int16(DefaultNoiseFloor * DefaultSignalMultiplier) } else { - amplitude += int16(DefaultNoiseFloorMax * DefaultSignalMultiplier) + amplitude += int16(DefaultNoiseFloor * DefaultSignalMultiplier) } signalFrames[i] = createFrame(480, amplitude) } - writer := func(t *testing.T, sl *SignalLogger, array []msdk.PCM16Sample, count int) error { - for i := 0; i < count; i++ { - randIndex := rand.Uint() % uint(len(array)) - if err := sl.WriteSample(array[randIndex]); err != nil { - return err - } - t.Logf("%d written sample %d/%d: amplitude %d, noise floor %f, state changes %d, last signal %d\n", sl.framesProcessed, randIndex, len(array), array[randIndex][0], sl.noiseFloor, sl.stateChanges, sl.isLastSignal) - } - return nil - } - - newTestLogger := func(t *testing.T, opts ...SignalLoggerOption) (*SignalLogger, *mockPCM16Writer) { - next := newMockPCM16Writer(48000) - sl, err := NewSignalLogger(log, "incoming", next, opts...) - require.NoError(t, err) - return sl, next - } - - testTransition := func(t *testing.T, first, second []msdk.PCM16Sample, firstCount, secondCount int, opts ...SignalLoggerOption) *SignalLogger { - sl, _ := newTestLogger(t, opts...) - require.NoError(t, writer(t, sl, first, firstCount)) - require.Equal(t, uint64(firstCount), sl.framesProcessed) - require.NoError(t, writer(t, sl, second, secondCount)) - require.Equal(t, uint64(firstCount+secondCount), sl.framesProcessed) - return sl - } - t.Run("not_printing_on_first_10_frames", func(t *testing.T) { sl, _ := newTestLogger(t) sl.noiseFloor = 40 - require.NoError(t, writer(t, sl, silenceFrames, 5)) - require.Equal(t, int32(0), sl.isLastSignal) - require.NoError(t, writer(t, sl, signalFrames, 3)) - require.Equal(t, int32(1), sl.isLastSignal) - require.NoError(t, writer(t, sl, silenceFrames, 2)) - require.Equal(t, int32(0), sl.isLastSignal) + require.NoError(t, writer(t, sl, silenceFrames, 5, false)) + require.NoError(t, writer(t, sl, signalFrames, 3, false)) + require.NoError(t, writer(t, sl, silenceFrames, 2, false)) require.Equal(t, uint64(10), sl.framesProcessed) require.Equal(t, uint64(0), sl.stateChanges) }) t.Run("printing_on_11th_frame_transition", func(t *testing.T) { t.Run("silence_to_signal", func(t *testing.T) { - sl := testTransition(t, silenceFrames, signalFrames, 10, 1) + sl := testTransition(t, silenceFrames, signalFrames, 10, 1, false) require.Equal(t, uint64(1), sl.stateChanges) }) t.Run("signal_to_silence", func(t *testing.T) { - sl := testTransition(t, signalFrames, silenceFrames, 10, 1) - require.Equal(t, uint64(1), sl.stateChanges) + synctest.Test(t, func(t *testing.T) { + sl := testTransition(t, signalFrames, silenceFrames, 10, 50, true) + require.Equal(t, uint64(1), sl.stateChanges) + }) }) }) t.Run("silence_to_silence_transitions", func(t *testing.T) { - sl := testTransition(t, silenceFrames, silenceFrames, 10, 0) // Not too long, it will eventually bring noise floor low enough + sl := testTransition(t, silenceFrames, silenceFrames, 10, 0, false) // Not too long, it will eventually bring noise floor low enough require.Equal(t, uint64(0), sl.stateChanges) - require.Equal(t, int32(0), sl.isLastSignal) + require.Equal(t, false, sl.lastIsSignal) }) t.Run("signal_to_signal_transitions", func(t *testing.T) { - sl := testTransition(t, signalFrames, signalFrames, 10, 0, WithNoiseFloorMax(200)) + sl := testTransition(t, signalFrames, signalFrames, 10, 0, false) require.Equal(t, uint64(0), sl.stateChanges) - require.Equal(t, int32(1), sl.isLastSignal) + require.Equal(t, true, sl.lastIsSignal) }) t.Run("silence_to_signal_transitions", func(t *testing.T) { for i := 0; i < 100; i++ { t.Run(fmt.Sprintf("silence_to_signal_transition_%d", i), func(t *testing.T) { - sl := testTransition(t, silenceFrames, signalFrames, 10, 1) + sl := testTransition(t, silenceFrames, signalFrames, 10, 1, false) require.Equal(t, uint64(1), sl.stateChanges) - require.Equal(t, int32(1), sl.isLastSignal) + require.Equal(t, true, sl.lastIsSignal) }) } }) @@ -259,9 +254,11 @@ func TestSignalLogger_WriteSample(t *testing.T) { t.Run("signal_to_silence_transitions", func(t *testing.T) { for i := 0; i < 100; i++ { t.Run(fmt.Sprintf("signal_to_silence_transition_%d", i), func(t *testing.T) { - sl := testTransition(t, signalFrames, silenceFrames, 10, 1) - require.Equal(t, uint64(1), sl.stateChanges) - require.Equal(t, int32(0), sl.isLastSignal) + synctest.Test(t, func(t *testing.T) { + sl := testTransition(t, signalFrames, silenceFrames, 10, 50, true) + require.Equal(t, uint64(1), sl.stateChanges) + require.Equal(t, false, sl.lastIsSignal) + }) }) } }) From e030c5f4f37ae268d4a87647b3a7bb6d296042ee Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 8 Jan 2026 23:23:42 -0800 Subject: [PATCH 09/10] Plumbing room part of signal logger. Now all that's left is to grab the config from some place --- pkg/sip/inbound.go | 1 + pkg/sip/media_port.go | 2 +- pkg/sip/outbound.go | 1 + pkg/sip/room.go | 22 ++++++++++++++-------- pkg/sip/signal_logger.go | 4 ++-- pkg/sip/signal_logger_test.go | 16 ++++++++++++---- 6 files changed, 31 insertions(+), 15 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index d6cd0b4d..36ae1934 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -864,6 +864,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip disp.RingingTimeout = defaultRingingTimeout } disp.Room.JitterBuf = c.jitterBuf + disp.Room.SignalLogger = true // TODO: Populate ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration) defer cancel() status := CallRinging diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index ca411115..b29eba82 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -726,7 +726,7 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error { audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples) } if p.logSignalChanges { - audioOut, err = NewSignalLogger(p.log, "output", audioOut) + audioOut, err = NewSignalLogger(p.log, "mixed", audioOut) if err != nil { return err } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 1ca81369..3b7d6f41 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -98,6 +98,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi } jitterBuf := SelectValueBool(conf.EnableJitterBuffer, conf.EnableJitterBufferProb) room.JitterBuf = jitterBuf + room.SignalLogger = true // TODO: Populate tr := TransportFrom(sipConf.transport) contact := c.ContactURI(tr) diff --git a/pkg/sip/room.go b/pkg/sip/room.go index 43a9d859..d162a189 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -179,13 +179,14 @@ type ParticipantConfig struct { } type RoomConfig struct { - WsUrl string - Token string - RoomName string - Participant ParticipantConfig - RoomPreset string - RoomConfig *livekit.RoomConfiguration - JitterBuf bool + WsUrl string + Token string + RoomName string + Participant ParticipantConfig + RoomPreset string + RoomConfig *livekit.RoomConfiguration + JitterBuf bool + SignalLogger bool } func NewRoom(log logger.Logger, st *RoomStats) *Room { @@ -321,7 +322,12 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error { defer log.Infow("track closed") defer mTrack.Close() - codec, err := opus.Decode(mTrack, channels, log) + var out msdk.PCM16Writer = mTrack + if rconf.SignalLogger { + out, _ = NewSignalLogger(log, track.ID(), out) + } + + codec, err := opus.Decode(out, channels, log) if err != nil { log.Errorw("cannot create opus decoder", err) return diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go index 0ad5732b..1ef5928f 100644 --- a/pkg/sip/signal_logger.go +++ b/pkg/sip/signal_logger.go @@ -80,7 +80,7 @@ func WithHangoverDuration(hangoverDuration time.Duration) SignalLoggerOption { return nil } } -func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (*SignalLogger, error) { +func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (msdk.PCM16Writer, error) { s := &SignalLogger{ log: log, next: next, @@ -95,7 +95,7 @@ func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, opti } for _, option := range options { if err := option(s); err != nil { - return nil, err + return next, err } } return s, nil diff --git a/pkg/sip/signal_logger_test.go b/pkg/sip/signal_logger_test.go index 7d80942c..e9ffa27a 100644 --- a/pkg/sip/signal_logger_test.go +++ b/pkg/sip/signal_logger_test.go @@ -65,7 +65,9 @@ func TestSignalLogger_initialization(t *testing.T) { next := newMockPCM16Writer(48000) t.Run("default initialization", func(t *testing.T) { - sl, err := NewSignalLogger(log, "incoming", next) + out, err := NewSignalLogger(log, "incoming", next) + sl, ok := out.(*SignalLogger) + require.True(t, ok) require.NoError(t, err) require.NotNil(t, sl) require.Equal(t, DefaultSignalMultiplier, sl.signalMultiplier) @@ -74,7 +76,9 @@ func TestSignalLogger_initialization(t *testing.T) { }) t.Run("with valid options", func(t *testing.T) { - sl, err := NewSignalLogger(log, "incoming", next, WithSignalMultiplier(3.0), WithNoiseFloor(100), WithHangoverDuration(time.Second)) + out, err := NewSignalLogger(log, "incoming", next, WithSignalMultiplier(3.0), WithNoiseFloor(100), WithHangoverDuration(time.Second)) + sl, ok := out.(*SignalLogger) + require.True(t, ok) require.NoError(t, err) require.NotNil(t, sl) require.Equal(t, 3.0, sl.signalMultiplier) @@ -98,7 +102,9 @@ func TestSignalLogger_initialization(t *testing.T) { func TestSignalLogger_MeanAbsoluteDeviation(t *testing.T) { log := logger.GetLogger() next := newMockPCM16Writer(48000) - sl, err := NewSignalLogger(log, "incoming", next) + out, err := NewSignalLogger(log, "incoming", next) + sl, ok := out.(*SignalLogger) + require.True(t, ok) require.NoError(t, err) tests := []struct { @@ -148,7 +154,9 @@ func TestSignalLogger_MeanAbsoluteDeviation(t *testing.T) { func newTestLogger(t *testing.T, opts ...SignalLoggerOption) (*SignalLogger, *mockPCM16Writer) { next := newMockPCM16Writer(48000) - sl, err := NewSignalLogger(logger.GetLogger(), "incoming", next, opts...) + out, err := NewSignalLogger(logger.GetLogger(), "incoming", next, opts...) + sl, ok := out.(*SignalLogger) + require.True(t, ok) require.NoError(t, err) return sl, next } From d1618ce3440bee70cb2ac8ffb665e30bd6899975 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 26 Jan 2026 11:39:46 -0800 Subject: [PATCH 10/10] done? --- pkg/sip/features.go | 3 +++ pkg/sip/inbound.go | 10 ++++++++-- pkg/sip/outbound.go | 5 +++-- pkg/sip/room.go | 18 +++++++++--------- 4 files changed, 23 insertions(+), 13 deletions(-) create mode 100644 pkg/sip/features.go diff --git a/pkg/sip/features.go b/pkg/sip/features.go new file mode 100644 index 00000000..7b3bf4d3 --- /dev/null +++ b/pkg/sip/features.go @@ -0,0 +1,3 @@ +package sip + +const signalLoggingFeatureFlag = "sip.signal_logging" diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 36ae1934..373cd5f8 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -864,7 +864,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip disp.RingingTimeout = defaultRingingTimeout } disp.Room.JitterBuf = c.jitterBuf - disp.Room.SignalLogger = true // TODO: Populate + if disp.FeatureFlags != nil { + disp.Room.LogSignalChanges = disp.FeatureFlags[signalLoggingFeatureFlag] == "true" + } ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration) defer cancel() status := CallRinging @@ -951,13 +953,17 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit return nil, err } + logSignalChanges := false + if featureFlags != nil { + logSignalChanges = featureFlags[signalLoggingFeatureFlag] == "true" + } mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{ IP: c.s.sconf.MediaIP, Ports: conf.RTPPort, MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial, MediaTimeout: c.s.conf.MediaTimeout, EnableJitterBuffer: c.jitterBuf, - LogSignalChanges: true, // TODO: Replace with per-project setting + LogSignalChanges: logSignalChanges, Stats: &c.stats.Port, NoInputResample: !RoomResample, }, RoomSampleRate) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 3b7d6f41..fceca84b 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -90,6 +90,7 @@ type outboundCall struct { } func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig, state *CallState, projectID string) (*outboundCall, error) { + signalLoggingEnabled := sipConf.featureFlags[signalLoggingFeatureFlag] == "true" if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration { sipConf.maxCallDuration = maxCallDuration } @@ -98,7 +99,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi } jitterBuf := SelectValueBool(conf.EnableJitterBuffer, conf.EnableJitterBufferProb) room.JitterBuf = jitterBuf - room.SignalLogger = true // TODO: Populate + room.LogSignalChanges = signalLoggingEnabled tr := TransportFrom(sipConf.transport) contact := c.ContactURI(tr) @@ -143,7 +144,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi MediaTimeoutInitial: c.conf.MediaTimeoutInitial, MediaTimeout: c.conf.MediaTimeout, EnableJitterBuffer: call.jitterBuf, - LogSignalChanges: true, // TODO: Replace with per-project setting + LogSignalChanges: signalLoggingEnabled, Stats: &call.stats.Port, NoInputResample: !RoomResample, }, RoomSampleRate) diff --git a/pkg/sip/room.go b/pkg/sip/room.go index d162a189..8b9d13c3 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -179,14 +179,14 @@ type ParticipantConfig struct { } type RoomConfig struct { - WsUrl string - Token string - RoomName string - Participant ParticipantConfig - RoomPreset string - RoomConfig *livekit.RoomConfiguration - JitterBuf bool - SignalLogger bool + WsUrl string + Token string + RoomName string + Participant ParticipantConfig + RoomPreset string + RoomConfig *livekit.RoomConfiguration + JitterBuf bool + LogSignalChanges bool } func NewRoom(log logger.Logger, st *RoomStats) *Room { @@ -323,7 +323,7 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error { defer mTrack.Close() var out msdk.PCM16Writer = mTrack - if rconf.SignalLogger { + if rconf.LogSignalChanges { out, _ = NewSignalLogger(log, track.ID(), out) }