From 774e42abf1f9acfb1f508cbd5b919aa2749aa709 Mon Sep 17 00:00:00 2001 From: jmcvey3 <53623232+jmcvey3@users.noreply.github.com> Date: Thu, 22 Feb 2024 10:25:14 -0800 Subject: [PATCH] General cleanup and organization --- mhkit/dolfyn/io/rdi.py | 247 +++++++++++++++++------------------- mhkit/dolfyn/io/rdi_defs.py | 4 +- mhkit/dolfyn/io/rdi_lib.py | 29 +---- 3 files changed, 120 insertions(+), 160 deletions(-) diff --git a/mhkit/dolfyn/io/rdi.py b/mhkit/dolfyn/io/rdi.py index eedc79dcf..8842a4d1d 100644 --- a/mhkit/dolfyn/io/rdi.py +++ b/mhkit/dolfyn/io/rdi.py @@ -64,10 +64,10 @@ def read_rdi( # Reads into a dictionary of dictionaries using netcdf naming conventions # Should be easier to debug - with _RDIReader( + rdr = _RDIReader( filename, debug_level=debug_level, vmdas_search=vmdas_search, winriver=winriver - ) as ldr: - datNB, datBB = ldr.load_data(nens=nens) + ) + datNB, datBB = rdr.load_data(nens=nens) dats = [dat for dat in [datNB, datBB] if dat is not None] @@ -293,6 +293,44 @@ def read_hdr(self): self.read_hdrseg() return True + def checkheader(self): + if self._debug_level > 1: + logging.info(" ###In checkheader.") + fd = self.f + valid = False + if self._debug_level >= 0: + logging.info("pos {}".format(self.f.pos)) + numbytes = fd.read_i16(1) + if numbytes > 0: + fd.seek(numbytes - 2, 1) + cfgid = fd.read_ui8(2) + if cfgid is None: + if self._debug_level > 1: + logging.info("EOF") + return False + if len(cfgid) == 2: + fd.seek(-numbytes - 2, 1) + if cfgid[0] == 127 and cfgid[1] in [127, 121]: + if cfgid[1] == 121 and self._debug7f79 is None: + self._debug7f79 = True + if self._debug_level > 1: + logging.warning("7f79!!!") + valid = True + else: + fd.seek(-2, 1) + if self._debug_level > 1: + logging.info(" ###Leaving checkheader.") + return valid + + def read_hdrseg(self): + fd = self.f + hdr = self.hdr + hdr["nbyte"] = fd.read_i16(1) + spare = fd.read_ui8(1) + ndat = fd.read_ui8(1) + hdr["dat_offsets"] = fd.read_ui16(ndat) + self._nbyte = 4 + ndat * 2 + def check_for_double_buffer(self): """ VMDAS will record two buffers in NB or NB/BB mode, so we need to @@ -507,6 +545,16 @@ def read_buffer(self): return True + def print_progress(self): + self.progress = self.f.tell() + if self._debug_level > 1: + logging.debug( + " pos %0.0fmb/%0.0fmb\n" + % (self.f.tell() / 1048576.0, self._filesize / 1048576.0) + ) + if (self.f.tell() - self.progress) < 1048576: + return + def search_buffer(self): """ Check to see if the next bytes indicate the beginning of a @@ -544,54 +592,6 @@ def search_buffer(self): ) return True - def checkheader(self): - if self._debug_level > 1: - logging.info(" ###In checkheader.") - fd = self.f - valid = False - if self._debug_level >= 0: - logging.info("pos {}".format(self.f.pos)) - numbytes = fd.read_i16(1) - if numbytes > 0: - fd.seek(numbytes - 2, 1) - cfgid = fd.read_ui8(2) - if cfgid is None: - if self._debug_level > 1: - logging.info("EOF") - return False - if len(cfgid) == 2: - fd.seek(-numbytes - 2, 1) - if cfgid[0] == 127 and cfgid[1] in [127, 121]: - if cfgid[1] == 121 and self._debug7f79 is None: - self._debug7f79 = True - if self._debug_level > 1: - logging.warning("7f79!!!") - valid = True - else: - fd.seek(-2, 1) - if self._debug_level > 1: - logging.info(" ###Leaving checkheader.") - return valid - - def read_hdrseg(self): - fd = self.f - hdr = self.hdr - hdr["nbyte"] = fd.read_i16(1) - spare = fd.read_ui8(1) - ndat = fd.read_ui8(1) - hdr["dat_offsets"] = fd.read_ui16(ndat) - self._nbyte = 4 + ndat * 2 - - def print_progress(self): - self.progress = self.f.tell() - if self._debug_level > 1: - logging.debug( - " pos %0.0fmb/%0.0fmb\n" - % (self.f.tell() / 1048576.0, self._filesize / 1048576.0) - ) - if (self.f.tell() - self.progress) < 1048576: - return - def print_pos(self, byte_offset=-1): """Print the position in the file, used for debugging.""" if self._debug_level >= 2: @@ -603,35 +603,11 @@ def print_pos(self, byte_offset=-1): f" pos: {self.f.tell()}, pos_: {self._pos}, nbyte: {self._nbyte}, k: {k}, byte_offset: {byte_offset}" ) - def check_offset(self, offset, readbytes): - fd = self.f - if offset != 4 and self._fixoffset == 0: - if self._debug_level > 0: - if fd.tell() == self._filesize: - logging.error( - " EOF reached unexpectedly - discarding this last ensemble\n" - ) - else: - logging.debug( - " Adjust location by {:d} (readbytes={:d},hdr['nbyte']={:d})\n".format( - offset, readbytes, self.hdr["nbyte"] - ) - ) - self._fixoffset = offset - 4 - fd.seek(4 + self._fixoffset, 1) - - def remove_end(self, iens): - dat = self.outd - if self._debug_level > 0: - logging.info(" Encountered end of file. Cleaning up data.") - for nm in self.vars_read: - defs._setd(dat, nm, defs._get(dat, nm)[..., :iens]) - def read_dat(self, id): function_map = { 0: (self.read_fixed, []), # 0000 1st profile fixed leader - 1: (self.read_fixed, [True]), # 0001 - # 0010 Surface layer fixed leader (RiverPro & StreamPro) + 1: (self.read_fixed, [True]), # 0001 2nd profile fixed leader + # 0010 Surface layer profile fixed leader (RiverPro & StreamPro) 16: (self.read_fixed_sl, []), # 0080 1st profile variable leader 128: (self.read_var, [0]), @@ -745,6 +721,7 @@ def read_fixed(self, bb=False): # Check if n_cells has increased (for winriver transect files) if hasattr(self, "ensemble"): self.n_cells_diff = self.cfg["n_cells"] - self.ensemble["n_cells"] + # Increase n_cells if greater than 0 if self.n_cells_diff > 0: self.ensemble = defs._ensemble(self.n_avg, self.cfg["n_cells"]) if self._debug_level >= 1: @@ -798,15 +775,10 @@ def read_cfgseg(self, bb=False): logging.warning(f"Number of cells set to {cfg['n_cells']}") cfg["pings_per_ensemble"] = fd.read_ui16(1) # Check if cell size has changed - cell_size = fd.read_ui16(1) * 0.01 - if "cell_size" not in cfg: - cfg["cell_size"] = cell_size - self.cs_diff = cell_size - if self._debug_level >= 1: - logging.warning(f"Cell size set to {cfg['cell_size']}") - elif cell_size != cfg["cell_size"]: - self.cs_diff = cell_size - cfg["cell_size"] - cfg["cell_size"] = cell_size + cs = fd.read_ui16(1) * 0.01 + if ("cell_size" not in cfg) or (cs != cfg["cell_size"]): + self.cs_diff = cs if "cell_size" not in cfg else (cs - cfg["cell_size"]) + cfg["cell_size"] = cs if self._debug_level >= 1: logging.warning(f"Cell size set to {cfg['cell_size']}") cfg["blank_dist"] = fd.read_ui16(1) * 0.01 @@ -1211,12 +1183,12 @@ def read_winriver2(self): ens.fix_gps[k] = self.f.read_ui8(1) # gps fix type/quality ens.n_sat_gps[k] = self.f.read_ui8(1) # of satellites # horizontal dilution of precision - ens.hdop_gps[k] = self.f.read_float(1) - ens.elevation_gps[k] = self.f.read_float(1) # altitude + ens.hdop_gps[k] = self.f.read_f32(1) + ens.elevation_gps[k] = self.f.read_f32(1) # altitude m = self.f.reads(1) # altitude unit, 'm' - h_geoid = self.f.read_float(1) # height of geoid + h_geoid = self.f.read_f32(1) # height of geoid m2 = self.f.reads(1) # geoid unit, 'm' - ens.rtk_age_gps[k] = self.f.read_float(1) + ens.rtk_age_gps[k] = self.f.read_f32(1) station_id = self.f.read_ui16(1) self.vars_read += [ "time_gps", @@ -1245,13 +1217,13 @@ def read_winriver2(self): ) return "FAIL" self.f.seek(1, 1) - true_track = self.f.read_float(1) + true_track = self.f.read_f32(1) t = self.f.reads(1) # 'T' - magn_track = self.f.read_float(1) + magn_track = self.f.read_f32(1) m = self.f.reads(1) # 'M' - speed_knot = self.f.read_float(1) + speed_knot = self.f.read_f32(1) kts = self.f.reads(1) # 'N' - speed_kph = self.f.read_float(1) + speed_kph = self.f.read_f32(1) kph = self.f.reads(1) # 'K' mode = self.f.reads(1) # knots -> m/s @@ -1275,11 +1247,11 @@ def read_winriver2(self): ) return "FAIL" self.f.seek(1, 1) - depth_ft = self.f.read_float(1) + depth_ft = self.f.read_f32(1) ft = self.f.reads(1) # 'f' - depth_m = self.f.read_float(1) + depth_m = self.f.read_f32(1) m = self.f.reads(1) # 'm' - depth_fathom = self.f.read_float(1) + depth_fathom = self.f.read_f32(1) f = self.f.reads(1) # 'F' ens.dist_nmea[k] = depth_m self.vars_read += ["dist_nmea"] @@ -1359,6 +1331,30 @@ def skip_nocode(self, id): if self._debug_level >= 0: logging.debug(f"Skipping ID code {id}\n") + def check_offset(self, offset, readbytes): + fd = self.f + if offset != 4 and self._fixoffset == 0: + if self._debug_level > 0: + if fd.tell() == self._filesize: + logging.error( + " EOF reached unexpectedly - discarding this last ensemble\n" + ) + else: + logging.debug( + " Adjust location by {:d} (readbytes={:d},hdr['nbyte']={:d})\n".format( + offset, readbytes, self.hdr["nbyte"] + ) + ) + self._fixoffset = offset - 4 + fd.seek(4 + self._fixoffset, 1) + + def remove_end(self, iens): + dat = self.outd + if self._debug_level > 0: + logging.info(" Encountered end of file. Cleaning up data.") + for nm in self.vars_read: + defs._setd(dat, nm, defs._get(dat, nm)[..., :iens]) + def save_profiles(self, dat, nm, en, iens): ds = defs._get(dat, nm) @@ -1372,17 +1368,18 @@ def save_profiles(self, dat, nm, en, iens): if "_sl" in nm: # This works here b/c the max number of surface layer cells # is smaller than the min number of normal profile cells used. - # Extra SL cells created here are trimmed off in self.cleanup. + # Extra nan cells created after this if-statement + # are trimmed off in self.cleanup. bn = bn[: self.cfg["n_cells_sl"]] else: # Set bn to current ping size bn = bn[: self.cfg["n_cells"]] - # If n_cells increases, we need to increment defs + # If n_cells has increased, we also need to increment defs if self.n_cells_diff > 0: a = np.empty((self.n_cells_diff, ds.shape[1], ds.shape[2])) * np.nan ds = np.append(ds, a.astype(ds.dtype), axis=0) defs._setd(dat, nm, ds) - # If the shape decreases, set extra cells to nan instead of + # If the number of cells decreases, set extra cells to nan instead of # whatever is stuck in memory if ds.shape[0] != bn.shape[0]: n_cells = ds.shape[0] - bn.shape[0] @@ -1406,7 +1403,7 @@ def cleanup(self, cfg, dat): if len(self.cs) > 1: bins_to_merge = cell_sizes.max() / cell_sizes idx_start = cs[:, 0].astype(int) - idx_end = np.append(cs[1:, 0], dat["data_vars"]["number"].size).astype(int) + idx_end = np.append(cs[1:, 0], self._nens).astype(int) dv = dat["data_vars"] for var in dv: @@ -1416,54 +1413,50 @@ def cleanup(self, cfg, dat): # For each cell size change, reshape and bin-average for id1, id2, b in zip(idx_start, idx_end, bins_to_merge): array = np.transpose(dv[var][..., id1:id2]) - bin_arr = np.transpose( - np.mean(self.reshape(array, n_bin=b), axis=-1) - ) + bin_arr = np.transpose(np.mean(self.reshape(array, b), axis=-1)) new_var[: len(bin_arr), :, id1:id2] = bin_arr # Reset data. This often leaves nan data at farther ranges dv[var] = new_var # Set cell size and range + cfg["n_cells"] = self.ensemble["n_cells"] cfg["cell_size"] = cell_sizes.max() dat["coords"]["range"] = ( - cfg["bin1_dist_m"] + np.arange(self.ensemble["n_cells"]) * cfg["cell_size"] + cfg["bin1_dist_m"] + np.arange(cfg["n_cells"]) * cfg["cell_size"] ) + # Save configuration data as attributes for nm in cfg: dat["attrs"][nm] = cfg[nm] + # Clean up surface layer profiles if "surface_layer" in cfg: # RiverPro/StreamPro dat["coords"]["range_sl"] = ( cfg["bin1_dist_m_sl"] + np.arange(0, self.n_cells_sl) * cfg["cell_size_sl"] ) - # Trim surface layer profile to max cells used + # Trim off extra nan data dv = dat["data_vars"] for var in dv: if "sl" in var: dv[var] = dv[var][: self.n_cells_sl] dat["attrs"]["rotate_vars"].append("vel_sl") - def _outshape(self, inshape, n_pad=0, n_bin=None): - """Returns `outshape` (the 'reshape'd shape) for an `inshape` array.""" - return list(inshape[:-1]) + [int(inshape[-1] // n_bin), int(n_bin + n_pad)] - - def reshape(self, arr, n_pad=0, n_bin=None): - """Reshape the array `arr` to shape (...,n,n_bin+n_pad).""" + def reshape(self, arr, n_bin=None): + """ + Reshape the array `arr` to shape (...,n,n_bin). + """ - npd0 = int(n_pad // 2) - npd1 = int((n_pad + 1) // 2) - shp = self._outshape(arr.shape, n_pad=0, n_bin=n_bin) out = np.zeros( - self._outshape(arr.shape, n_pad=n_pad, n_bin=n_bin), dtype=arr.dtype + list(arr.shape[:-1]) + [int(arr.shape[-1] // n_bin), int(n_bin)], + dtype=arr.dtype, ) + shp = out.shape if np.mod(n_bin, 1) == 0: # n_bin needs to be int n_bin = int(n_bin) # If n_bin is an integer, we can do this simply. - out[..., npd0 : n_bin + npd0] = (arr[..., : (shp[-2] * shp[-1])]).reshape( - shp, order="C" - ) + out[..., :n_bin] = (arr[..., : (shp[-2] * shp[-1])]).reshape(shp, order="C") else: inds = (np.arange(np.prod(shp[-2:])) * n_bin // int(n_bin)).astype(int) # If there are too many indices, drop one bin @@ -1472,11 +1465,8 @@ def reshape(self, arr, n_pad=0, n_bin=None): shp[-2] -= 1 out = out[..., 1:, :] n_bin = int(n_bin) - out[..., npd0 : n_bin + npd0] = (arr[..., inds]).reshape(shp, order="C") + out[..., :n_bin] = (arr[..., inds]).reshape(shp, order="C") n_bin = int(n_bin) - if n_pad != 0: - out[..., 1:, :npd0] = out[..., :-1, n_bin : n_bin + npd0] - out[..., :-1, -npd1:] = out[..., 1:, npd0 : npd0 + npd1] return out @@ -1497,18 +1487,9 @@ def finalize(self, dat): ): da["fs"] = round(np.diff(dat["coords"]["time"]).mean() ** -1, 2) else: - da["fs"] = (da["sec_between_ping_groups"] * da["pings_per_ensemble"]) ** ( - -1 - ) - da["n_cells"] = self.ensemble["n_cells"] + da["fs"] = 1 / (da["sec_between_ping_groups"] * da["pings_per_ensemble"]) for nm in defs.data_defs: shp = defs.data_defs[nm][0] if len(shp) and shp[0] == "nc" and defs._in_group(dat, nm): defs._setd(dat, nm, np.swapaxes(defs._get(dat, nm), 0, 1)) - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.f.close() diff --git a/mhkit/dolfyn/io/rdi_defs.py b/mhkit/dolfyn/io/rdi_defs.py index f7c249c50..a91148a53 100644 --- a/mhkit/dolfyn/io/rdi_defs.py +++ b/mhkit/dolfyn/io/rdi_defs.py @@ -407,7 +407,5 @@ def __init__(self, navg, n_cells): np.zeros(_get_size(nm, n=navg, ncell=n_cells), dtype=data_defs[nm][2]), ) - def clean_data( - self, - ): + def clean_data(self): self["vel"][self["vel"] == -32.768] = np.NaN diff --git a/mhkit/dolfyn/io/rdi_lib.py b/mhkit/dolfyn/io/rdi_lib.py index 6e07f214f..df0851a0f 100644 --- a/mhkit/dolfyn/io/rdi_lib.py +++ b/mhkit/dolfyn/io/rdi_lib.py @@ -26,21 +26,9 @@ class bin_reader: } @property - def pos( - self, - ): + def pos(self): return self.f.tell() - def __enter__( - self, - ): - return self - - def __exit__( - self, - ): - self.close() - def __init__(self, fname, endian="<", checksum_size=None, debug_level=0): """ Default to little-endian '<'... @@ -59,9 +47,7 @@ def __init__(self, fname, endian="<", checksum_size=None, debug_level=0): self.cs = checksum_size self.debug_level = debug_level - def checksum( - self, - ): + def checksum(self): """ The next byte(s) are the expected checksum. Perform the checksum. """ @@ -71,9 +57,7 @@ def checksum( else: raise Exception("CheckSum not requested for this file") - def tell( - self, - ): + def tell(self): return self.f.tell() def seek(self, pos, rel=1): @@ -106,15 +90,12 @@ def read(self, n, frmt): def read_ui8(self, n): return self.read(n, "B") - def read_float(self, n): + def read_f32(self, n): return self.read(n, "f") - def read_double(self, n): + def read_f64(self, n): return self.read(n, "d") - read_f32 = read_float - read_f64 = read_double - def read_i8(self, n): return self.read(n, "b")