Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/sip/features.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package sip

const signalLoggingFeatureFlag = "sip.signal_logging"
8 changes: 8 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,9 @@ func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip
disp.RingingTimeout = defaultRingingTimeout
}
disp.Room.JitterBuf = c.jitterBuf
if disp.FeatureFlags != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is okay to omit this change, map lookup will still work on nil value.

disp.Room.LogSignalChanges = disp.FeatureFlags[signalLoggingFeatureFlag] == "true"
}
ctx, cancel := context.WithTimeout(ctx, disp.MaxCallDuration)
defer cancel()
status := CallRinging
Expand Down Expand Up @@ -950,12 +953,17 @@ func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit
return nil, err
}

logSignalChanges := false
if featureFlags != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, no need for a check

logSignalChanges = featureFlags[signalLoggingFeatureFlag] == "true"
}
mp, err := NewMediaPort(tid, c.log(), c.mon, &MediaOptions{
IP: c.s.sconf.MediaIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
MediaTimeout: c.s.conf.MediaTimeout,
EnableJitterBuffer: c.jitterBuf,
LogSignalChanges: logSignalChanges,
Stats: &c.stats.Port,
NoInputResample: !RoomResample,
}, RoomSampleRate)
Expand Down
17 changes: 17 additions & 0 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ type MediaOptions struct {
Stats *PortStats
EnableJitterBuffer bool
NoInputResample bool
LogSignalChanges bool
}

func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
Expand Down Expand Up @@ -280,6 +281,7 @@ func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor,
mediaTimeout: mediaTimeout,
timeoutResetTick: make(chan time.Duration, 1),
jitterEnabled: opts.EnableJitterBuffer,
logSignalChanges: opts.LogSignalChanges,
port: newUDPConn(log, conn),
audioOut: msdk.NewSwitchWriter(sampleRate),
audioIn: msdk.NewSwitchWriter(inSampleRate),
Expand Down Expand Up @@ -313,6 +315,7 @@ type MediaPort struct {
stats *PortStats
dtmfAudioEnabled bool
jitterEnabled bool
logSignalChanges bool

mu sync.Mutex
conf *MediaConf
Expand Down Expand Up @@ -722,6 +725,12 @@ func (p *MediaPort) setupOutput(tid traceid.ID) error {
if p.stats != nil {
audioOut = newMediaWriterCount(audioOut, &p.stats.AudioOutFrames, &p.stats.AudioOutSamples)
}
if p.logSignalChanges {
audioOut, err = NewSignalLogger(p.log, "mixed", audioOut)
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to close original audioOut here.

}
}

if p.conf.Audio.DTMFType != 0 {
p.dtmfOutRTP = s.NewStream(p.conf.Audio.DTMFType, dtmf.SampleRate)
Expand Down Expand Up @@ -755,6 +764,14 @@ func (p *MediaPort) setupInput() {
if p.stats != nil {
audioWriter = newMediaWriterCount(audioWriter, &p.stats.AudioInFrames, &p.stats.AudioInSamples)
}
if p.logSignalChanges {
signalLogger, err := NewSignalLogger(p.log, "input", audioWriter)
if err != nil {
p.log.Errorw("failed to create signal logger", err)
} else {
audioWriter = signalLogger
}
}
audioHandler := p.conf.Audio.Codec.DecodeRTP(audioWriter, p.conf.Audio.Type)
// Wrap the decoder with silence suppression handler to fill gaps during silence suppression
audioHandler = newSilenceFiller(audioHandler, audioWriter, codecInfo.RTPClockRate, codecInfo.SampleRate, p.log)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type outboundCall struct {
}

func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig, state *CallState, projectID string) (*outboundCall, error) {
signalLoggingEnabled := sipConf.featureFlags[signalLoggingFeatureFlag] == "true"
if sipConf.maxCallDuration <= 0 || sipConf.maxCallDuration > maxCallDuration {
sipConf.maxCallDuration = maxCallDuration
}
Expand All @@ -98,6 +99,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi
}
jitterBuf := SelectValueBool(conf.EnableJitterBuffer, conf.EnableJitterBufferProb)
room.JitterBuf = jitterBuf
room.LogSignalChanges = signalLoggingEnabled

tr := TransportFrom(sipConf.transport)
contact := c.ContactURI(tr)
Expand Down Expand Up @@ -142,6 +144,7 @@ func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Confi
MediaTimeoutInitial: c.conf.MediaTimeoutInitial,
MediaTimeout: c.conf.MediaTimeout,
EnableJitterBuffer: call.jitterBuf,
LogSignalChanges: signalLoggingEnabled,
Stats: &call.stats.Port,
NoInputResample: !RoomResample,
}, RoomSampleRate)
Expand Down
22 changes: 14 additions & 8 deletions pkg/sip/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,14 @@ type ParticipantConfig struct {
}

type RoomConfig struct {
WsUrl string
Token string
RoomName string
Participant ParticipantConfig
RoomPreset string
RoomConfig *livekit.RoomConfiguration
JitterBuf bool
WsUrl string
Token string
RoomName string
Participant ParticipantConfig
RoomPreset string
RoomConfig *livekit.RoomConfiguration
JitterBuf bool
LogSignalChanges bool
}

