Skip to content

Commit

Permalink
Merge pull request #3999 from rapidsai/branch-23.12
Browse files Browse the repository at this point in the history
Forward-merge branch-23.12 to branch-24.02
  • Loading branch information
AyodeAwe authored Nov 20, 2023
2 parents 8e213ec + 8549b54 commit 0dbdc9b
Show file tree
Hide file tree
Showing 55 changed files with 784 additions and 1,226 deletions.
45 changes: 0 additions & 45 deletions cpp/include/cugraph/algorithms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,51 +464,6 @@ k_truss_subgraph(raft::handle_t const& handle,
size_t number_of_vertices,
int k);

// FIXME: Internally distances is of int (signed 32-bit) data type, but current
// template uses data from VT, ET, WT from the legacy::GraphCSR View even if weights
// are not considered
/**
* @Synopsis Performs a breadth first search traversal of a graph starting from a vertex.
*
* @throws cugraph::logic_error with a custom message when an error occurs.
*
* @tparam VT Type of vertex identifiers. Supported value : int (signed,
* 32-bit)
* @tparam ET Type of edge identifiers. Supported value : int (signed,
* 32-bit)
* @tparam WT Type of edge weights. Supported values : int (signed, 32-bit)
*
* @param[in] handle Library handle (RAFT). If a communicator is set in the handle,
the multi GPU version will be selected.
* @param[in] graph cuGraph graph descriptor, should contain the connectivity
* information as a CSR
*
* @param[out] distances If set to a valid pointer, this is populated by distance of
* every vertex in the graph from the starting vertex
*
* @param[out] predecessors If set to a valid pointer, this is populated by bfs traversal
* predecessor of every vertex
*
* @param[out] sp_counters If set to a valid pointer, this is populated by bfs traversal
* shortest_path counter of every vertex
*
* @param[in] start_vertex The starting vertex for breadth first search traversal
*
* @param[in] directed Treat the input graph as directed
*
* @param[in] mg_batch If set to true use SG BFS path when comms are initialized.
*
*/
template <typename VT, typename ET, typename WT>
void bfs(raft::handle_t const& handle,
legacy::GraphCSRView<VT, ET, WT> const& graph,
VT* distances,
VT* predecessors,
double* sp_counters,
const VT start_vertex,
bool directed = true,
bool mg_batch = false);

