Skip to content

Commit

Permalink
MEDIA-2661: ICE reconnects during meeting and fails (#377)
Browse files Browse the repository at this point in the history
  • Loading branch information
RicardoMDomingues authored Jun 27, 2024
1 parent 3066c0a commit f58c712
Show file tree
Hide file tree
Showing 13 changed files with 1,085 additions and 288 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ set(TEST_FILES
test/utils/StringTokenizerTest.cpp
test/utils/TrackerTest.cpp
test/utils/StdExtensionsTest.cpp
test/utils/SocketAddressTest.cpp
test/memory/ListTest.cpp
test/memory/ArrayTest.cpp
test/jobmanager/JobManagerTest.cpp
Expand Down
32 changes: 32 additions & 0 deletions test/include/mocks/IceSessionEventListenerMock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "transport/ice/IceSession.h"
#include <gmock/gmock.h>
#include <memory>

namespace test
{

struct IceSessionEventListenerMock : public ::ice::IceSession::IEvents
{
MOCK_METHOD(void, onIceStateChanged, (::ice::IceSession * session, ::ice::IceSession::State state), (override));

MOCK_METHOD(void, onIceCompleted, (::ice::IceSession * session), (override));

MOCK_METHOD(void,
onIceCandidateChanged,
(::ice::IceSession * session, ::ice::IceEndpoint* localEndpoint, const ::transport::SocketAddress& remotePort),
(override));

MOCK_METHOD(void,
onIceCandidateAccepted,
(::ice::IceSession * session, ::ice::IceEndpoint* localEndpoint, const ::ice::IceCandidate& remoteCandidate),
(override));

MOCK_METHOD(void,
onIceDiscardCandidate,
(::ice::IceSession * session, ::ice::IceEndpoint* localEndpoint, const ::transport::SocketAddress& remotePort),
(override));
};

} // namespace test
5 changes: 4 additions & 1 deletion test/integration/ConfIntegrationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,10 +523,13 @@ TEST_F(IntegrationTest, probing)
// non-zero RTT indicates that there was a successful candidates pair, hence probing was performed
ASSERT_GT(transport->getRtt(), 0);
transport->stop();
start = utils::Time::getAbsoluteTime();
while (transport->hasPendingJobs() && utils::Time::getAbsoluteTime() - start < timeout)
{
utils::Time::nanoSleep(utils::Time::ms * 100);
}

ASSERT_EQ(false, transport->hasPendingJobs());
finalizeSimulation();
});
}
Expand Down Expand Up @@ -1015,7 +1018,7 @@ TEST_F(IntegrationTest, endpointMessage)
++endpointMessageCount;
}
});
// Dom speaker + UserMap + endpont message
// Dom speaker + UserMap + endpoint message
EXPECT_CALL(listenerMock, onWebRtcDataString(_, _)).Times(AtLeast(3));
group.run(utils::Time::sec * 2);

Expand Down
189 changes: 180 additions & 9 deletions test/transport/IceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
#include "logger/Logger.h"
#include "memory/AudioPacketPoolAllocator.h"
#include "memory/Packet.h"
#include "mocks/IceSessionEventListenerMock.h"
#include "test/integration/emulator/TimeTurner.h"
#include "transport/RtcSocket.h"
#include "transport/RtcePoll.h"
#include "transport/ice/IceSerialize.h"
#include "transport/ice/IceSession.h"
#include "transport/ice/Stun.h"
#include "utils/ContainerAlgorithms.h"
#include <atomic>
#include <gmock/gmock.h>
#include <gtest/gtest.h>

using namespace ::testing;

namespace
{
void setRemoteCandidates(ice::IceSession& target, ice::IceSession& source)
Expand Down Expand Up @@ -1497,7 +1501,7 @@ TEST_F(IceRobustness, earlyProbes)
}
}

EXPECT_EQ(sessions[0]->getRemoteCandidates().size(), 2);
EXPECT_EQ(sessions[0]->getRemoteCandidates().size(), 3);
EXPECT_GE(sessions[0]->getState(), ice::IceSession::State::CONNECTING);
EXPECT_LE(sessions[0]->getState(), ice::IceSession::State::CONNECTED);

