Source code for lidar_platform.tools.sbf.sbf

import configparser
import logging
import os
import struct

import numpy as np

logger = logging.getLogger(__name__)


def shift_array(array, shift, config=None, debug=False):
    newArray = array.astype(np.float64)
    # apply the shift read in the SBF file
    newArray += np.array(shift).reshape(1, -1)
    # apply GlobalShift if any
    if config is not None:
        try:
            globalShift = eval(config['SBF']['GlobalShift'])
            logger.debug(f'use GlobalShift {globalShift}')
            newArray += np.array(globalShift).reshape(1, -1)
        except:
            pass
    return newArray


def read_sbf_header(sbf, verbose=False):
    config = configparser.ConfigParser()
    config.optionxform = str
    with open(sbf) as f:
        config.read_file(f)
        if 'SBF' not in config:
            print('sbf badly formatted, no [SBF] section')
        else:
            return config


def write_sbf(sbf, xyz,
              sf=None, config=None, add_index=False, normals=None, global_shift=None):

    path_to_sbf = sbf
    path_to_sbf_data = sbf + '.data'
    n_points = xyz.shape[0]
    if sf is not None:
        sf = sf.reshape(n_points, -1)
        SFCount = sf.shape[1]
    else:
        SFCount = 0

    if global_shift is not None:
        global_shift_str = f'{global_shift[0]}, {global_shift[1]}, {global_shift[2]}'

    # write .sbf
    Points = xyz.shape[0]
    if config is None:  # if there is no config, build one
        dict_SF = {f'SF{k + 1}': f'{k + 1}' for k in range(SFCount)}
        config = configparser.ConfigParser()
        config.optionxform = str
        config['SBF'] = {'Points': str(Points),
                         'SFCount': str(SFCount),
                         **dict_SF}
        if global_shift is not None:
            config['SBF']['GlobalShift'] = global_shift_str
    else:
        config['SBF']['Points'] = str(Points)  # enforce the coherence of the number of points
        config['SBF']['SFCount'] = str(SFCount)
        if global_shift is not None:
            if 'GlobalShift' in config['SBF']:
                config_global_shift = config['SBF']['GlobalShift']
                print(f'[write_sbf] warning: global_shift parameter {global_shift}')
                print(f'            will overwrite the config one {config_global_shift}')
                config['SBF']['GlobalShift'] = global_shift_str

    if add_index is True:
        if 'SFCount' in config['SBF']:
            SFCount += 1
        else:
            SFCount = 1
        config['SBF']['SFcount'] = str(SFCount)
        config['SBF'][f'SF{SFCount}'] = 'index'

    if normals is not None:
        if 'SFCount' in config['SBF']:
            SFCount += 3
        else:
            SFCount = 3
        config['SBF']['SFcount'] = str(SFCount)
        config['SBF'][f'SF{SFCount + 1}'] = 'Nx'
        config['SBF'][f'SF{SFCount + 2}'] = 'Ny'
        config['SBF'][f'SF{SFCount + 3}'] = 'Nz'

    # write .sbf configuration file
    with open(path_to_sbf, 'w') as sbf:
        config.write(sbf)

    # remove GlobalShift
    if 'GlobalShift' in config['SBF']:
        globalShift = eval(config['SBF']['GlobalShift'])
        xyz_orig = xyz - np.array(globalShift).reshape(1, -1)
    else:
        xyz_orig = xyz
    # compute sbf internal shift
    shift = np.mean(xyz_orig, axis=0).astype(float)
    # build the array that will effectively be stored (32 bits float)
    a = np.zeros((Points, SFCount + 3)).astype('>f4')  # big-endian float32
    # set xyz
    a[:, :3] = (xyz_orig - shift).astype('>f4')  # big-endian float32

    # subtract shifts if any configured
    if SFCount != 0:
        shifted_sf = sf.copy()
        for k in range(SFCount):
            i_sf = k + 1
            for item in config['SBF'][f'SF{i_sf}'].split(','):
                if 's=' in item:  # apply offset if any
                    sf_shift = float(item.replace('"', '').split('s=')[1])
                    if verbose:
                        print(f'[read_sbf] subtract shift from scalar field SF{i_sf} for storage: {sf_shift}')
                    shifted_sf[:, k] -= sf_shift
        # set scalar fields
        a[:, 3:] = shifted_sf.astype('>f4')

    if add_index is True:
        b = np.zeros((Points, SFCount + 1)).astype('>f4')
        b[:, :-1] = a
        b[:, -1] = np.arange(Points).astype('>f4')
        a = b

    # write .sbf.data
    with open(path_to_sbf_data, 'wb') as sbf_data:
        # 0-1 SBF header flag
        flag = bytearray([42, 42])
        sbf_data.write(flag)
        # 2-9 Point count (Np)
        sbf_data.write(struct.pack('>Q', Points))
        # 10-11 ScalarField count (Ns)
        sbf_data.write(struct.pack('>H', SFCount))
        # 12-19 X coordinate shift
        sbf_data.write(struct.pack('>d', shift[0]))
        # 20-27 Y coordinate shift
        sbf_data.write(struct.pack('>d', shift[1]))
        # 28-35 Z coordinate shift
        sbf_data.write(struct.pack('>d', shift[2]))
        # 36-63 Reserved for later
        sbf_data.write(bytes(63 - 36 + 1))
        sbf_data.write(a)