/**
* @brief Compute Hungarian algorithm on a weighted bipartite graph
*
Expand Down
8 changes: 1 addition & 7 deletions cpp/include/cugraph/utilities/device_comm.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -806,9 +806,6 @@ device_sendrecv(raft::comms::comms_t const& comm,
size_t constexpr tuple_size =
thrust::tuple_size<typename thrust::iterator_traits<InputIterator>::value_type>::value;

// FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination
// rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside
// ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8
detail::device_sendrecv_tuple_iterator_element_impl<InputIterator,
OutputIterator,
size_t{0},
Expand Down Expand Up @@ -866,9 +863,6 @@ device_multicast_sendrecv(raft::comms::comms_t const& comm,
size_t constexpr tuple_size =
thrust::tuple_size<typename thrust::iterator_traits<InputIterator>::value_type>::value;

// FIXME: NCCL 2.7 supports only one ncclSend and one ncclRecv for a source rank and destination
// rank inside ncclGroupStart/ncclGroupEnd, so we cannot place this inside
// ncclGroupStart/ncclGroupEnd, this restriction will be lifted in NCCL 2.8
detail::device_multicast_sendrecv_tuple_iterator_element_impl<InputIterator,
OutputIterator,
size_t{0},
Expand Down
98 changes: 74 additions & 24 deletions cpp/include/cugraph/utilities/host_scalar_comm.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -254,19 +254,11 @@ template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, std::vector<T>> host_scalar_allgather(
raft::comms::comms_t const& comm, T input, cudaStream_t stream)
{
std::vector<size_t> rx_counts(comm.get_size(), size_t{1});
std::vector<size_t> displacements(rx_counts.size(), size_t{0});
std::iota(displacements.begin(), displacements.end(), size_t{0});
rmm::device_uvector<T> d_outputs(rx_counts.size(), stream);
rmm::device_uvector<T> d_outputs(comm.get_size(), stream);
raft::update_device(d_outputs.data() + comm.get_rank(), &input, 1, stream);
// FIXME: better use allgather
comm.allgatherv(d_outputs.data() + comm.get_rank(),
d_outputs.data(),
rx_counts.data(),
displacements.data(),
stream);
std::vector<T> h_outputs(rx_counts.size());
raft::update_host(h_outputs.data(), d_outputs.data(), rx_counts.size(), stream);
comm.allgather(d_outputs.data() + comm.get_rank(), d_outputs.data(), size_t{1}, stream);
std::vector<T> h_outputs(d_outputs.size());
raft::update_host(h_outputs.data(), d_outputs.data(), d_outputs.size(), stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");
return h_outputs;
Expand All @@ -277,11 +269,6 @@ std::enable_if_t<cugraph::is_thrust_tuple_of_arithmetic<T>::value, std::vector<T
host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t stream)
{
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
std::vector<size_t> rx_counts(comm.get_size(), tuple_size);
std::vector<size_t> displacements(rx_counts.size(), size_t{0});
for (size_t i = 0; i < displacements.size(); ++i) {
displacements[i] = i * tuple_size;
}
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
rmm::device_uvector<int64_t> d_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size,
stream);
Expand All @@ -292,12 +279,10 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st
h_tuple_scalar_elements.data(),
tuple_size,
stream);
// FIXME: better use allgather
comm.allgatherv(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
d_allgathered_tuple_scalar_elements.data(),
rx_counts.data(),
displacements.data(),
stream);
comm.allgather(d_allgathered_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
d_allgathered_tuple_scalar_elements.data(),
tuple_size,
stream);
std::vector<int64_t> h_allgathered_tuple_scalar_elements(comm.get_size() * tuple_size);
raft::update_host(h_allgathered_tuple_scalar_elements.data(),
d_allgathered_tuple_scalar_elements.data(),
Expand All @@ -318,6 +303,71 @@ host_scalar_allgather(raft::comms::comms_t const& comm, T input, cudaStream_t st
return ret;
}

template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, T> host_scalar_scatter(
raft::comms::comms_t const& comm,
std::vector<T> const& inputs, // relevant only in root
int root,
cudaStream_t stream)
{
CUGRAPH_EXPECTS(
((comm.get_rank() == root) && (inputs.size() == static_cast<size_t>(comm.get_size()))) ||
((comm.get_rank() != root) && (inputs.size() == 0)),
"inputs.size() should match with comm.get_size() in root and should be 0 otherwise.");
rmm::device_uvector<T> d_outputs(comm.get_size(), stream);
if (comm.get_rank() == root) {
raft::update_device(d_outputs.data(), inputs.data(), inputs.size(), stream);
}
comm.bcast(d_outputs.data(), d_outputs.size(), root, stream);
T h_output{};
raft::update_host(&h_output, d_outputs.data() + comm.get_rank(), 1, stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");
return h_output;
}

template <typename T>
std::enable_if_t<cugraph::is_thrust_tuple_of_arithmetic<T>::value, T> host_scalar_scatter(
raft::comms::comms_t const& comm,
std::vector<T> const& inputs, // relevant only in root
int root,
cudaStream_t stream)
{
CUGRAPH_EXPECTS(
((comm.get_rank() == root) && (inputs.size() == static_cast<size_t>(comm.get_size()))) ||
((comm.get_rank() != root) && (inputs.size() == 0)),
"inputs.size() should match with comm.get_size() in root and should be 0 otherwise.");
size_t constexpr tuple_size = thrust::tuple_size<T>::value;
rmm::device_uvector<int64_t> d_scatter_tuple_scalar_elements(comm.get_size() * tuple_size,
stream);
if (comm.get_rank() == root) {
for (int i = 0; i < comm.get_size(); ++i) {
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
detail::update_vector_of_tuple_scalar_elements_from_tuple_impl<T, size_t{0}, tuple_size>()
.update(h_tuple_scalar_elements, inputs[i]);
raft::update_device(d_scatter_tuple_scalar_elements.data() + i * tuple_size,
h_tuple_scalar_elements.data(),
tuple_size,
stream);
}
}
comm.bcast(
d_scatter_tuple_scalar_elements.data(), d_scatter_tuple_scalar_elements.size(), root, stream);
std::vector<int64_t> h_tuple_scalar_elements(tuple_size);
raft::update_host(h_tuple_scalar_elements.data(),
d_scatter_tuple_scalar_elements.data() + comm.get_rank() * tuple_size,
tuple_size,
stream);
auto status = comm.sync_stream(stream);
CUGRAPH_EXPECTS(status == raft::comms::status_t::SUCCESS, "sync_stream() failure.");

T ret{};
detail::update_tuple_from_vector_of_tuple_scalar_elements_impl<T, size_t{0}, tuple_size>().update(
ret, h_tuple_scalar_elements);

return ret;
}

// Return value is valid only in root (return value may better be std::optional in C++17 or later)
template <typename T>
std::enable_if_t<std::is_arithmetic<T>::value, std::vector<T>> host_scalar_gather(
Expand Down
5 changes: 0 additions & 5 deletions cpp/include/cugraph/utilities/shuffle_comm.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ compute_tx_rx_counts_offsets_ranks(raft::comms::comms_t const& comm,

rmm::device_uvector<size_t> d_rx_value_counts(comm_size, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released.
std::vector<size_t> tx_counts(comm_size, size_t{1});
std::vector<size_t> tx_offsets(comm_size);
std::iota(tx_offsets.begin(), tx_offsets.end(), size_t{0});
Expand Down Expand Up @@ -835,7 +834,6 @@ auto shuffle_values(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<TxValueIterator>::value_type>(
rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down Expand Up @@ -889,7 +887,6 @@ auto groupby_gpu_id_and_shuffle_values(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
rx_offsets.size() > 0 ? rx_offsets.back() + rx_counts.back() : size_t{0}, stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down Expand Up @@ -946,7 +943,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm,
allocate_dataframe_buffer<typename thrust::iterator_traits<ValueIterator>::value_type>(
rx_keys.size(), stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_key_first,
Expand All @@ -959,7 +955,6 @@ auto groupby_gpu_id_and_shuffle_kv_pairs(raft::comms::comms_t const& comm,
rx_src_ranks,
stream_view);

// FIXME: this needs to be replaced with AlltoAll once NCCL 2.8 is released
// (if num_tx_dst_ranks == num_rx_src_ranks == comm_size).
device_multicast_sendrecv(comm,
tx_value_first,
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/centrality/katz_centrality_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ void katz_centrality(
CUGRAPH_EXPECTS(epsilon >= 0.0, "Invalid input argument: epsilon should be non-negative.");

if (do_expensive_check) {
// FIXME: should I check for betas?

if (has_initial_guess) {
auto num_negative_values =
count_if_v(handle, pull_graph_view, katz_centralities, [] __device__(auto, auto val) {
Expand Down
46 changes: 41 additions & 5 deletions cpp/src/community/detail/common_methods.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct is_bitwise_comparable<cuco::pair<int32_t, float>> : std::true_type {};
namespace cugraph {
namespace detail {

// a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
// FIXME: a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
template <typename vertex_t, typename weight_t>
struct key_aggregated_edge_op_t {
weight_t total_edge_weight{};
Expand Down Expand Up @@ -80,7 +80,7 @@ struct key_aggregated_edge_op_t {
}
};

// a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
// FIXME: a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
template <typename vertex_t, typename weight_t>
struct reduce_op_t {
using type = thrust::tuple<vertex_t, weight_t>;
Expand All @@ -100,7 +100,28 @@ struct reduce_op_t {
}
};

// a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
// FIXME: a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
template <typename vertex_t, typename weight_t>
struct count_updown_moves_op_t {
bool up_down{};
__device__ auto operator()(thrust::tuple<vertex_t, thrust::tuple<vertex_t, weight_t>> p) const
{
vertex_t old_cluster = thrust::get<0>(p);
auto new_cluster_gain_pair = thrust::get<1>(p);
vertex_t new_cluster = thrust::get<0>(new_cluster_gain_pair);
weight_t delta_modularity = thrust::get<1>(new_cluster_gain_pair);

auto result_assignment =
(delta_modularity > weight_t{0})
? (((new_cluster > old_cluster) != up_down) ? old_cluster : new_cluster)
: old_cluster;

return (delta_modularity > weight_t{0})
? (((new_cluster > old_cluster) != up_down) ? false : true)
: false;
}
};
// FIXME: a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
template <typename vertex_t, typename weight_t>
struct cluster_update_op_t {
bool up_down{};
Expand All @@ -115,7 +136,7 @@ struct cluster_update_op_t {
}
};

// a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
// FIXME: a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
template <typename vertex_t, typename weight_t>
struct return_edge_weight_t {
__device__ auto operator()(
Expand All @@ -125,7 +146,7 @@ struct return_edge_weight_t {
}
};

// a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
// FIXME: a workaround for cudaErrorInvalidDeviceFunction error when device lambda is used
template <typename vertex_t, typename weight_t>
struct return_one_t {
__device__ auto operator()(
Expand Down Expand Up @@ -394,6 +415,21 @@ rmm::device_uvector<vertex_t> update_clustering_by_delta_modularity(
detail::reduce_op_t<vertex_t, weight_t>{},
cugraph::get_dataframe_buffer_begin(output_buffer));

int nr_moves = thrust::count_if(
handle.get_thrust_policy(),
thrust::make_zip_iterator(thrust::make_tuple(
next_clusters_v.begin(), cugraph::get_dataframe_buffer_begin(output_buffer))),
thrust::make_zip_iterator(
thrust::make_tuple(next_clusters_v.end(), cugraph::get_dataframe_buffer_end(output_buffer))),
detail::count_updown_moves_op_t<vertex_t, weight_t>{up_down});

if (multi_gpu) {
nr_moves = host_scalar_allreduce(
handle.get_comms(), nr_moves, raft::comms::op_t::SUM, handle.get_stream());
}

if (nr_moves == 0) { up_down = !up_down; }

thrust::transform(handle.get_thrust_policy(),
next_clusters_v.begin(),
next_clusters_v.end(),
Expand Down
Loading

0 comments on commit 0dbdc9b

Please sign in to comment.