diff --git a/pkg/sip/features.go b/pkg/sip/features.go new file mode 100644 index 00000000..7b3bf4d3 --- /dev/null +++ b/pkg/sip/features.go @@ -0,0 +1,3 @@ +package sip + +const signalLoggingFeatureFlag = "sip.signal_logging" diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 8bccb80f..ca51fb9b 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -23,6 +23,7 @@ import ( "math" "net/netip" "slices" + "strconv" "strings" "sync" "sync/atomic" @@ -864,6 +865,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip disp.RingingTimeout = defaultRingingTimeout } disp.Room.JitterBuf = c.jitterBuf + disp.Room.LogSignalChanges, _ = strconv.ParseBool(disp.FeatureFlags[signalLoggingFeatureFlag]) ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration) defer cancel() status := CallRinging @@ -956,12 +958,15 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit return nil, err } + logSignalChanges := false + logSignalChanges, _ = strconv.ParseBool(featureFlags[signalLoggingFeatureFlag]) mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{ IP: c.s.sconf.MediaIP, Ports: conf.RTPPort, MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial, MediaTimeout: c.s.conf.MediaTimeout, EnableJitterBuffer: c.jitterBuf, + LogSignalChanges: logSignalChanges, Stats: &c.stats.Port, NoInputResample: !RoomResample, }, RoomSampleRate) diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 4be408b6..0908a8f2 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -281,6 +281,7 @@ type MediaOptions struct { EnableJitterBuffer bool NoInputResample bool IgnorePreanswerData bool + LogSignalChanges bool } func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) { @@ -321,6 +322,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, mediaTimeout: mediaTimeout, timeoutResetTick: make(chan time.Duration, 1), jitterEnabled: opts.EnableJitterBuffer, + logSignalChanges: opts.LogSignalChanges, port: newUDPConn(log, conn), audioOut: msdk.NewSwitchWriter(sampleRate), audioIn: msdk.NewSwitchWriter(inSampleRate), @@ -357,6 +359,7 @@ type MediaPort struct { stats *PortStats dtmfAudioEnabled bool jitterEnabled bool + logSignalChanges bool mu sync.Mutex conf *MediaConf @@ -769,6 +772,13 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error { if p.stats != nil { audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples) } + if p.logSignalChanges { + audioOut, err = NewSignalLogger(p.log, "mixed", audioOut) + if err != nil { + audioOut.Close() // need to close since it's not linked to the port yet + return err + } + } if p.conf.Audio.DTMFType != 0 { p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate) @@ -802,6 +812,14 @@ func (p *MediaPort) setupInput() { if p.stats != nil { audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples) } + if p.logSignalChanges { + signalLogger, err := NewSignalLogger(p.log, "input", audioWriter) + if err != nil { + p.log.Errorw("failed to create signal logger", err) + } else { + audioWriter = signalLogger + } + } audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type) // Wrap the decoder with silence suppression handler to fill gaps during silence suppression audioHandler = newSilenceFiller(audioHandler, audioWriter, codecInfo.RTPClockRate, codecInfo.SampleRate, p.log) diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index cd7bf46e..b13e1856 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -20,6 +20,7 @@ import ( "math" "net" "sort" + "strconv" "sync" "time" @@ -90,6 +91,7 @@ type outboundCall struct { } func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig, state *CallState, projectID string) (*outboundCall, error) { + signalLoggingEnabled, _ := strconv.ParseBool(sipConf.featureFlags[signalLoggingFeatureFlag]) if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration { sipConf.maxCallDuration = maxCallDuration } @@ -98,6 +100,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi } jitterBuf := SelectValueBool(conf.EnableJitterBuffer, conf.EnableJitterBufferProb) room.JitterBuf = jitterBuf + room.LogSignalChanges = signalLoggingEnabled tr := TransportFrom(sipConf.transport) contact := c.ContactURI(tr) @@ -142,6 +145,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi MediaTimeoutInitial: c.conf.MediaTimeoutInitial, MediaTimeout: c.conf.MediaTimeout, EnableJitterBuffer: call.jitterBuf, + LogSignalChanges: signalLoggingEnabled, Stats: &call.stats.Port, NoInputResample: !RoomResample, IgnorePreanswerData: true, diff --git a/pkg/sip/room.go b/pkg/sip/room.go index 43a9d859..ef584f85 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -179,13 +179,14 @@ type ParticipantConfig struct { } type RoomConfig struct { - WsUrl string - Token string - RoomName string - Participant ParticipantConfig - RoomPreset string - RoomConfig *livekit.RoomConfiguration - JitterBuf bool + WsUrl string + Token string + RoomName string + Participant ParticipantConfig + RoomPreset string + RoomConfig *livekit.RoomConfiguration + JitterBuf bool + LogSignalChanges bool } func NewRoom(log logger.Logger, st *RoomStats) *Room { @@ -321,7 +322,17 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error { defer log.Infow("track closed") defer mTrack.Close() - codec, err := opus.Decode(mTrack, channels, log) + var out msdk.PCM16Writer = mTrack + if rconf.LogSignalChanges { + var err error + out, err = NewSignalLogger(log, track.ID(), out) + if err != nil { + log.Errorw("cannot create signal logger", err) + return + } + } + + codec, err := opus.Decode(out, channels, log) if err != nil { log.Errorw("cannot create opus decoder", err) return diff --git a/pkg/sip/signal_logger.go b/pkg/sip/signal_logger.go new file mode 100644 index 00000000..6c5bbc4c --- /dev/null +++ b/pkg/sip/signal_logger.go @@ -0,0 +1,195 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +import ( + "fmt" + "math" + "time" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/protocol/logger" +) + +const ( + // DefaultInitialNoiseFloorDB is the default noise floor in dBFS. + DefaultInitialNoiseFloorDB = -50 + // DefaultHangoverDuration is how long we stay in "signal" after level drops below exit threshold. + DefaultHangoverDuration = 1 * time.Second + // DefaultEnterVoiceOffsetDB is the default offset above noise floor to enter voice (hysteresis high). + DefaultEnterVoiceOffsetDB = 10 + // DefaultExitVoiceOffsetDB is the default offset above noise floor to exit voice (hysteresis low). + DefaultExitVoiceOffsetDB = 5 + + // minDBFS clamps very quiet frames to avoid -inf in dBFS. + minDBFS = -100 +) + +// SignalLogger keeps internal state of whether we're in voice or silence, using RMS → dBFS +// and a fixed noise floor with hysteresis. It implements msdk.PCM16Writer and logs state changes. +type SignalLogger struct { + // Configuration + log logger.Logger + next msdk.PCM16Writer + name string + hangoverDuration time.Duration + noiseFloor float64 // Noise floor in dBFS (fixed, not adaptive). + enterVoiceOffsetDB float64 // Offset above noise floor to enter voice (hysteresis high). + exitVoiceOffsetDB float64 // Offset above noise floor to exit voice (hysteresis low). + + // State + lastSignalTime time.Time + lastIsSignal bool + + // Stats + framesProcessed uint64 + stateChanges uint64 +} + +type SignalLoggerOption func(*SignalLogger) error + +// WithNoiseFloor sets the noise floor in dBFS (e.g. -40). Must be >= minDBFS. +func WithNoiseFloor(noiseFloorDB float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if noiseFloorDB < minDBFS { + return fmt.Errorf("noise floor must be >= %g dBFS, got %g", float64(minDBFS), noiseFloorDB) + } + s.noiseFloor = noiseFloorDB + return nil + } +} + +func WithHangoverDuration(hangoverDuration time.Duration) SignalLoggerOption { + return func(s *SignalLogger) error { + if hangoverDuration <= 0 { + return fmt.Errorf("hangover duration must be positive, got %s", hangoverDuration) + } + s.hangoverDuration = hangoverDuration + return nil + } +} + +// WithEnterVoiceOffsetDB sets the offset (dB) above noise floor to enter voice. Default is DefaultEnterVoiceOffsetDB. +func WithEnterVoiceOffsetDB(db float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if db <= 0 { + return fmt.Errorf("enterVoiceOffsetDB must be positive, got %g", db) + } + s.enterVoiceOffsetDB = db + return nil + } +} + +// WithExitVoiceOffsetDB sets the offset (dB) above noise floor to exit voice. Default is DefaultExitVoiceOffsetDB. +func WithExitVoiceOffsetDB(db float64) SignalLoggerOption { + return func(s *SignalLogger) error { + if db <= 0 { + return fmt.Errorf("exitVoiceOffsetDB must be positive, got %g", db) + } + s.exitVoiceOffsetDB = db + return nil + } +} + +func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (msdk.PCM16Writer, error) { + s := &SignalLogger{ + log: log, + next: next, + name: name, + hangoverDuration: DefaultHangoverDuration, + noiseFloor: DefaultInitialNoiseFloorDB, + enterVoiceOffsetDB: DefaultEnterVoiceOffsetDB, + exitVoiceOffsetDB: DefaultExitVoiceOffsetDB, + } + for _, option := range options { + if err := option(s); err != nil { + return next, err + } + } + return s, nil +} + +func (s *SignalLogger) String() string { + return fmt.Sprintf("SignalLogger(%s) -> %s", s.name, s.next.String()) +} + +func (s *SignalLogger) SampleRate() int { + return s.next.SampleRate() +} + +func (s *SignalLogger) Close() error { + if s.stateChanges > 0 { + s.log.Infow("signal logger closing", "name", s.name, "stateChanges", s.stateChanges) + } + return s.next.Close() +} + +// rmsToDBFS computes RMS of the frame then converts to dBFS: 10*log10(mean(square)/MAX^2). +// Uses math.MaxInt16 (32767) as reference. Returns a value <= 0; silence approaches -inf, so we clamp to minDBFS. +func (s *SignalLogger) rmsToDBFS(sample msdk.PCM16Sample, minDBFS float64) float64 { + if len(sample) == 0 { + return minDBFS + } + var sumSq int64 + for _, v := range sample { + x := int64(v) + sumSq += x * x + } + meanSq := float64(sumSq) / float64(len(sample)) + refSq := float64(math.MaxInt16) * float64(math.MaxInt16) + if meanSq <= 0 { + return minDBFS + } + db := 10 * math.Log10(meanSq/refSq) + if db < minDBFS { + return minDBFS + } + return db +} + +func (s *SignalLogger) WriteSample(sample msdk.PCM16Sample) error { + currentDB := s.rmsToDBFS(sample, minDBFS) + + enterThreshold := s.noiseFloor + s.enterVoiceOffsetDB + exitThreshold := s.noiseFloor + s.exitVoiceOffsetDB + + now := time.Now() + aboveEnter := currentDB > enterThreshold + belowExit := currentDB < exitThreshold + + if aboveEnter { + s.lastSignalTime = now + } + + s.framesProcessed++ + if s.framesProcessed <= 10 { + s.lastIsSignal = aboveEnter + return s.next.WriteSample(sample) + } + + if aboveEnter && !s.lastIsSignal { + s.lastIsSignal = true + s.stateChanges++ + s.log.Infow("signal changed", "name", s.name, "signal", true, "stateChanges", s.stateChanges, "dBFS", currentDB, "noiseFloor", s.noiseFloor) + } else if belowExit && s.lastIsSignal { + if now.Sub(s.lastSignalTime) >= s.hangoverDuration { + s.lastIsSignal = false + s.stateChanges++ + s.log.Infow("signal changed", "name", s.name, "signal", false, "stateChanges", s.stateChanges, "dBFS", currentDB, "noiseFloor", s.noiseFloor) + } + } + + return s.next.WriteSample(sample) +} diff --git a/pkg/sip/signal_logger_test.go b/pkg/sip/signal_logger_test.go new file mode 100644 index 00000000..84f6d027 --- /dev/null +++ b/pkg/sip/signal_logger_test.go @@ -0,0 +1,240 @@ +// Copyright 2024 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sip + +import ( + "fmt" + "math/rand/v2" + "sync/atomic" + "testing" + "testing/synctest" + "time" + + msdk "github.com/livekit/media-sdk" + "github.com/livekit/media-sdk/rtp" + "github.com/livekit/protocol/logger" + "github.com/stretchr/testify/require" +) + +// mockPCM16Writer is a simple mock implementation of PCM16Writer for testing +type mockPCM16Writer struct { + sampleRate int + samples []msdk.PCM16Sample + closed atomic.Bool +} + +func newMockPCM16Writer(sampleRate int) *mockPCM16Writer { + return &mockPCM16Writer{ + sampleRate: sampleRate, + samples: make([]msdk.PCM16Sample, 0), + } +} + +func (m *mockPCM16Writer) String() string { + return "mockPCM16Writer" +} + +func (m *mockPCM16Writer) SampleRate() int { + return m.sampleRate +} + +func (m *mockPCM16Writer) Close() error { + m.closed.Store(true) + return nil +} + +func (m *mockPCM16Writer) WriteSample(sample msdk.PCM16Sample) error { + m.samples = append(m.samples, sample) + return nil +} + +func TestSignalLogger_initialization(t *testing.T) { + log := logger.GetLogger() + next := newMockPCM16Writer(48000) + + t.Run("default initialization", func(t *testing.T) { + out, err := NewSignalLogger(log, "incoming", next) + sl, ok := out.(*SignalLogger) + require.True(t, ok) + require.NoError(t, err) + require.NotNil(t, sl) + require.InDelta(t, float64(DefaultInitialNoiseFloorDB), sl.noiseFloor, 0.01) + require.Equal(t, DefaultHangoverDuration, sl.hangoverDuration) + require.InDelta(t, float64(DefaultEnterVoiceOffsetDB), sl.enterVoiceOffsetDB, 0.01) + require.InDelta(t, float64(DefaultExitVoiceOffsetDB), sl.exitVoiceOffsetDB, 0.01) + }) + + t.Run("with valid options", func(t *testing.T) { + out, err := NewSignalLogger(log, "incoming", next, WithNoiseFloor(-60), WithHangoverDuration(2*time.Second), WithEnterVoiceOffsetDB(9), WithExitVoiceOffsetDB(4)) + sl, ok := out.(*SignalLogger) + require.True(t, ok) + require.NoError(t, err) + require.NotNil(t, sl) + require.Equal(t, -60.0, sl.noiseFloor) + require.Equal(t, 2*time.Second, sl.hangoverDuration) + require.Equal(t, 9.0, sl.enterVoiceOffsetDB) + require.Equal(t, 4.0, sl.exitVoiceOffsetDB) + }) + + t.Run("with invalid options", func(t *testing.T) { + _, err := NewSignalLogger(log, "incoming", next, WithHangoverDuration(-time.Second)) + require.Error(t, err) + require.Contains(t, err.Error(), "hangover duration must be positive, got -1s") + _, err = NewSignalLogger(log, "incoming", next, WithEnterVoiceOffsetDB(-1)) + require.Error(t, err) + require.Contains(t, err.Error(), "enterVoiceOffsetDB must be positive, got -1") + _, err = NewSignalLogger(log, "incoming", next, WithExitVoiceOffsetDB(-1)) + require.Error(t, err) + require.Contains(t, err.Error(), "exitVoiceOffsetDB must be positive, got -1") + _, err = NewSignalLogger(log, "incoming", next, WithNoiseFloor(-101)) + require.Error(t, err) + require.Contains(t, err.Error(), "noise floor must be >= -100 dBFS, got -101") + }) +} + +func newTestLogger(t *testing.T, opts ...SignalLoggerOption) (*SignalLogger, *mockPCM16Writer) { + next := newMockPCM16Writer(48000) + out, err := NewSignalLogger(logger.GetLogger(), "incoming", next, opts...) + sl, ok := out.(*SignalLogger) + require.True(t, ok) + require.NoError(t, err) + return sl, next +} + +func writer(t *testing.T, sl *SignalLogger, array []msdk.PCM16Sample, count int, wait bool) error { + for i := 0; i < count; i++ { + randIndex := rand.Uint() % uint(len(array)) + if err := sl.WriteSample(array[randIndex]); err != nil { + return err + } + since := time.Since(sl.lastSignalTime).Milliseconds() + t.Logf("%d written sample %d/%d, noise floor %.1f dBFS, state changes %d, last signal %t (%dms ago)\n", sl.framesProcessed, randIndex, len(array), sl.noiseFloor, sl.stateChanges, sl.lastIsSignal, since) + if wait { + time.Sleep(rtp.DefFrameDur) + } + } + return nil +} + +func testTransition(t *testing.T, first, second []msdk.PCM16Sample, firstCount, secondCount int, wait bool, opts ...SignalLoggerOption) *SignalLogger { + sl, _ := newTestLogger(t, opts...) + require.NoError(t, writer(t, sl, first, firstCount, wait)) + require.Equal(t, uint64(firstCount), sl.framesProcessed) + require.NoError(t, writer(t, sl, second, secondCount, wait)) + require.Equal(t, uint64(firstCount+secondCount), sl.framesProcessed) + return sl +} + +func createFrame(size int, amplitude int16) msdk.PCM16Sample { + frame := make(msdk.PCM16Sample, size) + for i := range frame { + frame[i] = amplitude + } + return frame +} + +// silenceAmplitude: frames with small amplitude yield low dBFS (e.g. ~-56 dBFS for 100), below exit threshold. +const silenceAmplitude = 50 + +// signalAmplitude: frames with larger amplitude yield high dBFS (e.g. ~-16 dBFS for 5000), above enter threshold (noiseFloor+10). +const signalAmplitude = 5000 + +func TestSignalLogger_WriteSample(t *testing.T) { + silenceFrames := make([]msdk.PCM16Sample, 100) + for i := range silenceFrames { + amplitude := int16(rand.Uint32() % uint32(silenceAmplitude)) + if rand.Uint()%2 == 0 { + amplitude = -amplitude + } + silenceFrames[i] = createFrame(480, amplitude) + } + + signalFrames := make([]msdk.PCM16Sample, 100) + for i := range signalFrames { + // Random amplitude around signalAmplitude (signalAmplitude/2 to signalAmplitude*3/2) so frames stay above enter threshold. + amplitude := int16(rand.Uint32()%uint32(signalAmplitude) + signalAmplitude/2) + if rand.Uint()%2 == 0 { + amplitude = -amplitude + } + signalFrames[i] = createFrame(480, amplitude) + } + + t.Run("not_printing_on_first_10_frames", func(t *testing.T) { + sl, _ := newTestLogger(t) + + require.NoError(t, writer(t, sl, silenceFrames, 5, false)) + require.NoError(t, writer(t, sl, signalFrames, 3, false)) + require.NoError(t, writer(t, sl, silenceFrames, 2, false)) + require.Equal(t, uint64(10), sl.framesProcessed) + require.Equal(t, uint64(0), sl.stateChanges) + }) + + t.Run("printing_on_11th_frame_transition", func(t *testing.T) { + t.Run("silence_to_signal", func(t *testing.T) { + sl := testTransition(t, silenceFrames, signalFrames, 10, 1, false) + require.Equal(t, uint64(1), sl.stateChanges) + }) + t.Run("signal_to_silence", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + // Use fixed low-amplitude silence so we stay below exit threshold for hangover + lowSilence := make([]msdk.PCM16Sample, 60) + for i := range lowSilence { + lowSilence[i] = createFrame(480, 20) + } + sl := testTransition(t, signalFrames, lowSilence, 10, 60, true) + require.GreaterOrEqual(t, sl.stateChanges, uint64(1), "expected at least one transition to silence") + }) + }) + }) + + t.Run("silence_to_silence_transitions", func(t *testing.T) { + sl := testTransition(t, silenceFrames, silenceFrames, 10, 0, false) + require.Equal(t, uint64(0), sl.stateChanges) + require.Equal(t, false, sl.lastIsSignal) + }) + + t.Run("signal_to_signal_transitions", func(t *testing.T) { + sl := testTransition(t, signalFrames, signalFrames, 10, 0, false) + require.Equal(t, uint64(0), sl.stateChanges) + require.Equal(t, true, sl.lastIsSignal) + }) + + t.Run("silence_to_signal_transitions", func(t *testing.T) { + for i := 0; i < 100; i++ { + t.Run(fmt.Sprintf("silence_to_signal_transition_%d", i), func(t *testing.T) { + sl := testTransition(t, silenceFrames, signalFrames, 10, 1, false) + require.Equal(t, uint64(1), sl.stateChanges) + require.Equal(t, true, sl.lastIsSignal) + }) + } + }) + + t.Run("signal_to_silence_transitions", func(t *testing.T) { + // Fixed low-amplitude silence and 60 frames (60*DefFrameDur > 1s hangover) for reliable transition. + lowSilence := make([]msdk.PCM16Sample, 60) + for i := range lowSilence { + lowSilence[i] = createFrame(480, 20) + } + for i := 0; i < 100; i++ { + t.Run(fmt.Sprintf("signal_to_silence_transition_%d", i), func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + sl := testTransition(t, signalFrames, lowSilence, 10, 60, true) + require.GreaterOrEqual(t, sl.stateChanges, uint64(1), "expected at least one transition to silence") + require.Equal(t, false, sl.lastIsSignal) + }) + }) + } + }) +}