diff --git a/CMakeLists.txt b/CMakeLists.txt index 3067f73a5..71b7c830d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -241,6 +241,12 @@ set(FILES codec/OpusEncoder.cpp codec/OpusEncoder.h codec/Vp8Header.h + codec/AudioTools.h + codec/AudioTools.cpp + codec/NoiseFloor.cpp + codec/AudioReceivePipeline.h + codec/AudioReceivePipeline.cpp + codec/SpscAudioBuffer.h concurrency/EventSemaphore.cpp concurrency/EventSemaphore.h concurrency/LockFreeList.cpp @@ -338,6 +344,12 @@ set(FILES rtp/SendTimeDial.h rtp/JitterTracker.h rtp/JitterTracker.cpp + rtp/JitterEstimator.h + rtp/JitterEstimator.cpp + rtp/RtpDelayTracker.h + rtp/RtpDelayTracker.cpp + rtp/JitterBufferList.cpp + rtp/JitterBufferList.h test/macros.h transport/UdpEndpoint.h transport/BaseUdpEndpoint.cpp @@ -528,6 +540,8 @@ set(TEST_LIB_FILES test/integration/emulator/FakeTcpServerEndpoint.h test/integration/emulator/FakeTcpServerEndpoint.cpp test/integration/emulator/Httpd.cpp + test/integration/emulator/JitterPacketSource.h + test/integration/emulator/JitterPacketSource.cpp test/CsvWriter.h test/CsvWriter.cpp test/ResourceLoader.cpp @@ -579,6 +593,7 @@ set(TEST_FILES test/transport/SrtpTest.cpp test/transport/Ipv6Test.cpp test/transport/JitterTest.cpp + test/transport/AdaptiveJitterTest.cpp test/integration/TimeTurnerTest.cpp test/config/ConfigTest.cpp test/codec/AudioProcessingTest.cpp @@ -625,7 +640,8 @@ set(TEST_FILES test/bridge/VideoNackReceiveJobTest.cpp test/utils/LogSpamTest.cpp test/utils/FunctionTest.cpp - test/bridge/RtpAudioRewriterTest.cpp) + test/bridge/RtpAudioRewriterTest.cpp + test/transport/JitterTest.cpp) set(TEST_FILES2 diff --git a/codec/AudioFader.h b/codec/AudioFader.h new file mode 100644 index 000000000..087d0452c --- /dev/null +++ b/codec/AudioFader.h @@ -0,0 +1,61 @@ +#pragma once +#include +#include + +namespace codec +{ + +// fades in +class AudioLinearFade +{ +public: + AudioLinearFade(uint32_t rampDurationInSamples) : _stepFactor(1.0 / rampDurationInSamples), _volume(0) {} + + void fadeInStereo(const int16_t* srcData, int16_t* outData, size_t sampleCount) + { + + for (size_t i = 0; i < sampleCount; ++i) + { + _volume = std::min(1.0, _volume + _stepFactor); + outData[i * 2] = srcData[i * 2] * _volume; + outData[i * 2 + 1] = srcData[i * 2 + 1] * _volume; + } + } + + void fadeInStereo(int16_t* inOutData, size_t sampleCount) + { + return fadeInStereo(inOutData, inOutData, sampleCount); + } + + void fadeOutStereo(const int16_t* srcData, int16_t* outData, size_t sampleCount) + { + for (size_t i = 0; i < sampleCount; ++i) + { + _volume = std::min(1.0, _volume + _stepFactor); + const auto amp = 1.0 - _volume; + outData[i * 2] = srcData[i * 2] * amp; + outData[i * 2 + 1] = srcData[i * 2 + 1] * amp; + } + } + + void fadeOutStereo(int16_t* data, size_t sampleCount) { fadeOutStereo(data, data, sampleCount); } + + double next() + { + if (_volume >= 1.0) + { + return 1.0; + } + _volume += _stepFactor; + return _volume; + } + + void reset() { _volume = 0; } + double get() const { return _volume; } + +private: + const double _stepFactor; + double _volume; +}; + +} // namespace codec diff --git a/codec/AudioReceivePipeline.cpp b/codec/AudioReceivePipeline.cpp new file mode 100644 index 000000000..26013b59d --- /dev/null +++ b/codec/AudioReceivePipeline.cpp @@ -0,0 +1,567 @@ +#include "codec/AudioReceivePipeline.h" +#include "codec/AudioFader.h" +#include "codec/AudioLevel.h" +#include "codec/AudioTools.h" +#include "math/helpers.h" +#include "memory/Allocator.h" +#include "rtp/RtpHeader.h" + +#define DEBUG_JB 0 + +#if DEBUG_JB +#define JBLOG(fmt, ...) logger::debug(fmt, ##__VA_ARGS__) +#else +#define JBLOG(fmt, ...) +#endif +namespace codec +{ + +AudioReceivePipeline::ReceiveBox::ReceiveBox(size_t bufferSize) + : underrunCount(0), + audioBufferSize(bufferSize), + audio(nullptr), + audioSampleCount(0) +{ + audio = reinterpret_cast(memory::page::allocate(audioBufferSize)); +} + +AudioReceivePipeline::ReceiveBox::~ReceiveBox() +{ + memory::page::free(audio, audioBufferSize); +} + +AudioReceivePipeline::AudioReceivePipeline(uint32_t rtpFrequency, + uint32_t ptime, + uint32_t maxPackets, + int audioLevelExtensionId) + : _ssrc(0), + _rtpFrequency(rtpFrequency), + _samplesPerPacket(ptime * rtpFrequency / 1000), + _estimator(rtpFrequency), + _audioLevelExtensionId(audioLevelExtensionId), + _targetDelay(0), + _bufferAtTwoFrames(0), + _pcmData(7 * _config.channels * _samplesPerPacket, _config.channels * _samplesPerPacket), + _receiveBox(memory::page::alignedSpace(sizeof(int16_t) * _config.channels * _samplesPerPacket)) +{ +} + +bool AudioReceivePipeline::updateTargetDelay(double delay) +{ + const auto decodeTime = 3; + const auto prevDelay = _targetDelay; + + _targetDelay = (_estimator.getJitterMaxStable() + 1 + decodeTime) * _rtpFrequency / 1000; + + if (_targetDelay > prevDelay + _rtpFrequency / 100) + { + logger::info("%u jitter increase to %0.3fms", + "AudioReceivePipeline", + _ssrc, + _targetDelay * 1000.0 / _rtpFrequency); + } + + return true; +} + +// produces up to 4 packets of stereo pcm data +size_t AudioReceivePipeline::decodePacket(uint32_t extendedSequenceNumber, + const uint64_t timestamp, + const memory::Packet& packet, + int16_t* audioData) +{ + const auto header = rtp::RtpHeader::fromPacket(packet); + const int16_t* originalAudioStart = audioData; + + if (_decoder.hasDecoded() && extendedSequenceNumber != _decoder.getExpectedSequenceNumber()) + { + const int32_t lossCount = static_cast(extendedSequenceNumber - _decoder.getExpectedSequenceNumber()); + if (lossCount <= 0) + { + logger::debug("%u Old opus packet sequence %u expected %u, discarding", + "AudioReceivePipeline", + _ssrc, + extendedSequenceNumber, + _decoder.getExpectedSequenceNumber()); + return 0; + } + + logger::debug("%u Lost opus packet sequence %u expected %u, fec", + "AudioReceivePipeline", + _ssrc, + extendedSequenceNumber, + _decoder.getExpectedSequenceNumber()); + + const auto concealCount = std::min(2u, extendedSequenceNumber - _decoder.getExpectedSequenceNumber() - 1); + for (uint32_t i = 0; concealCount > 1 && i < concealCount - 1; ++i) + { + const auto decodedFrames = _decoder.conceal(reinterpret_cast(audioData)); + if (decodedFrames > 0) + { + audioData += _config.channels * decodedFrames; + } + } + + const auto opusPayloadLength = packet.getLength() - header->headerLength(); + const auto decodedFrames = + _decoder.conceal(header->getPayload(), opusPayloadLength, reinterpret_cast(audioData)); + if (decodedFrames > 0) + { + audioData += _config.channels * decodedFrames; + } + } + + const auto decodedFrames = _decoder.decode(extendedSequenceNumber, + header->getPayload(), + packet.getLength() - header->headerLength(), + reinterpret_cast(audioData), + _samplesPerPacket); + + if (decodedFrames > 0) + { + audioData += _config.channels * decodedFrames; + } + + const size_t samplesProduced = (audioData - originalAudioStart) / _config.channels; + if (samplesProduced < _samplesPerPacket / 2) + { + logger::warn("%u failed to decode opus %zu", "AudioReceivePipeline", _ssrc, samplesProduced); + return samplesProduced; + } + + return samplesProduced; +} + +// after a number of packets that could not be compressed using current threshold +// increase the threshold so more samples may be eligible for compression. +void AudioReceivePipeline::adjustReductionPower(uint32_t recentReduction) +{ + if (recentReduction > _config.reduction.expectedCompression) + { + _elimination.incompressableCount = 0; + // keep threshold + return; + } + else + { + ++_elimination.incompressableCount; + if (_elimination.incompressableCount > _config.reduction.failedCompressLimit) + { + _elimination.deltaThreshold *= _config.reduction.incrementFactor; + } + } +} + +size_t AudioReceivePipeline::reduce(const memory::Packet& packet, + int16_t* audioData, + const uint32_t samples, + const uint32_t totalJitterSize) +{ + int audioLevel = 0; + if (!rtp::getAudioLevel(packet, _audioLevelExtensionId, audioLevel)) + { + audioLevel = codec::computeAudioLevel(audioData, samples * _config.channels); + } + _noiseFloor.update(audioLevel); + + auto allowedReduction = + static_cast(totalJitterSize - math::roundUpMultiple(_targetDelay, _samplesPerPacket)); + if (allowedReduction == 0 && _bufferAtTwoFrames > _config.safeZoneCountBeforeReducingJB && + _targetDelay <= _samplesPerPacket * 3 / 4) + { + // target delay < 1 frame and buffer has been at two frames for a while + // start reduction to 1 frame + allowedReduction = _samplesPerPacket / 4; + } + + if (allowedReduction <= 0) + { + if (_metrics.eliminatedSamples > 0) + { + logger::debug("%u shrunk %u packets, eliminated samples %u, eliminated %u packets, pcm %zu, JB %u, TD %u, " + "slowJitter %.2f", + "AudioReceivePipeline", + _ssrc, + _metrics.shrunkPackets, + _metrics.eliminatedSamples, + _metrics.eliminatedPackets, + _pcmData.size() / _config.channels, + jitterBufferSize(_head.nextRtpTimestamp), + _targetDelay, + _estimator.getJitterMaxStable()); + _metrics.shrunkPackets = 0; + _metrics.eliminatedSamples = 0; + _metrics.eliminatedPackets = 0; + _elimination.deltaThreshold = _config.reduction.sampleThreshold; + } + return samples; + } + else + { + JBLOG("%u bufsize %u, red %u", "AudioReceivePipeline", _ssrc, totalJitterSize, allowedReduction); + if (audioLevel > _noiseFloor.getLevel() - _config.reduction.silenceMargin && + allowedReduction >= static_cast(_samplesPerPacket)) + { + ++_metrics.eliminatedPackets; + _metrics.eliminatedSamples += samples; + return 0; + } + else + { + const auto newSampleCount = codec::compactStereoTroughs(audioData, + samples, + allowedReduction, + _config.reduction.silenceZone, + _elimination.deltaThreshold); + adjustReductionPower(samples - newSampleCount); + + if (newSampleCount < samples) + { + ++_metrics.shrunkPackets; + } + + _metrics.eliminatedSamples += samples - newSampleCount; + return newSampleCount; + } + } +} + +void AudioReceivePipeline::init(uint32_t extendedSequenceNumber, const rtp::RtpHeader& header, uint64_t receiveTime) +{ + _estimator.update(receiveTime, header.timestamp); + _ssrc = header.ssrc.get(); + _targetDelay = 25 * _rtpFrequency / 1000; + _head.nextRtpTimestamp = header.timestamp; + _head.extendedSequenceNumber = header.sequenceNumber - 1; + _pcmData.appendSilence(_samplesPerPacket); +} + +double AudioReceivePipeline::analysePacketJitter(uint32_t extendedSequenceNumber, + const rtp::RtpHeader& header, + uint64_t receiveTime) +{ + const double delay = _estimator.update(receiveTime, header.timestamp); + const auto delayInRtpCycles = delay * _rtpFrequency / 1000; + + if (_targetDelay < delayInRtpCycles && delayInRtpCycles > _metrics.receivedRtpCyclesPerPacket && _pcmData.empty()) + { + if (!_jitterEmergency.counter) + { + _jitterEmergency.sequenceStart = extendedSequenceNumber; + } + ++_jitterEmergency.counter; + logger::debug("%u received late packet %u with delay of %0.2fms and pcm buffer is empty. " + "JB %u Replay paused to avoid stuttering", + "AudioReceivePipeline", + _ssrc, + extendedSequenceNumber, + delay, + _jitterBuffer.count()); + } + else if (_jitterEmergency.counter > 0 && + delayInRtpCycles < _jitterBuffer.getRtpDelay() + _metrics.receivedRtpCyclesPerPacket) + { + logger::debug("%u continue after emergency pause %u packets. Recent packet delay %.02fms, JB lag %ums, JB %u", + "AudioReceivePipeline", + _ssrc, + extendedSequenceNumber - _jitterEmergency.sequenceStart, + delay, + _targetDelay * 1000 / _rtpFrequency, + _jitterBuffer.count()); + _jitterEmergency.counter = 0; + } + + return delay; +} + +bool AudioReceivePipeline::onRtpPacket(uint32_t extendedSequenceNumber, + memory::UniquePacket packet, + uint64_t receiveTime) +{ + const auto header = rtp::RtpHeader::fromPacket(*packet); + if (!header) + { + assert(false); + return false; // corrupt + } + + if (_targetDelay == 0) + { + init(extendedSequenceNumber, *header, receiveTime); + _jitterBuffer.add(std::move(packet)); + return true; + } + + const auto delay = analysePacketJitter(extendedSequenceNumber, *header, receiveTime); + const auto posted = _jitterBuffer.add(std::move(packet)); + if (posted) + { + updateTargetDelay(delay); + if (_targetDelay < _samplesPerPacket && _pcmData.size() >= _samplesPerPacket) + { + ++_bufferAtTwoFrames; + } + else + { + _bufferAtTwoFrames = 0; + } + } + else if (_jitterEmergency.counter > 0) + { + flush(); // reset and start over + logger::warn("%u RTP delay unrecoverable. Jitter buffer is full. Resetting...", "AudioReceivePipeline", _ssrc); + return false; + } + + process(receiveTime); + + return posted; +} + +// RTP was received but discarded in transport +// We must track extended sequence number +bool AudioReceivePipeline::onSilencedRtpPacket(uint32_t extendedSequenceNumber, + memory::UniquePacket packet, + uint64_t receiveTime) +{ + const auto header = rtp::RtpHeader::fromPacket(*packet); + if (!header) + { + return false; // corrupt + } + + if (_targetDelay == 0) + { + init(extendedSequenceNumber, *header, receiveTime); + _jitterBuffer.add(std::move(packet)); + return true; + } + + const auto delay = analysePacketJitter(extendedSequenceNumber, *header, receiveTime); + const auto posted = _jitterBuffer.add(std::move(packet)); + if (posted) + { + updateTargetDelay(delay); + } + else if (_jitterEmergency.counter > 0) + { + flush(); // reset and start over + logger::warn("%u RTP delay unrecoverable. Jitter buffer is full. Resetting...", "AudioReceivePipeline", _ssrc); + return true; + } + + process(receiveTime); + return posted; +} + +// Fetch audio and suppress pops after underruns as well as resume +size_t AudioReceivePipeline::fetchStereo(size_t sampleCount) +{ + codec::clearStereo(_receiveBox.audio, _samplesPerPacket); + _receiveBox.audioSampleCount = 0; + const uint32_t bufferLevel = _pcmData.size() / _config.channels; + if (bufferLevel < sampleCount) + { + ++_receiveBox.underrunCount; + if (_receiveBox.underrunCount % 100 == 1) + { + logger::info("%u underrun %u, samples %u", + "AudioReceivePipeline", + _ssrc, + _receiveBox.underrunCount, + bufferLevel); + } + + if (bufferLevel > 0) + { + _pcmData.fetch(_receiveBox.audio, sampleCount * _config.channels); + codec::AudioLinearFade fader(bufferLevel); + fader.fadeOutStereo(_receiveBox.audio, bufferLevel); + logger::debug("%u fade out %u, requested %zu, pcm %zu", + "AudioReceivePipeline", + _ssrc, + bufferLevel, + sampleCount, + _pcmData.size() / _config.channels); + _receiveBox.audioSampleCount = sampleCount; + return sampleCount; + } + else if (_receiveBox.underrunCount == 1) + { + _pcmData.replay(_receiveBox.audio, sampleCount * _config.channels); + codec::swingTail(_receiveBox.audio, 48000, sampleCount); + logger::debug("%u appended tail", "AudioReceivePipeline", _ssrc); + _receiveBox.audioSampleCount = sampleCount; + return sampleCount; + } + return 0; + } + + // JBLOG("fetched %zu", "AudioReceivePipeline", sampleCount); + _pcmData.fetch(_receiveBox.audio, sampleCount * _config.channels); + _receiveBox.audioSampleCount = sampleCount; + + if (_receiveBox.underrunCount > 0) + { + logger::debug("%u fade in after %u underruns", "AudioReceivePipeline", _ssrc, _receiveBox.underrunCount); + codec::AudioLinearFade fader(sampleCount); + fader.fadeInStereo(_receiveBox.audio, sampleCount); + _receiveBox.underrunCount = 0; + _receiveBox.audioSampleCount = sampleCount; + } + return sampleCount; +} + +uint32_t AudioReceivePipeline::jitterBufferSize(uint32_t rtpTimestamp) const +{ + return (_jitterBuffer.empty() ? 0 : _jitterBuffer.getRtpDelay(rtpTimestamp) + _metrics.receivedRtpCyclesPerPacket); +} + +// If it is less than 10ms since data was still not fetched from pcm buffer +// we can wait another cycle before assuming packet is lost rather than re-ordered. +bool AudioReceivePipeline::shouldWaitForMissingPacket(uint64_t timestamp) const +{ + const int32_t halfPtime = _samplesPerPacket * utils::Time::sec / (2 * _rtpFrequency); + return static_cast(timestamp - _head.readyPcmTimestamp) <= halfPtime; +} + +// insert DTX silence if the gap is reasonably small to keep speak pace. +// If we are lagging behind we use it to catch up instead +bool AudioReceivePipeline::dtxHandler(const int16_t sequenceAdvance, + const int64_t timestampAdvance, + const uint32_t totalJitterBufferSize) +{ + const bool isDTX = + sequenceAdvance == 1 && timestampAdvance > static_cast(_metrics.receivedRtpCyclesPerPacket); + + if (isDTX && timestampAdvance <= static_cast(3 * _samplesPerPacket)) + { + const auto allowedReduction = + static_cast(totalJitterBufferSize - math::roundUpMultiple(_targetDelay, _samplesPerPacket)); + + if (allowedReduction < static_cast(_metrics.receivedRtpCyclesPerPacket)) + { + JBLOG("%u DTX silence", "AudioReceivePipeline", _ssrc); + _pcmData.appendSilence(_metrics.receivedRtpCyclesPerPacket - std::max(allowedReduction, 0)); + _head.nextRtpTimestamp += _metrics.receivedRtpCyclesPerPacket; + } + return true; + } + return false; +} + +void AudioReceivePipeline::process(const uint64_t timestamp) +{ + size_t bufferLevel = _pcmData.size() / _config.channels; + + for (; _jitterEmergency.counter == 0 && !_jitterBuffer.empty() && bufferLevel < _samplesPerPacket; + bufferLevel = _pcmData.size() / _config.channels) + { + const auto header = _jitterBuffer.getFrontRtp(); + const int16_t sequenceAdvance = + header->sequenceNumber.get() - static_cast(_head.extendedSequenceNumber & 0xFFFFu); + + // Re-ordering. + // previous packet already played + if (sequenceAdvance < 0) + { + _jitterBuffer.pop(); + logger::info("%u drop late packet seq %u, tstmp %u", + "AudioReceivePipeline", + _ssrc, + _head.extendedSequenceNumber + sequenceAdvance, + header->timestamp.get()); + continue; + } + + // Re-ordering, or loss + if (sequenceAdvance > 1 && shouldWaitForMissingPacket(timestamp)) + { + return; + } + + const int32_t timestampAdvance = header->timestamp - _head.nextRtpTimestamp; + const uint32_t totalJitterBufferSize = jitterBufferSize(_head.nextRtpTimestamp) + bufferLevel; + + if (dtxHandler(sequenceAdvance, timestampAdvance, totalJitterBufferSize)) + { + continue; + } + + // Decode, reduce and append the packet + const uint32_t extendedSequenceNumber = _head.extendedSequenceNumber + sequenceAdvance; + auto packet = _jitterBuffer.pop(); + + int16_t audioData[_samplesPerPacket * 4 * _config.channels]; + size_t decodedSamples = 0; + const size_t payloadLength = packet->getLength() - header->headerLength(); + if (payloadLength == 0) + { + decodedSamples = std::max(480u, _metrics.receivedRtpCyclesPerPacket); + _head.nextRtpTimestamp += decodedSamples; + _decoder.onUnusedPacketReceived(extendedSequenceNumber); + } + else + { + decodedSamples = decodePacket(extendedSequenceNumber, timestamp, *packet, audioData); + if (decodedSamples > 0 && sequenceAdvance == 1) + { + _metrics.receivedRtpCyclesPerPacket = decodedSamples; + } + } + + _head.extendedSequenceNumber = extendedSequenceNumber; + _head.nextRtpTimestamp = header->timestamp + _metrics.receivedRtpCyclesPerPacket; + const auto remainingSamples = reduce(*packet, audioData, decodedSamples, totalJitterBufferSize); + if (_pcmData.append(audioData, remainingSamples * _config.channels)) + { + JBLOG("ssrc %u added pcm %zu, JB %u (%u), TD %u, eliminated %zu, tstmp %u", + "AudioReceivePipeline", + _ssrc, + _pcmData.size() / _config.channels, + jitterBufferSize(_head.nextRtpTimestamp), + _jitterBuffer.count(), + _targetDelay, + decodedSamples - remainingSamples, + _head.nextRtpTimestamp); + } + else + { + const auto header = rtp::RtpHeader::fromPacket(*packet); + logger::warn("%u failed to append seq %u ts %u", + "AudioReceivePipeline", + _ssrc, + header->sequenceNumber.get(), + header->timestamp.get()); + } + + if ((header->sequenceNumber.get() % 100) == 0 && !_jitterBuffer.empty()) + { + JBLOG("%u pcm %zu JB %u", + "AudioReceivePipeline", + _ssrc, + _pcmData.size() / _config.channels, + jitterBufferSize(_head.nextRtpTimestamp)); + } + } + + if (bufferLevel >= _samplesPerPacket) + { + // track when we last had enough data in buffer so we know how long we may wait for more + _head.readyPcmTimestamp = timestamp; + } +} + +// flushes buffers. +// useful if last mixed participant leaves and audio pipelines are not used atm +void AudioReceivePipeline::flush() +{ + _pcmData.clear(); + while (_jitterBuffer.pop()) {} + _targetDelay = 0; // will cause start over on seqno, rtp timestamp and jitter assessment + _jitterEmergency.counter = 0; + _bufferAtTwoFrames = 0; + _elimination = SampleElimination(); +} + +} // namespace codec diff --git a/codec/AudioReceivePipeline.h b/codec/AudioReceivePipeline.h new file mode 100644 index 000000000..f054b0cf9 --- /dev/null +++ b/codec/AudioReceivePipeline.h @@ -0,0 +1,144 @@ +#pragma once +#include "codec/NoiseFloor.h" +#include "codec/OpusDecoder.h" +#include "codec/SpscAudioBuffer.h" +#include "rtp/JitterBufferList.h" +#include "rtp/JitterEstimator.h" +#include "utils/Time.h" +#include + +namespace codec +{ + +/** + * Audio receive pipe line that performs adaptive jitter buffering and cut off concealment. + * PCM data buffer is single produce single consumer thread safe. The rest shall run on a single thread context. + * + * Mechanisms: + * - If buffers run empty, the tail of the audio is faded out to avoid pops and clicks. + * - When sound continues after an underrun, the audio is faded in to avoid pops. + * - Audio elements with significant sound is not dropped. Silence is removed and audio is + * time compressed with some distortion where it is heard the least. This avoids CPU intensive + * resampling and filtering. + * - Packet loss is concealed by Opus decoder. + * - If jitter is low, the pipe line will operate in the time window between packet arrival and audio fetch. + * Which can be less than one audio frame. + * + * Packet arrival may be offset from packet pull pace. If jitter is less than the time left to pull, + * the packets will arrive before pull and can be put into pcm buffer immediately. Otherwise, the pipe line will + * have one packet in pcm and 1 or 0 packets in JB. + * + * | | | | + * v v v v + * | | | | + * v v v v + */ +class AudioReceivePipeline +{ + struct Config + { + uint32_t channels = 2; + uint32_t safeZoneCountBeforeReducingJB = 150; + + struct + { + uint32_t sampleThreshold = 10; + uint32_t expectedCompression = 30; + uint32_t failedCompressLimit = 15; + double incrementFactor = 1.25; + double silenceMargin = 6.0; // dB + double silenceZone = 10.0; + } reduction; + }; + const Config _config; + +public: + AudioReceivePipeline(uint32_t rtpFrequency, uint32_t ptime, uint32_t maxPackets, int audioLevelExtensionId = 255); + + // called from same thread context + bool onRtpPacket(uint32_t extendedSequenceNumber, memory::UniquePacket packet, uint64_t receiveTime); + bool onSilencedRtpPacket(uint32_t extendedSequenceNumber, memory::UniquePacket packet, uint64_t receiveTime); + + void process(uint64_t timestamp); + void flush(); + + // called from mix consumer + bool needProcess() const { return _pcmData.size() < _samplesPerPacket * _config.channels; } + size_t fetchStereo(size_t sampleCount); + + const int16_t* getAudio() const { return _receiveBox.audio; } + uint32_t getAudioSampleCount() const { return _receiveBox.audioSampleCount; } + +private: + void init(uint32_t extendedSequenceNumber, const rtp::RtpHeader& header, uint64_t receiveTime); + double analysePacketJitter(uint32_t extendedSequenceNumber, const rtp::RtpHeader& header, uint64_t receiveTime); + bool updateTargetDelay(double delayMs); + size_t decodePacket(uint32_t extendedSequenceNumber, + uint64_t timestamp, + const memory::Packet& packet, + int16_t* audioData); + size_t reduce(const memory::Packet& packet, int16_t* audioData, uint32_t samples, uint32_t totalJitterSize); + uint32_t jitterBufferSize(uint32_t rtpTimestamp) const; + void adjustReductionPower(uint32_t recentReduction); + + bool shouldWaitForMissingPacket(uint64_t timestamp) const; + bool dtxHandler(int16_t seqAdvance, int64_t timestampAdvance,uint32_t totalJitterBufferSize); + + uint32_t _ssrc; + const uint32_t _rtpFrequency; + const uint32_t _samplesPerPacket; + + rtp::JitterBufferList _jitterBuffer; + rtp::JitterEstimator _estimator; + + const int _audioLevelExtensionId; + codec::OpusDecoder _decoder; + codec::NoiseFloor _noiseFloor; + + uint32_t _targetDelay; + struct SampleElimination + { + uint32_t incompressableCount = 0; + int16_t deltaThreshold = 10; + } _elimination; + + struct HeadInfo + { + uint64_t readyPcmTimestamp = 0; // last time we saw pcm data pending in + uint32_t nextRtpTimestamp = 0; + uint32_t extendedSequenceNumber = 0; + } _head; + + struct JitterEmergency + { + uint32_t counter = 0; // late packet arrives and buffer is empty + uint32_t sequenceStart = 0; + } _jitterEmergency; + + // Count how many times buffer has been at 2 frames. If target delay is low we can reduce to one frame + uint32_t _bufferAtTwoFrames; + + struct Metrics + { + uint32_t shrunkPackets = 0; + uint32_t eliminatedPackets = 0; + uint32_t eliminatedSamples = 0; + uint32_t receivedRtpCyclesPerPacket = 960; // 480, 960, 1440 + } _metrics; + + SpscAudioBuffer _pcmData; + + // for receive thread + struct ReceiveBox + { + ReceiveBox(size_t bufferSize); + ~ReceiveBox(); + + uint32_t underrunCount; + const size_t audioBufferSize; + int16_t* audio; + uint32_t audioSampleCount; + } _receiveBox; +}; + +} // namespace codec diff --git a/codec/AudioTools.cpp b/codec/AudioTools.cpp new file mode 100644 index 000000000..2429f6ea1 --- /dev/null +++ b/codec/AudioTools.cpp @@ -0,0 +1,123 @@ +#include "codec/AudioTools.h" +#include +#include +#include + +namespace codec +{ +void makeStereo(int16_t* data, size_t count) +{ + for (int i = count - 1; i >= 0; i--) + { + data[i * 2] = data[i]; + data[i * 2 + 1] = data[i]; + } +} + +void swingTailMono(int16_t* data, const uint32_t sampleRate, const size_t count, const int step) +{ + const double tailFrequency = 250; + const double m = 2.0 * M_PI * 2550.0 / sampleRate; + const double alpha = m / (1.0 + m); + double y[2] = {0}; + y[0] = data[(count - 51) * step]; + + for (size_t i = count - 50; i < count; ++i) + { + y[1] = y[0]; + y[0] += alpha * (data[i * step] - y[0]); + } + + double v = y[0] - y[1]; + const double beta = tailFrequency / (M_PI * sampleRate); + double yn = data[(count - 1) * step]; + double amplification = 0.9; + for (size_t i = 0; i < count; ++i) + { + yn = yn + v; + v = (v - beta * yn); + data[i * step] = yn * amplification; + amplification *= 0.99; + } +} + +void swingTail(int16_t* data, const uint32_t sampleRate, const size_t count) +{ + swingTailMono(data, sampleRate, count, 2); + swingTailMono(data + 1, sampleRate, count, 2); +} + +void addToMix(const int16_t* srcAudio, int16_t* mixAudio, size_t count, double amplification) +{ + if (amplification == 1.0) + { + for (size_t i = 0; i < count; ++i) + { + mixAudio[i] += srcAudio[i]; + } + } + + for (size_t i = 0; i < count; ++i) + { + mixAudio[i] += amplification * srcAudio[i]; + } +} + +void subtractFromMix(const int16_t* srcAudio, int16_t* mixAudio, size_t count, double amplification) +{ + for (size_t i = 0; i < count; ++i) + { + mixAudio[i] -= amplification * srcAudio[i]; + } +} + +/** + * Eliminate samples at times where energy is low which makes it less audible. + */ +size_t compactStereoTroughs(int16_t* pcmData, + size_t samples, + size_t maxReduction, + const int16_t silenceThreshold, + const int16_t deltaThreshold) +{ + pcmData[0] = pcmData[0]; + pcmData[1] = pcmData[1]; + size_t produced = 1; + + size_t removedSamples = 0; + int keepSamples = 0; + for (size_t i = 1; i < samples - 1; ++i) + { + const int16_t a = pcmData[i * 2 - 2]; + const int16_t c = pcmData[i * 2 + 2]; + + if (removedSamples < maxReduction && !keepSamples && std::abs(a - c) < deltaThreshold) + { + if (std::abs(a) < silenceThreshold) + { + // low volume allow more sample removal + keepSamples = 2; + } + else + { + keepSamples = 8; + } + ++removedSamples; + continue; + } + keepSamples = std::max(0, keepSamples - 1); + + pcmData[produced * 2] = pcmData[i * 2]; + pcmData[produced * 2 + 1] = pcmData[i * 2 + 1]; + + ++produced; + } + + const size_t i = samples - 1; + pcmData[produced * 2] = pcmData[i * 2]; + pcmData[produced * 2 + 1] = pcmData[i * 2 + 1]; + ++produced; + + return produced; +} +} // namespace codec diff --git a/codec/AudioTools.h b/codec/AudioTools.h new file mode 100644 index 000000000..678e8b02e --- /dev/null +++ b/codec/AudioTools.h @@ -0,0 +1,52 @@ +#pragma once +#include +#include +#include + +namespace memory +{ + +class AudioPacket; +class Packet; + +} // namespace memory + +namespace codec +{ +class AudioFilter; + +template +void makeStereo(T* data, size_t count) +{ + for (int i = count - 1; i >= 0; i--) + { + data[i * 2] = data[i]; + data[i * 2 + 1] = data[i]; + } +} + +size_t compactStereo(int16_t* pcmData, size_t size); +size_t compactStereoTroughs(int16_t* pcmData, + size_t samples, + size_t maxReduction = 1000, + int16_t silenceThreshold = 10, + int16_t deltaThreshold = 10); + +template +void clearStereo(T* data, size_t count) +{ + std::memset(data, 0, count * 2 * sizeof(T)); +} + +template +void copyStereo(const T* srcData, T* data, size_t count) +{ + std::memcpy(data, srcData, count * 2 * sizeof(T)); +} + +void swingTail(int16_t* data, uint32_t sampleRate, size_t count); + +void addToMix(const int16_t* srcAudio, int16_t* mixAudio, size_t count, double amplification); +void subtractFromMix(const int16_t* srcAudio, int16_t* mixAudio, size_t count, double amplification); + +} // namespace codec diff --git a/codec/NoiseFloor.cpp b/codec/NoiseFloor.cpp new file mode 100644 index 000000000..1b3627d1e --- /dev/null +++ b/codec/NoiseFloor.cpp @@ -0,0 +1,33 @@ +#include "codec/NoiseFloor.h" +#include "codec/AudioLevel.h" +#include + +namespace codec +{ +void NoiseFloor::onPcm(int16_t* pcmData, size_t sampleCount) +{ + update(computeAudioLevel(pcmData, sampleCount)); +} + +void NoiseFloor::update(double audioLevel) +{ + if (audioLevel >= 127) + { + return; + } + + if (audioLevel > _noiseLevel) + { + _noiseLevel = audioLevel; + } + else + { + _noiseLevel *= 0.999; + } +} + +double NoiseFloor::getLevel() const +{ + return _noiseLevel; +} +} // namespace codec diff --git a/codec/NoiseFloor.h b/codec/NoiseFloor.h new file mode 100644 index 000000000..40469fe18 --- /dev/null +++ b/codec/NoiseFloor.h @@ -0,0 +1,20 @@ +#pragma once +#include +#include + +namespace codec +{ +class NoiseFloor +{ +public: + NoiseFloor() : _noiseLevel(0) {} + + void onPcm(int16_t* pcmData, size_t sampleCount); + void update(double audioLevel); + double getLevel() const; + +private: + double _noiseLevel; +}; + +} // namespace codec diff --git a/codec/OpusDecoder.cpp b/codec/OpusDecoder.cpp index 1f60fa1ce..c6e75259b 100644 --- a/codec/OpusDecoder.cpp +++ b/codec/OpusDecoder.cpp @@ -108,4 +108,13 @@ int32_t OpusDecoder::getLastPacketDuration() return lastPacketDuration; } +void OpusDecoder::onUnusedPacketReceived(uint32_t extendedSequenceNumber) +{ + const auto advance = static_cast(extendedSequenceNumber - _sequenceNumber); + if (advance > 0) + { + _sequenceNumber = extendedSequenceNumber; + } +} + } // namespace codec diff --git a/codec/OpusDecoder.h b/codec/OpusDecoder.h index 838731169..1b368a8d6 100644 --- a/codec/OpusDecoder.h +++ b/codec/OpusDecoder.h @@ -28,6 +28,8 @@ class OpusDecoder int32_t getLastPacketDuration(); + void onUnusedPacketReceived(uint32_t extendedSequenceNumber); + private: struct OpaqueDecoderState; diff --git a/codec/SpscAudioBuffer.h b/codec/SpscAudioBuffer.h new file mode 100644 index 000000000..271c1b854 --- /dev/null +++ b/codec/SpscAudioBuffer.h @@ -0,0 +1,209 @@ +#pragma once +#include "memory/Allocator.h" +#include "utils/ScopedReentrancyBlocker.h" +#include +#include +#include + +namespace codec +{ + +/** + * Stream buffer that facilitates mixing. + * You can add and subtract the front of the buffer to a mixed output. + * You can then pop the front of the buffer. + * Single producer single consumer thread safe. + */ +template +class SpscAudioBuffer +{ +public: + SpscAudioBuffer(size_t count, uint32_t replayMemory) + : _size(memory::page::alignedSpace((count + replayMemory) * sizeof(T))), + _readHead(0), + _writeHead(0), + _length(0), + _recentReadSize(0), + _replayMemorySize(replayMemory) +#ifdef DEBUG + , + _reentranceRead(0), + _reentranceWrite(0) +#endif + { + _data = reinterpret_cast(memory::page::allocate(_size)); + std::memset(_data, 0, _size); + } + + ~SpscAudioBuffer() { memory::page::free(_data, _size); } + + /** + * copies data from replay backlog memory. Useful for concealment attempts + * You must make sure there is backlog data or you will get whatever garbage is in the buffer + */ + void replay(T* mixedData, const uint32_t count) const + { + assert(mixedData); + assert(count <= _replayMemorySize); + REENTRANCE_CHECK(_reentranceRead); + + if (count > _replayMemorySize) + { + return; + } + + const auto maxItems = _size / sizeof(T); + const auto readStart = (_readHead + maxItems - count) % maxItems; + if (readStart + count > maxItems) + { + const auto remaining = maxItems - readStart; + for (size_t i = 0; i < remaining; ++i) + { + mixedData[i] = _data[readStart + i]; + } + for (size_t i = 0; i < count - remaining; ++i) + { + mixedData[remaining + i] = _data[i]; + } + } + else + { + for (size_t i = 0; i < count; ++i) + { + mixedData[i] = _data[i + readStart]; + } + } + } + + /** + * Writes count elements to the buffer from data, and moves the write head. + */ + bool append(const T* data, const uint32_t count) + { + assert(data); + REENTRANCE_CHECK(_reentranceWrite); + + const auto maxItems = _size / sizeof(T); + + const auto currentLength = _length.load(std::memory_order_consume); + if (currentLength + count + _replayMemorySize > maxItems) + { + return false; + } + + if (_writeHead + count > maxItems) + { + const auto remaining = maxItems - _writeHead; + std::memcpy(&_data[_writeHead], data, remaining * sizeof(T)); + std::memcpy(&_data[0], &data[remaining], (count - remaining) * sizeof(T)); + _writeHead = count - remaining; + } + else + { + std::memcpy(&_data[_writeHead], data, count * sizeof(T)); + _writeHead += count; + } + + _length.fetch_add(count); + return true; + } + + /** + * Add count silence elements to the buffer. The write head is moved. + */ + void appendSilence(const uint32_t count) + { + REENTRANCE_CHECK(_reentranceWrite); + const uint32_t maxItems = _size / sizeof(T); + + const auto currentLength = _length.load(std::memory_order_consume); + const uint32_t spaceLeft = maxItems - _replayMemorySize - currentLength; + const auto toWrite = std::min(spaceLeft, count); + + if (_writeHead + toWrite > maxItems) + { + const auto remaining = maxItems - _writeHead; + std::memset(&_data[_writeHead], 0, remaining * sizeof(T)); + std::memset(&_data[0], 0, (toWrite - remaining) * sizeof(T)); + _writeHead = count - remaining; + } + else + { + std::memset(&_data[_writeHead], 0, toWrite * sizeof(T)); + _writeHead += toWrite; + } + + _length.fetch_add(toWrite); + } + + /** + * Reads count elements from the buffer and adds each element of the read data to mixedData, scaled with + * scaleFactor. + */ + size_t fetch(T* mixedData, const uint32_t count) + { + assert(mixedData); + REENTRANCE_CHECK(_reentranceRead); + + const auto currentLength = _length.load(std::memory_order_consume); + if (currentLength == 0) + { + return 0; + } + + const uint32_t toRead = std::min(currentLength, count); + + const auto maxItems = _size / sizeof(T); + if (_readHead + toRead > maxItems) + { + const auto remaining = maxItems - _readHead; + for (size_t i = 0; i < remaining; ++i) + { + mixedData[i] = _data[i + _readHead]; + } + for (size_t i = 0; i < toRead - remaining; ++i) + { + mixedData[remaining + i] = _data[i]; + } + _readHead = toRead - remaining; + } + else + { + for (size_t i = 0; i < toRead; ++i) + { + mixedData[i] = _data[i + _readHead]; + } + _readHead += toRead; + } + + _recentReadSize = toRead; + _length.fetch_sub(toRead); + return toRead; + } + + size_t size() const { return _length.load(std::memory_order_consume); } + bool empty() const { return size() == 0; } + void clear() + { + _length = 0; + _readHead = 0; + _writeHead = 0; + _recentReadSize = 0; + } + +private: + const uint32_t _size; + T* _data; + uint32_t _readHead; + uint32_t _writeHead; + std::atomic_uint32_t _length; + mutable uint32_t _recentReadSize; + const uint32_t _replayMemorySize; + +#ifdef DEBUG + mutable std::atomic_uint32_t _reentranceRead; + mutable std::atomic_uint32_t _reentranceWrite; +#endif +}; + +} // namespace codec diff --git a/math/WelfordVariance.h b/math/WelfordVariance.h new file mode 100644 index 000000000..af9db017e --- /dev/null +++ b/math/WelfordVariance.h @@ -0,0 +1,113 @@ +#pragma once +#include +#include + +namespace math +{ + +template +class WelfordVariance +{ +public: + WelfordVariance() : _mean(0), _s(0), _count(0) {} + + void add(T value) + { + ++_count; + const T prevMean = _mean; + _mean += (value - _mean) / _count; + _s += (value - _mean) * (value - prevMean); + } + + T getMean() const { return _mean; } + T getVariance() const + { + if (_count < 2) + { + return 0; + } + return _s / (_count - 1); + } + +private: + T _mean; + T _s; + uint32_t _count; +}; + +template +class RollingWelfordVariance +{ +public: + RollingWelfordVariance(uint32_t windowSize) + : _windowSize(windowSize), + _mean(0), + _count(0), + _cursor(0), + _varianceAcc(0), + _values(new T[windowSize]), + _varValues(new T[windowSize]) + { + } + + ~RollingWelfordVariance() + { + delete[] _values; + delete[] _varValues; + } + + void add(T value) + { + _cursor = (_cursor + 1) % _windowSize; + const auto oldMean = getMean(); + if (_count >= _windowSize) + { + _mean += value - _values[_cursor]; + const auto newMean = getMean(); + const auto newVarValue = (value - oldMean) * (value - newMean); + _varianceAcc += newVarValue - _varValues[_cursor]; + _values[_cursor] = value; + _varValues[_cursor] = newVarValue; + } + else + { + ++_count; + _mean += value; + auto newMean = getMean(); + const auto newVarValue = (value - oldMean) * (value - newMean); + _varianceAcc += newVarValue; + _values[_cursor] = value; + _varValues[_cursor] = newVarValue; + } + } + + T getMean() const + { + if (_count == 0) + { + return 0; + } + return _mean / _count; + } + + T getVariance() const + { + if (_count < 2) + { + return 0; + } + return _varianceAcc / _count; + } + +private: + const uint32_t _windowSize; + T _mean; + uint32_t _count; + uint32_t _cursor; + T _varianceAcc; + + T* _values; + T* _varValues; +}; + +} // namespace math diff --git a/math/helpers.h b/math/helpers.h index c106bcd2d..fc32abd29 100644 --- a/math/helpers.h +++ b/math/helpers.h @@ -30,4 +30,16 @@ bool extractMantissaExponent(uint64_t value, uint32_t& mantissa, uint32_t& expon mantissa = static_cast(value); return true; } + +template +T roundUpMultiple(T value, T N) +{ + const auto r = value % N; + if (r > 0) + { + return value - r + N; + } + + return value; +} } // namespace math diff --git a/rtp/JitterBufferList.cpp b/rtp/JitterBufferList.cpp new file mode 100644 index 000000000..67f97dadf --- /dev/null +++ b/rtp/JitterBufferList.cpp @@ -0,0 +1,145 @@ +#include "rtp/JitterBufferList.h" +#include "rtp/RtpHeader.h" +#include + +namespace rtp +{ + +JitterBufferList::JitterBufferList() : _freeItems(nullptr), _head(nullptr), _tail(nullptr), _count(0) +{ + for (size_t i = 0; i < SIZE; ++i) + { + _itemStore[i].next = _freeItems; + _freeItems = &_itemStore[i]; + } +} + +JitterBufferList::ListItem* JitterBufferList::allocItem() +{ + if (!_freeItems) + { + return nullptr; + } + + auto item = _freeItems; + _freeItems = item->next; + item->next = nullptr; + return item; +} + +bool JitterBufferList::add(memory::UniquePacket packet) +{ + const auto newHeader = RtpHeader::fromPacket(*packet); + if (!newHeader) + { + return false; // corrupt + } + + auto newItem = allocItem(); + if (!newItem) + { + return false; // full + } + + newItem->packet = std::move(packet); + ++_count; + + if (_tail) + { + const auto tailHeader = RtpHeader::fromPacket(*_tail->packet); + if (static_cast(newHeader->sequenceNumber.get() - tailHeader->sequenceNumber.get()) >= 0) + { + _tail->next = newItem; + _tail = newItem; + return true; + } + } + else + { + _head = newItem; + _tail = newItem; + return true; + } + + for (ListItem* item = _head; item; item = item->next) + { + const auto itemHeader = RtpHeader::fromPacket(*item->next->packet); + if (static_cast(itemHeader->sequenceNumber.get() - newHeader->sequenceNumber.get()) > 0) + { + newItem->next = item->next; + item->next = newItem; + if (item == _head) + { + _head = newItem; + } + return true; + } + } + + return false; +} + +memory::UniquePacket JitterBufferList::pop() +{ + if (!_head) + { + return nullptr; + } + + auto item = _head; + _head = item->next; + if (_tail == item) + { + _tail = nullptr; + } + + item->next = _freeItems; + _freeItems = item; + --_count; + return std::move(item->packet); +} + +uint32_t JitterBufferList::getRtpDelay() const +{ + if (!_tail) + { + return 0; + } + + const auto headHeader = rtp::RtpHeader::fromPacket(*_head->packet); + const auto tailHeader = rtp::RtpHeader::fromPacket(*_tail->packet); + return tailHeader->timestamp.get() - headHeader->timestamp.get(); +} + +int32_t JitterBufferList::getRtpDelay(uint32_t rtpTimestamp) const +{ + if (!_tail) + { + return 0; + } + + const auto tailHeader = rtp::RtpHeader::fromPacket(*_tail->packet); + return static_cast(tailHeader->timestamp.get() - rtpTimestamp); +} + +const rtp::RtpHeader* JitterBufferList::getFrontRtp() const +{ + if (!_head) + { + return nullptr; + } + + return rtp::RtpHeader::fromPacket(*_head->packet); +} + +const rtp::RtpHeader* JitterBufferList::getTailRtp() const +{ + if (!_tail) + { + return nullptr; + } + + return rtp::RtpHeader::fromPacket(*_tail->packet); +} + +} // namespace rtp diff --git a/rtp/JitterBufferList.h b/rtp/JitterBufferList.h new file mode 100644 index 000000000..952931593 --- /dev/null +++ b/rtp/JitterBufferList.h @@ -0,0 +1,47 @@ +#pragma once +#include "memory/PacketPoolAllocator.h" + +namespace rtp +{ +struct RtpHeader; +/** + * Effective jitter buffer. Packets are stored in linked list. Ordered packets are quickly added to the end of the list. + * Rarely occurring out of order packets have to be inserted in the list after a quick scan. + */ + +class JitterBufferList +{ +public: + enum + { + SIZE = 300 + }; + + JitterBufferList(); + + bool add(memory::UniquePacket packet); + memory::UniquePacket pop(); + uint32_t getRtpDelay() const; + int32_t getRtpDelay(uint32_t rtpTimestamp) const; + const rtp::RtpHeader* getFrontRtp() const; + const rtp::RtpHeader* getTailRtp() const; + + bool empty() const { return !_head; } + uint32_t count() const { return _count; } + +private: + struct ListItem + { + memory::UniquePacket packet; + ListItem* next = nullptr; + }; + + ListItem* allocItem(); + + ListItem* _freeItems; + ListItem _itemStore[SIZE]; + ListItem* _head; + ListItem* _tail; + uint32_t _count; +}; +} // namespace rtp diff --git a/rtp/JitterEstimator.cpp b/rtp/JitterEstimator.cpp new file mode 100644 index 000000000..f02a28d0d --- /dev/null +++ b/rtp/JitterEstimator.cpp @@ -0,0 +1,74 @@ +#include "rtp/JitterEstimator.h" +#include "logger/Logger.h" +#include "utils/Time.h" +#include +#include + +#define JITTER_DEBUG 0 + +#if JITTER_DEBUG +#define LOG(fmt, ...) logger::debug(fmt, ##__VA_ARGS__) +#else +#define LOG(fmt, ...) +#endif + +namespace +{ + +} // namespace + +namespace rtp +{ +JitterEstimator::JitterEstimator(uint32_t sampleFrequency) + : _var(150), + _maxJitter(0.03), + _maxJitterStable(0.003), + _delayTracker(sampleFrequency) +{ +} + +/** + * returns delay of received packet in ms + */ +double JitterEstimator::update(uint64_t receiveTime, uint32_t rtpTimestamp) +{ + auto rawDelay = _delayTracker.update(receiveTime, rtpTimestamp); + + const double measurement = static_cast(rawDelay / utils::Time::us) / 1000.0; + + _var.add(measurement); + _maxJitter.update(measurement); + _maxJitterStable.update(measurement); + if (_maxJitter.get() * 2 < _maxJitterStable.get() && _maxJitter.get() > 30.0) + { + logger::info("Jitter estimate reset %.2f -> %.2f", "JitterEstimator", _maxJitterStable.get(), _maxJitter.get()); + _maxJitterStable.reset(_maxJitter.get()); + } + + LOG("%.2fs jitter %.4f, d %.2fms, std %.4f, 95p %.4f, maxJ %.4f, Jslow %.3f ", + "JitterEstimator", + rtpTimestamp / 48000.0, + _var.getMean(), + measurement, + sqrt(_var.getVariance()), + get95Percentile(), + _maxJitter.get(), + _maxJitterStable.get()); + + return measurement; +} + +/** + * avg jitter in ms + */ +double JitterEstimator::getJitter() const +{ + return _var.getMean(); +} + +double JitterEstimator::get95Percentile() const +{ + return _var.getMean() + 2 * sqrt(_var.getVariance()); +} + +} // namespace rtp diff --git a/rtp/JitterEstimator.h b/rtp/JitterEstimator.h new file mode 100644 index 000000000..31b106a9d --- /dev/null +++ b/rtp/JitterEstimator.h @@ -0,0 +1,43 @@ +#pragma once +#include "math/Matrix.h" +#include "math/WelfordVariance.h" +#include "rtp/RtpDelayTracker.h" +#include "utils/Trackers.h" + +namespace rtp +{ + +/** + * Estimates the latency from jitter. That is the jitter buffer level you will need to be able to replay + * media without having gaps due to jitter. The 95percentile level will calculate level needed to replay >95% + * of the packets without gap. + * This is far from the jitter tracker in RFC3550 that average over delta in RTP timestamp vs delta in receive time. + */ +class JitterEstimator +{ +public: + JitterEstimator(uint32_t sampleFrequency); + + double update(uint64_t receiveTime, uint32_t rtpTimestamp); + + double getJitter() const; + double get95Percentile() const; + double getMaxJitter() const { return _maxJitter.get(); } + + // in ms + double getJitterMaxStable() const { return _maxJitterStable.get(); } + uint32_t toRtpTimestamp(uint64_t timestamp) const { return _delayTracker.toRtpTimestamp(timestamp); } + uint32_t getRtpFrequency() const { return _delayTracker.getFrequency(); } + +private: + void updateVarianceAccumulator(double value); + + math::RollingWelfordVariance _var; + + utils::MaxTrackerPlain _maxJitter; + utils::MaxTrackerPlain _maxJitterStable; + + RtpDelayTracker _delayTracker; +}; + +} // namespace rtp diff --git a/rtp/RtpDelayTracker.cpp b/rtp/RtpDelayTracker.cpp new file mode 100644 index 000000000..5c94e4f72 --- /dev/null +++ b/rtp/RtpDelayTracker.cpp @@ -0,0 +1,64 @@ +#include "rtp/RtpDelayTracker.h" +#include "utils/Time.h" +#include + +namespace rtp +{ + +RtpDelayTracker::RtpDelayTracker(uint32_t rtpFrequency, uint32_t clockSkewRtpTicks) + : _frequency(rtpFrequency), + _clockSkewCompensation(clockSkewRtpTicks), + _renderTime(0), + _rtpTimestamp(0) +{ +} + +uint64_t RtpDelayTracker::update(uint64_t receiveTime, uint32_t rtpTimestamp) +{ + if (_renderTime == 0 && _rtpTimestamp == 0) + { + _renderTime = receiveTime; + _rtpTimestamp = rtpTimestamp; + _delay = 0; + return 0; + } + if (static_cast(_rtpTimestamp - rtpTimestamp) > static_cast(_frequency * 2)) + { + // very long reverse, indicating a reordered packet arriving later than 2s, or an rtp timestamp reset. + _renderTime = receiveTime; + _rtpTimestamp = rtpTimestamp; + return utils::Time::ms * 10; + } + + _renderTime += static_cast(rtpTimestamp - _rtpTimestamp) * static_cast(utils::Time::sec) / + (_frequency - _clockSkewCompensation); + + if (static_cast(_renderTime - receiveTime) > 0) + { + // cannot receive frame before being created. adjust + _renderTime = receiveTime; + } + + _rtpTimestamp = rtpTimestamp; + + _delay = receiveTime - _renderTime; + if (_delay > utils::Time::ms * 10) + { + _renderTime += _delay * 0.01; + } + return _delay; +} + +void RtpDelayTracker::reset() +{ + _renderTime = 0; + _rtpTimestamp = 0; +} + +uint32_t RtpDelayTracker::toRtpTimestamp(uint64_t timestamp) const +{ + return static_cast(static_cast((timestamp - _renderTime) * _frequency / utils::Time::sec)) + + _rtpTimestamp; +} + +} // namespace rtp diff --git a/rtp/RtpDelayTracker.h b/rtp/RtpDelayTracker.h new file mode 100644 index 000000000..2bab625bb --- /dev/null +++ b/rtp/RtpDelayTracker.h @@ -0,0 +1,37 @@ +#pragma once +#include + +namespace rtp +{ +/** + * Tracks arrival delay based on RTP timestamp. This measurement can be used in various filters, max trackers, avg + * windows to decide jitter buffer levels. + * Measures delay of packets up to 2s for re-ordered packet + * + * Clock skew compensation default is for 48kHz rtp frequency and gives 0.5ms / s + */ +class RtpDelayTracker +{ +public: + RtpDelayTracker(uint32_t rtpFrequency, uint32_t clockSkewRtpTicks = 24); + + uint64_t update(uint64_t receiveTime, uint32_t rtpTimestamp); + + uint64_t getDelay() const { return _delay; } + + void reset(); + + uint32_t toRtpTimestamp(uint64_t timestamp) const; + uint32_t getFrequency() const { return _frequency; } + +private: + const uint32_t _frequency; + const uint32_t _clockSkewCompensation; + + uint64_t _renderTime; + uint32_t _rtpTimestamp; + + uint64_t _delay; +}; + +} // namespace rtp diff --git a/rtp/RtpHeader.cpp b/rtp/RtpHeader.cpp index 931bba037..e357a2a7e 100644 --- a/rtp/RtpHeader.cpp +++ b/rtp/RtpHeader.cpp @@ -316,4 +316,30 @@ bool getTransmissionTimestamp(const memory::Packet& packet, uint8_t extensionId, return false; } +bool getAudioLevel(const memory::Packet& packet, uint8_t extensionId, int& level) +{ + assert(rtp::isRtpPacket(packet)); + + auto* rtpHeader = RtpHeader::fromPacket(packet); + if (!rtpHeader) + { + return false; + } + + auto* extensionHeader = rtpHeader->getExtensionHeader(); + if (extensionHeader) + { + for (auto& extension : extensionHeader->extensions()) + { + if (extension.getId() == extensionId) + { + level = extension.data[0] & 0x7F; + return true; + } + } + } + + return false; +} + } // namespace rtp diff --git a/rtp/RtpHeader.h b/rtp/RtpHeader.h index b25120bbf..858caacb2 100644 --- a/rtp/RtpHeader.h +++ b/rtp/RtpHeader.h @@ -164,4 +164,6 @@ void addAudioLevel(PacketT& packet, uint8_t extensionId, uint8_t level) rtpHeader->setExtensions(extensionHeader, payloadLength); packet.setLength(rtpHeader->headerLength() + payloadLength); } + +bool getAudioLevel(const memory::Packet& packet, uint8_t extensionId, int& level); } // namespace rtp diff --git a/test/integration/BarbellTest.cpp b/test/integration/BarbellTest.cpp index b53377a5c..3e3b9f971 100644 --- a/test/integration/BarbellTest.cpp +++ b/test/integration/BarbellTest.cpp @@ -490,10 +490,10 @@ TEST_F(BarbellTest, barbellAfterClients) { const auto data = analyzeRecording>(group.clients[id].get(), 5, true); EXPECT_EQ(data.dominantFrequencies.size(), 1); - EXPECT_EQ(data.amplitudeProfile.size(), 2); - if (data.amplitudeProfile.size() > 1) + EXPECT_EQ(data.amplitudeProfile.size(), 4); + if (data.amplitudeProfile.size() > 3) { - EXPECT_NEAR(data.amplitudeProfile[1].second, 5725, 100); + EXPECT_NEAR(data.amplitudeProfile[3].second, 5625, 150); } ASSERT_GE(data.dominantFrequencies.size(), 1); EXPECT_NEAR(data.dominantFrequencies[0], expectedFrequencies[freqId++], 25.0); diff --git a/test/integration/ConfIntegrationTest.cpp b/test/integration/ConfIntegrationTest.cpp index c91fab145..be7802f61 100644 --- a/test/integration/ConfIntegrationTest.cpp +++ b/test/integration/ConfIntegrationTest.cpp @@ -138,7 +138,7 @@ TEST_F(IntegrationTest, plain) EXPECT_EQ(data.amplitudeProfile[0].second, 0); EXPECT_NEAR(data.amplitudeProfile.back().second, 1826, 250); - EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 0.79, 48000 * 0.2); + EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 1.79, 48000 * 0.2); } EXPECT_EQ(data.audioSsrcCount, 1); @@ -236,7 +236,7 @@ TEST_F(IntegrationTest, ptime10) EXPECT_EQ(data.amplitudeProfile[0].second, 0); EXPECT_NEAR(data.amplitudeProfile.back().second, 1826, 250); - EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 0.79, 48000 * 0.2); + EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 1.79, 48000 * 0.2); } EXPECT_EQ(data.audioSsrcCount, 1); @@ -612,7 +612,7 @@ TEST_F(IntegrationTest, conferencePort) EXPECT_EQ(data.amplitudeProfile[0].second, 0); EXPECT_NEAR(data.amplitudeProfile.back().second, 1826, 250); - EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 0.79, 48000 * 0.2); + EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 1.79, 48000 * 0.2); } EXPECT_EQ(data.audioSsrcCount, 1); @@ -1005,7 +1005,7 @@ TEST_F(IntegrationTest, confList) EXPECT_EQ(data.amplitudeProfile[0].second, 0); EXPECT_NEAR(data.amplitudeProfile.back().second, 1826, 250); - EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 0.79, 48000 * 0.2); + EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 1.79, 48000 * 0.2); } EXPECT_EQ(data.audioSsrcCount, 1); @@ -1171,10 +1171,10 @@ TEST_F(IntegrationTest, twoClientsAudioOnly) { const auto data = analyzeRecording>(group.clients[id].get(), 5, true); EXPECT_EQ(data.dominantFrequencies.size(), 1); - EXPECT_EQ(data.amplitudeProfile.size(), 2); + EXPECT_EQ(data.amplitudeProfile.size(), 4); if (data.amplitudeProfile.size() > 1) { - EXPECT_NEAR(data.amplitudeProfile[1].second, 5725, 100); + EXPECT_NEAR(data.amplitudeProfile.back().second, 5725, 100); } if (data.dominantFrequencies.size() >= 1) { diff --git a/test/integration/FFTanalysis.cpp b/test/integration/FFTanalysis.cpp index 09fb7c4f9..7e9ac8ab7 100644 --- a/test/integration/FFTanalysis.cpp +++ b/test/integration/FFTanalysis.cpp @@ -137,6 +137,9 @@ CmplxArray createAudioSpectrum(const std::vector& recording, uint32_t s { const size_t fftWindowSize = 2048; size_t numThreads = std::max(std::thread::hardware_concurrency(), 2U); +#ifdef LCHECK_BUILD + numThreads = 2; +#endif std::vector workers; std::vector spectrums; diff --git a/test/integration/IntegrationCallTypes.cpp b/test/integration/IntegrationCallTypes.cpp index a30b6104c..4a3ffa325 100644 --- a/test/integration/IntegrationCallTypes.cpp +++ b/test/integration/IntegrationCallTypes.cpp @@ -47,6 +47,21 @@ enum class EncryptionMode class IntegrationCallTypesTest : public IntegrationTest, public ::testing::WithParamInterface> { + void SetUp() override + { +#ifdef NOPERF_TEST + GTEST_SKIP(); +#endif + IntegrationTest::SetUp(); + } + + void TearDown() override + { +#ifdef NOPERF_TEST + GTEST_SKIP(); +#endif + IntegrationTest::TearDown(); + } }; TEST_P(IntegrationCallTypesTest, party3AllModes) @@ -147,8 +162,9 @@ TEST_P(IntegrationCallTypesTest, party3AllModes) if (data.dominantFrequencies.size() >= 2) { EXPECT_NEAR(data.dominantFrequencies[0], expectedFrequencies[freqId][0], 25.0); - EXPECT_NEAR(data.dominantFrequencies[1], expectedFrequencies[freqId++][1], 25.0); + EXPECT_NEAR(data.dominantFrequencies[1], expectedFrequencies[freqId][1], 25.0); } + ++freqId; if (2 == id) { EXPECT_GE(data.amplitudeProfile.size(), 2); @@ -164,7 +180,7 @@ TEST_P(IntegrationCallTypesTest, party3AllModes) EXPECT_EQ(data.amplitudeProfile[0].second, 0); EXPECT_NEAR(data.amplitudeProfile.back().second, 1826, 250); - EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 1.60, 48000); + EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 1.80, 48000); } EXPECT_EQ(data.audioSsrcCount, 1); diff --git a/test/integration/IntegrationLegApiTest.cpp b/test/integration/IntegrationLegApiTest.cpp index b46b963b2..d8c77ad80 100644 --- a/test/integration/IntegrationLegApiTest.cpp +++ b/test/integration/IntegrationLegApiTest.cpp @@ -133,7 +133,7 @@ TEST_F(IntegrationLegApi, plain) EXPECT_EQ(data.amplitudeProfile[0].second, 0); EXPECT_NEAR(data.amplitudeProfile.back().second, 1826, 250); - EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 0.79, 48000 * 0.2); + EXPECT_NEAR(data.amplitudeProfile.back().first, 48000 * 1.79, 48000 * 0.2); } EXPECT_EQ(data.audioSsrcCount, 1); @@ -209,10 +209,10 @@ TEST_F(IntegrationLegApi, twoClientsAudioOnly) { const auto data = analyzeRecording>(group.clients[id].get(), 5, true); EXPECT_EQ(data.dominantFrequencies.size(), 1); - EXPECT_EQ(data.amplitudeProfile.size(), 2); - if (data.amplitudeProfile.size() > 1) + EXPECT_EQ(data.amplitudeProfile.size(), 4); + if (data.amplitudeProfile.size() > 2) { - EXPECT_NEAR(data.amplitudeProfile[1].second, 5725, 100); + EXPECT_NEAR(data.amplitudeProfile.back().second, 5725, 100); } if (data.dominantFrequencies.size() >= 1) { diff --git a/test/integration/IntegrationTest.h b/test/integration/IntegrationTest.h index 5c92b44f4..29b31c2a0 100644 --- a/test/integration/IntegrationTest.h +++ b/test/integration/IntegrationTest.h @@ -5,6 +5,7 @@ #include "config/Config.h" #include "emulator/TimeTurner.h" #include "test/integration/FFTanalysis.h" +#include "test/integration/SampleDataUtils.h" #include "test/integration/emulator/Httpd.h" #include "test/integration/emulator/SfuClient.h" #include "test/integration/emulator/SfuGroupCall.h" @@ -212,10 +213,11 @@ struct IntegrationTest : public ::testing::Test if (checkAmplitudeProfile) { - EXPECT_EQ(amplitudeProfile.size(), 2); - if (amplitudeProfile.size() > 1) + // audio will start with a short noise floor to stabilize before going full amplitude. + EXPECT_EQ(amplitudeProfile.size(), 4); + if (amplitudeProfile.size() > 3) { - EXPECT_NEAR(amplitudeProfile[1].second, 5725, 125); + EXPECT_NEAR(amplitudeProfile[3].second, 5625, 175); } } } @@ -243,6 +245,54 @@ struct IntegrationTest : public ::testing::Test return result; } + template + static IntegrationTest::AudioAnalysisData analyzeSpectrum(TClient* client, + double expectedDurationSeconds, + double inclusionThreshold, + size_t mixedAudioSources = 0, + bool dumpPcmData = false) + { + auto audioCounters = client->getAudioReceiveCounters(utils::Time::getAbsoluteTime()); + EXPECT_EQ(audioCounters.lostPackets, 0); + + const auto& data = client->getAudioReceiveStats(); + IntegrationTest::AudioAnalysisData result; + + for (const auto& item : data) + { + if (client->isRemoteVideoSsrc(item.first)) + { + continue; + } + + result.audioSsrcCount++; + + std::vector freqVector; + std::vector> amplitudeProfile; + auto rec = item.second->getRecording(); + auto spectrum = createAudioSpectrum(rec, 48000); + auto powerSpectrum = SampleDataUtils::powerSpectrumDB(spectrum); + auto pwrFreq = SampleDataUtils::toPowerVector(powerSpectrum, 48000); + std::sort(pwrFreq.begin(), + pwrFreq.end(), + [](const std::pair& f1, const std::pair& f2) { + return f2.second < f1.second; + }); + + auto peaks = SampleDataUtils::isolatePeaks(pwrFreq, inclusionThreshold, 48000); + for (auto& p : peaks) + { + result.dominantFrequencies.push_back(p.first); + } + if (dumpPcmData) + { + item.second->dumpPcmData(); + } + } + + return result; + } + protected: void runTestInThread(const size_t expectedNumThreads, std::function test, uint32_t maxTestDurationSec = 80); void startSimulation(); diff --git a/test/integration/SampleDataUtils.cpp b/test/integration/SampleDataUtils.cpp index 9d219157a..6cd776bad 100644 --- a/test/integration/SampleDataUtils.cpp +++ b/test/integration/SampleDataUtils.cpp @@ -649,3 +649,15 @@ bool SampleDataUtils::verifyAudioLevel(const std::vector& packet } return true; } + +bool SampleDataUtils::dumpPayload(FILE* h, const memory::Packet& packet) +{ + const auto header = rtp::RtpHeader::fromPacket(packet); + const auto toWrite = packet.getLength() - header->headerLength(); + return toWrite == ::fwrite(header->getPayload(), 1, toWrite, h); +} + +bool SampleDataUtils::dumpPayload(FILE* h, const int16_t* audio, size_t samples) +{ + return samples == ::fwrite(audio, 2, samples, h); +} diff --git a/test/integration/SampleDataUtils.h b/test/integration/SampleDataUtils.h index f39cb4901..138eb81ad 100644 --- a/test/integration/SampleDataUtils.h +++ b/test/integration/SampleDataUtils.h @@ -74,6 +74,10 @@ class SampleDataUtils uint32_t topN, std::vector& frequencies); + static bool dumpPayload(FILE* h, const memory::Packet& packet); + + static bool dumpPayload(FILE* h, const int16_t* audio, size_t samples); + private: static const std::vector _opusRtpSamplePackets; }; diff --git a/test/integration/SrtpIntegrationTest.cpp b/test/integration/SrtpIntegrationTest.cpp index 2c74f88cb..1f699ae2c 100644 --- a/test/integration/SrtpIntegrationTest.cpp +++ b/test/integration/SrtpIntegrationTest.cpp @@ -46,7 +46,7 @@ TEST_P(IntegrationSrtpTest, oneOnOneSDES) ASSERT_TRUE(group.connectAll(utils::Time::sec * _clientsConnectionTimeout)); - makeShortCallWithDefaultAudioProfile(group, utils::Time::sec * 2); + makeShortCallWithDefaultAudioProfile(group, utils::Time::sec * 4); nlohmann::json responseBody; auto statsSuccess = emulator::awaitResponse(_httpd, @@ -64,13 +64,13 @@ TEST_P(IntegrationSrtpTest, oneOnOneSDES) group.clients[0]->stopTransports(); group.clients[1]->stopTransports(); - group.awaitPendingJobs(utils::Time::sec * 4); + group.awaitPendingJobs(utils::Time::sec * 6); finalizeSimulation(); const double expectedFrequencies[2][1] = {{1300.0}, {600.0}}; for (auto id : {0, 1}) { - const auto data = analyzeRecording>(group.clients[id].get(), 2, true, 0); + const auto data = analyzeSpectrum>(group.clients[id].get(), 6, -75, 0, true); EXPECT_EQ(data.dominantFrequencies.size(), 1); if (data.dominantFrequencies.size() >= 1) { diff --git a/test/integration/emulator/AudioSource.cpp b/test/integration/emulator/AudioSource.cpp index 676aea6c4..98c213917 100644 --- a/test/integration/emulator/AudioSource.cpp +++ b/test/integration/emulator/AudioSource.cpp @@ -1,5 +1,6 @@ #include "test/integration/emulator/AudioSource.h" #include "codec/AudioLevel.h" +#include "codec/AudioTools.h" #include "codec/Opus.h" #include "memory/PacketPoolAllocator.h" #include "rtp/RtpHeader.h" @@ -21,12 +22,23 @@ AudioSource::AudioSource(memory::PacketPoolAllocator& allocator, uint32_t ssrc, _ptime(ptime), _isPtt(PttState::NotSpecified), _useAudioLevel(true), - _emulatedAudioType(fakeAudio) + _emulatedAudioType(fakeAudio), + _pcm16File(nullptr), + _packetCount(0) { } +AudioSource::~AudioSource() +{ + if (_pcm16File) + { + ::fclose(_pcm16File); + } +} + memory::UniquePacket AudioSource::getPacket(uint64_t timestamp) { + const auto samplesPerPacket = codec::Opus::sampleRate * _ptime / 1000; if (timeToRelease(timestamp) > 0) { return nullptr; @@ -51,16 +63,52 @@ memory::UniquePacket AudioSource::getPacket(uint64_t timestamp) rtpHeader->ssrc = _ssrc; rtpHeader->timestamp = _rtpTimestamp; - const auto samplesPerPacket = codec::Opus::sampleRate * _ptime / 1000; int16_t audio[codec::Opus::channelsPerFrame * samplesPerPacket]; _rtpTimestamp += samplesPerPacket; - for (uint64_t x = 0; _emulatedAudioType == Audio::Opus && x < samplesPerPacket; ++x) + if (_emulatedAudioType == Audio::Opus && !_pcm16File) + { + for (uint64_t x = 0; x < samplesPerPacket; ++x) + { + audio[x * 2] = _amplitude * sin(_phase + x * 2 * M_PI * _frequency / codec::Opus::sampleRate); + audio[x * 2 + 1] = 0; + } + _phase += samplesPerPacket * 2 * M_PI * _frequency / codec::Opus::sampleRate; + + if (_packetCount++ < 50) + { + // create noise floor + for (uint64_t x = 0; x < samplesPerPacket; ++x) + { + audio[x * 2] *= 0.01; + } + } + if (_tonePattern.onRatio < 1.0) + { + if (_tonePattern.silenceCountDown <= -20) + { + const auto p = 2.0 * _tonePattern.onRatio * (rand() % 1000) * 0.001; + _tonePattern.silenceCountDown = 20.0 * (1.0 / (p + 0.0001) - 1.0); + } + if (_tonePattern.silenceCountDown-- >= 0) + { + for (uint64_t x = 0; x < samplesPerPacket; ++x) + { + audio[x * 2] *= 0.001; + } + } + } + } + else if (_emulatedAudioType == Audio::Opus && _pcm16File) { - audio[x * 2] = _amplitude * sin(_phase + x * 2 * M_PI * _frequency / codec::Opus::sampleRate); - audio[x * 2 + 1] = 0; + auto readSamples = ::fread(audio, sizeof(int16_t), samplesPerPacket, _pcm16File); + if (readSamples < samplesPerPacket) + { + ::rewind(_pcm16File); + readSamples = ::fread(audio, sizeof(int16_t), samplesPerPacket, _pcm16File); + } + codec::makeStereo(audio, samplesPerPacket); } - _phase += samplesPerPacket * 2 * M_PI * _frequency / codec::Opus::sampleRate; rtp::RtpHeaderExtension extensionHead; auto cursor = extensionHead.extensions().begin(); @@ -150,4 +198,20 @@ void AudioSource::setUseAudioLevel(const bool useAudioLevel) _useAudioLevel = useAudioLevel; } +bool AudioSource::openPcm16File(const char* filename) +{ + if (_pcm16File) + { + ::fclose(_pcm16File); + _pcm16File = nullptr; + } + + _pcm16File = ::fopen(filename, "r"); + return _pcm16File != nullptr; +} + +void AudioSource::enableIntermittentTone(double onRatio) +{ + _tonePattern.onRatio = onRatio; +} } // namespace emulator diff --git a/test/integration/emulator/AudioSource.h b/test/integration/emulator/AudioSource.h index 4c849f318..c0345e1d3 100644 --- a/test/integration/emulator/AudioSource.h +++ b/test/integration/emulator/AudioSource.h @@ -23,7 +23,7 @@ class AudioSource }; AudioSource(memory::PacketPoolAllocator& allocator, uint32_t ssrc, Audio fakeAudio, uint32_t ptime = 20); - ~AudioSource(){}; + ~AudioSource(); memory::UniquePacket getPacket(uint64_t timestamp); int64_t timeToRelease(uint64_t timestamp) const; @@ -35,6 +35,11 @@ class AudioSource void setFrequency(double frequency) { _frequency = frequency; } void setPtt(const PttState isPtt); void setUseAudioLevel(const bool useAudioLevel); + void enableIntermittentTone(double onRatio); + + bool openPcm16File(const char* filename); + + uint16_t getSequenceCounter() const { return _sequenceCounter; } private: static const uint32_t maxSentBufferSize = 12 * 1024; @@ -51,6 +56,14 @@ class AudioSource PttState _isPtt; bool _useAudioLevel; Audio _emulatedAudioType; + FILE* _pcm16File; + uint32_t _packetCount; + + struct TonePattern + { + double onRatio = 1.0; + int32_t silenceCountDown = 0; + } _tonePattern; }; } // namespace emulator diff --git a/test/integration/emulator/JitterPacketSource.cpp b/test/integration/emulator/JitterPacketSource.cpp new file mode 100644 index 000000000..edda667c3 --- /dev/null +++ b/test/integration/emulator/JitterPacketSource.cpp @@ -0,0 +1,266 @@ +#include "JitterPacketSource.h" +#include "logger/PacketLogger.h" +#include "rtp/RtpHeader.h" +#include "utils/Time.h" +#include + +namespace emulator +{ + +struct SsrcTrack +{ + uint64_t prevReceiveTime; + double avgReceiveTime; + uint32_t count; +}; + +uint32_t identifyAudioSsrc(logger::PacketLogReader& reader) +{ + logger::PacketLogItem item; + std::map ssrcs; + for (int i = 0; reader.getNext(item); ++i) + { + if (item.size >= 300) + { + if (ssrcs.end() != ssrcs.find(item.ssrc)) + { + ssrcs.erase(item.ssrc); + } + continue; + } + + auto it = ssrcs.find(item.ssrc); + if (ssrcs.end() == it) + { + ssrcs[item.ssrc] = SsrcTrack{item.receiveTimestamp, 0.02, 1}; + continue; + } + + if (item.receiveTimestamp - it->second.prevReceiveTime > utils::Time::ms * 15) + { + it->second.prevReceiveTime = item.receiveTimestamp; + ++it->second.count; + + if (it->second.count > 300) + { + return item.ssrc; + } + } + } + + if (ssrcs.size() > 0) + { + return ssrcs.begin()->first; + } + return 0; +} + +JitterPacketSource::JitterPacketSource(memory::PacketPoolAllocator& allocator, + uint32_t ptime, + uint32_t jitterMs, + uint32_t maxJitterMs) + : _ptime(ptime), + _audioSource(allocator, 50, emulator::Audio::Opus, ptime), + _releaseTime(0), + _audioTimeline(0), + _packetLossRatio(0), + _jitterMs(jitterMs), + _maxJitterMs(maxJitterMs), + _poissonDistribution(jitterMs), + _randTwister(std::random_device{}()) +{ +} + +void JitterPacketSource::openWithTone(double frequency, double onRatio) +{ + _audioSource.setFrequency(frequency); + _audioSource.setVolume(0.2); + _audioSource.enableIntermittentTone(onRatio); +} + +memory::UniquePacket JitterPacketSource::getNext(uint64_t timestamp) +{ + if (!_nextPacket) + { + _nextPacket = _audioSource.getPacket(timestamp); + if (!_nextPacket) + { + return nullptr; + } + + _releaseTime = timestamp; + _audioTimeline = timestamp; + } + + if (utils::Time::diffGE(_releaseTime, timestamp, 0)) + { + auto p = std::move(_nextPacket); + _audioTimeline += _ptime * utils::Time::ms; + _nextPacket = _audioSource.getPacket(_audioTimeline); + assert(_nextPacket); + + const auto delayms = _poissonDistribution(_randTwister); + + _releaseTime = _audioTimeline + (_jitterMs > 0 ? std::min(delayms, _maxJitterMs) * utils::Time::ms : 0); + + return p; + } + + return nullptr; +} + +JitterTracePacketSource::JitterTracePacketSource(memory::PacketPoolAllocator& allocator) + : _audioSource(allocator, 50, emulator::Audio::Opus, 20), + _releaseTime(0), + _sequenceOffset(0), + _traceOffset(0), + _audioTimeline(0), + _eof(false), + _packetLossRatio(0) +{ +} + +void JitterTracePacketSource::open(const char* audioPcm16File, const char* networkTrace) +{ + _traceReader = std::make_unique(::fopen(networkTrace, "r")); + _audioSsrc = identifyAudioSsrc(*_traceReader); + _traceReader->rewind(); + + _audioSource.openPcm16File(audioPcm16File); +} + +void JitterTracePacketSource::openWithTone(double frequency, const char* networkTrace) +{ + _traceReader = std::make_unique(::fopen(networkTrace, "r")); + _audioSsrc = identifyAudioSsrc(*_traceReader); + _traceReader->rewind(); + + _audioSource.setFrequency(frequency); + _audioSource.setVolume(0.6); +} + +bool JitterTracePacketSource::getNextAudioTraceItem(logger::PacketLogItem& item) +{ + for (; _traceReader->getNext(item);) + { + if (item.ssrc == _audioSsrc && rand() % 1000 > _packetLossRatio * 1000.0) + { + return true; + } + } + + return false; +} + +memory::UniquePacket JitterTracePacketSource::getNext(uint64_t timestamp) +{ + if (!_nextPacket) + { + logger::PacketLogItem item; + if (!getNextAudioTraceItem(item)) + { + _eof = true; + return nullptr; + } + + uint16_t audioSeqNo = _audioSource.getSequenceCounter(); + if (_traceOffset != 0) + { + int16_t diff = item.sequenceNumber + _sequenceOffset - audioSeqNo; + if (diff != 0) + { + logger::debug("ssrc %u, seq %u, audio seq %u", + "", + item.ssrc, + item.sequenceNumber + _sequenceOffset, + audioSeqNo); + } + if (diff < 0) + { + return nullptr; // reorder + } + } + + auto packet = _audioSource.getPacket(_audioTimeline); + auto header = rtp::RtpHeader::fromPacket(*packet); + if (_traceOffset == 0) + { + _sequenceOffset = header->sequenceNumber - item.sequenceNumber; + _traceOffset = timestamp - item.receiveTimestamp; + } + + _audioTimeline += 20 * utils::Time::ms; + + while (header->sequenceNumber != item.sequenceNumber + _sequenceOffset) + { + logger::debug("loss in audio source seq %u, trace %u", + "JitterTracePacketSource", + header->sequenceNumber.get(), + item.sequenceNumber + _sequenceOffset); + packet = _audioSource.getPacket(_audioTimeline); + header = rtp::RtpHeader::fromPacket(*packet); + _audioTimeline += 20 * utils::Time::ms; + } + + _nextPacket = std::move(packet); + _releaseTime = item.receiveTimestamp + _traceOffset; + } + + if (utils::Time::diffGE(_releaseTime, timestamp, 0)) + { + return std::move(_nextPacket); + } + return nullptr; +} + +JitterPatternSource::JitterPatternSource(memory::PacketPoolAllocator& allocator, + uint32_t ptime, + const std::vector& jitterPattern) + : _ptime(ptime), + _audioSource(allocator, 50, emulator::Audio::Opus, ptime), + _releaseTime(0), + _audioTimeline(0), + _index(0), + _jitterPattern(jitterPattern) +{ +} + +void JitterPatternSource::openWithTone(double frequency, double onRatio) +{ + _audioSource.setFrequency(frequency); + _audioSource.setVolume(0.2); + _audioSource.enableIntermittentTone(onRatio); +} + +memory::UniquePacket JitterPatternSource::getNext(uint64_t timestamp) +{ + if (!_nextPacket) + { + _nextPacket = _audioSource.getPacket(timestamp); + if (!_nextPacket) + { + return nullptr; + } + + _releaseTime = timestamp; + _audioTimeline = timestamp; + } + + if (utils::Time::diffGE(_releaseTime, timestamp, 0)) + { + auto p = std::move(_nextPacket); + _audioTimeline += _ptime * utils::Time::ms; + _nextPacket = _audioSource.getPacket(_audioTimeline); + assert(_nextPacket); + + const auto delayms = _jitterPattern[_index++ % _jitterPattern.size()]; + + _releaseTime = _audioTimeline + delayms * utils::Time::ms; + + return p; + } + + return nullptr; +} + +} // namespace emulator diff --git a/test/integration/emulator/JitterPacketSource.h b/test/integration/emulator/JitterPacketSource.h new file mode 100644 index 000000000..c29cafa68 --- /dev/null +++ b/test/integration/emulator/JitterPacketSource.h @@ -0,0 +1,92 @@ +#pragma once +#include "test/integration/emulator/AudioSource.h" +#include + +namespace logger +{ +struct PacketLogItem; +class PacketLogReader; +} // namespace logger + +namespace emulator +{ +uint32_t identifyAudioSsrc(logger::PacketLogReader& reader); + +class JitterPacketSource +{ +public: + JitterPacketSource(memory::PacketPoolAllocator& allocator, + uint32_t ptime, + uint32_t jitterMs = 45, + uint32_t maxJitterMs = 80); + + void openWithTone(double frequency, double onRatio = 1.0); + memory::UniquePacket getNext(uint64_t timestamp); + + void setRandomPacketLoss(double ratio) { _packetLossRatio = ratio; } + +private: + uint32_t _ptime; + emulator::AudioSource _audioSource; + memory::UniquePacket _nextPacket; + uint64_t _releaseTime; + uint64_t _audioTimeline; + double _packetLossRatio; + uint32_t _jitterMs; + const uint32_t _maxJitterMs; + std::poisson_distribution _poissonDistribution; + std::mt19937 _randTwister; +}; + +class JitterTracePacketSource +{ +public: + JitterTracePacketSource(memory::PacketPoolAllocator& allocator); + + void open(const char* audioPcm16File, const char* networkTrace); + + void openWithTone(double frequency, const char* networkTrace); + + bool getNextAudioTraceItem(logger::PacketLogItem& item); + + memory::UniquePacket getNext(uint64_t timestamp); + + bool isEof() const { return _eof; } + + void setRandomPacketLoss(double ratio) { _packetLossRatio = ratio; } + +private: + emulator::AudioSource _audioSource; + memory::UniquePacket _nextPacket; + uint64_t _releaseTime; + std::unique_ptr _traceReader; + int16_t _sequenceOffset; + + int64_t _traceOffset; + uint32_t _audioSsrc; + uint64_t _audioTimeline; + bool _eof; + double _packetLossRatio; +}; + +class JitterPatternSource +{ +public: + JitterPatternSource(memory::PacketPoolAllocator& allocator, + uint32_t ptime, + const std::vector& jitterPattern); + + void openWithTone(double frequency, double onRatio = 1.0); + memory::UniquePacket getNext(uint64_t timestamp); + +private: + uint32_t _ptime; + emulator::AudioSource _audioSource; + memory::UniquePacket _nextPacket; + uint64_t _releaseTime; + uint64_t _audioTimeline; + uint32_t _index; + std::vector _jitterPattern; +}; + +} // namespace emulator diff --git a/test/transport/AdaptiveJitterTest.cpp b/test/transport/AdaptiveJitterTest.cpp new file mode 100644 index 000000000..69160c6df --- /dev/null +++ b/test/transport/AdaptiveJitterTest.cpp @@ -0,0 +1,518 @@ + +#include "codec/AudioReceivePipeline.h" +#include "codec/AudioTools.h" +#include "logger/Logger.h" +#include "logger/PacketLogger.h" +#include "memory/PacketPoolAllocator.h" +#include "rtp/JitterEstimator.h" +#include "rtp/JitterTracker.h" +#include "rtp/RtpHeader.h" +#include "test/integration/SampleDataUtils.h" +#include "test/integration/emulator/AudioSource.h" +#include "test/integration/emulator/JitterPacketSource.h" +#include "test/integration/emulator/TimeTurner.h" +#include "utils/Pacer.h" +#include "utils/ScopedFileHandle.h" +#include +#include + +using namespace math; + +namespace +{ + +} // namespace + +class TimeTicker : public utils::TimeSource +{ +public: + TimeTicker() : _startTime(std::chrono::system_clock::now()), _time(utils::Time::getAbsoluteTime()) {} + + uint64_t getAbsoluteTime() override { return _time; }; + + void nanoSleep(uint64_t nanoSeconds) override { _time += nanoSeconds; }; + + std::chrono::system_clock::time_point wallClock() const override + { + return _startTime + std::chrono::duration_cast(std::chrono::nanoseconds(_time)); + } + + void advance(uint64_t nanoSeconds) override { _time += nanoSeconds; } + +private: + const std::chrono::system_clock::time_point _startTime; + uint64_t _time; +}; + +class AudioPipelineTest : public testing::TestWithParam +{ +public: + void SetUp() override { utils::Time::initialize(_timeTurner); } + void TearDown() override { utils::Time::initialize(); } + TimeTicker _timeTurner; +}; + +TEST_P(AudioPipelineTest, DISABLED_fileReRun) +{ + const uint32_t rtpFrequency = 48000; + std::string trace = GetParam(); + memory::PacketPoolAllocator allocator(4096 * 4, "JitterTest"); + uint16_t* tmpZeroes = new uint16_t[960 * 50]; + std::fill(tmpZeroes, tmpZeroes + 960 * 10, 0); + + logger::info("scanning file %s", "", trace.c_str()); + + emulator::JitterTracePacketSource psource(allocator); + // psource.setRandomPacketLoss(0.03); + psource.open("./_bwelogs/2minrecording.raw", ("./_bwelogs/" + trace).c_str()); + + utils::ScopedFileHandle audioPlayback(::fopen(("/mnt/c/dev/rtc/" + trace + "out.raw").c_str(), "w+")); + + auto pipeline = std::make_unique(48000, 20, 100, 1); + + const auto samplesPerPacket = rtpFrequency / 50; + uint32_t extendedSequenceNumber = 0; + + utils::Pacer playbackPacer(utils::Time::ms * 20); + playbackPacer.reset(100); + + for (uint64_t timeSteps = 0; timeSteps < 90000 && !psource.isEof(); ++timeSteps) + { + _timeTurner.advance(utils::Time::ms); + const auto timestamp = utils::Time::getAbsoluteTime(); + for (auto packet = psource.getNext(timestamp); packet; packet = psource.getNext(timestamp)) + { + auto header = rtp::RtpHeader::fromPacket(*packet); + uint16_t curExtSeq = extendedSequenceNumber & 0xFFFFu; + int16_t adv = header->sequenceNumber.get() - curExtSeq; + + const auto acceptedPacket = + pipeline->onRtpPacket(extendedSequenceNumber + adv, std::move(packet), timestamp); + if (adv > 0) + { + extendedSequenceNumber += adv; + } + if (!acceptedPacket) + { + logger::warn("JB full packet dropped %u", "", extendedSequenceNumber); + } + } + + if (pipeline->needProcess()) + { + pipeline->process(timestamp); + } + + if (playbackPacer.timeToNextTick(timestamp) <= 0) + { + pipeline->fetchStereo(samplesPerPacket); + + ::fwrite(pipeline->getAudio(), samplesPerPacket * 2, sizeof(int16_t), audioPlayback.get()); + + playbackPacer.tick(timestamp); + } + } +} + +INSTANTIATE_TEST_SUITE_P(AudioPipelineRerun, + AudioPipelineTest, + ::testing::Values("Transport-6-4G-1-5Mbps", + "Transport-22-4G-2.3Mbps", + "Transport-30-3G-1Mbps", + "Transport-32_Idre", + "Transport-44-clkdrift", + "Transport-48_80_3G", + "Transport-3887-wifi", + "Transport-105_tcp_1ploss", + "Transport-1094-4G", + "Transport-14-wifi", + "Transport-48_50_3G", + "Transport-86_tcp_1ploss")); + +TEST_F(AudioPipelineTest, DTX) +{ + const uint32_t rtpFrequency = 48000; + memory::PacketPoolAllocator allocator(4096 * 4, "JitterTest"); + + auto pipeline = std::make_unique(48000, 20, 100, 1); + + const auto samplesPerPacket = rtpFrequency / 50; + uint32_t extendedSequenceNumber = 0; + uint32_t timestampCounter = 4000; + + utils::Pacer playbackPacer(utils::Time::ms * 20); + playbackPacer.reset(100); + + emulator::JitterPacketSource audioSource(allocator, 20); + audioSource.openWithTone(210, 0.34); + uint32_t underruns = 0; + + for (uint64_t timeSteps = 0; timeSteps < 9000; ++timeSteps) + { + _timeTurner.advance(utils::Time::ms * 2); + const auto timestamp = utils::Time::getAbsoluteTime(); + if (pipeline->needProcess()) + { + pipeline->process(timestamp); + } + + if (playbackPacer.timeToNextTick(timestamp) <= 0) + { + if (pipeline->needProcess()) + { + ++underruns; + } + auto fetched = pipeline->fetchStereo(samplesPerPacket); + playbackPacer.tick(timestamp); + if (fetched == 0) + { + logger::info("no audio", "test"); + } + } + + auto packet = audioSource.getNext(timestamp); + if (!packet) + { + continue; + } + if (timestampCounter == 4000 + 960 * 20) + { + timestampCounter += 960; + logger::info("dtx at %u", "test", timestampCounter); + continue; + } + auto header = rtp::RtpHeader::fromPacket(*packet); + header->sequenceNumber = extendedSequenceNumber++ & 0xFFFFu; + header->timestamp = timestampCounter; + timestampCounter += samplesPerPacket; + + pipeline->onRtpPacket(extendedSequenceNumber, std::move(packet), timestamp); + } + + EXPECT_LT(underruns, 50); +} + +TEST_F(AudioPipelineTest, ptime10) +{ + const uint32_t rtpFrequency = 48000; + memory::PacketPoolAllocator allocator(4096 * 4, "JitterTest"); + + auto pipeline = std::make_unique(48000, 20, 100, 1); + + uint32_t extendedSequenceNumber = 0; + uint32_t timestampCounter = 4000; + + utils::Pacer playbackPacer(utils::Time::ms * 20); + playbackPacer.reset(100); + + emulator::JitterPacketSource audioSource(allocator, 10); + audioSource.openWithTone(210, 0.34); + + uint32_t underruns = 0; + + const uint32_t samplesPerPacketSent = 10 * rtpFrequency / 1000; + for (uint64_t timeSteps = 0; timeSteps < 9000; ++timeSteps) + { + _timeTurner.advance(utils::Time::ms * 2); + const auto timestamp = utils::Time::getAbsoluteTime(); + if (pipeline->needProcess()) + { + pipeline->process(timestamp); + } + + const auto samplesPerPacketFetch = 20 * rtpFrequency / 1000; + if (playbackPacer.timeToNextTick(timestamp) <= 0) + { + if (pipeline->needProcess()) + { + ++underruns; + } + pipeline->fetchStereo(samplesPerPacketFetch); + playbackPacer.tick(timestamp); + } + + auto packet = audioSource.getNext(timestamp); + if (!packet) + { + continue; + } + if (timestampCounter == 4000 + 960 * 20) + { + timestampCounter += samplesPerPacketSent; + logger::info("dtx at %u", "test", timestampCounter); + continue; + } + auto header = rtp::RtpHeader::fromPacket(*packet); + header->sequenceNumber = extendedSequenceNumber++ & 0xFFFFu; + header->timestamp = timestampCounter; + timestampCounter += samplesPerPacketSent; + ; + + pipeline->onRtpPacket(extendedSequenceNumber, std::move(packet), timestamp); + } + + EXPECT_LT(underruns, 70); +} + +TEST_F(AudioPipelineTest, ContinuousTone) +{ + const uint32_t rtpFrequency = 48000; + memory::PacketPoolAllocator allocator(4096 * 4, "JitterTest"); + + auto pipeline = std::make_unique(48000, 20, 100, 1); + + const auto samplesPerPacket = rtpFrequency / 50; + uint32_t extendedSequenceNumber = 0; + + utils::Pacer playbackPacer(utils::Time::ms * 20); + playbackPacer.reset(100); + + emulator::JitterPacketSource audioSource(allocator, 20, 43); + audioSource.openWithTone(800, 1.0); + uint32_t underruns = 0; + + utils::ScopedFileHandle dumpFile(nullptr); //(::fopen("/mnt/c/dev/rtc/ContinuousTone.raw", "wr")); + + for (uint64_t timeSteps = 0; timeSteps < 9000; ++timeSteps) + { + _timeTurner.advance(utils::Time::ms * 2); + const auto timestamp = utils::Time::getAbsoluteTime(); + if (pipeline->needProcess()) + { + pipeline->process(timestamp); + } + + if (playbackPacer.timeToNextTick(timestamp) <= 0) + { + if (pipeline->needProcess()) + { + ++underruns; + } + auto fetched = pipeline->fetchStereo(samplesPerPacket); + playbackPacer.tick(timestamp); + if (fetched == 0) + { + logger::info("no audio", "test"); + int16_t silence[samplesPerPacket * 2]; + std::memset(silence, 0, samplesPerPacket * 2 * sizeof(int16_t)); + if (dumpFile) + { + SampleDataUtils::dumpPayload(dumpFile.get(), silence, samplesPerPacket * 2); + } + } + else + { + if (dumpFile) + { + SampleDataUtils::dumpPayload(dumpFile.get(), pipeline->getAudio(), fetched * 2); + } + } + } + + auto packet = audioSource.getNext(timestamp); + if (!packet) + { + continue; + } + auto header = rtp::RtpHeader::fromPacket(*packet); + + int16_t advance = static_cast(header->timestamp.get() - (extendedSequenceNumber & 0xFFFFu)); + extendedSequenceNumber += advance; + + pipeline->onRtpPacket(extendedSequenceNumber, std::move(packet), timestamp); + } + + EXPECT_LE(underruns, 30); +} + +TEST_F(AudioPipelineTest, lowJitter) +{ + const uint32_t rtpFrequency = 48000; + memory::PacketPoolAllocator allocator(4096 * 4, "JitterTest"); + + auto pipeline = std::make_unique(48000, 20, 100, 1); + + uint32_t extendedSequenceNumber = 0; + + utils::Pacer playbackPacer(utils::Time::ms * 20); + playbackPacer.reset(_timeTurner.getAbsoluteTime() + utils::Time::ms * 4); // consume 10ms after capture + + emulator::JitterPacketSource audioSource(allocator, 20, 14, 41); + audioSource.openWithTone(210, 0.34); + { + auto firstPacket = audioSource.getNext(_timeTurner.getAbsoluteTime()); // reset audio time line + auto header = rtp::RtpHeader::fromPacket(*firstPacket); + extendedSequenceNumber = header->sequenceNumber; + } + uint32_t underruns = 0; + + for (uint64_t timeSteps = 0; timeSteps < 9000; ++timeSteps) + { + _timeTurner.advance(utils::Time::ms * 1); + const auto timestamp = utils::Time::getAbsoluteTime(); + if (pipeline->needProcess()) + { + pipeline->process(timestamp); + } + + const auto samplesPerPacketFetch = 20 * rtpFrequency / 1000; + if (playbackPacer.timeToNextTick(timestamp) <= 0) + { + if (pipeline->needProcess()) + { + ++underruns; + } + pipeline->fetchStereo(samplesPerPacketFetch); + playbackPacer.tick(timestamp); + } + + while (auto packet = audioSource.getNext(timestamp)) + { + auto header = rtp::RtpHeader::fromPacket(*packet); + auto adv = static_cast( + header->sequenceNumber.get() - static_cast(extendedSequenceNumber & 0xFFFFu)); + extendedSequenceNumber += adv; + // logger::debug("pkt produced %zu", "", timestamp / utils::Time::ms); + pipeline->onRtpPacket(extendedSequenceNumber, std::move(packet), timestamp); + } + } + + EXPECT_LT(underruns, 7); +} + +TEST_F(AudioPipelineTest, nearEmpty) +{ + const uint32_t rtpFrequency = 48000; + memory::PacketPoolAllocator allocator(4096 * 4, "JitterTest"); + + auto pipeline = std::make_unique(48000, 20, 100, 1); + + uint32_t extendedSequenceNumber = 0; + + utils::Pacer playbackPacer(utils::Time::ms * 20); + playbackPacer.reset(_timeTurner.getAbsoluteTime() + utils::Time::ms * 3); // consume 10ms after capture + + // clang-format off + std::vector jitter = + {0, 0, 0, 0, 0, 1, 0, 0, 0, 19, 1, 0, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1, + 0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1, + 0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1, + 0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1, + 0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1,0, 1, 0, 1, 1, 2, 0, 0, 1, 1, 0, 1, 0, 1, 1}; + // clang-format on + + emulator::JitterPatternSource audioSource(allocator, 20, jitter); + audioSource.openWithTone(210, 0.34); + { + auto firstPacket = audioSource.getNext(_timeTurner.getAbsoluteTime()); // reset audio time line + auto header = rtp::RtpHeader::fromPacket(*firstPacket); + extendedSequenceNumber = header->sequenceNumber; + } + uint32_t underruns = 0; + const auto samplesPerPacketFetch = 20 * rtpFrequency / 1000; + + for (uint64_t timeSteps = 0; timeSteps < jitter.size() * 20; ++timeSteps) + { + _timeTurner.advance(utils::Time::ms); + const auto timestamp = utils::Time::getAbsoluteTime(); + if (pipeline->needProcess()) + { + pipeline->process(timestamp); + } + + if (playbackPacer.timeToNextTick(timestamp) <= 0) + { + if (pipeline->needProcess()) + { + ++underruns; + } + pipeline->fetchStereo(samplesPerPacketFetch); + playbackPacer.tick(timestamp); + } + + while (auto packet = audioSource.getNext(timestamp)) + { + auto header = rtp::RtpHeader::fromPacket(*packet); + auto adv = static_cast( + header->sequenceNumber.get() - static_cast(extendedSequenceNumber & 0xFFFFu)); + extendedSequenceNumber += adv; + pipeline->onRtpPacket(extendedSequenceNumber, std::move(packet), timestamp); + } + } + + EXPECT_EQ(underruns, 1); + _timeTurner.advance(40 * utils::Time::ms); + pipeline->process(utils::Time::getAbsoluteTime()); + EXPECT_EQ(pipeline->fetchStereo(samplesPerPacketFetch), 960); + pipeline->process(utils::Time::getAbsoluteTime()); + EXPECT_EQ(pipeline->fetchStereo(samplesPerPacketFetch), 0); +} + +TEST_F(AudioPipelineTest, everIncreasing) +{ + const uint32_t rtpFrequency = 48000; + memory::PacketPoolAllocator allocator(4096 * 4, "JitterTest"); + + auto pipeline = std::make_unique(48000, 20, 100, 1); + + uint32_t extendedSequenceNumber = 0; + + utils::Pacer playbackPacer(utils::Time::ms * 20); + playbackPacer.reset(_timeTurner.getAbsoluteTime() + utils::Time::ms * 3); // consume 10ms after capture + + std::vector jitter = {0, 0, 0, 10, 20, 60, 90, 120}; + for (int i = 0; i < 310; ++i) + { + jitter.push_back(120 + i * 90); + } + + emulator::JitterPatternSource audioSource(allocator, 20, jitter); + audioSource.openWithTone(210, 0.34); + { + auto firstPacket = audioSource.getNext(_timeTurner.getAbsoluteTime()); // reset audio time line + auto header = rtp::RtpHeader::fromPacket(*firstPacket); + extendedSequenceNumber = header->sequenceNumber; + } + uint32_t underruns = 0; + const auto samplesPerPacketFetch = 20 * rtpFrequency / 1000; + + for (uint64_t timeSteps = 0; timeSteps < 33000; ++timeSteps) + { + _timeTurner.advance(utils::Time::ms); + const auto timestamp = utils::Time::getAbsoluteTime(); + if (pipeline->needProcess()) + { + pipeline->process(timestamp); + } + + if (playbackPacer.timeToNextTick(timestamp) <= 0) + { + if (pipeline->needProcess()) + { + ++underruns; + } + pipeline->fetchStereo(samplesPerPacketFetch); + playbackPacer.tick(timestamp); + } + + while (auto packet = audioSource.getNext(timestamp)) + { + auto header = rtp::RtpHeader::fromPacket(*packet); + auto adv = static_cast( + header->sequenceNumber.get() - static_cast(extendedSequenceNumber & 0xFFFFu)); + extendedSequenceNumber += adv; + bool posted = pipeline->onRtpPacket(extendedSequenceNumber, std::move(packet), timestamp); + if (timeSteps == 32969) + { + ASSERT_FALSE(posted); + } + else + { + ASSERT_TRUE(posted); + } + } + } + + EXPECT_LT(underruns, 1650); + EXPECT_GE(underruns, 1600); +} diff --git a/test/transport/JitterTest.cpp b/test/transport/JitterTest.cpp index 50b81d3a4..98d9be4c9 100644 --- a/test/transport/JitterTest.cpp +++ b/test/transport/JitterTest.cpp @@ -1,10 +1,17 @@ #include "bwe/BandwidthEstimator.h" +#include "codec/AudioFader.h" #include "concurrency/MpmcQueue.h" #include "logger/Logger.h" +#include "logger/PacketLogger.h" #include "math/Matrix.h" +#include "math/WelfordVariance.h" #include "memory/PacketPoolAllocator.h" +#include "rtp/JitterBufferList.h" +#include "rtp/JitterEstimator.h" #include "rtp/JitterTracker.h" #include "rtp/RtpHeader.h" +#include "rtp/SendTimeDial.h" +#include "test/CsvWriter.h" #include "test/bwe/FakeAudioSource.h" #include "test/bwe/FakeCall.h" #include "test/bwe/FakeCrossTraffic.h" @@ -14,6 +21,735 @@ using namespace math; +#include "utils/ScopedFileHandle.h" +#include +#include + +using namespace math; + +namespace rtp +{ +template +class Backlog +{ +public: + T add(T value) + { + _index = (_index + 1) % S; + auto prev = _values[_index]; + _values[_index] = value; + return prev; + } + + T front() { return _values[_index]; } + T back() { return _values[(_index + 1) % S]; } + + T getMean() const + { + T acc = 0; + for (int i = 0; i < S; ++i) + { + acc += _values[i]; + } + + return acc / S; + } + + T getVariance(T hypotheticalMean) const + { + T acc = 0; + for (size_t i = 0; i < S; ++i) + { + auto d = _values[i] - hypotheticalMean; + acc += d * d; + } + + return acc / S; + } + +private: + T _values[S]; + uint32_t _index; +}; +} // namespace rtp +namespace +{ +class PacketJitterEmulator +{ +public: + PacketJitterEmulator(memory::PacketPoolAllocator& allocator, uint32_t ssrc) + : _source(allocator, 80, ssrc), + _jitter(10), + _link("Jitter", 2500, 15000, 1500) + { + } + + void setJitter(float ms) { _jitter = ms; } + + memory::UniquePacket get(uint64_t timestamp) + { + auto packet = _source.getPacket(timestamp); + if (packet) + { + _link.push(std::move(packet), timestamp, false); + if (_link.count() == 1) + { + uint32_t latency = (rand() % static_cast(_jitter * 101)) / 100; + _link.injectDelaySpike(latency); + } + } + + return _link.pop(timestamp); + } + + uint64_t nextEmitTime(uint64_t timestamp) const { return _link.timeToRelease(timestamp); } + +private: + fakenet::FakeAudioSource _source; + double _jitter; + + memory::UniquePacket _packet; + fakenet::NetworkLink _link; +}; +} // namespace + +TEST(Welford, uniform) +{ + math::WelfordVariance w1; + math::RollingWelfordVariance w2(250); + uint32_t range = 1000; + for (int i = 0; i < 2500; ++i) + { + double val = rand() % (range + 1); + w1.add(val); + w2.add(val); + + if (i % 100 == 0) + { + logger::debug("w1 %f, %f, w2 %f, %f", + "", + w1.getMean(), + sqrt(w1.getVariance()), + w2.getMean(), + sqrt(w2.getVariance())); + } + } + + EXPECT_NEAR(w2.getMean(), 500.0, 20.0); + EXPECT_NEAR(w2.getVariance(), (range * range) / 12, 1500); +} + +TEST(Welford, normal) +{ + math::WelfordVariance w1; + math::RollingWelfordVariance w2(250); + + std::random_device rd{}; + std::mt19937 gen{rd()}; + + // values near the mean are the most likely + // standard deviation affects the dispersion of generated values from the mean + std::normal_distribution<> normalDistribution{500, 100}; + + for (int i = 0; i < 2500; ++i) + { + double val = normalDistribution(gen); + w1.add(val); + w2.add(val); + + if (i % 100 == 0) + { + logger::debug("w1 %f, %f, w2 %f, %f", + "", + w1.getMean(), + sqrt(w1.getVariance()), + w2.getMean(), + sqrt(w2.getVariance())); + } + } + EXPECT_NEAR(w2.getMean(), 500.0, 20.0); + EXPECT_NEAR(w2.getVariance(), 100.0 * 100, 2500.0); +} + +class JitterBufferTest : public ::testing::Test +{ +public: + JitterBufferTest() : _allocator(400, "test") {} + + memory::PacketPoolAllocator _allocator; + rtp::JitterBufferList _buffer; +}; + +// jitter below 10ms will never affect subsequent packet. It is just a delay inflicted on each packet unrelated to +// previous events +TEST(JitterTest, a10ms) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + rtp::JitterEstimator jitt(48000); + emulator.setJitter(10); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 15000; ++i) + { + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(timestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 600); + EXPECT_GE(countBelow, 600); + EXPECT_GE(below95, 1440); + EXPECT_NEAR(jitt.getJitter(), 5.5, 1.5); +} + +// delay above 20ms will affect subsequent packet as previous packet has not been delivered yet. +TEST(JitterTest, a30ms) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + emulator.setJitter(30); + rtp::JitterEstimator jitt(48000); + rtp::Backlog blog; + math::RollingWelfordVariance var2(250); + uint64_t timestamp = utils::Time::getAbsoluteTime(); + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 15000; ++i) + { + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(timestamp, header->timestamp.get()); + blog.add(delay); + var2.add(delay); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + } + + auto m = jitt.getJitter(); + for (int i = 0; i < 10; ++i) + { + auto a = m - 2.0 + i * 0.4; + logger::info("m %.4f std %.4f", "", a, blog.getVariance(a)); + } + logger::info("roll wf m %.4f std %.4f", "", var2.getMean(), blog.getVariance(var2.getMean())); + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 600); + EXPECT_GE(countBelow, 600); + EXPECT_GE(below95, 1440); + EXPECT_NEAR(jitt.getJitter(), 12.0, 1.5); +} + +TEST(JitterTest, clockSkewRxFaster) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + rtp::JitterEstimator jitt(48000); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + uint64_t remoteTimestamp = timestamp; + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 15000; ++i) + { + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(remoteTimestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + remoteTimestamp += utils::Time::ms * 2; + // 500 iterations per s + if (i % 5 == 0) // 0.01s + { + remoteTimestamp += utils::Time::us * 5; // 0.5ms / s + } + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 650); + EXPECT_GE(countBelow, 650); + EXPECT_GE(below95, 1440); + EXPECT_NEAR(jitt.getJitter(), 5.5, 1.5); +} + +TEST(JitterTest, clockSkewRxSlower) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + rtp::JitterEstimator jitt(48000); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + uint64_t remoteTimestamp = timestamp; + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 15000; ++i) + { + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(remoteTimestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + remoteTimestamp += utils::Time::ms * 2; + if (i % 5 == 0) // 0.01s + { + timestamp += utils::Time::us * 5; + } + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 700); + EXPECT_GE(countBelow, 700); + EXPECT_GE(below95, 1440); + EXPECT_NEAR(jitt.getJitter(), 5.5, 1.5); +} + +TEST(JitterTest, a230) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + emulator.setJitter(230); + rtp::JitterEstimator jitt(48000); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 15000; ++i) + { + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(timestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 600); + EXPECT_GE(countBelow, 600); + EXPECT_GE(below95, 1400); + EXPECT_NEAR(jitt.getJitter(), 75, 15.5); +} + +TEST(JitterTest, gap) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + rtp::JitterEstimator jitt(48000); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 15000; ++i) + { + auto p = emulator.get(timestamp); + if (p && (i < 300 || i > 450)) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(timestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 600); + EXPECT_GE(countBelow, 600); + EXPECT_GE(below95, 1440); + EXPECT_NEAR(jitt.getJitter(), 5.5, 1.5); +} + +TEST(JitterTest, dtx) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + rtp::RtpDelayTracker delayTracker(48000); + + uint64_t receiveTime = 5000; + uint32_t rtpTimestamp = 7800; + for (int i = 0; i < 300; ++i) + { + delayTracker.update(receiveTime + (rand() % 40) * utils::Time::ms / 10, rtpTimestamp); + receiveTime += utils::Time::ms * 20; + rtpTimestamp += 960; + } + + receiveTime += utils::Time::minute * 3; + rtpTimestamp += 24 * 60 * 48000; // 24 min increase in rtptimestamp + auto delay = delayTracker.update(receiveTime, rtpTimestamp); + EXPECT_EQ(delay, 0); + + rtpTimestamp += 960; + receiveTime += utils::Time::ms * 20; + delay = delayTracker.update(receiveTime + 2 * utils::Time::ms, rtpTimestamp); + EXPECT_NEAR(delay, 2.0 * utils::Time::ms, 11 * utils::Time::us); + + rtpTimestamp -= 3 * 48000; // rollback rtp by 3s, would look like huge 3s jitter + receiveTime += utils::Time::ms * 20; + delay = delayTracker.update(receiveTime, rtpTimestamp); + EXPECT_LE(delay, utils::Time::ms * 10); + + rtpTimestamp += 960; + receiveTime += utils::Time::ms * 200; + delay = delayTracker.update(receiveTime, rtpTimestamp); + EXPECT_GE(delay, utils::Time::ms * 179); + + // run for 10s + for (int i = 0; i < 9 * 50; ++i) + { + rtpTimestamp += 960; + receiveTime += utils::Time::ms * 20; + delay = delayTracker.update(receiveTime + (rand() % 4) * utils::Time::ms, rtpTimestamp); + } + EXPECT_LT(delay, utils::Time::ms * 9); + + receiveTime += utils::Time::sec * 10; + for (int i = 0; i < 10 * 50; ++i) + { + rtpTimestamp += 960; + receiveTime += utils::Time::ms * 20; + delay = delayTracker.update(receiveTime + (rand() % 4) * utils::Time::ms, rtpTimestamp); + } + EXPECT_LT(delay, utils::Time::ms * 100); +} + +TEST(JitterTest, adaptDown23) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + emulator.setJitter(200); + rtp::JitterEstimator jitt(48000); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 15000; ++i) + { + if (i == 9980) + { + EXPECT_NEAR(jitt.getJitter(), 72, 14.0); + emulator.setJitter(23); + logger::info("decrease jitter to 23ms", ""); + } + + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(timestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 500); + EXPECT_GE(countBelow, 500); + EXPECT_GE(below95, 1440); + EXPECT_NEAR(jitt.getJitter(), 10, 1.5); +} + +TEST(JitterTest, adaptUp200) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + + rtp::JitterEstimator jitt(48000); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + rtp::JitterTracker tracker(48000); + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 55000; ++i) + { + if (i == 9980) + { + EXPECT_NEAR(jitt.getJitter(), 4.5, 1.5); + emulator.setJitter(200); + logger::info("incr jitter to 30ms", ""); + } + + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(timestamp, header->timestamp.get()); + tracker.update(timestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + if (i % 500 == 0) + { + logger::info("jitter %f, 95p %f, maxJ %f, ietf jitter %fms", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter(), + tracker.get() / 48.0); + } + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 600); + EXPECT_GE(countBelow, 600); + EXPECT_GE(below95, 1430); + EXPECT_NEAR(jitt.getJitter(), 75, 13.5); +} + +TEST(JitterTest, adaptUp30) +{ + memory::PacketPoolAllocator allocator(3000, "TestAllocator"); + PacketJitterEmulator emulator(allocator, 5670); + emulator.setJitter(10); + rtp::JitterEstimator jitt(48000); + + uint64_t timestamp = utils::Time::getAbsoluteTime(); + rtp::JitterTracker tracker(48000); + + int countBelow = 0; + int countAbove = 0; + int below95 = 0; + for (int i = 0; i < 12500; ++i) + { + if (i == 9980) + { + EXPECT_NEAR(jitt.getJitter(), 4.5, 1.5); + emulator.setJitter(30); + logger::info("incr jitter to 30ms", ""); + } + + auto p = emulator.get(timestamp); + if (p) + { + auto header = rtp::RtpHeader::fromPacket(*p); + auto delay = jitt.update(timestamp, header->timestamp.get()); + tracker.update(timestamp, header->timestamp.get()); + + if (delay > jitt.getJitter()) + { + ++countAbove; + } + else + { + ++countBelow; + } + + if (delay < jitt.get95Percentile()) + { + ++below95; + } + } + timestamp += utils::Time::ms * 2; + if (i % 500 == 0) + { + logger::info("jitter %f, 95p %f, maxJ %f, ietf jitter %fms", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter(), + tracker.get() / 48.0); + } + } + + logger::info("jitter %f, 95p %f, maxJ %f", + "JitterEstimator", + jitt.getJitter(), + jitt.get95Percentile(), + jitt.getMaxJitter()); + + logger::info("above %d, below %d, 95p %d", "", countAbove, countBelow, below95); + EXPECT_GE(countAbove, 600); + EXPECT_GE(countBelow, 600); + EXPECT_GE(below95, 1150); + EXPECT_NEAR(jitt.getJitter(), 12, 2.5); +} + TEST(JitterTest, jitterTracker) { rtp::JitterTracker tracker(48000); @@ -27,8 +763,9 @@ TEST(JitterTest, jitterTracker) rtpTimestamp += 960; // 20ms receiveTime += (15 + rand() % 11) * utils::Time::ms; } + logger::debug("jit %u", "", tracker.get()); - EXPECT_NEAR(tracker.get(), 25 * 48 / 10, 10); + EXPECT_NEAR(tracker.get(), 25 * 48 / 10, 5); // receive with 2ms difference in interval compared to send interval for (int i = 0; i < 8000; ++i) @@ -40,3 +777,196 @@ TEST(JitterTest, jitterTracker) logger::debug("jit %u", "", tracker.get()); EXPECT_NEAR(tracker.get(), 2 * 48, 15); } + +TEST_F(JitterBufferTest, bufferPlain) +{ + memory::Packet stageArea; + { + auto header = rtp::RtpHeader::create(stageArea); + header->ssrc = 4000; + stageArea.setLength(250); + } + const auto oneFromFull = _buffer.SIZE - 2; + + for (int i = 0; i < oneFromFull; ++i) + { + auto p = memory::makeUniquePacket(_allocator, stageArea); + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = i + 100; + header->timestamp = 56000 + i * 960; + + EXPECT_TRUE(_buffer.add(std::move(p))); + } + + EXPECT_EQ(_buffer.count(), oneFromFull); + EXPECT_EQ(_buffer.getRtpDelay(), 960 * (oneFromFull - 1)); + EXPECT_FALSE(_buffer.empty()); + EXPECT_EQ(_buffer.getFrontRtp()->timestamp.get(), 56000); + + // make a gap of 3 pkts + auto p = memory::makeUniquePacket(_allocator, stageArea); + { + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = 100 + oneFromFull; + header->timestamp = 56000 + oneFromFull * 960; + } + EXPECT_TRUE(_buffer.add(std::move(p))); + EXPECT_EQ(_buffer.getRtpDelay(), 960 * oneFromFull); + + uint16_t prevSeq = 99; + uint32_t count = 0; + EXPECT_EQ(_buffer.count(), oneFromFull + 1); + for (auto packet = _buffer.pop(); packet; packet = _buffer.pop()) + { + auto header = rtp::RtpHeader::fromPacket(*packet); + EXPECT_GT(header->sequenceNumber.get(), prevSeq); + prevSeq = header->sequenceNumber.get(); + ++count; + } + EXPECT_EQ(count, oneFromFull + 1); +} + +TEST_F(JitterBufferTest, bufferReorder) +{ + memory::Packet stageArea; + { + auto header = rtp::RtpHeader::create(stageArea); + header->ssrc = 4000; + stageArea.setLength(250); + } + + for (int i = 0; i < 5; ++i) + { + auto p = memory::makeUniquePacket(_allocator, stageArea); + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = i + 100; + header->timestamp = 56000 + i * 960; + + EXPECT_TRUE(_buffer.add(std::move(p))); + } + + EXPECT_EQ(_buffer.count(), 5); + EXPECT_EQ(_buffer.getRtpDelay(), 960 * 4); + EXPECT_FALSE(_buffer.empty()); + EXPECT_EQ(_buffer.getFrontRtp()->timestamp.get(), 56000); + + // make a gap + auto p = memory::makeUniquePacket(_allocator, stageArea); + { + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = 110; + header->timestamp = 56000 + 10 * 960; + } + EXPECT_TRUE(_buffer.add(std::move(p))); + EXPECT_EQ(_buffer.getRtpDelay(), 960 * 10); + + // reorder + p = memory::makeUniquePacket(_allocator, stageArea); + { + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = 105; + header->timestamp = 56000 + 5 * 960; + } + EXPECT_TRUE(_buffer.add(std::move(p))); + EXPECT_EQ(_buffer.getRtpDelay(), 960 * 10); + EXPECT_EQ(_buffer.count(), 7); + + p = memory::makeUniquePacket(_allocator, stageArea); + { + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = 108; + header->timestamp = 56000 + 28 * 960; + } + EXPECT_TRUE(_buffer.add(std::move(p))); + + uint16_t prevSeq = 99; + uint32_t count = 0; + for (auto packet = _buffer.pop(); packet; packet = _buffer.pop()) + { + auto header = rtp::RtpHeader::fromPacket(*packet); + EXPECT_GT(header->sequenceNumber.get(), prevSeq); + prevSeq = header->sequenceNumber.get(); + ++count; + } + EXPECT_EQ(count, 8); +} + +TEST_F(JitterBufferTest, bufferEmptyFull) +{ + memory::Packet stageArea; + { + auto header = rtp::RtpHeader::create(stageArea); + header->ssrc = 4000; + stageArea.setLength(250); + } + + EXPECT_EQ(_buffer.pop(), nullptr); + + for (int i = 0; i < _buffer.SIZE; ++i) + { + auto p = memory::makeUniquePacket(_allocator, stageArea); + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = i + 100; + header->timestamp = 56000 + i * 960; + + EXPECT_TRUE(_buffer.add(std::move(p))); + } + + EXPECT_EQ(_buffer.count(), _buffer.SIZE); + EXPECT_EQ(_buffer.getRtpDelay(), 960 * (_buffer.SIZE - 1)); + EXPECT_FALSE(_buffer.empty()); + EXPECT_EQ(_buffer.getFrontRtp()->timestamp.get(), 56000); + + auto p = memory::makeUniquePacket(_allocator, stageArea); + { + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = 100 + _buffer.count(); + header->timestamp = 56000 + 10 * 960; + } + EXPECT_FALSE(_buffer.add(std::move(p))); + EXPECT_EQ(_buffer.getRtpDelay(), 960 * (_buffer.SIZE - 1)); + + uint16_t prevSeq = 99; + uint32_t count = 0; + EXPECT_EQ(_buffer.count(), _buffer.SIZE); + for (auto packet = _buffer.pop(); packet; packet = _buffer.pop()) + { + auto header = rtp::RtpHeader::fromPacket(*packet); + EXPECT_GT(header->sequenceNumber.get(), prevSeq); + prevSeq = header->sequenceNumber.get(); + ++count; + } + EXPECT_EQ(count, _buffer.SIZE); + EXPECT_TRUE(_buffer.empty()); + EXPECT_EQ(_buffer.pop(), nullptr); +} + +TEST_F(JitterBufferTest, reorderedFull) +{ + memory::Packet stageArea; + { + auto header = rtp::RtpHeader::create(stageArea); + header->ssrc = 4000; + stageArea.setLength(250); + } + + EXPECT_EQ(_buffer.pop(), nullptr); + + for (int i = 0; i < _buffer.SIZE - 50; ++i) + { + auto p = memory::makeUniquePacket(_allocator, stageArea); + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = i + 100; + header->timestamp = 56000 + i * 960; + + EXPECT_TRUE(_buffer.add(std::move(p))); + } + + auto p = memory::makeUniquePacket(_allocator, stageArea); + auto header = rtp::RtpHeader::create(*p); + header->sequenceNumber = 49; + header->timestamp = 56100; + + EXPECT_EQ(_buffer.count(), _buffer.SIZE - 50); + EXPECT_TRUE(_buffer.add(std::move(p))); +} diff --git a/utils/ScopedFileHandle.h b/utils/ScopedFileHandle.h index d29e11f87..6da2590ad 100644 --- a/utils/ScopedFileHandle.h +++ b/utils/ScopedFileHandle.h @@ -20,8 +20,10 @@ class ScopedFileHandle FILE* get() { return _file; } + operator bool() const { return _file != nullptr; } + private: FILE* _file; }; -} +} // namespace utils diff --git a/utils/Trackers.cpp b/utils/Trackers.cpp index cf921ac00..ae659eaa3 100644 --- a/utils/Trackers.cpp +++ b/utils/Trackers.cpp @@ -72,6 +72,18 @@ void MaxTracker::update(double value) } } +void MaxTrackerPlain::update(double value) +{ + if (value > _value) + { + _value = value; + } + else + { + _value = _value - _decay * (_value - value); + } +} + void TimeTracker::stop() { _stopTime = Time::getApproximateTime(); diff --git a/utils/Trackers.h b/utils/Trackers.h index a9167b525..0ba10d8b6 100644 --- a/utils/Trackers.h +++ b/utils/Trackers.h @@ -36,6 +36,21 @@ class MaxTracker const double _decay; }; +// Single threaded max tracker +class MaxTrackerPlain +{ +public: + explicit MaxTrackerPlain(double decay, double startValue = 0) : _value(startValue), _decay(decay) {} + + void update(double value); + void reset(double value) { _value = value; } + double get() const { return _value; } + +private: + double _value; + const double _decay; +}; + class AvgRateTracker { public: