diff --git a/environment.yaml b/environment.yaml index b0b417a..be3bd1d 100644 --- a/environment.yaml +++ b/environment.yaml @@ -23,3 +23,4 @@ dependencies: - scikit-survival #prod - sphinx #dev - tqdm #exp + - mypy #dev diff --git a/pysurv_dist/distance.py b/pysurv_dist/distance.py index 45d954d..c88f762 100644 --- a/pysurv_dist/distance.py +++ b/pysurv_dist/distance.py @@ -1,18 +1,152 @@ -"""Implentation of survival distance score and clinical independence score.""" +"""Implementation of survival distance score and clinical independence score. + +The survival distance score was defined in this paper: + +https://psb.stanford.edu/psb-online/proceedings/psb20/Neums.pdf + +The goal of the survival distance score is to measure the variation in survival +rate across time given a particular feature. + +""" +from typing import List + +import numpy as np +import pandas as pd from numpy.typing import ArrayLike -from sksurv.utils import Surv +from sklearn.linear_model import LinearRegression +from sksurv.util import Surv, check_y_survival + + +def _survival_distance_score(x: ArrayLike, y: Surv) -> float: + score = 0 + event, time = check_y_survival(y) + for t in np.sort(time.unique()): + # get mean / variance of feature + pass + return score -def survival_distance_score(X: ArrayLike, y: Surv) -> dict: +def survival_distance_score(X: pd.DataFrame, y: Surv) -> dict: """Calculate survival distance score.""" - pass + scores = {} + for col in X: + scores[col] = _survival_distance_score(X[col], y) + return scores + + +def _clinical_independence_score(x: pd.DataFrame, y: ArrayLike) -> float: + """Calculate the score for clinical independence. + + Parameters + ---------- + x : ArrayLike + Multi-dimensional array, all of the other attributes we are using to + assess the independence of y + y : ArrayLike + One-dimensional array, the attribute values we're fitting + the linear regression for + + Returns + ------- + score : float + cg = 1 - R^2, R^2 being the coefficient of determination + """ + reg = LinearRegression() + reg.fit(x, y) + score = 1 - reg.score(x, y) + return score + + +def clinical_independence_score(X: ArrayLike) -> ArrayLike: + """Calculate the clinical independence score. + + Train a linear regression model for each feature in X against all other + features to measure how independent a given feature is compared to the + other features provided. + + The clinical independence score for one feature is the following: + 1 - R^2 + + The final result is a 1D array with clinical independence value for each + feature + + If multi-dimensional array X is a numpy array, then we expect the shape of + the array to be [n_rows, n_cols]. If shape is the opposite, the operation + will fail. + + Parameters + ---------- + X: ArrayLike + All feature attributes + y: ArrayLike + Survival values + + Returns + ------- + ArrayLike + Clinical independence score for column in order of the features given + """ + if isinstance(X, pd.DataFrame): + X = X.values + + x_transpose = X.T + scores = [ + _clinical_independence_score( + x=np.delete(x_transpose, i, 0), y=x_transpose[i] + ) + for i in range(x_transpose.shape[0]) + ] + return np.array(scores) + + +def combine_sds_ci_scores( + sds: List[float], ci: List[float], weight: float +) -> List[float]: + """Combine clinical independence and survival distance scores. + + Assume that the clinical independence and survival distance score arrays + are in the same order, i.e. element 0 references the same attribute. + + Parameters + ---------- + sds : List[float] + Survival distance scores + ci : dict + Clinical independence scores + weight : float + How much to weigh the survival distance over the clinical independence + score during combination. + + Returns + ------- + List[float] + List of combined survival distance and clinical independence scores + """ + # combine + scores = [] + return scores + + +class SurvivalDistanceClinicalIndependenceTransformer: + """Apply survival distance scoring and clinical independence scoring.""" + def __init__( + self, clinical_weight: float, corr_threshold: float = 0.6 + ) -> None: + self.clinical_weight = clinical_weight + self.corr_threshold = corr_threshold + self.survival_scores = np.array([]) + self.clinical_scores = np.array([]) + self.combined_scores = np.array([]) -def clinical_indpendence_score(X: ArrayLike, y: Surv, **kwargs) -> dict: - """Calculate the clinical indpendence score.""" - pass + def fit(self, df: ArrayLike) -> ArrayLike: + """Fit transformer to input data.""" + pass + def transform(self, df: ArrayLike) -> ArrayLike: + """Transform data by combining correlated columns.""" + pass -def combine_sds_ci_scores(sds: dict, ci: dict, weight: float): - """Combine clinical independence and survival distance scores.""" - pass + def fit_transform(self, df: ArrayLike) -> ArrayLike: + """Fit and transform input data.""" + pass diff --git a/pysurv_dist/selection.py b/pysurv_dist/selection.py index cbb3731..d34be2d 100644 --- a/pysurv_dist/selection.py +++ b/pysurv_dist/selection.py @@ -9,7 +9,7 @@ from sksurv.utils import Surv from .distance import ( - clinical_indpendence_score, + clinical_independence_score, combine_sds_ci_scores, survival_distance_score, ) @@ -33,7 +33,7 @@ def score_function(X: ArrayLike, y: Surv) -> ArrayLike: # TODO: validate y, make sure that y has boolean and float/int tuple # using sksurv checks sds = survival_distance_score(X, y) - ci = clinical_indpendence_score(X, y) + ci = clinical_independence_score(X, y) combined = combine_sds_ci_scores(sds=sds, ci=ci) return combined diff --git a/tests/test_distance.py b/tests/test_distance.py new file mode 100644 index 0000000..0c8bbb5 --- /dev/null +++ b/tests/test_distance.py @@ -0,0 +1,59 @@ +from unittest import TestCase +from unittest.mock import patch + +import numpy as np +import pandas as pd + +from pysurv_dist import distance + + +class TestClinicalIndependenceScore(TestCase): + def setUp(self) -> None: + self.data = pd.DataFrame( + { + "x1": range(10), + "x2": [20, 22, 23.5, 24, 25, 28, 28, 29, 29.5, 30], + "x3": range(50, 60), + } + ) + self.random_data = pd.DataFrame( + { + "x1": [1, 5, 10, 250, 55, 36, 1, 36, 42, 78], + "x2": [20, 21.1, 21.9, 23, 24, 25, 26, 27, 28, 29], + } + ) + + @patch("pysurv_dist.distance._clinical_independence_score") + def test_happy_path_dataframe(self, mock_cg): + expected_scores = np.array([1, 2, 3]) + mock_cg.side_effect = expected_scores + scores = distance.clinical_independence_score(self.data) + np.testing.assert_array_equal(scores, expected_scores) + + @patch("pysurv_dist.distance._clinical_independence_score") + def test_happy_path_array(self, mock_cg): + expected_scores = np.array([1, 2, 3]) + mock_cg.side_effect = expected_scores + scores = distance.clinical_independence_score(self.data.values) + np.testing.assert_array_equal(scores, expected_scores) + + @patch("pysurv_dist.distance._clinical_independence_score") + def test_array_transposed(self, mock_cg): + expected_scores = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) + mock_cg.side_effect = expected_scores + scores = distance.clinical_independence_score(self.data.values.T) + np.testing.assert_array_equal(scores, expected_scores) + + def test__clinical_independence_score(self): + score = distance._clinical_independence_score( + self.data[["x1"]], self.data["x2"] + ) + expected_score = 0.04324611 + self.assertAlmostEqual(score, expected_score, 8) + + def test__clinical_independence_score_random(self): + score = distance._clinical_independence_score( + self.random_data[["x1"]], self.random_data["x2"] + ) + expected_score = 0.994 + self.assertAlmostEqual(score, expected_score, 3)