def is_int(str_):
    try:
        int(str_)
        return True
    except ValueError:
        return False


[docs] class SbfData: def __init__(self, filename, verbose=True): self.Np = None self.filename = filename self.xyz = None self.sf = None self.config = None self.read_sbf(verbose=verbose) # set xyz, sf and config self.sf_names = self.get_sf_names() self.name_index = self.get_name_index_dict() for name, index in self.name_index.items(): self[name] = self.sf[:, index] self.__setattr__('x', self.xyz[:, 0]) self.__setattr__('y', self.xyz[:, 1]) self.__setattr__('z', self.xyz[:, 2]) def __setitem__(self, key, value): self.__dict__[key] = value def __getitem__(self, key): return self.__dict__[key]
[docs] def read_sbf(self, verbose=False): config = read_sbf_header(self.filename, verbose=verbose) # READ .sbf header ################ # READ .sbf.data # be careful, sys.byteorder is probably 'little' (different from Cloud Compare) sbf_data = self.filename + '.data' with open(sbf_data, 'rb') as f: bytes_ = f.read(64) # 0-1 SBF header flag flag = bytes_[0:2] # 2-9 Point count (Np) Np = struct.unpack('>Q', bytes_[2:10])[0] # 10-11 ScalarField count (Ns) Ns = struct.unpack('>H', bytes_[10:12])[0] if verbose is True: print(f'flag {flag}, Np {Np}, Ns {Ns}') # 12-19 X coordinate shift x_shift = struct.unpack('>d', bytes_[12:20])[0] # 20-27 Y coordinate shift y_shift = struct.unpack('>d', bytes_[20:28])[0] # 28-35 Z coordinate shift z_shift = struct.unpack('>d', bytes_[28:36])[0] # 36-63 Reserved for later if verbose is True: print(f'shift ({x_shift, y_shift, z_shift})') print(bytes_[37:]) print(len(bytes_[37:])) array = np.fromfile(f, dtype='>f4').reshape(Np, Ns + 3).astype(np.float64) shift = np.array((x_shift, y_shift, z_shift)).reshape(1, 3) self.Np = Np # shift point cloud xyz = shift_array(array[:, :3], shift, config) # get scalar fields if any and handle shifts, aka "s=value", if any if Ns != 0: for k in range(1, Ns + 1): for item in config['SBF'][f'SF{k}'].split(','): if 's=' in item: # apply offset if any shift = float(item.replace('"', '').split('s=')[1]) if verbose is True: print(f'[read_sbf] add shift to scalar field SF{k}: {shift}') array[:, 2 + k] += shift sf = array[:, 3:] else: sf = None self.xyz = xyz self.sf = sf self.config = config
[docs] def get_name_index_dict(self): dict_ = {self.config['SBF'][name].split(',')[0]: int(name.split('SF')[1]) - 1 # remove s= and p= information for name in self.config['SBF'] if len(name.split('SF')) == 2 and is_int(name.split('SF')[1])} return dict_
[docs] def get_sf_names(self): list_ = [self.config['SBF'][name] for name in self.config['SBF'] if len(name.split('SF')) == 2 and is_int(name.split('SF')[1])] list_ = [name.split(',')[0] for name in list_] # remove s= and p= information return list_
[docs] def remove_sf(self, name): # remove the scalar field from the sf array index = self.name_index[name] new_sf = np.delete(self.sf, index, axis=1) # copy the configuration new_config = configparser.ConfigParser() new_config.optionxform = str new_config.read_dict(self.config) sf_index = index + 1 sf_count = int(self.config['SBF']['SFCount']) # update the configuration new_config['SBF']['SFCount'] = str(sf_count - 1) # decrease the counter of scalar fields new_config.remove_option('SBF', f'SF{sf_count}') # remove the last option for idx in range(1, sf_index): new_config['SBF'][f'SF{idx}'] = self.config['SBF'][f'SF{idx}'] for idx in range(sf_index, sf_count): new_config['SBF'][f'SF{idx}'] = self.config['SBF'][f'SF{idx + 1}'] self.sf = new_sf self.set_config(new_config)
[docs] def set_config(self, config): self.config = config self.name_index = self.get_name_index_dict() self.sf_names = self.get_sf_names()
[docs] def add_sf(self, name, sf_to_add): sf_count = int(self.config['SBF']['SFCount']) self.config['SBF'][f'SF{sf_count + 1}'] = name self.config['SBF']['SFCount'] = str(sf_count + 1) # add 1 to sf count self.sf = np.c_[self.sf, sf_to_add] # add the column to the array
[docs] def rename_sf(self, name, new_name): index = self.name_index[name] self.config['SBF'][f'SF{index + 1}'] = new_name
[docs] def merge(self, sbf_data): if sbf_data.pc.shape[1] != 3: raise ValueError('[SbfData.merge] number of columns of pc shall be 3') if sbf_data.sf.shape[1] != self.sf.shape[1]: raise ValueError('[SbfData.merge] number of scalar fields shall be the same for merging') self.xyz = np.r_[self.xyz, sbf_data.xyz] self.sf = np.r_[self.sf, sbf_data.sf] print(f'[SbfData.merge] {len(sbf_data.pc)} points added, new total = {self.xyz.shape[0]}') self.config['SBF']['Points'] = str(self.xyz.shape[0])
def read_sbf(filename, verbose=False): return SbfData(filename, verbose=verbose) def open_sbf(filename): return read_sbf_header(filename)