-
Notifications
You must be signed in to change notification settings - Fork 148
Signal logger #558
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Signal logger #558
Changes from all commits
51e579d
4583f8f
6af8ce9
2875d70
b7ae9c8
0d7080a
a862f1e
5c0bcdd
e030c5f
d1618ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| package sip | ||
|
|
||
| const signalLoggingFeatureFlag = "sip.signal_logging" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -864,6 +864,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip | |
| disp.RingingTimeout = defaultRingingTimeout | ||
| } | ||
| disp.Room.JitterBuf = c.jitterBuf | ||
| if disp.FeatureFlags != nil { | ||
| disp.Room.LogSignalChanges = disp.FeatureFlags[signalLoggingFeatureFlag] == "true" | ||
| } | ||
| ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration) | ||
| defer cancel() | ||
| status := CallRinging | ||
|
|
@@ -950,12 +953,17 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit | |
| return nil, err | ||
| } | ||
|
|
||
| logSignalChanges := false | ||
| if featureFlags != nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, no need for a check |
||
| logSignalChanges = featureFlags[signalLoggingFeatureFlag] == "true" | ||
| } | ||
| mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{ | ||
| IP: c.s.sconf.MediaIP, | ||
| Ports: conf.RTPPort, | ||
| MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial, | ||
| MediaTimeout: c.s.conf.MediaTimeout, | ||
| EnableJitterBuffer: c.jitterBuf, | ||
| LogSignalChanges: logSignalChanges, | ||
| Stats: &c.stats.Port, | ||
| NoInputResample: !RoomResample, | ||
| }, RoomSampleRate) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -240,6 +240,7 @@ type MediaOptions struct { | |
| Stats *PortStats | ||
| EnableJitterBuffer bool | ||
| NoInputResample bool | ||
| LogSignalChanges bool | ||
| } | ||
|
|
||
| func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) { | ||
|
|
@@ -280,6 +281,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, | |
| mediaTimeout: mediaTimeout, | ||
| timeoutResetTick: make(chan time.Duration, 1), | ||
| jitterEnabled: opts.EnableJitterBuffer, | ||
| logSignalChanges: opts.LogSignalChanges, | ||
| port: newUDPConn(log, conn), | ||
| audioOut: msdk.NewSwitchWriter(sampleRate), | ||
| audioIn: msdk.NewSwitchWriter(inSampleRate), | ||
|
|
@@ -313,6 +315,7 @@ type MediaPort struct { | |
| stats *PortStats | ||
| dtmfAudioEnabled bool | ||
| jitterEnabled bool | ||
| logSignalChanges bool | ||
|
|
||
| mu sync.Mutex | ||
| conf *MediaConf | ||
|
|
@@ -722,6 +725,12 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error { | |
| if p.stats != nil { | ||
| audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples) | ||
| } | ||
| if p.logSignalChanges { | ||
| audioOut, err = NewSignalLogger(p.log, "mixed", audioOut) | ||
| if err != nil { | ||
| return err | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to close original |
||
| } | ||
| } | ||
|
|
||
| if p.conf.Audio.DTMFType != 0 { | ||
| p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate) | ||
|
|
@@ -755,6 +764,14 @@ func (p *MediaPort) setupInput() { | |
| if p.stats != nil { | ||
| audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples) | ||
| } | ||
| if p.logSignalChanges { | ||
| signalLogger, err := NewSignalLogger(p.log, "input", audioWriter) | ||
| if err != nil { | ||
| p.log.Errorw("failed to create signal logger", err) | ||
| } else { | ||
| audioWriter = signalLogger | ||
| } | ||
| } | ||
| audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type) | ||
| // Wrap the decoder with silence suppression handler to fill gaps during silence suppression | ||
| audioHandler = newSilenceFiller(audioHandler, audioWriter, codecInfo.RTPClockRate, codecInfo.SampleRate, p.log) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -179,13 +179,14 @@ type ParticipantConfig struct { | |
| } | ||
|
|
||
| type RoomConfig struct { | ||
| WsUrl string | ||
| Token string | ||
| RoomName string | ||
| Participant ParticipantConfig | ||
| RoomPreset string | ||
| RoomConfig *livekit.RoomConfiguration | ||
| JitterBuf bool | ||
| WsUrl string | ||
| Token string | ||
| RoomName string | ||
| Participant ParticipantConfig | ||
| RoomPreset string | ||
| RoomConfig *livekit.RoomConfiguration | ||
| JitterBuf bool | ||
| LogSignalChanges bool | ||
| } | ||
|
|
||
| func NewRoom(log logger.Logger, st *RoomStats) *Room { | ||
|
|
@@ -321,7 +322,12 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error { | |
| defer log.Infow("track closed") | ||
| defer mTrack.Close() | ||
|
|
||
| codec, err := opus.Decode(mTrack, channels, log) | ||
| var out msdk.PCM16Writer = mTrack | ||
| if rconf.LogSignalChanges { | ||
| out, _ = NewSignalLogger(log, track.ID(), out) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not even logging an error? This could silenty write nil to |
||
| } | ||
|
|
||
| codec, err := opus.Decode(out, channels, log) | ||
| if err != nil { | ||
| log.Errorw("cannot create opus decoder", err) | ||
| return | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,161 @@ | ||
| // Copyright 2024 LiveKit, Inc. | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package sip | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "time" | ||
|
|
||
| msdk "github.com/livekit/media-sdk" | ||
| "github.com/livekit/protocol/logger" | ||
| ) | ||
|
|
||
| const ( | ||
| DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected. | ||
| DefaultNoiseFloor = float64(25) // Use a low noise floor; The purpose is to detect any kind of signal, not just voice. | ||
| DefaultHangoverDuration = 1 * time.Second // Duration of silence that needs to elapse before we no longer consider it a signal. | ||
| ) | ||
|
|
||
| // Keeps an internal state of whether we're currently transmitting signal (voice or noise), or silence. | ||
| // This implements msdk.PCM16Writer to inspect decoded packet content. | ||
| // Used to log changes betweem those states. | ||
| type SignalLogger struct { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should consider moving it to |
||
| // Configuration | ||
| log logger.Logger | ||
| next msdk.PCM16Writer | ||
| name string | ||
| hangoverDuration time.Duration // Time to stay in "signal" state after last signal to avoid flip-flopping. | ||
| signalMultiplier float64 // Threshold multiplier for signal to be detected. | ||
| noiseFloor float64 // Moveing average of noise floor. | ||
|
|
||
| // State | ||
| lastSignalTime time.Time | ||
| lastIsSignal bool | ||
|
|
||
| // Stats | ||
| framesProcessed uint64 | ||
| stateChanges uint64 | ||
| } | ||
|
|
||
| type SignalLoggerOption func(*SignalLogger) error | ||
|
|
||
| func WithSignalMultiplier(signalMultiplier float64) SignalLoggerOption { | ||
| return func(s *SignalLogger) error { | ||
| if signalMultiplier <= 1 { | ||
| return fmt.Errorf("signal multiplier must be greater than 1") | ||
| } | ||
| s.signalMultiplier = signalMultiplier | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func WithNoiseFloor(noiseFloor float64) SignalLoggerOption { | ||
| return func(s *SignalLogger) error { | ||
| if noiseFloor <= 0 { | ||
| return fmt.Errorf("noise floor min must be positive") | ||
| } | ||
| s.noiseFloor = noiseFloor | ||
| return nil | ||
| } | ||
| } | ||
|
|
||
| func WithHangoverDuration(hangoverDuration time.Duration) SignalLoggerOption { | ||
| return func(s *SignalLogger) error { | ||
| if hangoverDuration <= 0 { | ||
| return fmt.Errorf("hangover duration must be positive") | ||
| } | ||
| s.hangoverDuration = hangoverDuration | ||
| return nil | ||
| } | ||
| } | ||
| func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (msdk.PCM16Writer, error) { | ||
| s := &SignalLogger{ | ||
| log: log, | ||
| next: next, | ||
| name: name, | ||
| hangoverDuration: DefaultHangoverDuration, | ||
| signalMultiplier: DefaultSignalMultiplier, | ||
| noiseFloor: DefaultNoiseFloor, | ||
| lastSignalTime: time.Time{}, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to initialize explicitly. |
||
| lastIsSignal: false, | ||
| framesProcessed: 0, | ||
| stateChanges: 0, | ||
| } | ||
| for _, option := range options { | ||
| if err := option(s); err != nil { | ||
| return next, err | ||
| } | ||
| } | ||
| return s, nil | ||
| } | ||
|
|
||
| func (s *SignalLogger) String() string { | ||
| return fmt.Sprintf("SignalLogger(%s) -> %s", s.name, s.next.String()) | ||
| } | ||
|
|
||
| func (s *SignalLogger) SampleRate() int { | ||
| return s.next.SampleRate() | ||
| } | ||
|
|
||
| func (s *SignalLogger) Close() error { | ||
| if s.stateChanges > 0 { | ||
| s.log.Infow("signal logger closing", "name", s.name, "stateChanges", s.stateChanges) | ||
| } | ||
| return s.next.Close() | ||
| } | ||
|
|
||
| // Calculates the mean absolute deviation of the frame. | ||
| func (s *SignalLogger) MeanAbsoluteDeviation(sample msdk.PCM16Sample) float64 { | ||
| if len(sample) == 0 { | ||
| return 0 | ||
| } | ||
| var totalAbs int64 | ||
| for _, v := range sample { | ||
| if v < 0 { | ||
| totalAbs += int64(-v) | ||
| } else { | ||
| totalAbs += int64(v) | ||
| } | ||
| } | ||
| return float64(totalAbs) / float64(len(sample)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could also consider calculating a median, it will be less affected by sudden spikes. |
||
| } | ||
|
|
||
| func (s *SignalLogger) WriteSample(sample msdk.PCM16Sample) error { | ||
| currentEnergy := s.MeanAbsoluteDeviation(sample) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably doesn't matter here, but energy is usually computed as a square of amplitude. Maybe rename to |
||
|
|
||
| now := time.Now() | ||
| isSignal := currentEnergy > (s.noiseFloor * s.signalMultiplier) | ||
| if isSignal { | ||
| s.lastSignalTime = now | ||
| } | ||
|
|
||
| s.framesProcessed++ | ||
| if s.framesProcessed > 10 { // Don't log any changes at first | ||
| if isSignal && isSignal != s.lastIsSignal { // silence -> signal: Immediate transition | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could potentially hit each frame, wouldn't it be too spammy? |
||
| s.lastIsSignal = true | ||
| s.stateChanges++ | ||
| s.log.Infow("signal changed", "name", s.name, "signal", isSignal, "stateChanges", s.stateChanges) | ||
| } else if !isSignal && isSignal != s.lastIsSignal { // signal -> silence: Only after hangover | ||
| if now.Sub(s.lastSignalTime) >= s.hangoverDuration { | ||
| s.lastIsSignal = false | ||
| s.stateChanges++ | ||
| s.log.Infow("signal changed", "name", s.name, "signal", isSignal, "stateChanges", s.stateChanges) | ||
| } | ||
| } | ||
| } else { | ||
| s.lastIsSignal = isSignal | ||
| } | ||
| return s.next.WriteSample(sample) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is okay to omit this change, map lookup will still work on nil value.