-
Notifications
You must be signed in to change notification settings - Fork 15
Add compare and ranking function for FEMaps #174
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
639fbbe
1ab4380
b0dbba7
117aa88
08826ed
739ba99
7e383ce
9af5c5a
054fea3
8b610e9
fb486eb
30a028b
2269b75
d036057
ef8d230
404a5d3
179126a
d70c014
e6dfd0a
5a86140
8354a91
c80a7af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,304 @@ | ||
| import string | ||
| from collections import defaultdict | ||
| from typing import Literal | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
|
|
||
| from cinnabar import FEMap | ||
| from cinnabar.stats import _AVAILABLE_STATS | ||
|
|
||
|
|
||
| def compare_and_rank_results( | ||
| femap: FEMap, | ||
| prediction_type: Literal["nodewise", "edgewise"] = "edgewise", | ||
| rank_metric: Literal["MUE", "RMSE", "RAE", "R2", "rho", "KTAU", "PI"] = "MUE", | ||
| metrics_to_compute: list[Literal["MUE", "RMSE", "RAE", "R2", "rho", "KTAU", "PI"]] | None = None, | ||
| num_bootstraps: int = 1_000, | ||
| confidence_level: float = 0.95, | ||
| alpha: float = 0.05, | ||
| ) -> tuple[pd.DataFrame, pd.DataFrame]: | ||
| """ | ||
| Compare and rank multiple result sources on a single FEMap based on the chosen performance metric and return an ordered table of | ||
| results using the compact letter display (CLD) system. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| femap : FEMap | ||
| An FEMap instance with results from multiple sources to compare. | ||
| prediction_type: {"nodewise", "edgewise"}, default "edgewise" | ||
| The type of prediction in the FEMap to evaluate. | ||
| rank_metric : {"MUE", "RMSE", "RAE", "R2", "rho", "KTAU", "PI"}, default "MUE" | ||
| The metric used to rank the models. | ||
| metrics_to_compute : list[{"MUE", "RMSE", "RAE", "R2", "rho", "KTAU", "PI"}] | None, default None | ||
| A list of metrics to compute for each model. If ``None``, all metrics appropriate for the ``prediction_type`` will be computed. | ||
| num_bootstraps : int, default 1000 | ||
| The number of bootstrap samples to use for estimating confidence intervals. | ||
| confidence_level : float, default 0.95 | ||
| The confidence level for the intervals. | ||
| alpha : float, default 0.05 | ||
| The significance level (Type I error probability) for determining statistical significance | ||
| in pairwise comparisons. Lower values are more conservative and require stronger evidence for significance. | ||
|
|
||
| Note | ||
| ---- | ||
| - The comparison method uses a joint bootstrapping procedure that generates a distribution of differences in the rank metric and checks for significant differences using a method inspired by. [1]_ | ||
| - Each source must be evaluated on the same set of edges. | ||
| - Prediction types "nodewise" and "edgewise" correspond to DGs and edgewise DDGs respectively. | ||
| - When we have more than 2 models, we apply multiple testing correction to the pairwise comparisons using the ``Holm`` | ||
| method to control the family-wise error rate in a low number of comparisons. For more information see https://en.wikipedia.org/wiki/Holm%E2%80%93Bonferroni_method. | ||
|
|
||
| Returns | ||
| ------- | ||
| tuple[pd.DataFrame, pd.DataFrame] | ||
| A tuple containing two DataFrames: | ||
| - The first DataFrame contains the computed metrics for each model. | ||
| - The second DataFrame contains the pairwise comparison results between models. | ||
|
|
||
| References | ||
| ---------- | ||
| .. [1] https://www.nature.com/articles/s42004-025-01428-y | ||
| """ | ||
|
|
||
| # get the predictions and experimental values from the FEMaps and align them into a DataFrame | ||
| predictions_by_key: dict[tuple[str, str] | str, dict[str, float]] = defaultdict(dict) | ||
| if prediction_type == "nodewise": | ||
| abs_df = femap.get_absolute_dataframe() | ||
| # get the computational sources we want to compare | ||
| sources = abs_df[abs_df["computational"] == True]["source"].unique().tolist() | ||
|
jthorton marked this conversation as resolved.
Outdated
|
||
|
|
||
| if not sources: | ||
| raise ValueError( | ||
| "The FEMap contains no computed absolute values. " | ||
| "Call generate_absolute_values() first or add calculated absolute measurements directly to run nodewise comparisons." | ||
| ) | ||
|
|
||
| # for each row add the node prediction | ||
| for _, row in abs_df.iterrows(): | ||
| node_label = row["label"] | ||
| if not row["computational"]: | ||
| source_label = "exp" | ||
| else: | ||
| source_label = f"{row['source']}_calc" | ||
| predictions_by_key[node_label][source_label] = row["DG (kcal/mol)"] | ||
|
|
||
| elif prediction_type == "edgewise": | ||
| rel_df = femap.get_relative_dataframe() | ||
| sources = rel_df[rel_df["computational"] == True]["source"].unique() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On line 68 you to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes good catch, changed to |
||
| for _, row in rel_df.iterrows(): | ||
| key = (row["labelA"], row["labelB"]) | ||
| if not row["computational"]: | ||
| source_label = "exp" | ||
| else: | ||
| source_label = f"{row['source']}_calc" | ||
| predictions_by_key[key][source_label] = row["DDG (kcal/mol)"] | ||
| else: | ||
| raise ValueError(f"Invalid prediction_type: {prediction_type}") | ||
|
|
||
| predictions_df = pd.DataFrame(predictions_by_key.values()) | ||
|
|
||
| # check that we have experimental values for all edges | ||
| if "exp" not in predictions_df.columns: | ||
| raise ValueError("Experimental values are required to rank the results.") | ||
|
|
||
| # check that we have the same number of values for all sources | ||
| for source in sources: | ||
| # check if any values for this source are missing | ||
| if any(predictions_df[f"{source}_calc"].isna()): | ||
| raise ValueError( | ||
| f"Missing predictions for source {source}, all sources must have the same number of predictions." | ||
| ) | ||
|
|
||
| # set metrics to compute based on best practices if not provided | ||
| if metrics_to_compute is None: | ||
| if prediction_type == "edgewise": | ||
| metrics_to_compute = ["MUE", "RMSE"] | ||
| elif prediction_type == "nodewise": | ||
| metrics_to_compute = ["MUE", "RMSE", "RAE", "R2", "rho", "KTAU", "PI"] | ||
|
|
||
| # we must compute the rank metric however it is possible to miss it | ||
| if rank_metric not in metrics_to_compute: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe not here, but we should think about how best to nudge users away from doing bad things - i.e. maybe we can add a warning that tells folks that it's a bad idea to rank using correlation metrics if you're doing edgewise comparisons.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I agree, hopefully the docs guide them in the right direction but a warning would help! |
||
| metrics_to_compute.append(rank_metric) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] This does an in-place modification of the input kwarg. It's probably safe, but always better to avoid in case someone is using a pre-defined variable somewhere outside of the method call.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed to use a copy. |
||
|
|
||
| # check we can compute all requested metrics | ||
| for metric in metrics_to_compute: | ||
| if metric not in _AVAILABLE_STATS: | ||
| raise ValueError(f"Metric {metric} is not available.") | ||
|
|
||
| metrics_by_source = dict((source, dict((metric, []) for metric in metrics_to_compute)) for source in sources) | ||
|
jthorton marked this conversation as resolved.
Outdated
|
||
| pairwise_metrics = {} | ||
| # compute bootstrap metrics for each model using the same bootstrap samples, joint bootstrap | ||
| for _ in range(num_bootstraps): | ||
| bootstrap_sample = predictions_df.sample(frac=1.0, replace=True) | ||
| for source in sources: | ||
| y_true = bootstrap_sample["exp"].values | ||
| y_pred = bootstrap_sample[f"{source}_calc"].values | ||
| for metric in metrics_to_compute: | ||
| value = _AVAILABLE_STATS[metric](y_true, y_pred) | ||
| metrics_by_source[source][metric].append(value) | ||
|
|
||
| # compute pairwise differences for ranking metric | ||
| for i, source_i in enumerate(sources): | ||
| for j, source_j in enumerate(sources): | ||
| if j <= i: | ||
| continue | ||
| diffs = np.array(metrics_by_source[source_i][rank_metric]) - np.array( | ||
| metrics_by_source[source_j][rank_metric] | ||
| ) | ||
| pairwise_metrics[(source_i, source_j)] = diffs | ||
|
|
||
| # summarize all metrics | ||
| summary_data = [] | ||
| for source in sources: | ||
| row = {"Model": source} | ||
| for metric in metrics_to_compute: | ||
| # compute the sample metric | ||
| x = predictions_df[f"{source}_calc"].values | ||
| y = predictions_df["exp"].values | ||
| sample_value = _AVAILABLE_STATS[metric](y, x) | ||
| bootstrap_values = np.array(metrics_by_source[source][metric]) | ||
| lower = np.percentile(bootstrap_values, (1 - confidence_level) / 2 * 100) | ||
| upper = np.percentile(bootstrap_values, (1 + confidence_level) / 2 * 100) | ||
| row[f"{metric}"] = sample_value | ||
| row[f"{metric}_CI_Lower"] = lower | ||
| row[f"{metric}_CI_Upper"] = upper | ||
| summary_data.append(row) | ||
| summary_df = pd.DataFrame(summary_data) | ||
|
|
||
| # summarize pairwise comparisons with corrected p-values | ||
| comparison_data = [] | ||
| for (source_i, source_j), diffs in pairwise_metrics.items(): | ||
| lower = np.percentile(diffs, (1 - confidence_level) / 2 * 100) | ||
| upper = np.percentile(diffs, (1 + confidence_level) / 2 * 100) | ||
| # calculate the p-value as the fraction of bootstrap samples that cross zero | ||
| # use a 2-tailed test | ||
| # inspired by https://www.nature.com/articles/s42004-025-01428-y | ||
| p_value = 2 * min(np.mean(diffs < 0), np.mean(diffs > 0)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand this correctly, there is a chance that the p value ends up being 0 when you have a clear winner. Is this something that would be misleading to users?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thats right, I did find mention of a method which adds 1 to the numerator and denominator to avoid exact zero answers but I am not confident that this is the best practice. One option is to push users to use the reported metric differences and CI around those to interpret the results when this happens, I could add a warning to the docs about it? |
||
| comparison_data.append( | ||
| { | ||
| "Model 1": source_i, | ||
| "Model 2": source_j, | ||
| f"Diff in {rank_metric}": summary_df[summary_df["Model"] == source_i][rank_metric].values[0] | ||
| - summary_df[summary_df["Model"] == source_j][rank_metric].values[0], | ||
| "CI Lower": lower, | ||
| "CI Upper": upper, | ||
| "p-value": p_value, | ||
| "significant": p_value < alpha, | ||
| } | ||
| ) | ||
| comparison_df = pd.DataFrame(comparison_data) | ||
|
|
||
| # if we have more than 2 models, apply multiple testing correction | ||
| if len(sources) > 2: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] If you don't have > 2, you don't have a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having the output be stable would be great but I don't want people to think a correction was applied when it wasn't. I think this is something we could easily add in future though if people want it? |
||
| from statsmodels.stats.multitest import multipletests | ||
|
IAlibay marked this conversation as resolved.
|
||
|
|
||
| p_values = comparison_df["p-value"].values | ||
| reject, pvals_corrected, _, _ = multipletests(p_values, alpha=alpha, method="holm") | ||
| comparison_df["p-value corrected"] = pvals_corrected | ||
| # add corrected significance | ||
| comparison_df["significant"] = pvals_corrected < alpha | ||
|
|
||
| # rank the models and apply the CLD | ||
| # the order depends on whether lower or higher is better for the rank metric | ||
| if rank_metric in ["MUE", "RMSE", "RAE"]: | ||
| ascending = True # lower is better | ||
| else: | ||
| ascending = False # higher is better | ||
| ordered_labels = summary_df.sort_values(by=rank_metric, ascending=ascending).Model.tolist() | ||
| cld_assignment = _apply_cld(comparison_df, ordered_labels) | ||
| summary_df["CLD"] = summary_df["Model"].map(cld_assignment) | ||
|
|
||
| return summary_df, comparison_df | ||
|
|
||
|
|
||
| def _build_significance_lookup(pairwise_df: pd.DataFrame): | ||
| """ | ||
| Build a lookup dictionary for significance between model pairs. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| pairwise_df : pd.DataFrame | ||
| DataFrame containing pairwise comparison results with a 'significant' column. | ||
|
|
||
| Returns | ||
| ------- | ||
| dict[frozenset, bool] | ||
| A dictionary mapping frozensets of model pairs to their significance (True if significant, False otherwise). | ||
| """ | ||
| sig = {} | ||
| for _, row in pairwise_df.iterrows(): | ||
| m1, m2 = row["Model 1"], row["Model 2"] | ||
| sig[frozenset((m1, m2))] = row.significant | ||
| return sig | ||
|
|
||
|
|
||
| def _apply_cld(comparison_df: pd.DataFrame, ordered_labels: list[str]) -> dict[str, str]: | ||
| """ | ||
| Apply the Compact Letter Display (CLD) system to the pairwise comparison results using the insert-absorb method. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| comparison_df : pd.DataFrame | ||
| DataFrame containing pairwise comparison results with a 'significant' column. | ||
| ordered_labels : list[str] | ||
| List of model labels sorted by metric performance (best to worst). | ||
|
|
||
| Returns | ||
| ------- | ||
| dict[str, str] | ||
| A dictionary mapping each model label to its assigned CLD letters. | ||
| """ | ||
|
|
||
| sig = _build_significance_lookup(comparison_df) | ||
|
|
||
| # Each letter is represented as a set of methods which are not significantly different | ||
| letters: list[set[str]] = [] | ||
|
|
||
| def is_compatible(meth, let): | ||
| """Check if method can join this letter set""" | ||
| for other in let: | ||
| if sig.get(frozenset((meth, other)), False): | ||
| return False | ||
| return True | ||
|
|
||
| def absorb(lets): | ||
| """Remove letters that are strict subsets of others""" | ||
| absorbed = [] | ||
| for i, a in enumerate(lets): | ||
| redundant = False | ||
| for j, b in enumerate(lets): | ||
| if i != j and a < b: # a is a strict subset of b | ||
| redundant = True | ||
| break | ||
| if not redundant: | ||
| absorbed.append(a) | ||
| return absorbed | ||
|
|
||
| # For each method in order, try to insert into existing letters or create new letter | ||
| for method in ordered_labels: | ||
| inserted = False | ||
|
|
||
| # Try inserting into existing letters | ||
| for letter in letters: | ||
| if is_compatible(method, letter): | ||
| letter.add(method) | ||
| inserted = True | ||
|
|
||
| # If no insertion possible, create new letter | ||
| if not inserted: | ||
| letters.append({method}) | ||
|
|
||
| # Absorb step | ||
| letters = absorb(letters) | ||
|
|
||
| # Convert to method → string mapping | ||
| alphabet = string.ascii_lowercase | ||
| if len(letters) > len(alphabet): | ||
| raise ValueError("Too many letters required for CLD") | ||
|
|
||
| result = {m: "" for m in ordered_labels} | ||
| for i, letter in enumerate(letters): | ||
| for m in letter: | ||
| result[m] += alphabet[i] | ||
|
|
||
| return result | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] Some bounds checks on some of these arguments could be useful to avoid users passing bad things and getting odd failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added checks and tests.