Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22056] Fix TCP discovery server locators translation (backport #5382) (backport #5410) #5436

Merged
merged 1 commit into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,6 @@ bool SecurityManager::create_participant_stateless_message_reader()
ratt.endpoint.multicastLocatorList = pattr.builtin.metatrafficMulticastLocatorList;
}
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.matched_writers_allocation = pattr.allocation.participants;
Expand Down Expand Up @@ -1274,7 +1273,6 @@ bool SecurityManager::create_participant_volatile_message_secure_writer()
watt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
watt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
watt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
watt.endpoint.security_attributes().is_submessage_protected = true;
watt.endpoint.security_attributes().plugin_endpoint_attributes =
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;
Expand Down Expand Up @@ -1327,7 +1325,6 @@ bool SecurityManager::create_participant_volatile_message_secure_reader()
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
ratt.endpoint.security_attributes().is_submessage_protected = true;
ratt.endpoint.security_attributes().plugin_endpoint_attributes =
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;
Expand Down
78 changes: 74 additions & 4 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ void TCPTransportInterface::clean()

{
std::vector<std::shared_ptr<TCPChannelResource>> channels;
std::vector<eprosima::fastdds::rtps::Locator> delete_channels;

{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
Expand All @@ -200,10 +201,22 @@ void TCPTransportInterface::clean()

for (auto& channel : channel_resources_)
{
channels.push_back(channel.second);
if (std::find(channels.begin(), channels.end(), channel.second) == channels.end())
{
channels.push_back(channel.second);
}
else
{
delete_channels.push_back(channel.first);
}
}
}

for (auto& delete_channel : delete_channels)
{
channel_resources_.erase(delete_channel);
}

for (auto& channel : channels)
{
if (channel->connection_established())
Expand Down Expand Up @@ -279,7 +292,7 @@ Locator TCPTransportInterface::local_endpoint_to_locator(
return locator;
}

void TCPTransportInterface::bind_socket(
ResponseCode TCPTransportInterface::bind_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
Expand All @@ -288,7 +301,29 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);
channel_resources_[channel->locator()] = channel;

ResponseCode ret = RETCODE_OK;
const auto insert_ret = channel_resources_.insert(
decltype(channel_resources_)::value_type{channel->locator(), channel});
if (false == insert_ret.second)
{
// There is an existing channel that can be used. Force the Client to close unnecessary socket
ret = RETCODE_SERVER_ERROR;
}

std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
// Check if the locator is from an owned interface to link all local interfaces to the channel
is_own_interface(channel->locator(), local_interfaces);
if (!local_interfaces.empty())
{
Locator local_locator(channel->locator());
for (auto& interface_it : local_interfaces)
{
IPLocator::setIPv4(local_locator, interface_it.locator);
channel_resources_.insert(decltype(channel_resources_)::value_type{local_locator, channel});
}
}
return ret;
}

bool TCPTransportInterface::check_crc(
Expand Down Expand Up @@ -923,7 +958,7 @@ bool TCPTransportInterface::CreateInitialConnect(
std::lock_guard<std::mutex> socketsLock(sockets_map_mutex_);

// We try to find a SenderResource that has this locator.
// Note: This is done in this level because if we do in NetworkFactory level, we have to mantain what transport
// Note: This is done in this level because if we do it at NetworkFactory level, we have to mantain what transport
// already reuses a SenderResource.
for (auto& sender_resource : send_resource_list)
{
Expand Down Expand Up @@ -987,6 +1022,19 @@ bool TCPTransportInterface::CreateInitialConnect(
send_resource_list.emplace_back(
static_cast<SenderResource*>(new TCPSenderResource(*this, physical_locator)));

std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
// Check if the locator is from an owned interface to link all local interfaces to the channel
is_own_interface(physical_locator, local_interfaces);
if (!local_interfaces.empty())
{
Locator local_locator(physical_locator);
for (auto& interface_it : local_interfaces)
{
IPLocator::setIPv4(local_locator, interface_it.locator);
channel_resources_[local_locator] = channel;
}
}

return true;
}

Expand Down Expand Up @@ -2079,6 +2127,28 @@ void TCPTransportInterface::send_channel_pending_logical_ports(
}
}

void TCPTransportInterface::is_own_interface(
const Locator& locator,
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const
{
std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
get_ips(local_interfaces, true, false);
for (const auto& interface_it : local_interfaces)
{
if (IPLocator::compareAddress(locator, interface_it.locator) && is_interface_allowed(interface_it.name))
{
locNames = local_interfaces;
// Remove interface of original locator from the list
locNames.erase(std::remove_if(locNames.begin(), locNames.end(),
[&interface_it](const fastdds::rtps::IPFinder::info_IP& locInterface)
{
return locInterface.locator == interface_it.locator;
}), locNames.end());
break;
}
}
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
12 changes: 11 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class TCPTransportInterface : public TransportInterface
virtual ~TCPTransportInterface();

//! Stores the binding between the given locator and the given TCP socket. Server side.
void bind_socket(
ResponseCode bind_socket(
std::shared_ptr<TCPChannelResource>&);

//! Removes the listening socket for the specified port.
Expand Down Expand Up @@ -525,6 +525,16 @@ class TCPTransportInterface : public TransportInterface
*/
void send_channel_pending_logical_ports(
std::shared_ptr<TCPChannelResource>& channel);

/**
* Method to check if a locator contains an interface that belongs to the same host.
* If it occurs, @c locNames will be updated with the list of interfaces of the host.
* @param [in] locator Locator to check.
* @param [in,out] locNames Vector of interfaces to be updated.
*/
void is_own_interface(
const Locator& locator,
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const;
};

} // namespace rtps
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest(

if (RETCODE_OK == code)
{
mTransport->bind_socket(channel);
code = mTransport->bind_socket(channel);
}

sendData(channel, BIND_CONNECTION_RESPONSE, transaction_id, &payload, code);
Expand Down
10 changes: 9 additions & 1 deletion test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2001,6 +2001,9 @@ TEST_F(TCPv4Tests, autofill_port)
// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
// In order to ensure communication in TCP Discovery Server, a different entry is created in the server's
// channel resources map for each IP interface found, all of them using the same TCP channel. Thus, two
// clients will generate at least two entries in the server's channel resources map.
TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
{
TCPv4TransportDescriptor recvDescriptor;
Expand Down Expand Up @@ -2032,7 +2035,12 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)

std::this_thread::sleep_for(std::chrono::milliseconds(100));

ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2u);
std::set<std::shared_ptr<TCPChannelResource>> channels_created;
for (const auto& channel_resource : receiveTransportUnderTest.get_channel_resources())
{
channels_created.insert(channel_resource.second);
}
ASSERT_EQ(channels_created.size(), 2u);
}

#ifndef _WIN32
Expand Down
4 changes: 4 additions & 0 deletions tools/fds/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ int fastdds_discovery_server(
// Retrieve first TCP port
option::Option* pO_tcp_port = options[TCP_PORT];

bool udp_server_initialized = (pOp != nullptr) || (pO_port != nullptr);

/**
* A locator has been initialized previously in [0.0.0.0] address using either the DEFAULT_ROS2_SERVER_PORT or the
* port number set in the CLI. This locator must be used:
Expand All @@ -318,6 +320,7 @@ int fastdds_discovery_server(
// Add default locator in cases a) and b)
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.clear();
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(locator4);
udp_server_initialized = true;
}
else if (nullptr == pOp && nullptr != pO_port)
{
Expand Down Expand Up @@ -569,6 +572,7 @@ int fastdds_discovery_server(
}

fastdds::rtps::GuidPrefix_t guid_prefix = participantQos.wire_protocol().prefix;
participantQos.transport().use_builtin_transports = udp_server_initialized || options[XML_FILE] != nullptr;

// Create the server
int return_value = 0;
Expand Down
Loading