Source code for lidar_platform.classification.feature_selection

# -*- coding: utf-8 -*-
"""
Created on Thu Jul 28 10:33:50 2022

@author: Mathilde Letard
"""

import numpy as np
import pandas as pd
import sklearn
from sklearn import feature_selection

from . import cc_3dmasc
from .cc_3dmasc import get_acc_expe, feature_clean


[docs] def get_scales_feats(ds): """ Get the scales and features present in the dataset read by cc_3dmasc.load_sbf_features(). Parameters ---------- ds : dictionary data dictionary containing features, labels, names obtained using load_sbf_features(). Returns ------- numpy array : (ns*nf) x 1 list containing the scale of each descriptor. numpy array : nf x 1 list containing the feature name of each descriptor. numpy array : (ns*nf) x 1 list containing the complete name of each descriptor. """ scales = [] names = [] for i in ds['names']: name = i if 'kNN' not in i: split = i.split(sep='@') if len(split) > 1: scales += [float(split[1])] else: scales += [0] names += [name.split('@')[0]] else: scales += [0] names += [name] return np.array(scales), np.array(names), ds['names']
[docs] def nan_percentage(ds): """ Get the percentage of NaN values for each feature. This can be useful to better understand why a feature at a given scale is not contributing, or to identify relevant minimal scales to use. (reminder: 3DMASC outputs NaN for points for which the feature was impossible to compute - for ex. due to no neighbors in the specified sphere scale). Parameters ---------- ds : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). Returns ------- dictio_ft : dictionary dictionary containing the name of each feature and the associated percentage of NaN. """ scales, names, ds_names = get_scales_feats(ds) features = ds['features'] feats, indices = np.unique(names, return_index=True) sort_ind_arg = np.argsort(indices) sorted_indices = indices[sort_ind_arg] sorted_feats = feats[sort_ind_arg] percent = [] ok = False for f in range(len(sorted_feats)): if not ok: try: feat_data = features[:, sorted_indices[f]:sorted_indices[f+1]] scales_feat = scales[sorted_indices[f]:sorted_indices[f+1]] except IndexError: feat_data = features[:, sorted_indices[f]:] scales_feat = scales[sorted_indices[f]:] if len(scales_feat) > 2: ok = True if ok: for j in range(feat_data.shape[1]): percent.append(len(np.where(np.isnan(feat_data[:, j]))[0])/feat_data.shape[0]) dictio_ft = {'Scales': scales_feat, 'Percentage': percent} return dictio_ft
[docs] def info_score(ds): """ Get the mutual information score of each feature (computed with respect to the labels to predict). This metric is used in the classifier optimization procedure. Parameters ---------- ds : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). Returns ------- dictio_ft : dictionary contains the name of each feature and the associated score value. """ ds_cleaned = feature_clean(ds['features']) mi = sklearn.feature_selection.mutual_info_classif(ds_cleaned, ds['labels']) dictio_ft = {'Features': ds['names'], 'MutualInfo': mi} return dictio_ft
[docs] def inter_ft_corr_filter(features_set, features_score, threshold): """ Prune a set of predictors by keeping only the most informative elements among correlated pairs. First, linear correlation between all provided features at all provided scales is computed. Then, when the correlation between two predictors exceeds a given threshold, only the one with the highest mutual information score is kept. Parameters ---------- features_set : numpy array (n_points x n_predictors) array containing the value of each predictor evaluated for each point (e.g., "features" field of the dictionary obtained with load_sbf_features()). features_score : numpy array (n_predictors x 1) array containing the information score of each predictor (obtained with info_score()). threshold : float accepted value of correlation between predictors. Returns ------- select : list(int) indices of the selected features in the original features_set array (column indices). """ feats_df = pd.DataFrame(features_set) corr_mat = feats_df.corr().to_numpy() half = np.abs(corr_mat) for i in range(corr_mat.shape[0]): half[i, i] = 0 select = np.arange(0, features_set.shape[1], 1) cor = np.where((half >= threshold)) todel = [] if len(cor[0]) > 0: for f in range(len(cor[0])): comp = [cor[0][f], cor[1][f]] todel.append(comp[np.argmin(np.array(features_score)[comp])]) select = np.delete(select, todel, axis=0) return select # indices of selected features
[docs] def filter_corr_with_selected_ft(all_ft, candidate_ft, selected_ft, threshold): """ Check compatibility of features considering their linear correlation with an existing set of features. This function allows to evaluate whether a new predictor can be added to a set of previously selected features and scales without overcoming the maximum accepted inter-feature linear correlation coefficient. Parameters ---------- all_ft : numpy array (n_points x n_predictors) array containing the value of each predictor for each point ("features" fields of the dict obtained with obtained using load_sbf_features()). candidate_ft : list(int) indices of the elements to consider for selection in the all_ft array (column index of the candidates to evaluate). selected_ft : numpy array (n_points x n_selected) array containing the features that are already selected. threshold : float accepted value of correlation between predictors. Returns ------- valides : list(int) list of the indices of the selected features in the original ds['features'] array (obtained with load_sbf_features()). """ feats_df = pd.DataFrame(all_ft[:, candidate_ft]) corr_mat = feats_df.corr().to_numpy() half = np.abs(corr_mat) for i in range(corr_mat.shape[0]): half[i, ] = 0 select = np.arange(0, len(selected_ft), 1) # indices of already selected features cor = np.where((half >= threshold)) # indices of correlated features todel = [] for s in select: todel += cor[1][np.where((cor[0] == s))[0].tolist()].tolist() # features correlated to validated features set todel += cor[0][np.where((cor[1] == s))[0].tolist()].tolist() # features correlated to validated features set invalides = select.tolist() + np.unique(np.array(todel)).tolist() # indices of invalid features valides = np.delete(candidate_ft, invalides) return valides.tolist()
[docs] def get_n_uncorr_ft(ft_all, ft_select, ft_score, nf, threshold): """ Iteratively complete an unfilled set of uncorrelated features. This function iteratively looks for additional features to select to reach nf uncorrelated features. Parameters ---------- ft_all : numpy array (n_points x n_predictors) array containing the value of each predictor for each point ("features" field of the dict obtained with load_sbf_features()). ft_select : numpy array (n_selected) index in ft_all of each predictor that already passed the selection. ft_score : numpy array (n_predictors x 1) array containing the information score of each predictor. nf : int number of different features to select. threshold : float accepted value of correlation between predictors. Returns ------- ft_select : list(int) list of the indices of the selected features in the original ds['features'] array (obtained with load_sbf_features()). """ n_select = len(ft_select) argsort_sc = np.argsort(ft_score) possib = argsort_sc[-1 * nf:] nb_tested = nf while n_select < nf and possib.shape[0] > 0: possib = argsort_sc[:-1 * nb_tested] rem = nf - n_select test = ft_select + possib[-1 * rem:].tolist() # indices of selected features and best features compatibles = filter_corr_with_selected_ft(ft_all, test, ft_select, threshold) # all selected features valides_id = inter_ft_corr_filter(ft_all[:, compatibles], ft_score[compatibles], threshold) valides = np.array(compatibles)[valides_id] ft_select += valides.tolist() n_select = len(ft_select) nb_tested += rem return ft_select
[docs] def n_best_uncorr_ft(ds, nf, corr_threshold): """ Select nf uncorrelated features depending on their mutual information score and linear correlation. Parameters ---------- ds : dictionary data dictionary containing features, labels, names obtained with cc_3dmasc.load_sbf_features(). nf : int number of different features to select. corr_threshold : float accepted value of correlation between predictors. Returns ------- select : list(int) list of the indices of the selected features in the original ds['features'] array (obtained with cc_3dmasc.load_sbf_features()). """ ft_score = info_score(ds)['MutualInfo'] possib = np.argsort(ft_score)[-1 * nf:] select_id = inter_ft_corr_filter(ds['features'][:, possib], ft_score[possib], corr_threshold).tolist() select = np.array(possib)[select_id] if len(select.tolist()) != nf: select = get_n_uncorr_ft(ds['features'], select.tolist(), ft_score, nf, corr_threshold) return select
[docs] def n_best_uncorr_sc(ds, n_scales, corr_threshold): """ Select ns uncorrelated scales depending on their mutual information score, linear correlation, and a voting process. For each investigated features, all available scales are investigated and pruned depending on their correlations. Then the ns most frequently retained scales among all features are kept as the final set of scales. Parameters ---------- ds : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). n_scales : int number of different scales to select. corr_threshold : float accepted value of correlation between predictors. Returns ------- optim_ok : list(float) list of selected scales freq_optim : list(int) number of votes obtained by each selected scale. """ scales, names, ds_names = get_scales_feats(ds) best_scales = [] for f in np.unique(names): f_id = np.where((names == f))[0] search_ds = {'features': ds['features'][:, f_id], 'labels': ds['labels'], 'names': ds_names[f_id]} result = n_best_uncorr_ft(search_ds, n_scales, corr_threshold) best_scales += scales[result].tolist() sc, freq = np.unique(best_scales, return_counts=True) frequencysorted = freq[np.argsort(freq)] uniq_freq = np.sort(np.unique(frequencysorted)) optim = [] freq_optim = [] for uf in uniq_freq: argfreq = np.where((freq == uf))[0] argok = argfreq[np.argsort(sc[argfreq])] argok = argok[::-1] optim += sc[argok].tolist() freq_optim += (np.ones((1, len(argfreq))) * uf).flatten().tolist() optim_ok = [0] + optim[-1 * n_scales:] return optim_ok, freq_optim[-1 * n_scales:]
[docs] def rf_ft_selection(trads, testds, n_scales, n_features, eval_sc, threshold=0.85, step=1): """ Perform iterative feature selection using the random forest embedded feature importance as criteria. First, n-scales and n-features are selected based on their linear correlations and mutual information. Then, this set is iteratively reduced by discarding the feature having the lowest random forest feature importance. At each step, the model is trained again to update the feature importance ranking. Parameters ---------- trads : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). testds : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). n_scales : int number of different scales to select at the begining of the process. n_features : int number of different features to select at the begining of the process. eval_sc : float scale at which to evaluate each feature's information score at the begining of the process. threshold : float accepted value of correlation between predictors. step : int Returns ------- dictio_ft : dictionary contains the resulting predictors set and associated parameters and metrics at each iteration. """ trads['features'] = feature_clean(trads['features']) testds['features'] = feature_clean(testds['features']) scales, names, ds_names = get_scales_feats(trads) dictio = {'Complexity': [], 'Feats': [], 'Scales': [], 'Indices': [], 'Freq': [], 'OA': [], 'Fscore': [], 'Confidence': [], 'Recall': [], 'Precision': [], 'Class_UA': [], 'Class_PA': [], 'Class_Fscore': [], 'Class_confidence': [], 'Class_recall': [], 'Class_precision': [], 'Labels': np.unique(trads['labels'])} search_set = np.array(np.where(scales == eval_sc)[0].tolist() + np.where(scales == 0)[0].tolist()) search_ft_ds = {'features': trads['features'][:, search_set], 'labels': trads['labels'], 'names': names[search_set]} sel = n_best_uncorr_ft(search_ft_ds, n_features, threshold) scale_search_set = [] for sn in search_ft_ds['names'][sel]: scale_search_set += np.where((sn == names))[0].tolist() # indices of selected features at all scales search_sc_ds = {'features': trads['features'][:, scale_search_set], 'labels': trads['labels'], 'names': ds_names[scale_search_set]} sel_sc, freq_sel_sc = n_best_uncorr_sc(search_sc_ds, n_scales, threshold) id_es = [] for es in sel_sc: id_es += np.where(scales[scale_search_set] == es)[0].tolist() scales_selected = np.array(scale_search_set)[id_es] # indices of selected features at selected scales idx_used = [] for ss in sel_sc: idx_used += scales_selected[np.where(scales[scales_selected] == float(ss))[0]].tolist() reduced_tra = {'features': trads['features'][:, idx_used], 'labels': trads['labels']} reduced_test = {'features': testds['features'][:, idx_used], 'labels': testds['labels']} accuracy, fscore, confid, recall, precision, uas, pas, fscores, confc, recalls, precisions, labels, feat_imp, classifier, lab_pred = get_acc_expe(reduced_tra, reduced_test, plot=False) print(search_ft_ds['names']) print(sel) dictio['Feats'].append(search_ft_ds['names'][sel]) dictio['Scales'].append(sel_sc) trads['features'] = feature_clean(trads['features']) testds['features'] = feature_clean(testds['features']) argimp = np.argsort(feat_imp.flatten()) id_sort = np.array(idx_used)[argimp] # indices of selected predictors ranked by importance for i in range(0, argimp.shape[0]-step, step): id_select = id_sort[step:] # predictors indices reduced_tr = {'features': trads['features'][:, id_select], 'labels': trads['labels']} reduced_te = {'features': testds['features'][:, id_select], 'labels': testds['labels']} accuracy, fscore, confid, recall, precision, uas, pas, fscores, confc, recalls, precisions, labels, feat_imp, classifier, lab_pred = get_acc_expe(reduced_tr, reduced_te, plot=False) dictio['Complexity'].append(len(id_select)) dictio['Indices'].append(id_select) dictio['Feats'].append(names[id_select]) dictio['Scales'].append(scales[id_select]) dictio['OA'].append(accuracy) dictio['Fscore'].append(fscore) dictio['Confidence'].append(confid) dictio['Recall'].append(recall) dictio['Precision'].append(precision) dictio['Class_UA'].append(uas) dictio['Class_PA'].append(pas) dictio['Class_Fscore'].append(fscores) dictio['Class_confidence'].append(confc) dictio['Class_recall'].append(recalls) dictio['Class_precision'].append(precisions) argimp = np.argsort(feat_imp.flatten()) id_sort = np.array(id_select)[argimp] return dictio
[docs] def get_n_optimal_sc_ft(train_ds, test_ds, n_scales, n_features, eval_sc, threshold): """ Get the best n_features and n_scales for classification based on inter-feature correlation and information score. Parameters ---------- train_ds : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). test_ds : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). n_scales : int number of different scales to select. n_features : int number of different features to select. eval_sc : float scale at which to evaluate each feature's information score. threshold : float accepted value of correlation between predictors. Returns ------- dictio_ft : dictionary contains the resulting predictors set and associated parameters and metrics. - 'Feats': feature list - 'Scales': scale list - 'feat_imp': feature importance values - 'Indices': indices of the selected features in the initial array of features - 'Freq': number of votes obtained by each selected scale - 'OA': Overall Accuracy of classifier - 'Fscore': F1-score (averaged on all classes) - 'Confidence': Confidence (averaged on all classes) - 'Recall': Recall (averaged on all classes) - 'Precision': Precision (averaged on all classes) - 'Class_UA': User's accuracies (per class) - 'Class_PA': Producer's accuracies (per class) - 'Class_Fscore': F1-score per class - 'Class_confidence': class confidence - 'Class_recall': Recall per class - 'Class_precision': Precision per class - 'Labels': labels """ scales, names, ds_names = get_scales_feats(train_ds) search_set = np.array(np.where(scales == eval_sc)[0].tolist() + np.where(scales == 0)[0].tolist()) search_ft_ds = {'features': train_ds['features'][:, search_set], 'labels': train_ds['labels'], 'names': names[search_set]} sel = n_best_uncorr_ft(search_ft_ds, n_features, threshold) scale_search_set = [] for sn in search_ft_ds['names'][sel]: scale_search_set += np.where((sn == names))[0].tolist() search_sc_ds = {'features': train_ds['features'][:, scale_search_set], 'labels': train_ds['labels'], 'names': ds_names[scale_search_set]} sel_sc, freq_sel_sc = n_best_uncorr_sc(search_sc_ds, n_scales, threshold) id_es = [] for es in sel_sc: print(es) id_es += np.where(scales[scale_search_set] == es)[0].tolist() scales_selected = np.array(scale_search_set)[id_es] idx_used = [] for ss in sel_sc: idx_used += scales_selected[np.where(scales[scales_selected] == float(ss))[0]].tolist() reduced_tra = {'features': train_ds['features'][:, idx_used], 'labels': train_ds['labels']} reduced_test = {'features': test_ds['features'][:, idx_used], 'labels': test_ds['labels']} accuracy, fscore, confid, recall, precision, uas, pas, fscores, confc, recalls, precisions, labels, feat_imp, classifier, labels_pred = get_acc_expe(reduced_tra, reduced_test, plot=False) dictio = {'Feats': ds_names[idx_used], 'Scales': np.array(scales[idx_used]), 'feat_imp': feat_imp, 'Indices': np.array(idx_used), 'Freq': np.array(freq_sel_sc), 'OA': accuracy, 'Fscore': fscore, 'Confidence': confid, 'Recall': recall, 'Precision': precision, 'Class_UA': np.array(uas), 'Class_PA': np.array(pas), 'Class_Fscore': np.array(fscores), 'Class_confidence': np.array(confc), 'Class_recall': np.array(recalls), 'Class_precision': np.array(precisions), 'Labels': np.array(labels)} return dictio
[docs] def get_best_rf_select_iter(dictio_rf_select, trads, testds, wait, threshold): """ Get an optimized set of features and scales by analyzing the variations of OA or oob-score when performing random forest feature importance-based iterative selection. Parameters ---------- dictio_rf_select : dictionary obtained when performing rf_ft_selection. trads : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). testds : dictionary data dictionary containing features, labels, names obtained with load_sbf_features(). wait : int number of iterations to take into account for monitoring. threshold : float accepted value of OA variance within wait period. Returns ------- dictio_results : dictionary contains the resulting predictors set and associated parameters and metrics. - 'Best_it': best iteration - 'Features': optimized set of features - 'Scales': scales related to the optimized features - 'Feat_names': feature names (maybe redundant with 'Features') - 'Feat_imp_mean': mean of feature importance - 'Scales_name': scale names - 'Scales_freq': scale frequency (per scale) - 'Scales_imp': scale importance (per scale) - 'OA': Overall Accuracy, - 'Fscore': F1-score - 'Confid': confidence - 'Recall': recall, - 'Precision': precision, - 'UAs': User's accuracies (per class) - 'PAs': Producer's accuracies (per class) - 'Class_fscores': F1-score per class - 'Class_conf': confidence per class - 'labels': labels classifier : sklearn.ensemble.RandomForestClassifier theoretically optimal classifier (trained only with the selected features/scales). """ # data = np.load(dictio_rf_select, allow_pickle=True).flat[0] data = dictio_rf_select oa = data['OA'] feats = data['Feats'][1:] scales = data['Scales'][1:] indexes = data['Indices'] bi = -1 best_oa = oa[0] near = False for i in range(wait, len(oa)-wait, 1): var = max(best_oa, np.max(oa[i-wait:i])) - min(best_oa, np.min(oa[i-wait:i])) if np.abs(var) <= 0.01: if near: if np.abs(np.max(oa[i - wait:i]) - best_oa) < threshold: best_oa = np.max(oa[i - wait:i]) bi = i - wait + np.argmax(oa[i - wait:i]) else: best_oa = np.max(oa[i - wait:i]) bi = i - wait + np.argmax(oa[i - wait:i]) else: near = True diffoa = np.array(oa[i - wait:i]) - best_oa last_best = np.where(np.abs(diffoa) < threshold)[0] if len(last_best) > 0: best_oa = oa[i - wait + last_best[-1]] bi = i - wait + last_best[-1] final_feats = feats[bi] final_scales = scales[bi] final_idx = indexes[bi] reduced_tr = {'features': trads['features'][:, final_idx], 'labels': trads['labels']} reduced_te = {'features': testds['features'][:, final_idx], 'labels': testds['labels']} accuracy, fscore, confid, recall, precision, uas, pas, fscores, confc, recalls, precisions, labels, feat_imp, classifier, labels_pred = get_acc_expe(reduced_tr, reduced_te) features = np.unique(final_feats) feature_imptces = [] for f in np.unique(final_feats): where = np.where(final_feats == f)[0] mean_imp = np.mean(feat_imp[where]) feature_imptces.append(mean_imp) scales_imptces = [] for s in np.unique(final_scales): where = np.where(final_scales == s)[0] mean_imp = np.mean(feat_imp[where]) scales_imptces.append(mean_imp) echelles, freq_echelles = np.unique(final_scales, return_counts=True) fnames = [] for k in range(len(final_feats)): fname = str(final_feats[k])+str(final_scales[k]) fnames.append(fname) labels = np.unique((reduced_te['labels'])) cn = [] for l in labels: cn.append(cc_3dmasc.classes[int(l)]) for i in range(len(labels)): print(labels[i], cn[i], pas[i]) dictio_results = {'Best_it': len(oa) - bi, 'Features': np.array(final_feats), 'Scales': np.array(final_scales), 'Feat_names': np.array(features), 'Feat_imp_mean': np.array(feature_imptces), 'Scales_name': np.array(echelles), 'Scales_freq': np.array(freq_echelles), 'Scales_imp': scales_imptces, 'OA': accuracy, 'Fscore': fscore, 'Confid': confid, 'Recall': recall, 'Precision': precision, 'UAs': np.array(uas), 'PAs': np.array(pas), 'Class_fscores': np.array(fscores), 'Class_conf': np.array(confc), 'labels': labels} return dictio_results, classifier