diff --git a/examples/knn_interpretability.py b/examples/knn_interpretability.py new file mode 100644 index 00000000..a75a1b97 --- /dev/null +++ b/examples/knn_interpretability.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +"""Example of using kNN for outlier detection with interpretability +Sample wise interpretation is provided here. +""" +# Author: Alaa Abdelwahab +# License: BSD 2 clause + +from __future__ import division +from __future__ import print_function + +import os +import sys + +# temporary solution for relative imports in case pyod is not installed +# if pyod is installed, no need to use the following line +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname("__file__"), '..'))) + +from scipy.io import loadmat +from sklearn.model_selection import train_test_split + +from pyod.models.knn import KNN +from pyod.utils.utility import standardizer + +if __name__ == "__main__": + # Define data file and read X and y + mat_file = 'cardio.mat' + + mat = loadmat(os.path.join('data', mat_file)) + X = mat['X'] + y = mat['y'].ravel() + + X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, + random_state=1) + + # standardizing data for processing + X_train_norm, X_test_norm = standardizer(X_train, X_test) + + # train kNN detector + clf_name = 'KNN' + clf = KNN(n_neighbors=10, method='mean') + clf.fit(X_train) + + # get the prediction labels and outlier scores of the training data + y_train_pred = clf.labels_ # binary labels (0: inliers, 1: outliers) + y_train_scores = clf.decision_scores_ # raw outlier scores + + print('Training data has %d samples' % X_train.shape[0]) + print('Training data has %d features' % X_train.shape[1]) + + # Explain the first sample + print('\nExplaining sample 0:') + print('True label:', 'Outlier' if y_train[0] == 1 else 'Inlier') + print('Predicted label:', 'Outlier' if y_train_pred[0] == 1 else 'Inlier') + print('Outlier score: %.4f' % y_train_scores[0]) + print('\nGenerating dimensional outlier explanation...') + + clf.explain_outlier(0) + + # The horizontal bar chart shows per-feature average distances to k-neighbors. + # Features with bars exceeding the cutoff lines (dashed/dash-dot vertical lines) + # are the primary contributors to the sample being classified as an outlier. + + # Example with custom cutoffs + print('\n' + '='*60) + print('Example with custom cutoff bands (80th and 95th percentiles):') + print('='*60) + + clf.explain_outlier(0, cutoffs=[0.80, 0.95]) + diff --git a/pyod/models/knn.py b/pyod/models/knn.py index 2c0e7d4e..c028756a 100644 --- a/pyod/models/knn.py +++ b/pyod/models/knn.py @@ -7,6 +7,7 @@ from warnings import warn +import matplotlib.pyplot as plt import numpy as np from sklearn.neighbors import BallTree from sklearn.neighbors import NearestNeighbors @@ -167,6 +168,8 @@ def __init__(self, contamination=0.1, n_neighbors=5, method='largest', metric_params=self.metric_params, n_jobs=self.n_jobs, **kwargs) + # Cache for dimensional scores to avoid recomputation + self._cached_dimensional_scores = {} # {columns_tuple: scores_array} def fit(self, X, y=None): """Fit detector. y is ignored in unsupervised methods. @@ -189,6 +192,12 @@ def fit(self, X, y=None): X = check_array(X) self._set_n_classes(y) + # Store training data for explainability + self.X_train_ = X + + # Clear cache when fitting new data + self._cached_dimensional_scores = {} + self.neigh_.fit(X) # In certain cases, _tree does not exist for NearestNeighbors @@ -275,3 +284,207 @@ def _get_dist_by_method(self, dist_arr): return np.mean(dist_arr, axis=1) elif self.method == 'median': return np.median(dist_arr, axis=1) + + def get_outlier_explainability_scores(self, ind, columns=None): + """Compute per-feature outlier explainability scores. + + Calculates the average absolute distance to k-nearest neighbors + for each feature dimension. + + Parameters + ---------- + ind : int + Index of the sample in training data. + + columns : list, optional (default=None) + Specific feature indices. If None, use all features. + + Returns + ------- + scores : numpy array of shape (n_features,) + Average absolute distance to k-neighbors per dimension. + """ + check_is_fitted(self, ['X_train_', 'tree_']) + + sample = self.X_train_[ind:ind+1, :] + _, neighbor_indices = self.tree_.query(sample, k=self.n_neighbors) + neighbors = self.X_train_[neighbor_indices[0], :] + + if columns is None: + dim_distances = np.abs(neighbors - self.X_train_[ind, :]) + else: + dim_distances = np.abs(neighbors[:, columns] - self.X_train_[ind, columns]) + + return np.mean(dim_distances, axis=0) + + + def explain_outlier(self, ind, columns=None, cutoffs=None, + feature_names=None, file_name=None, + file_type=None, max_features_per_plot=20, + compute_cutoffs=True): # pragma: no cover + """Plot dimensional outlier graph for a given data point. + + Parameters + ---------- + ind : int + The index of the data point to explain. + + columns : list, optional + Specify a list of features/dimensions for plotting. If not + specified, use all features. + + cutoffs : list of floats in (0., 1), optional (default=[0.95, 0.99]) + The significance cutoff bands of the dimensional outlier graph. + + feature_names : list of strings, optional + The display names of all columns of the dataset, + to show on the y-axis of the plot. + + file_name : string, optional + The name to save the figure. + + file_type : string, optional + The file type to save the figure. + + max_features_per_plot : int, optional (default=20) + Maximum number of features per plot. Splits into multiple plots if exceeded. + + compute_cutoffs : bool, optional (default=True) + If True, computes dimensional scores for all samples to generate cutoff bands. + If False, only computes dimensional score for the target sample (much faster). + When True, results are cached for subsequent calls. + + Returns + ------- + scores : numpy array + The per-feature outlier scores for the specified sample. + """ + check_is_fitted(self, ['X_train_', 'tree_', 'labels_']) + + if columns is None: + columns = list(range(self.X_train_.shape[1])) + + cutoffs = [1 - self.contamination, 0.99] if cutoffs is None else cutoffs + + # Compute dimensional scores for target sample + dim_scores = self.get_outlier_explainability_scores(ind, columns) + + # Compute cutoff bands if requested + if compute_cutoffs: + cache_key = tuple(columns) + if cache_key in self._cached_dimensional_scores: + all_scores = self._cached_dimensional_scores[cache_key] + else: + all_scores = np.zeros((self.X_train_.shape[0], len(columns))) + for i in range(self.X_train_.shape[0]): + all_scores[i, :] = self.get_outlier_explainability_scores(i, columns) + self._cached_dimensional_scores[cache_key] = all_scores + + cutoff_bands = {c: np.quantile(all_scores, q=c, axis=0) for c in cutoffs} + else: + cutoff_bands = None + + # Set feature names + if feature_names is None: + feature_names = [f'Feature {i}' for i in columns] + + # Split into chunks if needed + n_features = len(columns) + chunks = [] + for start in range(0, n_features, max_features_per_plot): + end = min(start + max_features_per_plot, n_features) + chunk_cutoffs = None + if cutoff_bands: + chunk_cutoffs = {c: cutoff_bands[c][start:end] for c in cutoffs} + chunks.append((columns[start:end], dim_scores[start:end], + feature_names[start:end], chunk_cutoffs)) + + # Plot each chunk + for chunk_idx, (chunk_cols, chunk_scores, chunk_names, chunk_cutoffs) in enumerate(chunks): + self._plot_explanation_chunk(ind, chunk_cols, chunk_scores, chunk_names, + chunk_cutoffs, chunk_idx, len(chunks), + file_name, file_type) + + return dim_scores + + + def _plot_explanation_chunk(self, ind, columns, scores, names, cutoff_bands, + idx, total, file_name, file_type): + """Helper to plot one feature chunk.""" + n = len(columns) + fig, ax = plt.subplots(figsize=(10, max(6, n * 0.4))) + y_pos = np.arange(n) + + # Determine colors based on cutoffs + if cutoff_bands: + cutoffs_sorted = sorted(cutoff_bands.keys()) + colors = [] + for i, score in enumerate(scores): + if len(cutoffs_sorted) >= 2: + if score >= cutoff_bands[cutoffs_sorted[-1]][i]: + colors.append('#d62728') # Red + elif score >= cutoff_bands[cutoffs_sorted[0]][i]: + colors.append('#ff7f0e') # Orange + else: + colors.append('#1f77b4') # Blue + else: + colors.append('#d62728' if score >= cutoff_bands[cutoffs_sorted[0]][i] else '#1f77b4') + else: + colors = ['#000000'] * n # Black when no cutoffs + + # Plot bars + bars = ax.barh(y_pos, scores, color=colors, alpha=0.7, + edgecolor='black', linewidth=0.5) + + # Add cutoff lines + if cutoff_bands: + cutoffs_sorted = sorted(cutoff_bands.keys()) + styles = ['--', '-.', ':'] + line_colors = ['#ff7f0e', '#d62728', '#8c564b'] + for cutoff, style, color in zip(cutoffs_sorted, styles, line_colors): + for i, val in enumerate(cutoff_bands[cutoff]): + ax.plot([val, val], [i-0.4, i+0.4], style, color=color, linewidth=2, + label=f'{cutoff:.2f} Cutoff' if i == 0 else "") + + # Formatting + ax.set_yticks(y_pos) + ax.set_yticklabels(names) + ax.set_xlabel('Average Distance to k-Neighbors', fontsize=12) + ax.set_ylabel('Features', fontsize=12) + + label = 'Outlier' if self.labels_[ind] == 1 else 'Inlier' + overall_knn_score = self.decision_scores_[ind] + + if total > 1: + title = (f'Outlier score breakdown for sample #{ind+1} ({label})\n' + f'k={self.n_neighbors} | Overall KNN={overall_knn_score:.3f} | ' + f'Features {columns[0]+1}-{columns[-1]+1} (Part {idx+1}/{total})') + else: + title = (f'Outlier score breakdown for sample #{ind+1} ({label})\n' + f'k={self.n_neighbors}, method={self.method} | Overall KNN={overall_knn_score:.3f}') + ax.set_title(title, fontsize=14, fontweight='bold') + + # Value labels + for bar, score in zip(bars, scores): + ax.text(bar.get_width(), bar.get_y() + bar.get_height()/2, + f' {score:.3f}', ha='left', va='center', fontsize=9) + + # Legend + if cutoff_bands: + handles, labels = ax.get_legend_handles_labels() + by_label = dict(zip(labels, handles)) + if by_label: + ax.legend(by_label.values(), by_label.keys(), loc='best', framealpha=0.9) + + ax.grid(axis='x', alpha=0.3, linestyle=':') + plt.tight_layout() + + # Save file + if file_name: + name = f'{file_name}_part{idx+1}of{total}' if total > 1 else file_name + if file_type: + plt.savefig(f'{name}.{file_type}', dpi=300, bbox_inches='tight') + else: + plt.savefig(f'{name}.png', dpi=300, bbox_inches='tight') + + plt.show() \ No newline at end of file diff --git a/pyod/test/test_knn.py b/pyod/test/test_knn.py index 1f23b523..a32060f5 100644 --- a/pyod/test/test_knn.py +++ b/pyod/test/test_knn.py @@ -135,6 +135,49 @@ def test_predict_rank_normalized(self): assert_array_less(pred_ranks, 1.01) assert_array_less(-0.1, pred_ranks) + def test_get_outlier_explainability_scores(self): + """Test get_outlier_explainability_scores() method. + + Validates that the method returns correct dimensional scores + for known outlier and inlier samples. + """ + # Create a simple 2D dataset where outliers are obvious + # Point [10, 0] is an outlier in Dimension 0 (X), but normal in Dimension 1 (Y) + X_train = np.array([[0, 0], [1, 0], [0, 1], [1, 1], [10, 0]]) + + # Fit model + clf = KNN(n_neighbors=3, contamination=0.2) + clf.fit(X_train) + + # Test explaining the outlier (index 4: [10, 0]) + scores = clf.get_outlier_explainability_scores(ind=4) + + # Check shape: should have 1 score per feature + assert_equal(scores.shape[0], X_train.shape[1]) + + # Check that scores are non-negative (distances cannot be negative) + assert_array_less(-1e-10, scores) + + # For the outlier [10, 0], dimension 0 should have much higher score than dimension 1 + # because it's far from neighbors in X but close in Y + assert (scores[0] > scores[1]), \ + "Outlier in dimension 0 should have higher score than dimension 1" + + # Test explaining an inlier (index 0: [0, 0]) + scores_inlier = clf.get_outlier_explainability_scores(ind=0) + assert_equal(scores_inlier.shape[0], X_train.shape[1]) + assert_array_less(-1e-10, scores_inlier) + + # Test with specific columns parameter + scores_cols = clf.get_outlier_explainability_scores(ind=4, columns=[0]) + assert_equal(scores_cols.shape[0], 1) + assert_array_less(-1e-10, scores_cols) + + # Test with multiple columns + scores_multi = clf.get_outlier_explainability_scores(ind=4, columns=[0, 1]) + assert_equal(scores_multi.shape[0], 2) + assert_array_less(-1e-10, scores_multi) + def test_model_clone(self): clone_clf = clone(self.clf)