Expand Down Expand Up @@ -1527,11 +1531,178 @@ TEST_F(IceRobustness, earlyProbes)
EXPECT_TRUE(selectedPair0.second.address.equalsIp(firewall2.getPublicIp()));

auto remoteCandidates1 = sessions[0]->getRemoteCandidates();
EXPECT_EQ(remoteCandidates1.size(), 2);
EXPECT_EQ(remoteCandidates1.size(), 3);
auto selectedPair1 = sessions[1]->getSelectedPair();
EXPECT_TRUE(selectedPair1.second.address.equalsIp(firewall1.getPublicIp()));
}

TEST_F(IceRobustness, removeUnviableCandidates)
{
fakenet::Internet internet;

FakeStunServer stunServer(transport::SocketAddress::parse("64.233.165.127", 19302), internet);
fakenet::Firewall firewall1(transport::SocketAddress::parse("216.93.246.10", 0), internet);
fakenet::Firewall firewall2(transport::SocketAddress::parse("216.93.24.11", 0), internet);

FakeEndpoint endpoint1(transport::SocketAddress::parse("172.16.0.10", 2000), firewall1);
FakeEndpoint endpoint2(transport::SocketAddress::parse("172.16.0.20", 3000), firewall2);

FakeEndpoint otherNonAccessibleEndpoint(transport::SocketAddress::parse("192.168.1.10", 25000));

NiceMock<test::IceSessionEventListenerMock> session0EventListenerMock;

EXPECT_CALL(session0EventListenerMock, onIceCandidateAccepted(_, _, Truly([](const auto& candidate) {
return transport::SocketAddress::parse("216.93.24.11", 0).equalsIp(candidate.address);
}))).Times(1);

EXPECT_CALL(session0EventListenerMock, onIceCandidateAccepted(_, _, Truly([](const auto& candidate) {
return transport::SocketAddress::parse("64.233.165.127", 0).equalsIp(candidate.address);
}))).Times(1);

// Unviable must be discarded
EXPECT_CALL(session0EventListenerMock, onIceDiscardCandidate(_, _, Truly([](const auto& remotePort) {
return transport::SocketAddress::parse("192.168.1.10", 0).equalsIp(remotePort);
}))).Times(1);

EXPECT_CALL(session0EventListenerMock, onIceDiscardCandidate(_, _, Truly([](const auto& remotePort) {
return transport::SocketAddress::parse("172.16.0.20", 0).equalsIp(remotePort);
}))).Times(1);

ice::IceConfig config;
IceSessions sessions;
sessions.emplace_back(std::make_unique<ice::IceSession>(1,
config,
ice::IceComponent::RTP,
ice::IceRole::CONTROLLING,
&session0EventListenerMock));

sessions.emplace_back(
std::make_unique<ice::IceSession>(2, config, ice::IceComponent::RTP, ice::IceRole::CONTROLLED, nullptr));

endpoint1.attach(sessions[0]);
endpoint2.attach(sessions[1]);
otherNonAccessibleEndpoint.attach(sessions[1]);

std::vector<transport::SocketAddress> stunServers;
stunServers.push_back(stunServer.getIp());

gatherCandidates(internet, stunServers, sessions, timeSource);
// provide one side with candidates and credentials
logger::info("GATHER PHASE COMPLETE", "");
setRemoteCandidates(*sessions[1], *sessions[0]);
sessions[1]->setRemoteCredentials(sessions[0]->getLocalCredentials());

int si = 0;
for (auto& session : sessions)
{
logger::info("session %d local candidates", "", si);
log(session->getLocalCandidates());
logger::info("remote candidates", "");
log(session->getRemoteCandidates());
}

EXPECT_EQ(sessions[0]->getRemoteCandidates().size(), 0);

logger::info("probing from session %u", "", 0);
sessions[1]->probeRemoteCandidates(ice::IceRole::CONTROLLING, timeSource.getAbsoluteTime());

const auto startTimeNoCredentials = timeSource.getAbsoluteTime();
bool running = true;
while (running && timeSource.getAbsoluteTime() - startTimeNoCredentials < utils::Time::sec * 5)
{
internet.process(timeSource.getAbsoluteTime());
int64_t timeout = std::numeric_limits<int64_t>::max();
running = false;
for (auto& session : sessions)
{
auto sessionTimeout = session->processTimeout(timeSource.getAbsoluteTime());
internet.process(timeSource.getAbsoluteTime());
if (session->getState() == ice::IceSession::State::CONNECTING)
{
running = true;
}
if (sessionTimeout >= 0)
{
timeout = std::min(timeout, sessionTimeout);
}
}
if (running && timeout > 0)
{
timeSource.advance(timeout + 2);
}
}

ASSERT_EQ(sessions[0]->getRemoteCandidates().size(), 1);
EXPECT_EQ(sessions[0]->getState(), ice::IceSession::State::READY);

const auto session0Remotes = sessions[0]->getRemoteCandidates();
EXPECT_EQ(session0Remotes[0].type, ice::IceCandidate::Type::PRFLX);
EXPECT_TRUE(session0Remotes[0].address.equalsIp(firewall2.getPublicIp()));
// session 1 has a candidate, but session 0 cannot send request back on the probe

sessions[0]->setRemoteCredentials(sessions[1]->getLocalCredentials());
setRemoteCandidates(*sessions[0], *sessions[1]);
sessions[0]->probeRemoteCandidates(sessions[1]->getRole(), timeSource.getAbsoluteTime());
for (bool running = true; running;)
{
internet.process(timeSource.getAbsoluteTime());
int64_t timeout = std::numeric_limits<int64_t>::max();
running = false;
for (auto& session : sessions)
{
auto sessionTimeout = session->processTimeout(timeSource.getAbsoluteTime());
internet.process(timeSource.getAbsoluteTime());
if (session->getState() == ice::IceSession::State::CONNECTING)
{
running = true;
}
if (sessionTimeout >= 0)
{
timeout = std::min(timeout, sessionTimeout);
}
}
if (running && timeout > 0)
{
timeSource.advance(timeout + 2);
}
}

EXPECT_EQ(sessions[0]->getRemoteCandidates().size(), 4);
EXPECT_EQ(sessions[0]->getState(), ice::IceSession::State::CONNECTED);

// Continue until univable are discarded
const auto connectedTime = timeSource.getAbsoluteTime();
running = true;
while (running && timeSource.getAbsoluteTime() - connectedTime < utils::Time::sec * 10)
{
internet.process(timeSource.getAbsoluteTime());
int64_t timeout = std::numeric_limits<int64_t>::max();
running = false;
for (auto& session : sessions)
{
auto sessionTimeout = session->processTimeout(timeSource.getAbsoluteTime());
internet.process(timeSource.getAbsoluteTime());

if (sessionTimeout >= 0)
{
timeout = std::min(timeout, sessionTimeout);
}
}

if (sessions[0]->getRemoteCandidates().size() > 2)
{
running = true;
}

if (running && timeout > 0)
{
timeSource.advance(timeout + 2);
}
}

EXPECT_EQ(sessions[0]->getRemoteCandidates().size(), 2);
}

