From 7082cbb79fe0bacad7ae86cb97335fda1ba75ece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Thu, 28 Nov 2024 09:32:24 +0100 Subject: [PATCH] Fix TCP discovery server locators translation (#5410) --------- Signed-off-by: cferreiragonz (cherry picked from commit d71913b7a077b65cdc1744cce2babbbf57abb328) # Conflicts: # tools/fds/server.cpp --- src/cpp/rtps/security/SecurityManager.cpp | 3 - .../rtps/transport/TCPTransportInterface.cpp | 78 ++++++++++++++++++- .../rtps/transport/TCPTransportInterface.h | 12 ++- .../rtps/transport/tcp/RTCPMessageManager.cpp | 2 +- test/unittest/transport/TCPv4Tests.cpp | 10 ++- tools/fds/server.cpp | 4 + 6 files changed, 99 insertions(+), 10 deletions(-) diff --git a/src/cpp/rtps/security/SecurityManager.cpp b/src/cpp/rtps/security/SecurityManager.cpp index fb63fd498f3..73de7106165 100644 --- a/src/cpp/rtps/security/SecurityManager.cpp +++ b/src/cpp/rtps/security/SecurityManager.cpp @@ -1162,7 +1162,6 @@ bool SecurityManager::create_participant_stateless_message_reader() ratt.endpoint.multicastLocatorList = pattr.builtin.metatrafficMulticastLocatorList; } ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList; - ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList; ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators; ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; ratt.matched_writers_allocation = pattr.allocation.participants; @@ -1263,7 +1262,6 @@ bool SecurityManager::create_participant_volatile_message_secure_writer() watt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList; watt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators; watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; - watt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList; watt.endpoint.security_attributes().is_submessage_protected = true; watt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; @@ -1316,7 +1314,6 @@ bool SecurityManager::create_participant_volatile_message_secure_reader() ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList; ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators; ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; - ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList; ratt.endpoint.security_attributes().is_submessage_protected = true; ratt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 9e65740f2ab..eef49d71c61 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -203,6 +203,7 @@ void TCPTransportInterface::clean() { std::vector> channels; + std::vector delete_channels; { std::unique_lock scopedLock(sockets_map_mutex_); @@ -212,10 +213,22 @@ void TCPTransportInterface::clean() for (auto& channel : channel_resources_) { - channels.push_back(channel.second); + if (std::find(channels.begin(), channels.end(), channel.second) == channels.end()) + { + channels.push_back(channel.second); + } + else + { + delete_channels.push_back(channel.first); + } } } + for (auto& delete_channel : delete_channels) + { + channel_resources_.erase(delete_channel); + } + for (auto& channel : channels) { if (channel->connection_established()) @@ -291,7 +304,7 @@ Locator TCPTransportInterface::local_endpoint_to_locator( return locator; } -void TCPTransportInterface::bind_socket( +ResponseCode TCPTransportInterface::bind_socket( std::shared_ptr& channel) { std::unique_lock scopedLock(sockets_map_mutex_); @@ -300,7 +313,29 @@ void TCPTransportInterface::bind_socket( auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel); assert(it_remove != unbound_channel_resources_.end()); unbound_channel_resources_.erase(it_remove); - channel_resources_[channel->locator()] = channel; + + ResponseCode ret = RETCODE_OK; + const auto insert_ret = channel_resources_.insert( + decltype(channel_resources_)::value_type{channel->locator(), channel}); + if (false == insert_ret.second) + { + // There is an existing channel that can be used. Force the Client to close unnecessary socket + ret = RETCODE_SERVER_ERROR; + } + + std::vector local_interfaces; + // Check if the locator is from an owned interface to link all local interfaces to the channel + is_own_interface(channel->locator(), local_interfaces); + if (!local_interfaces.empty()) + { + Locator local_locator(channel->locator()); + for (auto& interface_it : local_interfaces) + { + IPLocator::setIPv4(local_locator, interface_it.locator); + channel_resources_.insert(decltype(channel_resources_)::value_type{local_locator, channel}); + } + } + return ret; } bool TCPTransportInterface::check_crc( @@ -931,7 +966,7 @@ bool TCPTransportInterface::CreateInitialConnect( std::lock_guard socketsLock(sockets_map_mutex_); // We try to find a SenderResource that has this locator. - // Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport + // Note: This is done in this level because if we do it at NetworkFactory level, we have to mantain what transport // already reuses a SenderResource. for (auto& sender_resource : send_resource_list) { @@ -995,6 +1030,19 @@ bool TCPTransportInterface::CreateInitialConnect( send_resource_list.emplace_back( static_cast(new TCPSenderResource(*this, physical_locator))); + std::vector local_interfaces; + // Check if the locator is from an owned interface to link all local interfaces to the channel + is_own_interface(physical_locator, local_interfaces); + if (!local_interfaces.empty()) + { + Locator local_locator(physical_locator); + for (auto& interface_it : local_interfaces) + { + IPLocator::setIPv4(local_locator, interface_it.locator); + channel_resources_[local_locator] = channel; + } + } + return true; } @@ -2086,6 +2134,28 @@ void TCPTransportInterface::send_channel_pending_logical_ports( } } +void TCPTransportInterface::is_own_interface( + const Locator& locator, + std::vector& locNames) const +{ + std::vector local_interfaces; + get_ips(local_interfaces, true, false); + for (const auto& interface_it : local_interfaces) + { + if (IPLocator::compareAddress(locator, interface_it.locator) && is_interface_allowed(interface_it.name)) + { + locNames = local_interfaces; + // Remove interface of original locator from the list + locNames.erase(std::remove_if(locNames.begin(), locNames.end(), + [&interface_it](const fastrtps::rtps::IPFinder::info_IP& locInterface) + { + return locInterface.locator == interface_it.locator; + }), locNames.end()); + break; + } + } +} + } // namespace rtps } // namespace fastrtps } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index 7914bc0d192..9cd42583c38 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -240,7 +240,7 @@ class TCPTransportInterface : public TransportInterface virtual ~TCPTransportInterface(); //! Stores the binding between the given locator and the given TCP socket. Server side. - void bind_socket( + ResponseCode bind_socket( std::shared_ptr&); //! Removes the listening socket for the specified port. @@ -525,6 +525,16 @@ class TCPTransportInterface : public TransportInterface */ void send_channel_pending_logical_ports( std::shared_ptr& channel); + + /** + * Method to check if a locator contains an interface that belongs to the same host. + * If it occurs, @c locNames will be updated with the list of interfaces of the host. + * @param [in] locator Locator to check. + * @param [in,out] locNames Vector of interfaces to be updated. + */ + void is_own_interface( + const Locator& locator, + std::vector& locNames) const; }; } // namespace rtps diff --git a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp index 2bf2e828a73..0554918356f 100644 --- a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp +++ b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp @@ -467,7 +467,7 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest( if (RETCODE_OK == code) { - mTransport->bind_socket(channel); + code = mTransport->bind_socket(channel); } sendData(channel, BIND_CONNECTION_RESPONSE, transaction_id, &payload, code); diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index 16301e2b371..0a80da235dc 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -1921,6 +1921,9 @@ TEST_F(TCPv4Tests, autofill_port) // This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators. // Clients typically communicated its PID as its locator port. When having several clients in the same // process this lead to overwriting server's channel resources map elements. +// In order to ensure communication in TCP Discovery Server, a different entry is created in the server's +// channel resources map for each IP interface found, all of them using the same TCP channel. Thus, two +// clients will generate at least two entries in the server's channel resources map. TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) { TCPv4TransportDescriptor recvDescriptor; @@ -1952,7 +1955,12 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2u); + std::set> channels_created; + for (const auto& channel_resource : receiveTransportUnderTest.get_channel_resources()) + { + channels_created.insert(channel_resource.second); + } + ASSERT_EQ(channels_created.size(), 2u); } #ifndef _WIN32 diff --git a/tools/fds/server.cpp b/tools/fds/server.cpp index 29cc0f5e172..7c0270ab9c5 100644 --- a/tools/fds/server.cpp +++ b/tools/fds/server.cpp @@ -282,6 +282,8 @@ int fastdds_discovery_server( // Retrieve first TCP port option::Option* pO_tcp_port = options[TCP_PORT]; + bool udp_server_initialized = (pOp != nullptr) || (pO_port != nullptr); + /** * A locator has been initialized previously in [0.0.0.0] address using either the DEFAULT_ROS2_SERVER_PORT or the * port number set in the CLI. This locator must be used: @@ -296,6 +298,7 @@ int fastdds_discovery_server( // Add default locator in cases a) and b) participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.clear(); participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(locator4); + udp_server_initialized = true; } else if (nullptr == pOp && nullptr != pO_port) { @@ -544,6 +547,7 @@ int fastdds_discovery_server( } fastrtps::rtps::GuidPrefix_t guid_prefix = participantQos.wire_protocol().prefix; + participantQos.transport().use_builtin_transports = udp_server_initialized || options[XML_FILE] != nullptr; // Create the server int return_value = 0;