Skip to content

Commit

Permalink
Fix TCP discovery server locators translation (#5410)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: cferreiragonz <[email protected]>
(cherry picked from commit d71913b)

# Conflicts:
#	src/cpp/rtps/transport/TCPTransportInterface.cpp
#	test/unittest/transport/TCPv4Tests.cpp
#	tools/fds/server.cpp
  • Loading branch information
cferreiragonz authored and mergify[bot] committed Nov 28, 2024
1 parent 8935858 commit 798346a
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 8 deletions.
3 changes: 0 additions & 3 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,6 @@ bool SecurityManager::create_participant_stateless_message_reader()
ratt.endpoint.multicastLocatorList = pattr.builtin.metatrafficMulticastLocatorList;
}
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.matched_writers_allocation = pattr.allocation.participants;
Expand Down Expand Up @@ -1256,7 +1255,6 @@ bool SecurityManager::create_participant_volatile_message_secure_writer()
watt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
watt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
watt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
watt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
watt.endpoint.security_attributes().is_submessage_protected = true;
watt.endpoint.security_attributes().plugin_endpoint_attributes =
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;
Expand Down Expand Up @@ -1309,7 +1307,6 @@ bool SecurityManager::create_participant_volatile_message_secure_reader()
ratt.endpoint.unicastLocatorList = pattr.builtin.metatrafficUnicastLocatorList;
ratt.endpoint.external_unicast_locators = pattr.builtin.metatraffic_external_unicast_locators;
ratt.endpoint.ignore_non_matching_locators = pattr.ignore_non_matching_locators;
ratt.endpoint.remoteLocatorList = pattr.builtin.initialPeersList;
ratt.endpoint.security_attributes().is_submessage_protected = true;
ratt.endpoint.security_attributes().plugin_endpoint_attributes =
PLUGIN_ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_ENCRYPTED;
Expand Down
191 changes: 188 additions & 3 deletions src/cpp/rtps/transport/TCPTransportInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ void TCPTransportInterface::clean()