func NewRoom(log logger.Logger, st *RoomStats) *Room {
Expand Down Expand Up @@ -321,7 +322,12 @@ func (r *Room) Connect(conf *config.Config, rconf RoomConfig) error {
defer log.Infow("track closed")
defer mTrack.Close()

codec, err := opus.Decode(mTrack, channels, log)
var out msdk.PCM16Writer = mTrack
if rconf.LogSignalChanges {
out, _ = NewSignalLogger(log, track.ID(), out)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not even logging an error? This could silenty write nil to out.

}

codec, err := opus.Decode(out, channels, log)
if err != nil {
log.Errorw("cannot create opus decoder", err)
return
Expand Down
161 changes: 161 additions & 0 deletions pkg/sip/signal_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2024 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sip

import (
"fmt"
"time"

msdk "github.com/livekit/media-sdk"
"github.com/livekit/protocol/logger"
)

const (
DefaultSignalMultiplier = float64(2) // Singnal needs to be at least 2 times the noise floor to be detected.
DefaultNoiseFloor = float64(25) // Use a low noise floor; The purpose is to detect any kind of signal, not just voice.
DefaultHangoverDuration = 1 * time.Second // Duration of silence that needs to elapse before we no longer consider it a signal.
)

// Keeps an internal state of whether we're currently transmitting signal (voice or noise), or silence.
// This implements msdk.PCM16Writer to inspect decoded packet content.
// Used to log changes betweem those states.
type SignalLogger struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider moving it to media-sdk, since it's not SIP-specific.

// Configuration
log logger.Logger
next msdk.PCM16Writer
name string
hangoverDuration time.Duration // Time to stay in "signal" state after last signal to avoid flip-flopping.
signalMultiplier float64 // Threshold multiplier for signal to be detected.
noiseFloor float64 // Moveing average of noise floor.

// State
lastSignalTime time.Time
lastIsSignal bool

// Stats
framesProcessed uint64
stateChanges uint64
}

type SignalLoggerOption func(*SignalLogger) error

func WithSignalMultiplier(signalMultiplier float64) SignalLoggerOption {
return func(s *SignalLogger) error {
if signalMultiplier <= 1 {
return fmt.Errorf("signal multiplier must be greater than 1")
}
s.signalMultiplier = signalMultiplier
return nil
}
}

func WithNoiseFloor(noiseFloor float64) SignalLoggerOption {
return func(s *SignalLogger) error {
if noiseFloor <= 0 {
return fmt.Errorf("noise floor min must be positive")
}
s.noiseFloor = noiseFloor
return nil
}
}

func WithHangoverDuration(hangoverDuration time.Duration) SignalLoggerOption {
return func(s *SignalLogger) error {
if hangoverDuration <= 0 {
return fmt.Errorf("hangover duration must be positive")
}
s.hangoverDuration = hangoverDuration
return nil
}
}
func NewSignalLogger(log logger.Logger, name string, next msdk.PCM16Writer, options ...SignalLoggerOption) (msdk.PCM16Writer, error) {
s := &SignalLogger{
log: log,
next: next,
name: name,
hangoverDuration: DefaultHangoverDuration,
signalMultiplier: DefaultSignalMultiplier,
noiseFloor: DefaultNoiseFloor,
lastSignalTime: time.Time{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to initialize explicitly.

lastIsSignal: false,
framesProcessed: 0,
stateChanges: 0,
}
for _, option := range options {
if err := option(s); err != nil {
return next, err
}
}
return s, nil
}

func (s *SignalLogger) String() string {
return fmt.Sprintf("SignalLogger(%s) -> %s", s.name, s.next.String())
}

func (s *SignalLogger) SampleRate() int {
return s.next.SampleRate()
}

func (s *SignalLogger) Close() error {
if s.stateChanges > 0 {
s.log.Infow("signal logger closing", "name", s.name, "stateChanges", s.stateChanges)
}
return s.next.Close()
}

// Calculates the mean absolute deviation of the frame.
func (s *SignalLogger) MeanAbsoluteDeviation(sample msdk.PCM16Sample) float64 {
if len(sample) == 0 {
return 0
}
var totalAbs int64
for _, v := range sample {
if v < 0 {
totalAbs += int64(-v)
} else {
totalAbs += int64(v)
}
}
return float64(totalAbs) / float64(len(sample))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also consider calculating a median, it will be less affected by sudden spikes.

}

func (s *SignalLogger) WriteSample(sample msdk.PCM16Sample) error {
currentEnergy := s.MeanAbsoluteDeviation(sample)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably doesn't matter here, but energy is usually computed as a square of amplitude. Maybe rename to ampMean?


now := time.Now()
isSignal := currentEnergy > (s.noiseFloor * s.signalMultiplier)
if isSignal {
s.lastSignalTime = now
}

s.framesProcessed++
if s.framesProcessed > 10 { // Don't log any changes at first
if isSignal && isSignal != s.lastIsSignal { // silence -> signal: Immediate transition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could potentially hit each frame, wouldn't it be too spammy?

s.lastIsSignal = true
s.stateChanges++
s.log.Infow("signal changed", "name", s.name, "signal", isSignal, "stateChanges", s.stateChanges)
} else if !isSignal && isSignal != s.lastIsSignal { // signal -> silence: Only after hangover
if now.Sub(s.lastSignalTime) >= s.hangoverDuration {
s.lastIsSignal = false
s.stateChanges++
s.log.Infow("signal changed", "name", s.name, "signal", isSignal, "stateChanges", s.stateChanges)
}
}
} else {
s.lastIsSignal = isSignal
}
return s.next.WriteSample(sample)
}
Loading