Skip to content

Commit

Permalink
add type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
jnke2016 committed Jan 13, 2025
1 parent 6418b96 commit af3b31f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 20 deletions.
30 changes: 24 additions & 6 deletions python/cugraph/cugraph/dask/sampling/biased_random_walks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dask.distributed import wait, default_client
import dask_cudf
import cudf
import cupy as cp
import operator as op
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
Expand All @@ -27,9 +28,14 @@
)

from cugraph.dask.comms import comms as Comms
from typing import Tuple, Union


def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
def convert_to_cudf(
cp_paths: cp.ndarray,
number_map=None,
is_vertex_paths: bool = False
) -> cudf.Series:
"""
Creates cudf Series from cupy arrays from pylibcugraph wrapper
"""
Expand All @@ -48,7 +54,13 @@ def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
return cudf.Series(cp_paths)


def _call_plc_biased_random_walks(sID, mg_graph_x, st_x, max_depth, random_state):
def _call_plc_biased_random_walks(
sID: bytes,
mg_graph_x,
st_x: cudf.Series,
max_depth: int,
random_state: int
) -> Tuple[cp.ndarray, cp.ndarray]:

return pylibcugraph_biased_random_walks(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
Expand All @@ -60,8 +72,12 @@ def _call_plc_biased_random_walks(sID, mg_graph_x, st_x, max_depth, random_state


def biased_random_walks(
input_graph, start_vertices=None, max_depth=None, random_state=None
):
input_graph,
start_vertices: Union[int, list, cudf.Series, cudf.DataFrame, cudf.Series
] = None,
max_depth: int = 1,
random_state: int = None
) -> Tuple[Union[dask_cudf.Series, dask_cudf.DataFrame], dask_cudf.Series, int]:
"""
compute random walks under the biased sampling framework for each nodes in
'start_vertices' and returns a padded result along with the maximum path length.
Expand All @@ -77,8 +93,10 @@ def biased_random_walks(
the random walks. In case of multi-column vertices it should be
a cudf.DataFrame
max_depth : int
The maximum depth of the random walks
max_depth: int
The maximum depth of the random walks. If not specified, the maximum
depth is set to 1.
Must be a positive integer
random_state: int, optional
Random seed to use when making sampling calls.
Expand Down
37 changes: 27 additions & 10 deletions python/cugraph/cugraph/dask/sampling/node2vec_random_walks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,26 @@
from dask.distributed import wait, default_client
import dask_cudf
import cudf
import cupy as cp
import operator as op
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
)

from pylibcugraph import ResourceHandle

from pylibcugraph import (
ResourceHandle,
node2vec_random_walks as pylibcugraph_node2vec_random_walks,
)

from cugraph.dask.comms import comms as Comms
from typing import Tuple, Union


def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
def convert_to_cudf(
cp_paths: cp.ndarray,
number_map=None,
is_vertex_paths: bool = False
) -> cudf.Series:
"""
Creates cudf Series from cupy arrays from pylibcugraph wrapper
"""
Expand All @@ -49,8 +54,14 @@ def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):


def _call_plc_node2vec_random_walks(
sID, mg_graph_x, st_x, max_depth, p, q, random_state
):
sID: bytes,
mg_graph_x,
st_x: cudf.Series,
max_depth: int,
p: float,
q: float,
random_state: int
) -> Tuple[cp.ndarray, cp.ndarray]:

return pylibcugraph_node2vec_random_walks(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
Expand All @@ -63,10 +74,15 @@ def _call_plc_node2vec_random_walks(
)


# FIXME: Add type anotation
def node2vec_random_walks(
input_graph, start_vertices=None, max_depth=None, p=1.0, q=1.0, random_state=None
):
input_graph,
start_vertices: Union[int, list, cudf.Series, cudf.DataFrame, cudf.Series
] = None,
max_depth: int = 1,
p: float = 1.0,
q: float = 1.0,
random_state: int = None
) -> Tuple[Union[dask_cudf.Series, dask_cudf.DataFrame], dask_cudf.Series, int]:
"""
compute random walks under the node2vec sampling framework for each nodes in
'start_vertices' and returns a padded result along with the maximum path length.
Expand All @@ -82,9 +98,10 @@ def node2vec_random_walks(
the random walks. In case of multi-column vertices it should be
a cudf.DataFrame. Only supports int32 currently.
max_depth: int, optional (default=1)
max_depth: int
The maximum depth of the random walks. If not specified, the maximum
depth is set to 1.
Must be a positive integer
p: float, optional (default=1.0, [0 < p])
Return factor, which represents the likelihood of backtracking to
Expand Down Expand Up @@ -158,7 +175,7 @@ def node2vec_random_walks(
Comms.get_session_id(),
input_graph._plc_graph[w],
start_v[0] if start_v else cudf.Series(dtype=start_vertices_type),
max_depth,
max_depth if isinstance(max_depth, int) else 1,
p=p,
q=q,
random_state=random_state,
Expand Down
24 changes: 20 additions & 4 deletions python/cugraph/cugraph/dask/sampling/uniform_random_walks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dask.distributed import wait, default_client
import dask_cudf
import cudf
import cupy as cp
import operator as op
from cugraph.dask.common.part_utils import (
persist_dask_df_equal_parts_per_worker,
Expand All @@ -27,9 +28,14 @@
)

from cugraph.dask.comms import comms as Comms
from typing import Tuple, Union


def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
def convert_to_cudf(
cp_paths: cp.ndarray,
number_map=None,
is_vertex_paths: bool = False
) -> cudf.Series:
"""
Creates cudf Series from cupy arrays from pylibcugraph wrapper
"""
Expand All @@ -48,7 +54,13 @@ def convert_to_cudf(cp_paths, number_map=None, is_vertex_paths=False):
return cudf.Series(cp_paths)


def _call_plc_uniform_random_walks(sID, mg_graph_x, st_x, max_depth, random_state):
def _call_plc_uniform_random_walks(
sID: bytes,
mg_graph_x,
st_x: cudf.Series,
max_depth: int,
random_state: int
) -> Tuple[cp.ndarray, cp.ndarray]:

return pylibcugraph_uniform_random_walks(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
Expand All @@ -60,8 +72,12 @@ def _call_plc_uniform_random_walks(sID, mg_graph_x, st_x, max_depth, random_stat


def uniform_random_walks(
input_graph, start_vertices=None, max_depth=None, random_state=None
):
input_graph,
start_vertices: Union[int, list, cudf.Series, cudf.DataFrame, cudf.Series
] = None,
max_depth: int = 1,
random_state: int = None
) -> Tuple[Union[dask_cudf.Series, dask_cudf.DataFrame], dask_cudf.Series, int]:
"""
compute random walks under the uniform sampling framework for each nodes in
'start_vertices' and returns a padded result along with the maximum path length.
Expand Down

0 comments on commit af3b31f

Please sign in to comment.