TEST_F(IceRobustness, roleConflict)
{
fakenet::Internet internet;
Expand Down Expand Up @@ -1606,13 +1777,13 @@ TEST_F(IceTest, udpTcpTimeout)
endpoint2b.attach(sessions[1]);

exchangeInfo(*sessions[0], *sessions[1]);
sessions[0]->addRemoteCandidate(ice::IceCandidate("werwe",
ice::IceComponent::RTP,
ice::TransportType::TCP,
12312,
endpoint2b._address,
ice::IceCandidate::Type::HOST,
ice::TcpType::PASSIVE),
sessions[0]->addRemoteTcpPassiveCandidate(ice::IceCandidate("werwe",
ice::IceComponent::RTP,
ice::TransportType::TCP,
12312,
endpoint2b._address,
ice::IceCandidate::Type::HOST,
ice::TcpType::PASSIVE),
&endpoint1b);
sessions[1]->addLocalTcpCandidate(ice::IceCandidate::Type::HOST,
0,
Expand Down
86 changes: 86 additions & 0 deletions test/utils/SocketAddressTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include "utils/SocketAddress.h"
#include <gtest/gtest.h>
#include <string>

using namespace transport;

namespace
{
struct LogLevelSetAndRestore
{
LogLevelSetAndRestore(logger::Level level) : _levelToRestore(logger::_logLevel) { logger::_logLevel = level; }
~LogLevelSetAndRestore() { logger::_logLevel = _levelToRestore; }

private:
logger::Level _levelToRestore;
};

} // namespace

TEST(SocketAddressTest, toFixedStringIpv4)
{
const auto socketAddress = SocketAddress::parse("127.0.0.1", 1096);
auto fixedStringSocket = socketAddress.toFixedString();

ASSERT_STREQ("127.0.0.1:1096", fixedStringSocket.c_str());
ASSERT_EQ(std::strlen("127.0.0.1:1096"), fixedStringSocket.size());
}

TEST(SocketAddressTest, toFixedStringIpv6)
{
const auto socketAddress0 = SocketAddress::parse("2001:0000:130F:0000:0000:09C0:876A:130B", 65093);
auto fixedStringSocket0 = socketAddress0.toFixedString();
ASSERT_STREQ("2001:0:130f::9c0:876a:130b:65093", fixedStringSocket0.c_str());
ASSERT_EQ(std::strlen("2001:0:130f::9c0:876a:130b:65093"), fixedStringSocket0.size());

const auto socketAddress1 = SocketAddress::parse("0:0:0:0:0:0:0:1", 9);
auto fixedStringSocket1 = socketAddress1.toFixedString();
ASSERT_STREQ("::1:9", fixedStringSocket1.c_str());
ASSERT_EQ(std::strlen("::1:9"), fixedStringSocket1.size());
}

TEST(SocketAddressTest, maybeMaskShouldNotMaskWhenLogLevelIsDebug)
{
LogLevelSetAndRestore l(logger::Level::DBG);

const auto ipv6Address = SocketAddress::parse("2001:0:130f::9c0:876a:130b", 1000);
const auto ipv4Address = SocketAddress::parse("64.63.123.98", 2665);

auto maybeMaskedIpv6 = maybeMasked(ipv6Address);
auto maybeMaskedIpv4 = maybeMasked(ipv4Address);

ASSERT_STREQ("2001:0:130f::9c0:876a:130b:1000", maybeMaskedIpv6.c_str());
ASSERT_EQ(std::strlen("2001:0:130f::9c0:876a:130b:1000"), maybeMaskedIpv6.size());

ASSERT_STREQ("64.63.123.98:2665", maybeMaskedIpv4.c_str());
ASSERT_EQ(std::strlen("64.63.123.98:2665"), maybeMaskedIpv4.size());
}

TEST(SocketAddressTest, maybeMaskShouldMaskWhenLogLevelIsNotDebug)
{
std::array<logger::Level, 3> notDebugLevels = {
logger::Level::ERROR,
logger::Level::INFO,
logger::Level::WARN,
};

const auto ipv6Address = SocketAddress::parse("2001:0:130f::9c0:876a:130b", 1000);
const auto ipv4Address = SocketAddress::parse("64.63.123.98", 2665);

for (const auto level : notDebugLevels)
{
LogLevelSetAndRestore l(level);

auto maybeMaskedIpv6 = maybeMasked(ipv6Address);
auto maybeMaskedIpv4 = maybeMasked(ipv4Address);

const char* a = maybeMaskedIpv6.c_str();
(void)a;

ASSERT_STREQ("ipv6-6134717025100730244:1000", maybeMaskedIpv6.c_str());
ASSERT_EQ(std::strlen("ipv6-6134717025100730244:1000"), maybeMaskedIpv6.size());

ASSERT_STREQ("ipv4-14244227746646367573:2665", maybeMaskedIpv4.c_str());
ASSERT_EQ(std::strlen("ipv4-14244227746646367573:2665"), maybeMaskedIpv4.size());
}
}
Loading

0 comments on commit f58c712

Please sign in to comment.