Source code for lidar_platform.tools.las.las

# coding: utf-8
# Paul Leroy
# Baptiste Feldmann

import copy
from datetime import datetime, timezone
import logging
import mmap
import os
import struct
import time

import laspy
import numpy as np

from . import las_fmt
from .. import misc
from .las_fmt import unpack_vlr_waveform_packet_descriptor

logger = logging.getLogger(__name__)
logging.basicConfig()

# LAS full waveform
HEADER_WDP_BYTE = struct.pack("=H16sHQ32s", * (0, b'LASF_Spec', 65535, 0, b'WAVEFORM_DATA_PACKETS'))

# VLRS Geokey
CRS_KEY = {"Vertical": 4096,
           "Projected": 3072}

GEOKEY_STANDARD = {1: (1, 0, 4),
                   1024: (0, 1, 1),
                   3076: (0, 1, 9001),
                   4099: (0, 1, 9001)}


def get_waveform_packet_descriptors(las_data):
    waveform_packet_descriptors = {}
    vlrs = las_data.header.vlrs
    for vlr in vlrs:
        if 99 < vlr.record_id < 355:
            wavepacket_index = vlr.record_id - 99
            waveform_packet_descriptors[wavepacket_index] = unpack_vlr_waveform_packet_descriptor(vlr)

    return waveform_packet_descriptors


def read_waveform(file_object, wpd, offset=0, make_positive=False):
    number_of_samples = wpd['number_of_samples']
    bytes_per_sample = int(wpd['bits_per_sample'] / 8)
    count = number_of_samples * bytes_per_sample
    b = file_object.read(count)
    a = np.frombuffer(b, dtype=np.uint16, count=number_of_samples)
    time = np.arange(number_of_samples) * wpd['temporal_sample_spacing']
    waveform = a * wpd['digitizer_gain'] + wpd['digitizer_offset'] + offset
    if make_positive:
        waveform = np.abs(waveform)
    return time, waveform


