From e646cce9200c42469059e194b9a76370115fec7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carlos=20Ferreira=20Gonz=C3=A1lez?= Date: Mon, 18 Nov 2024 09:14:36 +0100 Subject: [PATCH] Refs #22056: Backport TCP features --------- Signed-off-by: cferreiragonz --- src/cpp/rtps/security/SecurityManager.cpp | 3 - .../rtps/transport/TCPTransportInterface.cpp | 78 ++++++++++++++++++- .../rtps/transport/TCPTransportInterface.h | 12 ++- .../rtps/transport/tcp/RTCPMessageManager.cpp | 2 +- test/unittest/transport/TCPv4Tests.cpp | 10 ++- tools/fds/server.cpp | 4 + 6 files changed, 99 insertions(+), 10 deletions(-) diff --git a/src/cpp/rtps/security/SecurityManager.cpp b/src/cpp/rtps/security/SecurityManager.cpp index 1ee994464e6..3aae0bfd390 100644 --- a/src/cpp/rtps/security/SecurityManager.cpp +++ b/src/cpp/rtps/security/SecurityManager.cpp @@ -1174,7 +1174,6 @@ bool SecurityManager::create_participant_stateless_message_reader() ratt.endpoint.multicastLocatorList = pattr.builtin.metatrafficMulticastLocatorList; } ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList; - ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList; ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators; ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; ratt.matched_writers_allocation = pattr.allocation.participants; @@ -1274,7 +1273,6 @@ bool SecurityManager::create_participant_volatile_message_secure_writer() watt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList; watt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators; watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; - watt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList; watt.endpoint.security_attributes().is_submessage_protected = true; watt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; @@ -1327,7 +1325,6 @@ bool SecurityManager::create_participant_volatile_message_secure_reader() ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList; ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators; ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators; - ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList; ratt.endpoint.security_attributes().is_submessage_protected = true; ratt.endpoint.security_attributes().plugin_endpoint_attributes = PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED; diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index 907e9ed15c6..5a8693c2049 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -191,6 +191,7 @@ void TCPTransportInterface::clean() { std::vector> channels; + std::vector delete_channels; { std::unique_lock scopedLock(sockets_map_mutex_); @@ -200,10 +201,22 @@ void TCPTransportInterface::clean() for (auto& channel : channel_resources_) { - channels.push_back(channel.second); + if (std::find(channels.begin(), channels.end(), channel.second) == channels.end()) + { + channels.push_back(channel.second); + } + else + { + delete_channels.push_back(channel.first); + } } } + for (auto& delete_channel : delete_channels) + { + channel_resources_.erase(delete_channel); + } + for (auto& channel : channels) { if (channel->connection_established()) @@ -279,7 +292,7 @@ Locator TCPTransportInterface::local_endpoint_to_locator( return locator; } -void TCPTransportInterface::bind_socket( +ResponseCode TCPTransportInterface::bind_socket( std::shared_ptr& channel) { std::unique_lock scopedLock(sockets_map_mutex_); @@ -288,7 +301,29 @@ void TCPTransportInterface::bind_socket( auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel); assert(it_remove != unbound_channel_resources_.end()); unbound_channel_resources_.erase(it_remove); - channel_resources_[channel->locator()] = channel; + + ResponseCode ret = RETCODE_OK; + const auto insert_ret = channel_resources_.insert( + decltype(channel_resources_)::value_type{channel->locator(), channel}); + if (false == insert_ret.second) + { + // There is an existing channel that can be used. Force the Client to close unnecessary socket + ret = RETCODE_SERVER_ERROR; + } + + std::vector local_interfaces; + // Check if the locator is from an owned interface to link all local interfaces to the channel + is_own_interface(channel->locator(), local_interfaces); + if (!local_interfaces.empty()) + { + Locator local_locator(channel->locator()); + for (auto& interface_it : local_interfaces) + { + IPLocator::setIPv4(local_locator, interface_it.locator); + channel_resources_.insert(decltype(channel_resources_)::value_type{local_locator, channel}); + } + } + return ret; } bool TCPTransportInterface::check_crc( @@ -923,7 +958,7 @@ bool TCPTransportInterface::CreateInitialConnect( std::lock_guard socketsLock(sockets_map_mutex_); // We try to find a SenderResource that has this locator. - // Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport + // Note: This is done in this level because if we do it at NetworkFactory level, we have to mantain what transport // already reuses a SenderResource. for (auto& sender_resource : send_resource_list) { @@ -987,6 +1022,19 @@ bool TCPTransportInterface::CreateInitialConnect( send_resource_list.emplace_back( static_cast(new TCPSenderResource(*this, physical_locator))); + std::vector local_interfaces; + // Check if the locator is from an owned interface to link all local interfaces to the channel + is_own_interface(physical_locator, local_interfaces); + if (!local_interfaces.empty()) + { + Locator local_locator(physical_locator); + for (auto& interface_it : local_interfaces) + { + IPLocator::setIPv4(local_locator, interface_it.locator); + channel_resources_[local_locator] = channel; + } + } + return true; } @@ -2079,6 +2127,28 @@ void TCPTransportInterface::send_channel_pending_logical_ports( } } +void TCPTransportInterface::is_own_interface( + const Locator& locator, + std::vector& locNames) const +{ + std::vector local_interfaces; + get_ips(local_interfaces, true, false); + for (const auto& interface_it : local_interfaces) + { + if (IPLocator::compareAddress(locator, interface_it.locator) && is_interface_allowed(interface_it.name)) + { + locNames = local_interfaces; + // Remove interface of original locator from the list + locNames.erase(std::remove_if(locNames.begin(), locNames.end(), + [&interface_it](const fastdds::rtps::IPFinder::info_IP& locInterface) + { + return locInterface.locator == interface_it.locator; + }), locNames.end()); + break; + } + } +} + } // namespace rtps } // namespace fastdds } // namespace eprosima diff --git a/src/cpp/rtps/transport/TCPTransportInterface.h b/src/cpp/rtps/transport/TCPTransportInterface.h index 11a1f9f5af2..147a2fdb640 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.h +++ b/src/cpp/rtps/transport/TCPTransportInterface.h @@ -239,7 +239,7 @@ class TCPTransportInterface : public TransportInterface virtual ~TCPTransportInterface(); //! Stores the binding between the given locator and the given TCP socket. Server side. - void bind_socket( + ResponseCode bind_socket( std::shared_ptr&); //! Removes the listening socket for the specified port. @@ -525,6 +525,16 @@ class TCPTransportInterface : public TransportInterface */ void send_channel_pending_logical_ports( std::shared_ptr& channel); + + /** + * Method to check if a locator contains an interface that belongs to the same host. + * If it occurs, @c locNames will be updated with the list of interfaces of the host. + * @param [in] locator Locator to check. + * @param [in,out] locNames Vector of interfaces to be updated. + */ + void is_own_interface( + const Locator& locator, + std::vector& locNames) const; }; } // namespace rtps diff --git a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp index 8813440fd39..9ad8f232332 100644 --- a/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp +++ b/src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp @@ -458,7 +458,7 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest( if (RETCODE_OK == code) { - mTransport->bind_socket(channel); + code = mTransport->bind_socket(channel); } sendData(channel, BIND_CONNECTION_RESPONSE, transaction_id, &payload, code); diff --git a/test/unittest/transport/TCPv4Tests.cpp b/test/unittest/transport/TCPv4Tests.cpp index b8649b06836..06ee3e8cb68 100644 --- a/test/unittest/transport/TCPv4Tests.cpp +++ b/test/unittest/transport/TCPv4Tests.cpp @@ -2001,6 +2001,9 @@ TEST_F(TCPv4Tests, autofill_port) // This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators. // Clients typically communicated its PID as its locator port. When having several clients in the same // process this lead to overwriting server's channel resources map elements. +// In order to ensure communication in TCP Discovery Server, a different entry is created in the server's +// channel resources map for each IP interface found, all of them using the same TCP channel. Thus, two +// clients will generate at least two entries in the server's channel resources map. TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) { TCPv4TransportDescriptor recvDescriptor; @@ -2032,7 +2035,12 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness) std::this_thread::sleep_for(std::chrono::milliseconds(100)); - ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2u); + std::set> channels_created; + for (const auto& channel_resource : receiveTransportUnderTest.get_channel_resources()) + { + channels_created.insert(channel_resource.second); + } + ASSERT_EQ(channels_created.size(), 2u); } #ifndef _WIN32 diff --git a/tools/fds/server.cpp b/tools/fds/server.cpp index 09098ddddfa..280747d6083 100644 --- a/tools/fds/server.cpp +++ b/tools/fds/server.cpp @@ -304,6 +304,8 @@ int fastdds_discovery_server( // Retrieve first TCP port option::Option* pO_tcp_port = options[TCP_PORT]; + bool udp_server_initialized = (pOp != nullptr) || (pO_port != nullptr); + /** * A locator has been initialized previously in [0.0.0.0] address using either the DEFAULT_ROS2_SERVER_PORT or the * port number set in the CLI. This locator must be used: @@ -318,6 +320,7 @@ int fastdds_discovery_server( // Add default locator in cases a) and b) participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.clear(); participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(locator4); + udp_server_initialized = true; } else if (nullptr == pOp && nullptr != pO_port) { @@ -569,6 +572,7 @@ int fastdds_discovery_server( } fastdds::rtps::GuidPrefix_t guid_prefix = participantQos.wire_protocol().prefix; + participantQos.transport().use_builtin_transports = udp_server_initialized || options[XML_FILE] != nullptr; // Create the server int return_value = 0;