Skip to content

Commit

Permalink
use minimal delay when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
olofkallander committed Oct 20, 2023
1 parent f020b52 commit 97607ad
Show file tree
Hide file tree
Showing 6 changed files with 586 additions and 287 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ set(TEST_LIB_FILES
test/integration/emulator/FakeTcpServerEndpoint.h
test/integration/emulator/FakeTcpServerEndpoint.cpp
test/integration/emulator/Httpd.cpp
test/integration/emulator/JitterPacketSource.h
test/integration/emulator/JitterPacketSource.cpp
test/CsvWriter.h
test/CsvWriter.cpp
test/ResourceLoader.cpp
Expand Down
94 changes: 56 additions & 38 deletions codec/AudioReceivePipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
#endif
namespace codec
{
const size_t CHANNELS = 2;
const uint32_t REDUCTION_THRESHOLD = 10;

AudioReceivePipeline::ReceiveBox::ReceiveBox(size_t bufferSize)
: underrunCount(0),
Expand All @@ -41,8 +39,8 @@ AudioReceivePipeline::AudioReceivePipeline(uint32_t rtpFrequency,
_estimator(rtpFrequency),
_audioLevelExtensionId(audioLevelExtensionId),
_targetDelay(0),
_pcmData(7 * CHANNELS * _samplesPerPacket, CHANNELS * _samplesPerPacket),
_receiveBox(memory::page::alignedSpace(sizeof(int16_t) * CHANNELS * _samplesPerPacket))
_pcmData(7 * _config.channels * _samplesPerPacket, _config.channels * _samplesPerPacket),
_receiveBox(memory::page::alignedSpace(sizeof(int16_t) * _config.channels * _samplesPerPacket))
{
}

Expand Down Expand Up @@ -98,7 +96,7 @@ size_t AudioReceivePipeline::decodePacket(uint32_t extendedSequenceNumber,
const auto decodedFrames = _decoder.conceal(reinterpret_cast<uint8_t*>(audioData));
if (decodedFrames > 0)
{
audioData += CHANNELS * decodedFrames;
audioData += _config.channels * decodedFrames;
}
}

Expand All @@ -107,7 +105,7 @@ size_t AudioReceivePipeline::decodePacket(uint32_t extendedSequenceNumber,
_decoder.conceal(header->getPayload(), opusPayloadLength, reinterpret_cast<uint8_t*>(audioData));
if (decodedFrames > 0)
{
audioData += CHANNELS * decodedFrames;
audioData += _config.channels * decodedFrames;
}
}

Expand All @@ -119,10 +117,10 @@ size_t AudioReceivePipeline::decodePacket(uint32_t extendedSequenceNumber,

if (decodedFrames > 0)
{
audioData += CHANNELS * decodedFrames;
audioData += _config.channels * decodedFrames;
}

const size_t samplesProduced = (audioData - originalAudioStart) / CHANNELS;
const size_t samplesProduced = (audioData - originalAudioStart) / _config.channels;
if (samplesProduced < _samplesPerPacket / 2)
{
logger::warn("%u failed to decode opus %zu", "AudioReceivePipeline", _ssrc, samplesProduced);
Expand All @@ -136,18 +134,18 @@ size_t AudioReceivePipeline::decodePacket(uint32_t extendedSequenceNumber,
// increase the threshold so more samples may be eligible for compression.
void AudioReceivePipeline::adjustReductionPower(uint32_t recentReduction)
{
if (recentReduction > 30)
if (recentReduction > _config.reduction.expectedCompression)
{
_elimination.uncompressableCount = 0;
_elimination.incompressableCount = 0;
// keep threshold
return;
}
else
{
++_elimination.uncompressableCount;
if (_elimination.uncompressableCount > 15)
++_elimination.incompressableCount;
if (_elimination.incompressableCount > _config.reduction.failedCompressLimit)
{
_elimination.deltaThreshold *= 1.25;
_elimination.deltaThreshold *= _config.reduction.incrementFactor;
}
}
}
Expand All @@ -160,26 +158,35 @@ size_t AudioReceivePipeline::reduce(const memory::Packet& packet,
int audioLevel = 0;
if (!rtp::getAudioLevel(packet, _audioLevelExtensionId, audioLevel))
{
audioLevel = codec::computeAudioLevel(audioData, samples * CHANNELS);
audioLevel = codec::computeAudioLevel(audioData, samples * _config.channels);
}
_noiseFloor.update(audioLevel);

const auto maxReduction =
static_cast<int32_t>(totalJitterSize - math::roundUpMultiple(_targetDelay, _samplesPerPacket));
auto maxReduction = static_cast<int32_t>(totalJitterSize - math::roundUpMultiple(_targetDelay, _samplesPerPacket));
if (maxReduction == 0 && _jitterNearEmptyCount > _config.safeZoneCountBeforeReducingJB &&
_targetDelay <= _samplesPerPacket * 3 / 4)
{
// start grinding down the last buffer packet and go for
maxReduction = _samplesPerPacket / 4;
}

if (maxReduction > static_cast<int32_t>(_rtpFrequency / 200))
if (maxReduction > 0)
{
logger::debug("%u bufsize %u, red %u", "AudioReceivePipeline", _ssrc, totalJitterSize, maxReduction);
if (audioLevel > _noiseFloor.getLevel() - 6 && maxReduction >= static_cast<int32_t>(_samplesPerPacket))
JBLOG("%u bufsize %u, red %u", "AudioReceivePipeline", _ssrc, totalJitterSize, maxReduction);
if (audioLevel > _noiseFloor.getLevel() - _config.reduction.silenceMargin &&
maxReduction >= static_cast<int32_t>(_samplesPerPacket))
{
++_metrics.eliminatedPackets;
_metrics.eliminatedSamples += samples;
return 0;
}
else
{
const auto newSampleCount =
codec::compactStereoTroughs(audioData, samples, maxReduction, 10, _elimination.deltaThreshold);
const auto newSampleCount = codec::compactStereoTroughs(audioData,
samples,
maxReduction,
_config.reduction.silenceZone,
_elimination.deltaThreshold);
adjustReductionPower(samples - newSampleCount);

if (newSampleCount < samples)
Expand All @@ -202,14 +209,14 @@ size_t AudioReceivePipeline::reduce(const memory::Packet& packet,
_metrics.shrunkPackets,
_metrics.eliminatedSamples,
_metrics.eliminatedPackets,
_pcmData.size() / CHANNELS,
_pcmData.size() / _config.channels,
jitterBufferSize(_head.nextRtpTimestamp),
_targetDelay,
_estimator.getJitterMaxStable());
_metrics.shrunkPackets = 0;
_metrics.eliminatedSamples = 0;
_metrics.eliminatedPackets = 0;
_elimination.deltaThreshold = REDUCTION_THRESHOLD;
_elimination.deltaThreshold = _config.reduction.sampleThreshold;
}
return samples;
}
Expand Down Expand Up @@ -270,6 +277,7 @@ bool AudioReceivePipeline::onRtpPacket(uint32_t extendedSequenceNumber,
const auto header = rtp::RtpHeader::fromPacket(*packet);
if (!header)
{
assert(false);
return false; // corrupt
}

Expand All @@ -285,13 +293,23 @@ bool AudioReceivePipeline::onRtpPacket(uint32_t extendedSequenceNumber,
if (posted)
{
updateTargetDelay(delay);
if (_targetDelay < _samplesPerPacket && !_pcmData.empty())
{
++_jitterNearEmptyCount;
}
else
{
_jitterNearEmptyCount = 0;
}
}

process(receiveTime);

return posted;
}

// RTP was received but discarded in transport
// We must track extended sequence number
bool AudioReceivePipeline::onSilencedRtpPacket(uint32_t extendedSequenceNumber,
memory::UniquePacket packet,
uint64_t receiveTime)
Expand Down Expand Up @@ -325,11 +343,11 @@ size_t AudioReceivePipeline::fetchStereo(size_t sampleCount)
{
codec::clearStereo(_receiveBox.audio, _samplesPerPacket);
_receiveBox.audioSampleCount = 0;
const uint32_t bufferLevel = _pcmData.size() / CHANNELS;
const uint32_t bufferLevel = _pcmData.size() / _config.channels;
if (bufferLevel < sampleCount)
{
++_receiveBox.underrunCount;
if (_receiveBox.underrunCount % 100 == 0)
if (_receiveBox.underrunCount % 100 == 1)
{
logger::info("%u underrun %u, samples %u",
"AudioReceivePipeline",
Expand All @@ -340,21 +358,21 @@ size_t AudioReceivePipeline::fetchStereo(size_t sampleCount)

if (bufferLevel > 0)
{
_pcmData.fetch(_receiveBox.audio, sampleCount * CHANNELS);
_pcmData.fetch(_receiveBox.audio, sampleCount * _config.channels);
codec::AudioLinearFade fader(bufferLevel);
fader.fadeOutStereo(_receiveBox.audio, bufferLevel);
logger::debug("%u fade out %u, requested %zu, pcm %zu",
"AudioReceivePipeline",
_ssrc,
bufferLevel,
sampleCount,
_pcmData.size() / CHANNELS);
_pcmData.size() / _config.channels);
_receiveBox.audioSampleCount = sampleCount;
return sampleCount;
}
else if (_receiveBox.underrunCount == 1)
{
_pcmData.replay(_receiveBox.audio, sampleCount * CHANNELS);
_pcmData.replay(_receiveBox.audio, sampleCount * _config.channels);
codec::swingTail(_receiveBox.audio, 48000, sampleCount);
logger::debug("%u appended tail", "AudioReceivePipeline", _ssrc);
_receiveBox.audioSampleCount = sampleCount;
Expand All @@ -364,7 +382,7 @@ size_t AudioReceivePipeline::fetchStereo(size_t sampleCount)
}

// JBLOG("fetched %zu", "AudioReceivePipeline", sampleCount);
_pcmData.fetch(_receiveBox.audio, sampleCount * CHANNELS);
_pcmData.fetch(_receiveBox.audio, sampleCount * _config.channels);
_receiveBox.audioSampleCount = sampleCount;

if (_receiveBox.underrunCount > 0)
Expand All @@ -383,12 +401,12 @@ uint32_t AudioReceivePipeline::jitterBufferSize(uint32_t rtpTimestamp) const
return (_jitterBuffer.empty() ? 0 : _jitterBuffer.getRtpDelay(rtpTimestamp) + _metrics.receivedRtpCyclesPerPacket);
}

void AudioReceivePipeline::process(uint64_t timestamp)
void AudioReceivePipeline::process(const uint64_t timestamp)
{
size_t bufferLevel = _pcmData.size() / CHANNELS;
size_t bufferLevel = _pcmData.size() / _config.channels;

for (; _jitterEmergency.counter == 0 && !_jitterBuffer.empty() && bufferLevel < _samplesPerPacket;
bufferLevel = _pcmData.size() / CHANNELS)
bufferLevel = _pcmData.size() / _config.channels)
{
const auto header = _jitterBuffer.getFrontRtp();
const int16_t sequenceAdvance =
Expand Down Expand Up @@ -435,7 +453,7 @@ void AudioReceivePipeline::process(uint64_t timestamp)
const uint32_t extendedSequenceNumber = _head.extendedSequenceNumber + sequenceAdvance;
auto packet = _jitterBuffer.pop();

int16_t audioData[_samplesPerPacket * 4 * CHANNELS];
int16_t audioData[_samplesPerPacket * 4 * _config.channels];
size_t decodedSamples = 0;
const size_t payloadLength = packet->getLength() - header->headerLength();
if (payloadLength == 0)
Expand All @@ -456,7 +474,7 @@ void AudioReceivePipeline::process(uint64_t timestamp)
_head.extendedSequenceNumber = extendedSequenceNumber;
_head.nextRtpTimestamp = header->timestamp + _metrics.receivedRtpCyclesPerPacket;
const auto remainingSamples = reduce(*packet, audioData, decodedSamples, totalJitterBufferSize);
if (!_pcmData.append(audioData, remainingSamples * CHANNELS))
if (!_pcmData.append(audioData, remainingSamples * _config.channels))
{
const auto header = rtp::RtpHeader::fromPacket(*packet);
logger::warn("%u failed to append seq %u ts %u",
Expand All @@ -466,10 +484,10 @@ void AudioReceivePipeline::process(uint64_t timestamp)
header->timestamp.get());
}

JBLOG("ssrc %u added pcm %u, JB %u (%u), TD %u, eliminated %zu, tstmp %u",
JBLOG("ssrc %u added pcm %zu, JB %u (%u), TD %u, eliminated %zu, tstmp %u",
"AudioReceivePipeline",
_ssrc,
_pcmData.size() / CHANNELS,
_pcmData.size() / _config.channels,
jitterBufferSize(_head.nextRtpTimestamp),
_jitterBuffer.count(),
_targetDelay,
Expand All @@ -478,10 +496,10 @@ void AudioReceivePipeline::process(uint64_t timestamp)

if ((header->sequenceNumber.get() % 100) == 0 && !_jitterBuffer.empty())
{
JBLOG("%u pcm %u JB %u",
JBLOG("%u pcm %zu JB %u",
"AudioReceivePipeline",
_ssrc,
_pcmData.size() / CHANNELS,
_pcmData.size() / _config.channels,
jitterBufferSize(_head.nextRtpTimestamp));
}
}
Expand Down
45 changes: 42 additions & 3 deletions codec/AudioReceivePipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,45 @@ namespace codec
/**
* Audio receive pipe line that performs adaptive jitter buffering and cut off concealment.
* PCM data buffer is single produce single consumer thread safe. The rest shall run on a single thread context.
*
* Mechanisms:
* - If buffers run empty, the tail of the audio is faded out to avoid pops and clicks.
* - When sound continues after an underrun, the audio is faded in to avoid pops.
* - Audio elements with significant sound is not dropped. Silence is removed and audio is
* time compressed with some distortion where it is heard the least. This avoids CPU intensive
* resampling and filtering.
* - Packet loss is concealed by Opus decoder.
* - If jitter is low, the pipe line will operate in the time window between packet arrival and audio fetch.
* Which can be less than one audio frame.
*
* Packet arrival may be offset from packet pull pace. If jitter is less than the time left to pull,
* the packets will arrive before pull and can be put into pcm buffer immediately. Otherwise, the pipe line will
* have one packet in pcm and 1 or 0 packets in JB.
*
* | | | |
* v v v v
* | | | |
* v v v v
*/
class AudioReceivePipeline
{
struct Config
{
uint32_t channels = 2;
uint32_t safeZoneCountBeforeReducingJB = 150;

struct
{
uint32_t sampleThreshold = 10;
uint32_t expectedCompression = 30;
uint32_t failedCompressLimit = 15;
double incrementFactor = 1.25;
double silenceMargin = 6.0; // dB
double silenceZone = 10.0;
} reduction;
};
const Config _config;

public:
AudioReceivePipeline(uint32_t rtpFrequency, uint32_t ptime, uint32_t maxPackets, int audioLevelExtensionId = 255);

Expand Down Expand Up @@ -59,7 +95,7 @@ class AudioReceivePipeline
uint32_t _targetDelay;
struct SampleElimination
{
uint32_t uncompressableCount = 0;
uint32_t incompressableCount = 0;
int16_t deltaThreshold = 10;
} _elimination;

Expand All @@ -72,16 +108,19 @@ class AudioReceivePipeline

struct JitterEmergency
{
int counter = 0; // late packet arrives and buffer is empty
uint32_t counter = 0; // late packet arrives and buffer is empty
uint32_t sequenceStart = 0;
} _jitterEmergency;

// Jitter buffer is close to 1 packet and jitter is less than 20ms
uint32_t _jitterNearEmptyCount = 0;

struct Metrics
{
uint32_t shrunkPackets = 0;
uint32_t eliminatedPackets = 0;
uint32_t eliminatedSamples = 0;
uint32_t receivedRtpCyclesPerPacket = 960;
uint32_t receivedRtpCyclesPerPacket = 960; // 480, 960, 1440
} _metrics;

SpscAudioBuffer<int16_t> _pcmData;
Expand Down
Loading

0 comments on commit 97607ad

Please sign in to comment.