[docs] class LasData(laspy.LasData): # subclass laspy.LasData def __init__(self, filename): las_data = laspy.read(filename) super().__init__(las_data.header, las_data.points) self.filename = filename self.wdp = os.path.splitext(filename)[0] + '.wdp' self.waveform_packet_descriptors = get_waveform_packet_descriptors(self) self.file_version = f'{las_data.header.version[0]}.{las_data.header.version[1]}'
[docs] def get_waveform(self, index, offset=0, make_positive=False): wpd = self.waveform_packet_descriptors[self.wavepacket_index[index]] # get the Wave Packet Descriptor if self.header.global_encoding.waveform_data_packets_internal: with open(self.filename, 'rb') as bf: bf.seek(int(self.header.start_of_waveform_data_packet_record + self.byte_offset_to_waveform_data[index])) return read_waveform(bf, wpd, offset=offset, make_positive=make_positive) elif self.header.global_encoding.waveform_data_packets_external: with open(self.wdp, 'rb') as bf: bf.seek(self.byte_offset_to_waveform_data[index]) return read_waveform(bf, wpd, offset=offset, make_positive=make_positive) else: print(f'[get_waveform] no waveform to return (index {index})') return None
[docs] def get_waveform_data_packet_header(self): with open(self.wdp, "rb") as f: evlr = las_fmt.unpack_evlr_header_waveform_data_packet(f.read(60)) return evlr
[docs] def get_number_of_samples(self, index): wavepacket_index = self.wavepacket_index[index] wpd = self.waveform_packets_descriptors[wavepacket_index] # get the waveform packet descriptor number_of_samples = wpd["number_of_samples"] # get the number of samples from the descriptor bytes_per_sample = int(wpd["bits_per_sample"] / 8) number_of_bytes = number_of_samples * bytes_per_sample return number_of_bytes
def read(filename): return LasData(filename) def create_file_and_get_appender(filename, las_data_in): las_data = laspy.create(point_format=las_data_in.point_format, file_version=las_data_in.header.version) las_data.header = las_data_in.header las_data.write(filename) appender = laspy.open(filename, mode='a') return appender def merge_las_with_wdp(to_merge, o='merged.las', odir=None): records = [] vlrs = [] record_id = 100 las_data_0 = read(to_merge[0]) # read with lidar_platform extension of laspy to have information on waveforms # create and initialize outputs if odir is None: odir = os.path.join(os.path.split(to_merge[0])[0], 'merge') os.makedirs(odir, exist_ok=True) out = os.path.join(odir, o) # name of the output file # create the output header, same version and point format as the first file to merge out_header = laspy.LasHeader(version=f'{las_data_0.header.version.major}.{las_data_0.header.version.minor}', point_format=las_data_0.header.point_format.id) out_header.scales = las_data_0.header.scales # copy scales out_header.x_offset = las_data_0.header.x_offset # copy x offset out_header.y_offset = las_data_0.header.y_offset # copy y offset out_header.z_offset = las_data_0.header.z_offset # copy z offset out_header.global_encoding.waveform_data_packets_external = True # create a record to store the points during merging point_record = laspy.ScaleAwarePointRecord.empty(header=out_header) # create and initialize wdp output file out_wdp = os.path.splitext(out)[0] + '.wdp' evlr_header = las_data_0.get_waveform_data_packet_header() # Read the EVLR header of the original wdp file with open(out_wdp, "wb") as f_out: # write the Extended Variable Length Record header f_out.write(las_fmt.pack_evlr_header_waveform_data_packet(evlr_header)) for k, file in enumerate(to_merge): las_data = laspy.read(file) if las_data.header.point_count == 0: print(f'WARNING {os.path.split(file)[-1]} is empty, nothing to merge') continue else: print(f'MERGE {las_data.header.point_count} points from {os.path.split(file)[-1]}') record_id_map = {} # process vlrs for idx, vlr in enumerate(las_data.vlrs): if type(vlr) is laspy.vlrs.known.WaveformPacketVlr: # check that the vlr is a waveform packet vlr record = vlr.record_data_bytes() try: index = records.index(record) # if vlr is already in the list, map the record ids record_id_map[vlr.record_id] = vlrs[index].record_id except ValueError: # if vlr is not already in the list, addit and map the record ids records.append(record) new_vlr = laspy.vlrs.known.WaveformPacketVlr(record_id) # create a new vlr new_vlr.parse_record_data(vlr.record_data_bytes()) # copy the data vlrs.append(new_vlr) # add the new vlr to the list record_id = record_id + 1 # update record_id record_id_map[vlr.record_id] = vlrs[-1].record_id print( f' vlr not already in the header of the merge, add it, ' f'record ids association: in file {vlr.record_id} <=> in merge {vlrs[-1].record_id}') # add the new vlr to the merge out_header.vlrs.append(new_vlr) # update wavepacket_index for wavepacket_index in np.unique(las_data.wavepacket_index): las_data.wavepacket_index[np.where(las_data.wavepacket_index == wavepacket_index)] = ( record_id_map[wavepacket_index + 99] - 99) # update byte offset to waveform data and append waveforms offset = os.path.getsize(out_wdp) las_data.wavepacket_offset = las_data.wavepacket_offset + offset las_data.point_source_id[:] = k wdp_to_merge = os.path.splitext(file)[0] + '.wdp' with open(wdp_to_merge, "rb") as f_in, open(out_wdp, "ab") as f_out: f_out.write(f_in.read()) # append the waveforms to the existing ones # offsets of the merge may be different from offsets of the file to merge, may need some adjustment if out_header.x_offset != las_data.header.x_offset: print(f' need to adapt x_offset from {las_data.header.x_offset} to the merge one {out_header.x_offset}') las_data.x = las_data.x - out_header.x_offset + las_data.header.x_offset if out_header.y_offset != las_data.header.y_offset: print(f' need to adapt y_offset from {las_data.header.y_offset} to the merge one {out_header.y_offset}') las_data.y = las_data.y - out_header.y_offset + las_data.header.y_offset if out_header.z_offset != las_data.header.z_offset: print(f' need to adapt z_offset from {las_data.header.z_offset} to the merge one {out_header.z_offset}') las_data.z = las_data.z - out_header.z_offset + las_data.header.z_offset # append the points current_size = len(point_record) point_record.resize(current_size + las_data.header.point_count) point_record.array[current_size:] = las_data.points.array with laspy.open(out, mode="w", header=out_header) as writer: writer.write_points(point_record) return out ######## # LEGACY ######## def filter_las(obj, select): """Filter lasdata Args: obj ('plateforme_lidar.utils.lasdata'): lasdata object select (list or int): list of boolean, list of integer or integer Returns: 'plateforme_lidar.utils.lasdata': filtering lasdata object """ if type(select) == list or type(select) == np.ndarray: if not len(select) == len(obj): select = np.array(select)[np.argsort(select)] obj_new = las_fmt.lasdata() obj_new['metadata'] = obj.metadata features = list(obj.__dict__.keys()) features.remove("metadata") for feature in features: setattr(obj_new, feature, getattr(obj, feature)[select]) return obj_new def merge_las(obj_list): """Merge lasdata The returned structure takes format of the first in the list All the extraFields aren't kept Args: obj_list (list): list of 'plateforme_lidar.utils.lasdata' type Returns: 'plateforme_lidar.utils.lasdata': lasdata merged """ merge = las_fmt.lasdata() merge['metadata'] = copy.deepcopy(obj_list[0].metadata) merge['metadata']['extraField'] = [] feature_list = list(obj_list[0].__dict__.keys()) [feature_list.remove(i) for i in obj_list[0].metadata["extraField"] + ["metadata"]] for feature in feature_list: merge[feature] = np.concatenate([i[feature] for i in obj_list], axis=0) return merge def filter_wdp(lines, select): """Filter WDP tab Args: lines (list): list of waveforms select (list): list of boolean or list of index Returns: list : list of extracted waveforms """ if len(select) == len(lines): select = np.where(select)[0] else: select = np.array(select)[np.argsort(select)] return [lines[i] for i in select] def update_byte_offset(las_obj, waveforms, byte_offset_start=60): """Update byte offset to waveform data Args: las_obj ('plateforme_lidar.utils.lasdata'): LAS dataset to update waveforms (list): list of waveforms byte_offset_start (int, optional): byte number of first line in WDP file. Defaults to 60. Raises: ValueError: Las file must have same number of points than waveforms ! """ new_offset = [byte_offset_start] sizes = np.uint16(las_obj.wavepacket_size) if len(las_obj) != len(waveforms): raise ValueError("Las file must have same number of points than waveforms !") for i in range(0, len(las_obj)): new_offset += [np.uint64(new_offset[i]+sizes[i])] las_obj.wavepacket_offset = new_offset[0:-1] def read_vlrs(vlrs): # read_bfe VLR in LAS file with laspy # Can only read_bfe waveform, bbox tile and projection vlrs record_id__tuple = {} for vlr in vlrs: if 99 < vlr.record_id < 355: # Waveform Packet Descriptor # Bits per Sample # Waveform Compression Type # Number of Samples # Temporal Sample Spacing # Digitizer Gain # Digitizer Offset record_id__tuple[vlr.record_id] = struct.unpack("=BBLLdd", vlr.record_data_bytes()) elif vlr.record_id == 10: # read_bfe bbox tile vlrs : # (level, index, implicit_lvl, reversible, buffer, min_x, max_x, min_y, max_y) record_id__tuple[vlr.record_id] = struct.unpack("=2IH2?4f", vlr.record_data) elif vlr.record_id == 34735: # GeoKeyDirectoryTag Record # Key values that define the coordinate system # wKeyDirectoryVersion # wKeyRevision # wMinorRevision # wNumberOfKeys # + wNumberOfKeys * (wKeyID, wTIFFTagLocation, wCount, wValue_Offset) geo_key_list = struct.unpack("=4H", vlr.record_data_bytes()[0:8]) wNumberOfKeys = geo_key_list[3] # Number of sets of 4 unsigned shorts to follow for i in range(0, wNumberOfKeys): temp = struct.unpack("=4H", vlr.record_data_bytes()[8 * (i + 1): 8 * (i + 1) + 8]) if temp[1] == 0 and temp[2] == 1: geo_key_list += temp record_id__tuple[vlr.record_id] = geo_key_list return record_id__tuple def pack_vlr(dictio): # write VLR in LAS file with LasPy # Can only write waveform, bbox tile and projection vlrs list_ = [] size = 0 if len(dictio) > 0: for i in dictio.keys(): if 99 < i < 355: # waveform vlrs # (Bits/sample, waveform compression type, nbr of samples, Temporal spacing, # digitizer gain, digitizer offset) temp = laspy.header.VLR(user_id="LASF_Spec", record_id=i, record_data=struct.pack("=BBLLdd", *dictio[i])) elif i == 10: # bbox tile vlrs : # (level,index,implicit_lvl,reversible,buffer,min_x,max_x,min_y,max_y) temp = laspy.header.VLR(user_id="LAStools", record_id=i, record_data=struct.pack("=2IH2?4f", *dictio[i])) elif i == 34735: # Projection # (KeyDirectoryVersion, KeyRevision, MinorRevision, NumberofKeys) # + n * (KeyId,TIFFTagLocation,Count,Value_offset) fmt = "=" + str(len(dictio[i])) + "H" temp = laspy.header.VLR(user_id="LASF_Projection", record_id=i, record_data=struct.pack(fmt, *dictio[i])) else: raise Exception("VLR.record_id unknown : " + str(i)) size += len(temp.record_data_bytes()) list_ += [temp] return list_, size def vlrs_keys(vlrs, geokey): """Add geokey in VLR Projection Args: vlrs (dict): lasdata.metadata['vlrs'] geokey (dict): geokey={"Vertical":epsg,"Projected":epsg} Returns: dict: updated vlrs """ vlrs_copy = vlrs.copy() if 34735 in vlrs.keys(): vlrs_dict = {} for i in range(0, int(len(vlrs[34735]) / 4)): num = i * 4 vlrs_dict[vlrs[34735][num]] = vlrs[34735][num + 1: num + 4] else: vlrs_dict = GEOKEY_STANDARD for i in list(geokey.keys()): vlrs_dict[CRS_KEY[i]] = [0, 1, geokey[i]] vlrs_sort = np.sort(list(vlrs_dict.keys())) vlrs_final = [] for i in vlrs_sort: vlrs_final += [i] vlrs_final += vlrs_dict[i] vlrs_final[3] = len(vlrs_sort) - 1 vlrs_copy[34735] = tuple(vlrs_final) return vlrs_copy class WriteLAS(object): def __init__(self, filepath, data, point_format=1, extra_fields=[], waveforms=[], parallel=True): """Write LAS 1.3 with laspy Args: filepath (str): output file path (extensions= .las or .laz) data ('plateforme_lidar.utils.lasdata'): lasdata object point_format (int, optional): data format id according to ASPRS convention (standard mode=1, fwf mode=4). Defaults to 1. extraField (list, optional): list of additional fields [(("name","type format"),listData),...] ex: [(("depth","float32"),numpy.ndarray),(("value","uint8"),numpy.ndarray)]. Defaults to []. waveforms (list, optional): list of waveforms to save in external WDP file. Make sure that point_format is compatible with wave packet (ie. 4,5,9 or 10). Default to [] """ # standard : point_format=1 ; fwf : point_format=4 print("[Writing LAS file]") self._start = time.time() self.output_data = data self.LAS_fmt = las_fmt.LASFormat() # new_header=self.createHeader("1.3",point_format) # pointFormat=laspy.PointFormat(point_format) # for extraField in extraFields: # pointFormat.add_extra_dimension(laspy.ExtraBytesParams(name=extraField["name"],type=extraField["type"],description="Extras_fields")) # new_points=laspy.PackedPointRecord(points,point_format=pointFormat) if point_format in [4, 5, 9, 10]: version = "1.4" backend = laspy.compression.LazBackend(2) else: version = "1.3" backend = laspy.compression.LazBackend(int(not parallel)) header = self.create_header(version, point_format) self.point_record = laspy.LasData(header=header, points=laspy.ScaleAwarePointRecord.zeros( len(self.output_data), header=header) ) for extraField in extra_fields: name_ = extraField[0][0] type_ = extraField[0][1] data_ = extraField[1] self.point_record.add_extra_dim(laspy.ExtraBytesParams(name=name_, type=getattr(np, type_), description="Extra_fields")) setattr(self.point_record, name_, data_) self.write_attr() self.point_record.write(filepath, laz_backend=backend) print("done") if len(waveforms) > 0 and point_format in [4, 5, 9, 10]: self.wave_data_packet(filepath, waveforms) def __repr__(self): return "Write " + str(len(self.output_data)) + " points in " + str(round(time.time() - self._start, 1)) + " sec" def wave_data_packet(self, filepath, waveforms): # write external waveforms in WDP file not compressed # Future improvement will make writing compressed possible nbrPoints = len(self.output_data) sizes = np.uint16(self.output_data.wavepacket_size) offsets = np.uint64(self.output_data.wavepacket_offset) pkt_desc_index = self.output_data.wavepacket_index vlrs = self.output_data.metadata['vlrs'] if not all(offsets[1::]==(offsets[0:-1]+sizes[0:-1])): raise ValueError("byte offset list is not continuous, re-compute your LAS dataset") start = time.time() print("[Writing waveform data packet] %d waveforms" %len(waveforms)) displayer = misc.Timing(nbrPoints, 20) with open(filepath[0:-4] + ".wdp", "wb") as wdpFile : wdpFile.write(HEADER_WDP_BYTE) for i in range(0, nbrPoints): msg = displayer.timer(i) if msg is not None: print("[Writing waveform data packet] " + msg) if len(waveforms[i]) != (sizes[i] / 2): raise ValueError("Size of waveform n°"+str(i)+" is not the same in LAS file") try: vlr_body = vlrs[pkt_desc_index[i] + 99] except: raise ValueError("Number of the wave packet desc index not in VLRS") length = int(vlr_body[2]) try: test = struct.pack(str(length)+'h',*np.int16(waveforms[i])) wdpFile.write(test) except: raise ValueError(str(length)) print("[Writing waveform data packet] done in %d sec" %(time.time() - start)) def create_header(self, version, point_format): header = laspy.LasHeader(version=version, point_format=point_format) scale = 0.001 if point_format in [4, 5, 9, 10]: header.global_encoding.waveform_data_packets_external = True header.system_identifier = self.LAS_fmt.identifier["system_identifier"] header.generating_software = self.LAS_fmt.identifier["generating_software"] header.vlrs, vlrs_size = pack_vlr(self.output_data.metadata['vlrs']) header.offset_to_point_data = 235 + vlrs_size header.mins = np.min(self.output_data.XYZ, axis=0) header.maxs = np.max(self.output_data.XYZ, axis=0) header.offsets = np.int64(header.mins * scale) / scale header.scales = np.array([scale] * 3) header.x_scale, header.y_scale, header.z_scale = header.scales header.x_offset, header.y_offset, header.z_offset = header.offsets header.x_min, header.y_min, header.z_min = header.mins header.x_max, header.y_max, header.z_max = header.maxs header.point_count = len(self.output_data) pt_return_count = [0] * 5 unique,counts = np.unique(self.output_data.return_number, return_counts=True) for i in unique: try: pt_return_count[i-1] = counts[i-1] except: pass header.number_of_points_by_return = pt_return_count return header def write_attr(self): # point_dtype=[('X','int32'),('Y','int32'),('Z','int32')] # +self.LAS_fmt.recordFormat[self.point_record.header.point_format.id] # write conventional fields coords_int = np.array((self.output_data.XYZ - self.point_record.header.offsets) / self.point_record.header.scales, dtype=np.int32) self.point_record.X = coords_int[:, 0] self.point_record.Y = coords_int[:, 1] self.point_record.Z = coords_int[:, 2] for i in self.LAS_fmt.record_format[self.point_record.header.point_format.id]: try: data = getattr(self.output_data, i[0]) setattr(self.point_record, i[0], getattr(np, i[1])(data)) except: print(f'[lastools.WriteLAS.writeAttr] {i[0]} {i[1]}') print("[lastools.WriteLAS.writeAttr] Warning: not possible to write attribute: " + i[0]) def read_bfe(filepath, extra_fields=False, parallel=True): """Read a LAS file with laspy Args: filepath (str): input LAS file (extensions: .las or .laz) extra_fields (bool, optional): True if you want to load additional fields. Defaults to False. parallel backend (str, optional): 'multi' for parallel backend, 'single' for single-thread mode, 'laszip' to use laszip for LAS fwf Returns: 'plateforme_lidar.utils.lasdata': lasdata object """ point_format = laspy.open(filepath, mode='r', laz_backend=laspy.compression.LazBackend(2)).header.point_format.id if point_format in [4, 5, 9, 10]: # Wave packets # Point Data Record Format 4 adds Wave Packets to Point Data Record Format 1 # Point Data Record Format 5 adds Wave Packets to Point Data Record Format 3 # Point Data Record Format 9 adds Wave Packets to Point Data Record Format 6 # Point Data Record Format 10 adds Wave Packets to Point Data Record Format 7 backend = laspy.compression.LazBackend(2) else: backend = laspy.compression.LazBackend(int(not parallel)) las_data = laspy.read(filepath, laz_backend=backend) gps_time_type = las_data.header.global_encoding.gps_time_type print(f'[las.read_bfe] gps_time_type read in header: {gps_time_type.name}') metadata = {"vlrs": read_vlrs(las_data.vlrs), "extraField": [], 'filepath': filepath} output = las_fmt.lasdata() LAS_fmt = las_fmt.LASFormat() for field, dtype in LAS_fmt.record_format[point_format]: try: output[field] = np.array(getattr(las_data, field), dtype=dtype) except: print("[LasPy] " + str(field) + " not found") output['XYZ'] = las_data.xyz if extra_fields: for extra_dimension_name in las_data.point_format.extra_dimension_names: name = extra_dimension_name.replace('(', '').replace(')', '').replace(' ', '_').lower() print(f'rename extra field {extra_dimension_name} to {name}') metadata['extraField'] += [name] output[name] = las_data[extra_dimension_name] if 'GpsTime' in las_data.point_format.extra_dimension_names: print('[lastools.ReadLAS] WARNING GpsTime found in extra_dimension_names (CloudCompare convention)') print('[lastools.ReadLAS] replace standard field gps_time by extra field GpsTime') output['gps_time'] = las_data['GpsTime'] output['metadata'] = metadata return output def read_wdp(las_data): """Reading waveforms in WDP file Args: las_data ('plateforme_lidar.utils.lasdata'): lasdata object Raises: ValueError: if for one point wave data packet descriptor is not in VLRS Returns: list: list of waveform (length of each waveform can be different) """ point_number = len(las_data) sizes = np.uint16(las_data.wavepacket_size) offset = np.uint64(las_data.wavepacket_offset) pkt_desc_index = las_data.wavepacket_index vlrs = las_data.metadata['vlrs'] start = time.time() print("[Reading waveform data packet] %d waveforms" %point_number) with open(las_data.metadata['filepath'][0:-4] + ".wdp", 'rb') as wdp: dataraw = mmap.mmap(wdp.fileno(), os.path.getsize(las_data.metadata['filepath'][0:-4] + ".wdp"), access=mmap.ACCESS_READ) lines = [] displayer = misc.Timing(point_number, 20) for i in range(0, point_number): msg = displayer.timer(i) if msg is not None: print("[Reading waveform data packet] " + msg) try: vlr_body = vlrs[pkt_desc_index[i] + 99] except: raise ValueError("Number of the wave packet desc index not in VLRS !") length = int(vlr_body[2]) line = np.array(struct.unpack(str(length) + 'h', dataraw[offset[i]: (offset[i] + sizes[i])])) lines += [np.round_(line * vlr_body[4] + vlr_body[5], decimals=2)] print("[Reading waveform data packet] done in %d sec" %(time.time()-start)) return lines def read_ortho_fwf(workspace, las_file): print("[Read waveform data packet] : ",end='\r') f = laspy.file.File(workspace + las_file) nbr_pts = int(f.header.count) percent = [int(0.2 * nbr_pts), int(0.4 * nbr_pts), int(0.6 * nbr_pts), int(0.8 * nbr_pts), int(0.95 * nbr_pts)] try : sizes = np.int_(f.waveform_packet_size) except : sizes = np.int_(f.points['point']['wavefm_pkt_size']) offset = np.int_(f.byte_offset_to_waveform_data) wdp = open(workspace + las_file[0:-4] + ".wdp", 'rb') data = mmap.mmap(wdp.fileno(), 0, access=mmap.ACCESS_READ) temp = read_vlrs(f.header.vlrs) if len(f.header.vlrs) == 1: vlr_body = temp[list(temp.keys())[0]] else: vlr_body = temp[f.header.vlrs[f.wave_packet_desc_index[0]].record_id] anchor_z = f.z_t*f.return_point_waveform_loc step_z = f.z_t[0]*vlr_body[3] lines = [] length = int(vlr_body[2]) prof = [np.round_(anchor_z-(step_z * c), decimals=2) for c in range(0, length)] prof = np.transpose(np.reshape(prof, np.shape(prof))) for i in range(0, nbr_pts): if i in percent: print("%d%%-" %(25+25*percent.index(i)), end='\r') line = np.array(struct.unpack(str(length) + 'h',data[offset[i]:offset[i] + sizes[i]])) lines += [np.round_(line*vlr_body[4] + vlr_body[5], decimals=2)] wdp.close() f.close() print("done !") return np.stack([lines,prof]), vlr_body[3], np.round_(step_z, decimals=2) offset_time = int(10 ** 9) sec_in_week = int(3600 * 24 * 7) gps_epoch_datetime = datetime(1980, 1, 6, tzinfo=timezone.utc) def get_week_number(standard_time, adjusted=False): """Compute the week number in GPS standard time Args: standard_time (float or list): timestamp in standard GPS time format Raises: ValueError: if there are GPS time from different week in list Returns: int : week number since GPS epoch starting """ if adjusted: standard_time = standard_time + offset_time if np.ndim(standard_time) == 0: week_number = int(standard_time // sec_in_week) else: week_num_first = min(standard_time) // sec_in_week week_num_last = max(standard_time) // sec_in_week if week_num_first == week_num_last: week_number = int(week_num_first) else: print(f'week_num_first {week_num_first}, week_num_last {week_num_last}') raise ValueError("[lastools.GPSTime._get_week_number] Time values aren't in same week") return week_number class GPSTime(object): def __init__(self, gpstime: list): """Manage GPS Time and convert between Adjusted Standard and Week GPS time GPS time start on 1980-01-06 00:00:00 UTC Time stamps are either stored in GPS week seconds or Adjusted Standard GPS Time (i.e., Standard GPS Time - 1 * 10**9 sec) depending on the "Global Encoding Bit 0" of the LAS Public Header Args: gpstime (list): GPS time """ self.gpstime = np.atleast_1d(gpstime) self.gps_time_type = self.get_gps_time_type() def __repr__(self): return self.gps_time_type def get_gps_time_type(self): if all(self.gpstime < sec_in_week): gps_time_type = laspy.header.GpsTimeType.WEEK_TIME elif all(self.gpstime < offset_time): gps_time_type = laspy.header.GpsTimeType.STANDARD else: raise ValueError("[lastools.GPSTime.get_format] Unexpected gps_time_type, neither WEEK_TIME nor STANDARD") return gps_time_type def adjusted_standard_2_week_time(self): """Conversion from Adjusted Standard GPS time format to week time Raises: ValueError: if your data aren't in Adjusted Standard GPS time Returns: int: week number list: list of GPS time in week time format """ if self.gps_time_type != laspy.header.GpsTimeType.STANDARD: raise ValueError("GPS time format is not " + laspy.header.GpsTimeType.STANDARD.name) else: temp = self.gpstime + offset_time week_number = self._get_week_number(temp) return week_number, temp % sec_in_week def week_time_2_adjusted_standard(self, date_in_week=[], week_number=0): """Conversion from week GPS time format to Adjusted Standard time Args: date_in_week (list, optional): date of project in format (year, month, day). Defaults to []. week_number (int, optional): week number. Defaults to 0. Raises: ValueError: if your data aren't in Week GPS time format ValueError: You have to give at least date_in_week or week_number Returns: list: list of Adjusted Standard GPS time """ if self.gps_time_type != laspy.header.GpsTimeType.WEEK_TIME: raise ValueError("GPS time format is not " + laspy.header.GpsTimeType.WEEK_TIME.name) elif len(date_in_week) > 0: date_datetime = datetime(*date_in_week, tzinfo=timezone.utc) week_number = get_week_number(date_datetime.timestamp() - gps_epoch_datetime.timestamp()) print(f'week_number {week_number}') elif week_number == 0: raise ValueError("You have to give date_in_week OR week_number") adjusted_standard_time = (self.gpstime + week_number * sec_in_week) - offset_time return adjusted_standard_time