Source code for lidar_platform.classification.cc_3dmasc

# -*- coding: utf-8 -*-
"""
Created on Fri Aug  5 18:12:03 2022

@author: Mathilde Letard, Baptiste Feldmann
"""
import os.path
import sys

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import sklearn
from sklearn import metrics

from ..tools import cc, sbf

#  definition of classes names and label values (here, internal conventions of the Rennes LiDAR platform)
classes = {2: 'Ground',
           3: 'Low_veg',
           4: 'Interm_veg',
           5: 'High_veg.',
           6: 'Building',
           9: 'Water',
           11: 'Artificial_ground',
           13: 'Power_Line',
           14: 'Surf_zone',
           15: 'Water_Column',
           16: 'Bathymetry',
           18: 'Sandy_seabed',
           19: 'Rocky_seabed',
           23: 'Bare_ground',
           24: 'Pebble',
           25: 'Rock',
           28: 'Car',
           29: 'Swimming_pools'}


[docs] def load_sbf_features(sbf_filepath, params_filepath, labels=False, coords=False): """ Read an SBF file containing a point cloud with 3DMASC features. Parameters --------- sbf_filepath : str, absolute path to core-points SBF file (containing the features) params_filepath : str, txt parameters file for 3DMASC labels : bool (default=False), to train a model, set it to True (you need to read the labels) coords : bool (default=False), set to True if you want to get the coordinates too Returns -------- data : dict, 'features' : numpy.array of containing the features present in the SBF file 'names' : list of str, name of each feature column 'labels' : list of int, class labels 'coords' : numpy.array of point coordinates """ convention = {"NumberOfReturns": "Number Of Returns", "ReturnNumber": "Return Number"} sbf_data = sbf.read(sbf_filepath) sf_dict = sbf_data.get_name_index_dict() for sfn in sf_dict.keys(): sfn = sfn.replace(' ', '_') root, ext = os.path.splitext(params_filepath) f = open(root + "_feature_sources.txt", "r") features = f.readlines() sf_to_load = [] loaded_sf_names = [] for feature in features[1:]: feature_name = feature.split(sep=':')[1] feature_name = feature_name.split("\n")[0] base = feature_name.split('_')[0] if feature_name in convention.keys(): sf_to_load.append(sf_dict[convention[feature_name]]) loaded_sf_names.append(convention[feature_name]) elif base in convention.keys(): sf_to_load.append(sf_dict[feature_name.replace(base, convention[base])]) loaded_sf_names.append(feature_name.replace(base, convention[base])) elif "kNN" in feature_name: sf_to_load.append(sf_dict['"' + feature_name + '"']) loaded_sf_names.append(feature_name) else: sf_to_load.append(sf_dict[feature_name]) loaded_sf_names.append(feature_name) pc = sbf_data.xyz sf = sbf_data.sf data = {"features": sf[:, sf_to_load], "names": np.array(loaded_sf_names)} if labels: data["labels"] = sf[:, sf_dict['Classification']] if coords: data['coords'] = pc return data
[docs] def feature_clean(features): """ Delete NaN and Inf values in the features set (no normalization, just NaN and Inf values cleaning) Parameters ---------- features : numpy array input features dataset (for ex., the "features" field of a dict obtained with load_sbf_features. Returns ------- dataset : numpy array a dataset containing no more NaN of Inf values. """ for i in range(0, len(features[0, :])): col = features[:, i] if all(np.isnan(col)): newcol = np.array([-9999] * len(col)) else: maxi = max(col[np.isfinite(col)]) newcol = col newcol[np.isinf(col)] = maxi newcol[np.isnan(newcol)] = -9999 features[:, i] = newcol return features
[docs] def train(trads, model=0): """ Train a random forest model for point cloud features classification. This function handles two types of RF models: scikit-learn (parallelized computing), and OpenCV (same as in CloudCompare, but not parallelized). Parameters ---------- trads : dictionary of numpy arrays training features dictionary. model : int (0 or 1) type of model. 0 = scikit-learn random forest, 1 = OpenCV random forest Returns ------- model : sklearn RandomForestClassifier or OpenCV RTrees trained random forest model ready for use. """ if model == 0: classifier = sklearn.ensemble.RandomForestClassifier(n_estimators=150,criterion='gini', max_features="sqrt", max_depth=None, oob_score=True, n_jobs=-1, verbose=0) classifier.fit(feature_clean(trads['features']), trads['labels']) elif model == 1: labels = np.array(trads['labels']).astype('int32') classifier = cv2.ml.RTrees_create() classifier.setMaxDepth(25) classifier.setActiveVarCount(0) classifier.setCalculateVarImportance(True) classifier.setMinSampleCount(1) term_type, n_trees, epsilon = cv2.TERM_CRITERIA_MAX_ITER, 150, sys.float_info.epsilon classifier.setTermCriteria((term_type, n_trees, epsilon)) train_data = cv2.ml.TrainData_create(samples=np.array(trads['features']).astype('float32'), layout=cv2.ml.ROW_SAMPLE, responses=labels) classifier.train(trainData=train_data) else: print('Invalid model type') return return classifier
[docs] def test(testds, classifier, model=0): """ Test the random forest model obtained. The model is tested on the test dataset, and classification metrics are computed. Parameters ---------- testds : dict of numpy arrays test features dictionary. classifier : sklearn RandomForestClassifier or OpenCV RTrees trained random forest model ready for use. model : int (0 or 1) type of the model. 0 = scikit-learn random forest, 1 = OpenCV random forest Returns ------- labels_pred : numpy array labels predicted by the model for each set of features. confid_pred : numpy array prediction confidence for each set of features. feat_imptce : numpy array importance of each feature (as computed in sklearn's RandomForestClassifier). oa : float overall accuracy of the classifier fs : float F1-score of the classifier averaged on all classes """ labels = testds['labels'] if model == 0: feat_imptce = classifier.feature_importances_ labels_pred = classifier.predict(feature_clean(testds['features'])) conf = classifier.predict_proba(feature_clean(testds['features'])) elif model == 1: feat_imptce = classifier.getVarImportance() _ret, responses = classifier.predict(testds['features'].astype('float32')) votes = classifier.getVotes(testds['features'].astype('float32'), 0) conf = votes[1:, :] conf = np.max(conf, axis=-1)/150 labels_pred = responses else: print('Invalid model type') return oa = metrics.accuracy_score(labels, labels_pred) fs = metrics.f1_score(labels, labels_pred, average='macro') return labels_pred, conf, feat_imptce, oa, fs
[docs] def get_acc_expe(trads, testds, plot=True, save=False, model=0): """ Train a random forest model for point cloud features classification and get metrics describing its performances. Parameters ---------- trads : dictionary of numpy arrays training features dictionary. testds : dict of numpy arrays test features dictionary. save : bool defines if plot must be saved. plot : bool defines if plot must be opened. model : int (0 or 1) type of model. 0 = scikit-learn random forest, 1 = OpenCV random forest Returns ------- accuracy : float Overall Accuracy of classifier fscore : float F1-score (averaged on all classes) numpy.mean(confid_pred) : float Mean prediction confidence recall : float Recall (averaged on all classes) precision : float Precision (averaged on all classes) uas : numpy.array(float) User's accuracies (per class) pas : numpy.array(float) Producer's accuracies (per class) fscores : numpy.array(float) F1-score per class confc : numpy.array(float) Mean prediction confidence per class recalls : numpy.array(float) Recall per class precisions : numpy.array(float) Precision per class labels : numpy.array(float) labels feat_imptce : numpy.array(float) feature importance values classifier : sklearn RandomForestClassifier or OpenCV RTrees classifier labels_pred : np.array(int) model predictions """ classifier = train(trads, model) labels_pred, confid_pred, feat_imptce, oa, fs = test(testds, classifier, model) test_labels = testds['labels'] accuracy = metrics.accuracy_score(test_labels, labels_pred) precision = metrics.precision_score(test_labels, labels_pred, average='macro') recall = metrics.recall_score(test_labels, labels_pred, average='macro') fscore = metrics.f1_score(test_labels, labels_pred, average='macro') mat_pa = metrics.confusion_matrix(test_labels, labels_pred, normalize='true') mat_ua = metrics.confusion_matrix(test_labels, labels_pred, normalize='pred') confmat = metrics.confusion_matrix(test_labels, labels_pred, normalize=None) labels = np.unique(test_labels) figure = go.Figure(data=go.Heatmap( z=confmat, x=[classes[l] for l in labels], y=[classes[l] for l in labels], text=confmat, texttemplate="%{text}", textfont={"size": 20}, colorscale=[(0, "rgb(250,250,250)"), (0.25, 'darkseagreen'), (1.0, "seagreen")], hoverongaps=False)) figure.update_layout( title_text="Confusion matrix", font_size=18, ) if save: figure.write_html('confusion_matrix.html', auto_open=False) if plot: figure.write_html('confusion_matrix.html', auto_open=True) uas = [] pas = [] for i in range(mat_pa.shape[0]): pas.append(mat_pa[i, i]) uas.append(mat_ua[i, i]) labels = np.unique(test_labels) confc = [] for l in labels: confc.append(np.mean(confid_pred[np.where((labels_pred == l))[0]])) precisions = metrics.precision_score(test_labels, labels_pred, average=None) recalls = metrics.recall_score(test_labels, labels_pred, average=None) fscores = metrics.f1_score(test_labels, labels_pred, average=None) return accuracy, fscore, np.mean(confid_pred), recall, precision, uas, pas, fscores, confc, recalls, precisions, labels, feat_imptce, classifier, labels_pred
[docs] def plot_feat_imp(feat_imp, trads, save=False, plot=True): """ Plot the random forest feature importance. Parameters ---------- feat_imp : numpy.array() array containing feature importances values. trads : dictionary of numpy arrays training features dictionary. save : bool defines if plot must be saved. plot : bool defines if plot must be opened. """ fig = go.Figure() fig.add_trace( go.Bar(name='Feature Importances', x=trads['names'], y=feat_imp, marker_color='navy'), ) fig.update_layout( title_text="<b>Feature importances<b>" ) fig.update_xaxes(tickangle=45, showgrid=True, type='category', title_text='<b>Feature<b>') fig.update_yaxes(title_text="<b>RF Importance</b>") if plot: fig.write_html( 'feature_importances.html', auto_open=True) if save: fig.write_image('feature_importances.jpg', scale=3)
[docs] def plot_corr_mat(trads, plot=True, save=False): """ Visualize correlation between features as a heatmap. Parameters ---------- trads : dictionary of numpy arrays training features dictionary. save : bool defines if plot must be saved. plot : bool defines if plot must be opened. """ feats_df = pd.DataFrame(trads['features'], columns=trads['names']) corr_mat = feats_df.corr() figure = go.Figure(data=go.Heatmap( z=corr_mat, x=trads['names'], y=trads['names'], text=np.round(corr_mat*100, 0), texttemplate="%{text}", textfont={"size": 20}, colorscale='magma', hoverongaps=False)) figure.update_layout( title_text="Correlation matrix", ) if save: figure.write_html('correlation_matrix.html', auto_open=False) if plot: figure.write_html('correlation_matrix.html', auto_open=True) return corr_mat
[docs] def get_shap_expl(classifier, testds, save=True): """ Get the shap summary plot of a random forest classifier trained on the given dataset (only works with scikit-learn models). Parameters ---------- classifier : scikit-learn RandomForestClassifier trained classifier. testds : dict, 'features' : numpy.array of computed features 'names' : list of str, name of each column feature 'labels' : list of int, class labels training dataset. save : bool whether to save the resulting plot. """ labels = np.unique(testds['labels']) cn = [] for l in labels: cn.append(classes[int(l)]) explainer = shap.TreeExplainer(classifier) fig = plt.figure() fig.set_size_inches(32, 18) shap_values = explainer.shap_values(testds['features'], approximate=True) shap.summary_plot(shap_values, testds['features'], feature_names=testds['names'], class_names=cn, max_display=testds['features'].shape[1], plot_type="bar") if save: plt.savefig('SHAP_explainer.jpg', bbox_inches='tight') return shap_values
[docs] def classif_errors_confidence(pred, true, confid_pred): """ Get statistics about the prediction probability obtained by misclassified points. Args: pred: numpy array array containing the predicted labels true: numpy array array containing the true labels confid_pred: numpy array array containing the prediction probabilities Returns: dict dictionary with mean, median, min, max, and std of prediction probability for misclassified points. """ idx_err = np.where((pred != true))[0] err_confid = confid_pred[idx_err] err_stats = {'Mean_confidence': np.mean(err_confid), 'Median_confidence': np.median(err_confid), 'Min_confidence': np.min(err_confid), 'Max_confidence': np.max(err_confid), 'Std_confidence': np.std(err_confid)} return err_stats
[docs] def apply_confidence_threshold(pred, true, confid_pred, threshold): """ Compute the classification accuracy when discarding points classified with a prediction probability below a given threshold. Args: pred: numpy array array containing predicted labels true: numpy array array containing true labels confid_pred: numpy array array containing the prediction probabilities threshold: float prediction probability threshold to apply Returns: float new classification accuracy """ idx_ok = np.where(confid_pred >= threshold)[0] accuracy = metrics.accuracy_score(true[idx_ok], pred[idx_ok]) return accuracy
[docs] def confidence_filtering_report(pred, true, confid_pred): """ Get a report of the evolution of classification accuracy depending on the prediction probability threshold applied. All points with prediction probabilities below a given threshold are not consided for accuracy computation. Args: pred: numpy array array containing predicted labels true: numpy array array containing true labels confid_pred: numpy array array containing prediction probabilities Returns: dict dictionary containing the accuracy and the percentage of remaining points for the following prediction probability thresholds: 0.5, 0.6, 0.7, 0.8, 0.9, 0.95 """ thresholded_acc = {0.5: '', 0.6: '', 0.7: '', 0.8: '', 0.9: '', 0.95: ''} percent = {0.5: '', 0.6: '', 0.7: '', 0.8: '', 0.9: '', 0.95: ''} for t in thresholded_acc.keys(): idx_ok = np.where(confid_pred >= t)[0] accuracy = metrics.accuracy_score(true[idx_ok], pred[idx_ok]) thresholded_acc[t] = accuracy percent[t] = len(idx_ok) / len(pred) return thresholded_acc, percent