{
std::vector<std::shared_ptr<TCPChannelResource>> channels;
std::vector<eprosima::fastdds::rtps::Locator> delete_channels;

{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
Expand All @@ -178,10 +179,22 @@ void TCPTransportInterface::clean()

for (auto& channel : channel_resources_)
{
channels.push_back(channel.second);
if (std::find(channels.begin(), channels.end(), channel.second) == channels.end())
{
channels.push_back(channel.second);
}
else
{
delete_channels.push_back(channel.first);
}
}
}

for (auto& delete_channel : delete_channels)
{
channel_resources_.erase(delete_channel);
}

for (auto& channel : channels)
{
if (channel->connection_established())
Expand Down Expand Up @@ -258,7 +271,7 @@ Locator TCPTransportInterface::local_endpoint_to_locator(
return locator;
}

void TCPTransportInterface::bind_socket(
ResponseCode TCPTransportInterface::bind_socket(
std::shared_ptr<TCPChannelResource>& channel)
{
std::unique_lock<std::mutex> scopedLock(sockets_map_mutex_);
Expand All @@ -267,7 +280,29 @@ void TCPTransportInterface::bind_socket(
auto it_remove = std::find(unbound_channel_resources_.begin(), unbound_channel_resources_.end(), channel);
assert(it_remove != unbound_channel_resources_.end());
unbound_channel_resources_.erase(it_remove);
channel_resources_[channel->locator()] = channel;

ResponseCode ret = RETCODE_OK;
const auto insert_ret = channel_resources_.insert(
decltype(channel_resources_)::value_type{channel->locator(), channel});
if (false == insert_ret.second)
{
// There is an existing channel that can be used. Force the Client to close unnecessary socket
ret = RETCODE_SERVER_ERROR;
}

std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
// Check if the locator is from an owned interface to link all local interfaces to the channel
is_own_interface(channel->locator(), local_interfaces);
if (!local_interfaces.empty())
{
Locator local_locator(channel->locator());
for (auto& interface_it : local_interfaces)
{
IPLocator::setIPv4(local_locator, interface_it.locator);
channel_resources_.insert(decltype(channel_resources_)::value_type{local_locator, channel});
}
}
return ret;
}

bool TCPTransportInterface::check_crc(
Expand Down Expand Up @@ -872,6 +907,134 @@ bool TCPTransportInterface::OpenOutputChannel(
return true;
}

<<<<<<< HEAD
=======
bool TCPTransportInterface::OpenOutputChannels(
SendResourceList& send_resource_list,
const LocatorSelectorEntry& locator_selector_entry)
{
bool success = false;
if (locator_selector_entry.remote_guid == fastdds::rtps::c_Guid_Unknown)
{
// Only unicast is used in TCP
for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i)
{
size_t index = locator_selector_entry.state.unicast[i];
success |= CreateInitialConnect(send_resource_list, locator_selector_entry.unicast[index]);
}
}
else
{
for (size_t i = 0; i < locator_selector_entry.state.unicast.size(); ++i)
{
size_t index = locator_selector_entry.state.unicast[i];
success |= OpenOutputChannel(send_resource_list, locator_selector_entry.unicast[index]);
}
}
return success;
}

bool TCPTransportInterface::CreateInitialConnect(
SendResourceList& send_resource_list,
const Locator& locator)
{
if (!IsLocatorSupported(locator))
{
return false;
}

uint16_t logical_port = IPLocator::getLogicalPort(locator);
if (0 == logical_port)
{
return false;
}

Locator physical_locator = IPLocator::toPhysicalLocator(locator);

std::lock_guard<std::mutex> socketsLock(sockets_map_mutex_);

// We try to find a SenderResource that has this locator.
// Note: This is done in this level because if we do it at NetworkFactory level, we have to mantain what transport
// already reuses a SenderResource.
for (auto& sender_resource : send_resource_list)
{
TCPSenderResource* tcp_sender_resource = TCPSenderResource::cast(*this, sender_resource.get());

if (tcp_sender_resource && (physical_locator == tcp_sender_resource->locator() ||
(IPLocator::hasWan(locator) &&
IPLocator::WanToLanLocator(physical_locator) ==
tcp_sender_resource->locator())))
{
// Add logical port to channel if it's not there yet
auto channel_resource = channel_resources_.find(physical_locator);

// Maybe as WAN?
if (channel_resource == channel_resources_.end() && IPLocator::hasWan(locator))
{
Locator wan_locator = IPLocator::WanToLanLocator(locator);
channel_resource = channel_resources_.find(IPLocator::toPhysicalLocator(wan_locator));
}

if (channel_resource != channel_resources_.end())
{
channel_resource->second->add_logical_port(logical_port, rtcp_message_manager_.get());
}
else
{
std::lock_guard<std::mutex> channelPendingLock(channel_pending_logical_ports_mutex_);
channel_pending_logical_ports_[physical_locator].insert(logical_port);
}

statistics_info_.add_entry(locator);
return true;
}
}

// At this point, if there is no SenderResource to reuse, this is the first try to open a channel for this locator.
// There is no need to check if a channel already exists for this locator because this method is called only when
// a new connection is required.

EPROSIMA_LOG_INFO(RTCP, "Called to CreateInitialConnect @ " << IPLocator::to_string(locator));

// Create a TCP_CONNECT_TYPE channel
std::shared_ptr<TCPChannelResource> channel(
#if TLS_FOUND
(configuration()->apply_security) ?
static_cast<TCPChannelResource*>(
new TCPChannelResourceSecure(this, io_service_, ssl_context_,
physical_locator, configuration()->maxMessageSize)) :
#endif // if TLS_FOUND
static_cast<TCPChannelResource*>(
new TCPChannelResourceBasic(this, io_service_, physical_locator,
configuration()->maxMessageSize))
);

EPROSIMA_LOG_INFO(RTCP, "CreateInitialConnect: [CONNECT] @ " << IPLocator::to_string(locator));

channel_resources_[physical_locator] = channel;
channel->connect(channel_resources_[physical_locator]);
channel->add_logical_port(logical_port, rtcp_message_manager_.get());
statistics_info_.add_entry(locator);
send_resource_list.emplace_back(
static_cast<SenderResource*>(new TCPSenderResource(*this, physical_locator)));

std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
// Check if the locator is from an owned interface to link all local interfaces to the channel
is_own_interface(physical_locator, local_interfaces);
if (!local_interfaces.empty())
{
Locator local_locator(physical_locator);
for (auto& interface_it : local_interfaces)
{
IPLocator::setIPv4(local_locator, interface_it.locator);
channel_resources_[local_locator] = channel;
}
}

return true;
}

>>>>>>> d71913b7a (Fix TCP discovery server locators translation (#5410))
bool TCPTransportInterface::OpenInputChannel(
const Locator& locator,
TransportReceiverInterface* receiver,
Expand Down Expand Up @@ -1941,6 +2104,28 @@ void TCPTransportInterface::send_channel_pending_logical_ports(
}
}

void TCPTransportInterface::is_own_interface(
const Locator& locator,
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const
{
std::vector<fastdds::rtps::IPFinder::info_IP> local_interfaces;
get_ips(local_interfaces, true, false);
for (const auto& interface_it : local_interfaces)
{
if (IPLocator::compareAddress(locator, interface_it.locator) && is_interface_allowed(interface_it.name))
{
locNames = local_interfaces;
// Remove interface of original locator from the list
locNames.erase(std::remove_if(locNames.begin(), locNames.end(),
[&interface_it](const fastdds::rtps::IPFinder::info_IP& locInterface)
{
return locInterface.locator == interface_it.locator;
}), locNames.end());
break;
}
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
12 changes: 11 additions & 1 deletion src/cpp/rtps/transport/TCPTransportInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class TCPTransportInterface : public TransportInterface
virtual ~TCPTransportInterface();

//! Stores the binding between the given locator and the given TCP socket. Server side.
void bind_socket(
ResponseCode bind_socket(
std::shared_ptr<TCPChannelResource>&);

//! Removes the listening socket for the specified port.
Expand Down Expand Up @@ -497,6 +497,16 @@ class TCPTransportInterface : public TransportInterface
*/
void send_channel_pending_logical_ports(
std::shared_ptr<TCPChannelResource>& channel);

/**
* Method to check if a locator contains an interface that belongs to the same host.
* If it occurs, @c locNames will be updated with the list of interfaces of the host.
* @param [in] locator Locator to check.
* @param [in,out] locNames Vector of interfaces to be updated.
*/
void is_own_interface(
const Locator& locator,
std::vector<fastdds::rtps::IPFinder::info_IP>& locNames) const;
};

} // namespace rtps
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/rtps/transport/tcp/RTCPMessageManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ ResponseCode RTCPMessageManager::processBindConnectionRequest(

if (RETCODE_OK == code)
{
mTransport->bind_socket(channel);
code = mTransport->bind_socket(channel);
}

sendData(channel, BIND_CONNECTION_RESPONSE, transaction_id, &payload, code);
Expand Down
12 changes: 12 additions & 0 deletions test/unittest/transport/TCPv4Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1847,6 +1847,9 @@ TEST_F(TCPv4Tests, autofill_port)
// This test verifies server's channel resources mapping keys uniqueness, where keys are clients locators.
// Clients typically communicated its PID as its locator port. When having several clients in the same
// process this lead to overwriting server's channel resources map elements.
// In order to ensure communication in TCP Discovery Server, a different entry is created in the server's
// channel resources map for each IP interface found, all of them using the same TCP channel. Thus, two
// clients will generate at least two entries in the server's channel resources map.
TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)
{
TCPv4TransportDescriptor recvDescriptor;
Expand Down Expand Up @@ -1878,7 +1881,16 @@ TEST_F(TCPv4Tests, client_announced_local_port_uniqueness)

std::this_thread::sleep_for(std::chrono::milliseconds(100));

<<<<<<< HEAD
ASSERT_EQ(receiveTransportUnderTest.get_channel_resources().size(), 2);
=======
std::set<std::shared_ptr<TCPChannelResource>> channels_created;
for (const auto& channel_resource : receiveTransportUnderTest.get_channel_resources())
{
channels_created.insert(channel_resource.second);
}
ASSERT_EQ(channels_created.size(), 2u);
>>>>>>> d71913b7a (Fix TCP discovery server locators translation (#5410))
}

#ifndef _WIN32
Expand Down
8 changes: 8 additions & 0 deletions tools/fds/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ int fastdds_discovery_server(
// Retrieve first TCP port
option::Option* pO_tcp_port = options[TCP_PORT];

bool udp_server_initialized = (pOp != nullptr) || (pO_port != nullptr);

/**
* A locator has been initialized previously in [0.0.0.0] address using either the DEFAULT_ROS2_SERVER_PORT or the
* port number set in the CLI. This locator must be used:
Expand All @@ -296,6 +298,7 @@ int fastdds_discovery_server(
// Add default locator in cases a) and b)
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.clear();
participantQos.wire_protocol().builtin.metatrafficUnicastLocatorList.push_back(locator4);
udp_server_initialized = true;
}
else if (nullptr == pOp && nullptr != pO_port)
{
Expand Down Expand Up @@ -543,7 +546,12 @@ int fastdds_discovery_server(
}
}

<<<<<<< HEAD
fastrtps::rtps::GuidPrefix_t guid_prefix = participantQos.wire_protocol().prefix;
=======
fastdds::rtps::GuidPrefix_t guid_prefix = participantQos.wire_protocol().prefix;
participantQos.transport().use_builtin_transports = udp_server_initialized || options[XML_FILE] != nullptr;
>>>>>>> d71913b7a (Fix TCP discovery server locators translation (#5410))

// Create the server
int return_value = 0;
Expand Down

0 comments on commit 798346a

Please sign in to comment.