diff --git a/src/copairs/compute.py b/src/copairs/compute.py index 954bf01..6986e58 100644 --- a/src/copairs/compute.py +++ b/src/copairs/compute.py @@ -2,19 +2,42 @@ import os from multiprocessing.pool import ThreadPool from pathlib import Path -from typing import Callable +from typing import Callable, Tuple, Optional, Union import numpy as np from tqdm.autonotebook import tqdm -def parallel_map(par_func, items): - """Execute par_func(i) for every i in items using ThreadPool and tqdm.""" +def parallel_map(par_func: Callable[[int], None], items: np.ndarray) -> None: + """Execute a function in parallel over a list of items. + + This function uses a thread pool to process items in parallel, with progress + tracking via `tqdm`. It is particularly useful for batch operations that benefit + from multithreading. + + Parameters: + ---------- + par_func : Callable + A function to execute for each item. It should accept a single argument + (an item index or value). + items : np.ndarray + An array or list of items to process. + """ + # Total number of items to process num_items = len(items) + + # Determine the number of threads to use, limited by CPU count pool_size = min(num_items, os.cpu_count()) + + # Calculate chunk size for dividing work among threads chunksize = num_items // pool_size + + # Use a thread pool to execute the function in parallel with ThreadPool(pool_size) as pool: + # Map the function to items with unordered execution for better efficiency tasks = pool.imap_unordered(par_func, items, chunksize=chunksize) + + # Display progress using tqdm for _ in tqdm(tasks, total=len(items), leave=False): pass @@ -22,18 +45,40 @@ def parallel_map(par_func, items): def batch_processing( pairwise_op: Callable[[np.ndarray, np.ndarray], np.ndarray], ): - """Decorator adding the batch_size param to run the function with - multithreading using a list of paired indices""" + """Decorator for adding batch processing to pairwise operations. + + This function wraps a pairwise operation to process data in batches, enabling + efficient computation and multithreading when working with large datasets. + + Parameters: + ---------- + pairwise_op : Callable + A function that computes pairwise operations (e.g., similarity or distance) + between two arrays of features. + + Returns: + ------- + Callable + A wrapped function that processes pairwise operations in batches. + + """ def batched_fn(feats: np.ndarray, pair_ix: np.ndarray, batch_size: int): + # Total number of pairs to process num_pairs = len(pair_ix) + + # Initialize an empty result array to store pairwise operation results result = np.empty(num_pairs, dtype=np.float32) def par_func(i): + # Extract the features for the current batch of pairs x_sample = feats[pair_ix[i : i + batch_size, 0]] y_sample = feats[pair_ix[i : i + batch_size, 1]] + + # Compute pairwise operations for the current batch result[i : i + len(x_sample)] = pairwise_op(x_sample, y_sample) + # Use multithreading to process the batches in parallel parallel_map(par_func, np.arange(0, num_pairs, batch_size)) return result @@ -42,52 +87,181 @@ def par_func(i): def pairwise_corr(x_sample: np.ndarray, y_sample: np.ndarray) -> np.ndarray: + """Compute the Pearson correlation coefficient for paired rows of two matrices. + + Parameters: + ---------- + x_sample : np.ndarray + A 2D array where each row represents a profile + y_sample : np.ndarray + A 2D array of the same shape as `x_sample`. + + Returns: + ------- + np.ndarray + A 1D array of Pearson correlation coefficients for each row pair in + `x_sample` and `y_sample`. """ - Compute pearson correlation between two matrices in a paired row-wise - fashion. `x_sample` and `y_sample` must be of the same shape. - """ + # Compute the mean for each row x_mean = x_sample.mean(axis=1, keepdims=True) y_mean = y_sample.mean(axis=1, keepdims=True) + # Center the rows by subtracting the mean x_center = x_sample - x_mean y_center = y_sample - y_mean + # Compute the numerator (dot product of centered vectors) numer = (x_center * y_center).sum(axis=1) + # Compute the denominator (product of vector magnitudes) denom = (x_center**2).sum(axis=1) * (y_center**2).sum(axis=1) denom = np.sqrt(denom) + # Calculate correlation coefficients corrs = numer / denom return corrs def pairwise_cosine(x_sample: np.ndarray, y_sample: np.ndarray) -> np.ndarray: + """Compute cosine similarity for paired rows of two matrices. + + Parameters: + ---------- + x_sample : np.ndarray + A 2D array where each row represents a profile. + y_sample : np.ndarray + A 2D array of the same shape as `x_sample`. + + Returns: + ------- + np.ndarray + A 1D array of cosine similarity scores for each row pair in `x_sample` and `y_sample`. + """ + # Normalize each row to unit vectors x_norm = x_sample / np.linalg.norm(x_sample, axis=1)[:, np.newaxis] y_norm = y_sample / np.linalg.norm(y_sample, axis=1)[:, np.newaxis] + + # Compute the dot product of normalized vectors c_sim = np.sum(x_norm * y_norm, axis=1) return c_sim def pairwise_abs_cosine(x_sample: np.ndarray, y_sample: np.ndarray) -> np.ndarray: + """Compute the absolute cosine similarity for paired rows of two matrices. + + Parameters: + ---------- + x_sample : np.ndarray + A 2D array where each row represents a profile. + y_sample : np.ndarray + A 2D array of the same shape as `x_sample`. + + Returns: + ------- + np.ndarray + Absolute values of cosine similarity scores. + """ return np.abs(pairwise_cosine(x_sample, y_sample)) def pairwise_euclidean(x_sample: np.ndarray, y_sample: np.ndarray) -> np.ndarray: + """ + Compute the inverse Euclidean distance for paired rows of two matrices. + + Parameters: + ---------- + x_sample : np.ndarray + A 2D array where each row represents a profile. + y_sample : np.ndarray + A 2D array of the same shape as `x_sample`. + + Returns: + ------- + np.ndarray + A 1D array of inverse Euclidean distance scores (scaled to range 0-1). + """ + # Compute Euclidean distance and scale to a range of 0 to 1 e_dist = np.sqrt(np.sum((x_sample - y_sample) ** 2, axis=1)) return 1 / (1 + e_dist) def pairwise_manhattan(x_sample: np.ndarray, y_sample: np.ndarray) -> np.ndarray: + """Compute the inverse Manhattan distance for paired rows of two matrices. + + Parameters: + ---------- + x_sample : np.ndarray + A 2D array where each row represents a profile. + y_sample : np.ndarray + A 2D array of the same shape as `x_sample`. + + Returns: + ------- + np.ndarray + A 1D array of inverse Manhattan distance scores (scaled to range 0-1). + """ m_dist = np.sum(np.abs(x_sample - y_sample), axis=1) return 1 / (1 + m_dist) def pairwise_chebyshev(x_sample: np.ndarray, y_sample: np.ndarray) -> np.ndarray: + """Compute the inverse Chebyshev distance for paired rows of two matrices. + + Parameters: + ---------- + x_sample : np.ndarray + A 2D array where each row represents a profile. + y_sample : np.ndarray + A 2D array of the same shape as `x_sample`. + + Returns: + ------- + np.ndarray + A 1D array of inverse Chebyshev distance scores (scaled to range 0-1). + """ c_dist = np.max(np.abs(x_sample - y_sample), axis=1) return 1 / (1 + c_dist) -def get_distance_fn(distance): +def get_distance_fn(distance: Union[str, Callable]) -> Callable: + """Retrieve a distance metric function based on a string identifier or custom callable. + + This function provides flexibility in specifying the distance metric to be used + for pairwise similarity or dissimilarity computations. Users can choose from a + predefined set of metrics or provide a custom callable. + + Parameters: + ---------- + distance : str or callable + The name of the distance metric or a custom callable function. Supported + string identifiers for predefined metrics are: + - "cosine": Cosine similarity. + - "abs_cosine": Absolute cosine similarity. + - "correlation": Pearson correlation coefficient. + - "euclidean": Inverse Euclidean distance (scaled to range 0-1). + - "manhattan": Inverse Manhattan distance (scaled to range 0-1). + - "chebyshev": Inverse Chebyshev distance (scaled to range 0-1). + + If a callable is provided, it must accept the paramters associated with each + callable function. + + Returns: + ------- + callable + A function implementing the specified distance metric. + + Raises: + ------- + ValueError: + If the provided `distance` is not a recognized string identifier or a valid callable. + + Example: + ------- + >>> distance_fn = get_distance_fn("cosine") + >>> similarity_scores = distance_fn(x_sample, y_sample) + """ + + # Dictionary of supported distance metrics distance_metrics = { "abs_cosine": pairwise_abs_cosine, "cosine": pairwise_cosine, @@ -97,6 +271,7 @@ def get_distance_fn(distance): "chebyshev": pairwise_chebyshev, } + # If a string is provided, look up the corresponding metric function if isinstance(distance, str): if distance not in distance_metrics: raise ValueError( @@ -104,137 +279,378 @@ def get_distance_fn(distance): ) distance_fn = distance_metrics[distance] elif callable(distance): + # If a callable is provided, use it directly distance_fn = distance else: + # Raise an error if neither a string nor a callable is provided raise ValueError("Distance must be either a string or a callable object.") + # Wrap the distance function for efficient batch processing return batch_processing(distance_fn) -def random_binary_matrix(n, m, k, rng): - """Generate a random binary matrix of n*m with exactly k values in 1 per row. - Args: - n: Number of rows. - m: Number of columns. - k: Number of 1's per row. +def random_binary_matrix( + n: int, m: int, k: int, rng: Optional[np.random.Generator] +) -> np.ndarray: + """Generate a random binary matrix with a fixed number of 1's per row. + + This function creates an `n x m` binary matrix where each row contains exactly + `k` ones, with the positions of the ones randomized using a specified random + number generator (RNG). + + Parameters: + ---------- + n : int + Number of rows in the matrix. + m : int + Number of columns in the matrix. + k : int + Number of 1's to be placed in each row. Must satisfy `k <= m`. + rng : Optional[np.random.Generator] + A NumPy random number generator instance used for shuffling the positions + of the ones in each row. If None, a new Generator will be created using + the default random seed. Returns: - A: Random binary matrix of n*m with exactly k values in 1 per row. + ------- + np.ndarray + A binary matrix of shape `(n, m)` with exactly `k` ones per row. """ + # Initialize the binary matrix with all zeros matrix = np.zeros((n, m), dtype=int) + + # Fill the first `k` elements of each row with ones matrix[:, :k] = 1 + + # Randomly shuffle each row to distribute the ones across the columns rng.permuted(matrix, axis=1, out=matrix) + return matrix -def average_precision(rel_k) -> np.ndarray: - """Compute average precision based on binary list sorted by relevance""" +def average_precision(rel_k: np.ndarray) -> np.ndarray: + """Compute the Average Precision (AP) for a binary list of relevance scores. + + Average Precision (AP) is a performance metric for ranking tasks, which calculates + the weighted mean of precision values at the positions where relevant items occur + in a sorted list. The relevance list should be binary (1 for relevant items, 0 + for non-relevant). + + Parameters: + ---------- + rel_k : np.ndarray + A 2D binary array where each row represents a ranked list of items, and each + element indicates the relevance of the item (1 for relevant, 0 for non-relevant). + + Returns: + ------- + np.ndarray + A 1D array of Average Precision (AP) scores, one for each row in the input array. + """ + + # Cumulative sum of relevance scores along the row (True Positives at each rank) tp = np.cumsum(rel_k, axis=1) + + # Total number of relevant items (last value in cumulative sum per row) num_pos = tp[:, -1] + + # Rank positions (1-based index for each column) k = np.arange(1, rel_k.shape[1] + 1) + + # Precision at each rank pr_k = tp / k + + # Calculate AP: Weighted sum of precision values, normalized by total relevant items ap = (pr_k * rel_k).sum(axis=1) / num_pos + return ap -def ap_contiguous(rel_k_list, counts): - """Compute average precision from a list of contiguous values""" +def ap_contiguous( + rel_k_list: np.ndarray, counts: np.ndarray +) -> Tuple[np.ndarray, np.ndarray]: + """Compute Average Precision (AP) scores from relevance labels. + + This function calculates Average Precision (AP) scores for each profile based on + relevance labels and their associated counts. It also returns configurations + indicating the number of positive and total pairs for each profile. + + Parameters: + ---------- + rel_k_list : np.ndarray + Array of relevance labels (1 for positive pairs, 0 for negative pairs), sorted + by descending similarity within profiles. + counts : np.ndarray + Array indicating how many times each profile appears in the rank list. + + Returns: + ------- + ap_scores : np.ndarray + Array of Average Precision scores for each profile. + null_confs : np.ndarray + Array of configurations, where each row corresponds to: + - Number of positive pairs (`num_pos`). + - Total number of pairs (`counts`). + """ + # Convert counts into cutoff indices to segment relevance labels cutoffs = to_cutoffs(counts) + # Calculate the number of positive pairs for each profile num_pos = np.add.reduceat(rel_k_list, cutoffs) + + # Compute the cumulative shift for handling contiguous true positives shift = np.empty_like(num_pos) shift[0], shift[1:] = 0, num_pos[:-1] + # Calculate cumulative true positives for each profile segment tp = rel_k_list.cumsum() - np.repeat(shift.cumsum(), counts) + + # Rank positions for each relevance label, adjusted by cutoff indices k = np.arange(1, len(rel_k_list) + 1) - np.repeat(cutoffs, counts) + # Compute precision at each rank (precision = TP / rank) pr_k = tp / k + + # Calculate average precision scores for each profile ap_scores = np.add.reduceat(pr_k * rel_k_list, cutoffs) / num_pos + + # Generate configurations (number of positive and total pairs) null_confs = np.stack([num_pos, counts], axis=1) + return ap_scores, null_confs -def random_ap(num_perm: int, num_pos: int, total: int, seed) -> np.ndarray: - """Compute multiple average_precision scores generated at random""" +def random_ap(num_perm: int, num_pos: int, total: int, seed: int): + """Generate random Average Precision (AP) scores to create a null distribution. + + This function computes multiple Average Precision (AP) scores based on randomly + generated binary relevance lists. It is useful for generating a null distribution + to assess the significance of observed AP scores. + + Parameters: + ---------- + num_perm : int + Number of random permutations (i.e., how many random relevance lists to generate). + num_pos : int + Number of positive samples (1's) in each relevance list. + total : int + Total number of samples (columns) in each relevance list. + seed : int + Seed for the random number generator to ensure reproducibility. + + Returns: + ------- + np.ndarray + A 1D array containing the Average Precision scores for each randomly + generated relevance list. + """ + # Initialize the random number generator rng = np.random.default_rng(seed) + + # Generate a binary matrix with `num_perm` rows and `total` columns, + # where each row contains exactly `num_pos` ones distributed randomly rel_k = random_binary_matrix(num_perm, total, num_pos, rng) + + # Compute Average Precision (AP) scores for each row of the binary matrix null_dist = average_precision(rel_k) + return null_dist -def null_dist_cached(num_pos, total, seed, null_size, cache_dir): +def null_dist_cached( + num_pos: int, total: int, seed: int, null_size: int, cache_dir: Path +) -> np.ndarray: + """Generate or retrieve a cached null distribution for a given configuration. + + This function calculates a null distribution for a specified number of positive + pairs (`num_pos`) and total pairs (`total`). It uses caching to store and + retrieve precomputed distributions, saving time and computational resources. + + Parameters: + ---------- + num_pos : int + Number of positive pairs in the configuration. + total : int + Total number of pairs (positive + negative) in the configuration. + seed : int + Random seed for reproducibility. + null_size : int + Number of samples to generate in the null distribution. + cache_dir : Path + Directory to store or retrieve cached null distributions. + + Returns: + ------- + np.ndarray + Null distribution for the specified configuration. + """ + # Check if a seed is provided to enable caching if seed is not None: + # Define the cache file name based on the configuration cache_file = cache_dir / f"n{total}_k{num_pos}.npy" + + # If the cache file exists, load the null distribution from it if cache_file.is_file(): null_dist = np.load(cache_file) else: + # If the cache file doesn't exist, compute the null distribution null_dist = random_ap(null_size, num_pos, total, seed) + + # Save the computed distribution to the cache np.save(cache_file, null_dist) else: + # If no seed is provided, compute the null distribution without caching null_dist = random_ap(null_size, num_pos, total, seed) + + # Return the null distribution (loaded or computed) return null_dist -def get_null_dists(confs, null_size, seed): +def get_null_dists(confs: np.ndarray, null_size: int, seed: int) -> np.ndarray: + """Generate null distributions for each configuration of positive and total pairs. + Parameters: + ---------- + confs : np.ndarray + Array where each row contains the number of positive pairs (`num_pos`) + and total pairs (`total`) for a specific configuration. + null_size : int + Number of samples to generate in the null distribution. + seed : int + Random seed for reproducibility. + + Returns: + ------- + np.ndarray + A 2D array where each row corresponds to a null distribution for a specific + configuration. + """ + # Define the directory for caching null distributions cache_dir = Path.home() / ".copairs" / f"seed{seed}" / f"ns{null_size}" cache_dir.mkdir(parents=True, exist_ok=True) + + # Number of configurations and random seeds for each configuration num_confs = len(confs) rng = np.random.default_rng(seed) seeds = rng.integers(8096, size=num_confs) + # Initialize an array to store null distributions null_dists = np.empty([len(confs), null_size], dtype=np.float32) + # Function to generate null distributions for each configuration def par_func(i): num_pos, total = confs[i] null_dists[i] = null_dist_cached(num_pos, total, seeds[i], null_size, cache_dir) + # Parallelize the generation of null distributions parallel_map(par_func, np.arange(num_confs)) + return null_dists def p_values(ap_scores: np.ndarray, null_confs: np.ndarray, null_size: int, seed: int): - """Calculate p values for an array of ap_scores and null configurations. It uses the path - folder to cache null calculations. + """Calculate p-values for an array of Average Precision (AP) scores + using a null distribution. - Parameters + Parameters: ---------- ap_scores : np.ndarray - Ap scores for which to calculate p value. + Array of observed AP scores for which to calculate p-values. null_confs : np.ndarray - Number of average precisions calculated. It serves as an indicator of - how relevant is the resultant score. + Configuration array indicating the relevance or context of each AP score. Used + to generate corresponding null distributions. null_size : int + Number of samples to generate in the null distribution for each configuration. seed : int - Random initializing value. - - Examples - -------- - FIXME: Add docs. - + Seed for the random number generator to ensure reproducibility of the null + distribution. + Returns: + ------- + np.ndarray + An array of p-values corresponding to the input AP scores. """ + # Identify unique configurations and their indices confs, rev_ix = np.unique(null_confs, axis=0, return_inverse=True) + + # Generate null distributions for each unique configuration null_dists = get_null_dists(confs, null_size, seed) + + # Sort null distributions for efficient p-value computation null_dists.sort(axis=1) + + # Initialize an array to store the p-values pvals = np.empty(len(ap_scores), dtype=np.float32) + + # Compute p-values for each AP score for i, (ap_score, ix) in enumerate(zip(ap_scores, rev_ix)): - # Reverse to get from hi to low + # Find the rank of the observed AP score in the sorted null distribution num = null_size - np.searchsorted(null_dists[ix], ap_score) + + # Calculate the p-value as the proportion of null scores >= observed score pvals[i] = (num + 1) / (null_size + 1) + return pvals def concat_ranges(start: np.ndarray, end: np.ndarray) -> np.ndarray: - """Create a 1-d array concatenating multiple ranges""" + """Create a 1D array by concatenating multiple integer ranges. + + This function generates a single concatenated array from multiple ranges defined + by the `start` and `end` arrays. Each range is inclusive of `start` and exclusive + of `end`. + + Parameters: + ---------- + start : np.ndarray + A 1D array of start indices for the ranges. + end : np.ndarray + A 1D array of end indices for the ranges. Must have the same shape as `start`. + + Returns: + ------- + np.ndarray + A 1D array containing the concatenated ranges. + """ + # Generate individual ranges using `range` for each pair of start and end slices = map(range, start, end) + + # Flatten the ranges into a single iterable slices = itertools.chain.from_iterable(slices) + + # Calculate the total length of the concatenated ranges count = (end - start).sum() + + # Create a 1D array from the concatenated ranges mask = np.fromiter(slices, dtype=np.int32, count=count) + return mask -def to_cutoffs(counts: np.ndarray): - """Convert a list of counts into cutoff indices.""" +def to_cutoffs(counts: np.ndarray) -> np.ndarray: + """Convert counts into cumulative cutoff indices. + + This function generates a 1D array of indices that mark the start of each segment + in a cumulative list. The first index is always `0`, and subsequent indices + correspond to the cumulative sum of counts up to the previous entry. + + Parameters: + ---------- + counts : np.ndarray + A 1D array of counts representing the size of each segment. + + Returns: + ------- + np.ndarray + A 1D array of cutoff indices where each value indicates the starting index + for the corresponding segment. + """ + # Initialize an empty array for cutoff indices cutoffs = np.empty_like(counts) - cutoffs[0], cutoffs[1:] = 0, counts.cumsum()[:-1] + + # Set the first cutoff to 0 (start of the first segment) + cutoffs[0] = 0 + + # Compute subsequent cutoffs using cumulative sums, excluding the last element + cutoffs[1:] = counts.cumsum()[:-1] + return cutoffs diff --git a/src/copairs/map/average_precision.py b/src/copairs/map/average_precision.py index 10b481a..1f63f3c 100644 --- a/src/copairs/map/average_precision.py +++ b/src/copairs/map/average_precision.py @@ -3,6 +3,7 @@ import numpy as np import pandas as pd +from typing import List from copairs import compute from copairs.matching import Matcher, UnpairedException @@ -12,92 +13,272 @@ logger = logging.getLogger("copairs") -def build_rank_lists(pos_pairs, neg_pairs, pos_sims, neg_sims): +def build_rank_lists( + pos_pairs: np.ndarray, + neg_pairs: np.ndarray, + pos_sims: np.ndarray, + neg_sims: np.ndarray, +): + """Build rank lists for calculating average precision. + + This function processes positive and negative pairs along with their similarity scores + to construct rank lists and determine unique profile indices with their associated counts. + + Parameters: + ---------- + pos_pairs : np.ndarray + Array of positive pair indices, where each pair is represented as a pair of integers. + + neg_pairs : np.ndarray + Array of negative pair indices, where each pair is represented as a pair of integers. + + pos_sims : np.ndarray + Array of similarity scores for positive pairs. + + neg_sims : np.ndarray + Array of similarity scores for negative pairs. + + Returns: + ------- + paired_ix : np.ndarray + Unique indices of profiles that appear in the rank lists. + + rel_k_list : np.ndarray + Array of relevance labels (1 for positive pairs, 0 for negative pairs) sorted by + decreasing similarity within each profile. + + counts : np.ndarray + Array of counts indicating how many times each profile index appears in the rank lists. + """ + + # Combine relevance labels: 1 for positive pairs, 0 for negative pairs labels = np.concatenate( [ - np.ones(pos_pairs.size, dtype=np.int32), - np.zeros(neg_pairs.size, dtype=np.int32), + np.ones(pos_pairs.size, dtype=np.int32), # Label positive pairs + np.zeros(neg_pairs.size, dtype=np.int32), # Label negative pairs ] ) + + # Flatten positive and negative pair indices for ranking ix = np.concatenate([pos_pairs.ravel(), neg_pairs.ravel()]) + + # Expand similarity scores to match the flattened pair indices sim_all = np.concatenate([np.repeat(pos_sims, 2), np.repeat(neg_sims, 2)]) + + # Sort by similarity (descending) and then by index (lexicographical order) + # `1 - sim_all` ensures higher similarity values appear first, prioritizing + # pairs with stronger similarity scores for ranking. + # `ix` acts as a secondary criterion, ensuring consistent ordering of pairs + # with equal similarity scores by their indices (lexicographical order). ix_sort = np.lexsort([1 - sim_all, ix]) + + # Create the rank list of relevance labels sorted by similarity and index rel_k_list = labels[ix_sort] + + # Find unique profile indices and count their occurrences in the pairs paired_ix, counts = np.unique(ix, return_counts=True) + return paired_ix, rel_k_list, counts def average_precision( - meta, - feats, - pos_sameby, - pos_diffby, - neg_sameby, - neg_diffby, - batch_size=20000, - distance="cosine", + meta: pd.DataFrame, + feats: pd.DataFrame, + pos_sameby: List[str], + pos_diffby: List[str], + neg_sameby: List[str], + neg_diffby: List[str], + batch_size: int = 20000, + distance: str = "cosine", ) -> pd.DataFrame: + """Calculate average precision (AP) scores for pairs of profiles based on their + similarity. + + This function identifies positive and negative pairs of profiles using metadata + rules, computes their similarity scores, and calculates average precision + scores for each profile. The results include the number of positive and total pairs + for each profile. + + Parameters: + ---------- + meta : pd.DataFrame + Metadata of the profiles, including columns used for defining pairs. + This DataFrame should include the columns specified in `pos_sameby`, + `pos_diffby`, `neg_sameby`, and `neg_diffby`. + + feats : np.ndarray + Feature matrix representing the profiles, where rows correspond to profiles + and columns to features. + + pos_sameby : list + Metadata columns used to define positive pairs. Two profiles are considered a + positive pair if they belong to the same group that is not a control group. + For example, replicate profiles of the same compound are positive pairs and + should share the same value in a column identifying compounds. + + pos_diffby : list + Metadata columns used to differentiate positive pairs. Positive pairs do not need + to differ in any metadata columns, so this is typically left empty. However, + if necessary (e.g., to account for batch effects), you can specify columns + such as batch identifiers. + + neg_sameby : list + Metadata columns used to define negative pairs. Typically left empty, as profiles + forming a negative pair (e.g., a compound and a DMSO/control) do not need to + share any metadata values. This ensures comparisons are made without enforcing + unnecessary constraints. + + neg_diffby : list + Metadata columns used to differentiate negative pairs. Two profiles are considered + a negative pair if one belongs to a compound group and the other to a DMSO/ + control group. They must differ in specified metadata columns, such as those + identifying the compound and the treatment index, to ensure comparisons are + only made between compounds and DMSO controls (not between different compounds). + + batch_size : int + The batch size for similarity computations to optimize memory usage. + Default is 20000. + + distance : str + The distance metric used for computing similarities. Default is "cosine". + + Returns: + ------- + pd.DataFrame + A DataFrame containing the following columns: + - 'average_precision': The calculated average precision score for each profile. + - 'n_pos_pairs': The number of positive pairs for each profile. + - 'n_total_pairs': The total number of pairs for each profile. + - Additional metadata columns from the input. + + Raises: + ------ + UnpairedException + If no positive or negative pairs are found in the dataset. + + Notes: + ------ + - Positive Pair Rules: + * Positive pairs are defined by `pos_sameby` (profiles share these metadata values) + and optionally differentiated by `pos_diffby` (profiles must differ in these metadata values if specified). + - Negative Pair Rules: + * Negative pairs are defined by `neg_diffby` (profiles differ in these metadata values) + and optionally constrained by `neg_sameby` (profiles share these metadata values if specified). + """ + + # Combine all metadata columns needed for pair definitions columns = flatten_str_list(pos_sameby, pos_diffby, neg_sameby, neg_diffby) + + # Validate and filter metadata to ensure the required columns are present and usable meta, columns = evaluate_and_filter(meta, columns) validate_pipeline_input(meta, feats, columns) + + # Get the distance function for similarity calculations (e.g., cosine) distance_fn = compute.get_distance_fn(distance) - # Critical!, otherwise the indexing wont work + # Reset metadata index for consistent indexing meta = meta.reset_index(drop=True).copy() + + # Initialize the Matcher object to find pairs based on metadata rules logger.info("Indexing metadata...") matcher = Matcher(meta, columns, seed=0) + # Identify positive pairs based on `pos_sameby` and `pos_diffby` logger.info("Finding positive pairs...") pos_pairs = matcher.get_all_pairs(sameby=pos_sameby, diffby=pos_diffby) pos_total = sum(len(p) for p in pos_pairs.values()) if pos_total == 0: raise UnpairedException("Unable to find positive pairs.") + + # Convert positive pairs to a NumPy array for efficient computation pos_pairs = np.fromiter( itertools.chain.from_iterable(pos_pairs.values()), dtype=np.dtype((np.int32, 2)), count=pos_total, ) + # Identify negative pairs based on `neg_sameby` and `neg_diffby` logger.info("Finding negative pairs...") neg_pairs = matcher.get_all_pairs(sameby=neg_sameby, diffby=neg_diffby) neg_total = sum(len(p) for p in neg_pairs.values()) if neg_total == 0: raise UnpairedException("Unable to find negative pairs.") + + # Convert negative pairs to a NumPy array for efficient computation neg_pairs = np.fromiter( itertools.chain.from_iterable(neg_pairs.values()), dtype=np.dtype((np.int32, 2)), count=neg_total, ) + # Compute similarities for positive pairs logger.info("Computing positive similarities...") pos_sims = distance_fn(feats, pos_pairs, batch_size) + # Compute similarities for negative pairs logger.info("Computing negative similarities...") neg_sims = distance_fn(feats, neg_pairs, batch_size) + # Build rank lists for calculating average precision logger.info("Building rank lists...") paired_ix, rel_k_list, counts = build_rank_lists( pos_pairs, neg_pairs, pos_sims, neg_sims ) + # Compute average precision scores and associated configurations logger.info("Computing average precision...") ap_scores, null_confs = compute.ap_contiguous(rel_k_list, counts) + # Add AP scores and pair counts to the metadata DataFrame logger.info("Creating result DataFrame...") meta["n_pos_pairs"] = 0 meta["n_total_pairs"] = 0 meta.loc[paired_ix, "average_precision"] = ap_scores meta.loc[paired_ix, "n_pos_pairs"] = null_confs[:, 0] meta.loc[paired_ix, "n_total_pairs"] = null_confs[:, 1] + logger.info("Finished.") return meta -def p_values(dframe: pd.DataFrame, null_size: int, seed: int): - """Compute p-values""" +def p_values(dframe: pd.DataFrame, null_size: int, seed: int) -> np.ndarray: + """Compute p-values for average precision scores based on a null distribution. + + This function calculates the p-values for each profile in the input DataFrame, + comparing their average precision scores (`average_precision`) against a null + distribution generated for their specific configurations (number of positive + and total pairs). Profiles with no positive pairs are excluded from the p-value calculation. + + Parameters + ---------- + dframe : pd.DataFrame + A DataFrame containing the following columns: + - `average_precision`: The AP scores for each profile. + - `n_pos_pairs`: Number of positive pairs for each profile. + - `n_total_pairs`: Total number of pairs (positive + negative) for each profile. + null_size : int + The number of samples to generate in the null distribution for significance testing. + seed : int + Random seed for reproducibility of the null distribution. + + Returns + ------- + np.ndarray + An array of p-values for each profile in the DataFrame. Profiles with no positive + pairs will have NaN as their p-value. + """ + # Create a mask to filter profiles with at least one positive pair mask = dframe["n_pos_pairs"] > 0 + + # Initialize the p-values array with NaN for all profiles pvals = np.full(len(dframe), np.nan, dtype=np.float32) + + # Extract the average precision scores and null configurations for valid profiles scores = dframe.loc[mask, "average_precision"].values null_confs = dframe.loc[mask, ["n_pos_pairs", "n_total_pairs"]].values + + # Compute p-values for profiles with valid configurations using the null distribution pvals[mask] = compute.p_values(scores, null_confs, null_size, seed) + + # Return the array of p-values, including NaN for invalid profiles return pvals diff --git a/src/copairs/map/filter.py b/src/copairs/map/filter.py index c2956da..c9603be 100644 --- a/src/copairs/map/filter.py +++ b/src/copairs/map/filter.py @@ -6,11 +6,36 @@ import pandas as pd -def validate_pipeline_input(meta, feats, columns): +def validate_pipeline_input( + meta: pd.DataFrame, feats: np.ndarray, columns: List[str] +) -> None: + """Validate the metadata and features for consistency and completeness. + + Parameters: + ---------- + meta : pd.DataFrame + The metadata DataFrame describing the profiles. + feats : np.ndarray + The feature matrix where rows correspond to profiles in the metadata. + columns : List[str] + List of column names in the metadata to validate for null values. + + Raises: + ------- + ValueError: + - If any of the specified metadata columns contain null values. + - If the number of rows in the metadata and features are not equal. + - If the feature matrix contains null values. + """ + # Check for null values in the specified metadata columns if meta[columns].isna().any(axis=None): raise ValueError("metadata columns should not have null values.") + + # Check if the number of rows in metadata matches the feature matrix if len(meta) != len(feats): - raise ValueError("meta and feats have different number of rows") + raise ValueError("Metadata and features must have the same number of rows.") + + # Check for null values in the feature matrix if np.isnan(feats).any(): raise ValueError("features should not have null values.") @@ -29,50 +54,139 @@ def flatten_str_list(*args): return columns -def evaluate_and_filter(df, columns) -> Tuple[pd.DataFrame, List[str]]: - """Evaluate queries and filter the dataframe""" +def evaluate_and_filter( + df: pd.DataFrame, columns: List[str] +) -> Tuple[pd.DataFrame, List[str]]: + """Evaluate query filters and filter the metadata DataFrame based on specified columns. + + This function processes column specifications, extracts any filter conditions, + applies these conditions to the metadata DataFrame, and returns the filtered metadata + along with the updated list of columns. + + Parameters: + ---------- + df : pd.DataFrame + The metadata DataFrame containing information about profiles to be filtered. + columns : List[str] + A list of metadata column names. + + Returns: + ------- + Tuple[pd.DataFrame, List[str]] + - The filtered metadata DataFrame. + - The updated list of columns after processing any filter specifications. + """ + # Extract query filters from the column specifications query_list, columns = extract_filters(columns, df.columns) + + # Apply the extracted filters to the metadata DataFrame df = apply_filters(df, query_list) + + # Return the filtered metadata DataFrame and the updated list of columns return df, columns -def extract_filters(columns, df_columns) -> Tuple[List[str], List[str]]: - """Extract and validate filters from columns""" +def extract_filters( + columns: List[str], df_columns: List[str] +) -> Tuple[List[str], List[str]]: + """Extract and validate query filters from selected metadata columns. + + Parameters: + ---------- + columns : List[str] + A list of selected metadata column names or query expressions. Query expressions + should follow a valid syntax (e.g., "metadata_column > 5" or "metadata_column == 'value'"). + df_columns : List[str] + All available metadata column names to validate against. + + Returns: + ------- + Tuple[List[str], List[str]] + - `queries_to_eval`: A list of valid query expressions to evaluate. + - `parsed_cols`: A list of valid metadata column names extracted from the input `columns`. + + Raises: + ------- + ValueError: + - If a metadata column or query expression is invalid (e.g., references a non-existent column). + - If duplicate queries are found for the same metadata column. + """ + # Initialize lists to store parsed metadata column names and query expressions parsed_cols = [] queries_to_eval = [] + # Iterate through each entry in the selected metadata columns for col in columns: if col in df_columns: + # If the entry is a valid metadata column name, add it to parsed_cols parsed_cols.append(col) continue + + # Use regex to extract metadata column names from query expressions column_names = re.findall(r"(\w+)\s*[=<>!]+", col) + # Validate the extracted metadata column names against all available metadata columns valid_column_names = [col for col in column_names if col in df_columns] if not valid_column_names: - raise ValueError(f"Invalid query or column name: {col}") + raise ValueError(f"Invalid query or metadata column name: {col}") + # Add valid query expressions and associated metadata columns queries_to_eval.append(col) parsed_cols.extend(valid_column_names) + # Check for duplicate metadata columns in the parsed list if len(parsed_cols) != len(set(parsed_cols)): raise ValueError(f"Duplicate queries for column: {col}") + # Return the queries to evaluate and the parsed metadata column names return queries_to_eval, parsed_cols -def apply_filters(df, query_list): - """Combine and apply filters to dataframe""" +def apply_filters(df: pd.DataFrame, query_list: List[str]) -> pd.DataFrame: + """Combine and apply query filters to a DataFrame. + + This function takes a list of query expressions and applies them to a DataFrame + to filter its rows. If no query expressions are provided, the original DataFrame + is returned unchanged. + + Parameters: + ---------- + df : pd.DataFrame + The DataFrame to which the filters will be applied. + query_list : List[str] + A list of query expressions (e.g., "column_name > 5"). These expressions + should follow the syntax supported by `pd.DataFrame.query`. + + Returns: + ------- + pd.DataFrame + The DataFrame filtered based on the provided query expressions. + + Raises: + ------- + ValueError: + - If the combined query results in an empty DataFrame. + - If the combined query expression is invalid. + """ + # If no queries are provided, return the original DataFrame unchanged if not query_list: return df + # Combine the query expressions into a single string using logical AND (&) combined_query = " & ".join(f"({query})" for query in query_list) + try: + # Apply the combined query to filter the DataFrame df_filtered = df.query(combined_query) + + # Raise an error if the filtered DataFrame is empty if df_filtered.empty: raise ValueError(f"No data matched the query: {combined_query}") except Exception as e: + # Handle any issues with the query expression and provide feedback raise ValueError( f"Invalid combined query expression: {combined_query}. Error: {e}" ) + # Return the filtered DataFrame return df_filtered diff --git a/src/copairs/map/map.py b/src/copairs/map/map.py index 2e354cc..7e16e62 100644 --- a/src/copairs/map/map.py +++ b/src/copairs/map/map.py @@ -13,24 +13,61 @@ def mean_average_precision( ap_scores: pd.DataFrame, sameby, null_size: int, threshold: float, seed: int ) -> pd.DataFrame: + """Calculate the Mean Average Precision (mAP) score and associated p-values. + + This function computes the Mean Average Precision (mAP) score by grouping profiles + based on the specified criteria (`sameby`). It calculates the significance of mAP + scores by comparing them to a null distribution and performs multiple testing + corrections. + + Parameters: + ---------- + ap_scores : pd.DataFrame + DataFrame containing individual Average Precision (AP) scores and pair statistics + (e.g., number of positive pairs `n_pos_pairs` and total pairs `n_total_pairs`). + sameby : list or str + Metadata column(s) used to group profiles for mAP calculation. + null_size : int + Number of samples in the null distribution for significance testing. + threshold : float + p-value threshold for identifying significant MaP scores. + seed : int + Random seed for reproducibility. + + Returns: + ------- + pd.DataFrame + DataFrame with the following columns: + - `mean_average_precision`: Mean AP score for each group. + - `p_value`: p-value comparing mAP to the null distribution. + - `corrected_p_value`: Adjusted p-value after multiple testing correction. + - `below_p`: Boolean indicating if the p-value is below the threshold. + - `below_corrected_p`: Boolean indicating if the corrected p-value is below the threshold. + """ + # Filter out invalid or incomplete AP scores ap_scores = ap_scores.query("~average_precision.isna() and n_pos_pairs > 0") ap_scores = ap_scores.reset_index(drop=True).copy() logger.info("Computing null_dist...") + # Extract configurations for null distribution generation null_confs = ap_scores[["n_pos_pairs", "n_total_pairs"]].values null_confs, rev_ix = np.unique(null_confs, axis=0, return_inverse=True) + + # Generate null distributions for each unique configuration null_dists = compute.get_null_dists(null_confs, null_size, seed=seed) ap_scores["null_ix"] = rev_ix + # Function to calculate the p-value for a mAP score based on the null distribution def get_p_value(params): map_score, indices = params null_dist = null_dists[rev_ix[indices]].mean(axis=0) num = (null_dist > map_score).sum() - p_value = (num + 1) / (null_size + 1) + p_value = (num + 1) / (null_size + 1) # Add 1 for stability return p_value logger.info("Computing p-values...") + # Group by the specified metadata column(s) and calculate mean AP map_scores = ap_scores.groupby(sameby, observed=True).agg( { "average_precision": ["mean", lambda x: list(x.index)], @@ -38,14 +75,18 @@ def get_p_value(params): ) map_scores.columns = ["mean_average_precision", "indices"] + # Compute p-values for each group using the null distributions params = map_scores[["mean_average_precision", "indices"]] map_scores["p_value"] = thread_map(get_p_value, params.values, leave=False) + + # Perform multiple testing correction on p-values reject, pvals_corrected, alphacSidak, alphacBonf = multipletests( map_scores["p_value"], method="fdr_bh" ) map_scores["corrected_p_value"] = pvals_corrected + + # Mark scores below the p-value threshold map_scores["below_p"] = map_scores["p_value"] < threshold map_scores["below_corrected_p"] = map_scores["corrected_p_value"] < threshold - map_scores.drop(columns=["indices"], inplace=True) - map_scores.reset_index(inplace=True) + return map_scores