diff --git a/map2loop/mapdata.py b/map2loop/mapdata.py index b90b170f..ba08c2dc 100644 --- a/map2loop/mapdata.py +++ b/map2loop/mapdata.py @@ -56,7 +56,9 @@ class MapData: A link to the config structure which is defined in config.py """ - def __init__(self, tmp_path: str = "", verbose_level: VerboseLevel = VerboseLevel.ALL): + def __init__( + self, tmp_path: str = "", verbose_level: VerboseLevel = VerboseLevel.ALL + ): """ The initialiser for the map data @@ -124,13 +126,15 @@ def set_bounding_box(self, bounding_box): """ # Convert tuple bounding_box to dict else assign directly if type(bounding_box) is tuple: - self.bounding_box = {'minx': bounding_box[0], - 'maxx': bounding_box[1], - 'miny': bounding_box[2], - 'maxy': bounding_box[3]} + self.bounding_box = { + "minx": bounding_box[0], + "maxx": bounding_box[1], + "miny": bounding_box[2], + "maxy": bounding_box[3], + } if len(bounding_box) == 6: - self.bounding_box['top'] = bounding_box[4] - self.bounding_box['base'] = bounding_box[5] + self.bounding_box["top"] = bounding_box[4] + self.bounding_box["base"] = bounding_box[5] elif type(bounding_box) is dict: self.bounding_box = bounding_box else: @@ -138,11 +142,11 @@ def set_bounding_box(self, bounding_box): # Check for map based bounding_box and add depth boundaries if len(self.bounding_box) == 4: - self.bounding_box['top'] = 0 - self.bounding_box['base'] = 2000 + self.bounding_box["top"] = 0 + self.bounding_box["base"] = 2000 # Check that bounding_box has all the right keys - for i in ['minx', 'maxx', 'miny', 'maxy', 'top', 'base']: + for i in ["minx", "maxx", "miny", "maxy", "top", "base"]: if i not in self.bounding_box: raise KeyError(f"bounding_box dictionary does not contain {i} key") @@ -156,7 +160,7 @@ def set_bounding_box(self, bounding_box): self.bounding_box_polygon = geopandas.GeoDataFrame( index=[0], crs=self.working_projection, - geometry=[shapely.Polygon(zip(lon_point_list, lat_point_list))] + geometry=[shapely.Polygon(zip(lon_point_list, lat_point_list))], ) self.recreate_bounding_box_str() @@ -224,7 +228,9 @@ def get_filename(self, datatype: Datatype): return None @beartype.beartype - def set_config_filename(self, filename: str, legacy_format: bool = False, lower: bool = False): + def set_config_filename( + self, filename: str, legacy_format: bool = False, lower: bool = False + ): """ Set the config filename and update the config structure @@ -301,14 +307,22 @@ def set_filenames_from_australian_state(self, state: str): Raises: ValueError: state string not in state list ['WA', 'SA', 'QLD', 'NSW', 'TAS', 'VIC', 'ACT', 'NT'] """ - if state in ['WA', 'SA', 'QLD', 'NSW', 'TAS', 'VIC', 'ACT', 'NT']: - self.set_filename(Datatype.GEOLOGY, AustraliaStateUrls.aus_geology_urls[state]) - self.set_filename(Datatype.STRUCTURE, AustraliaStateUrls.aus_structure_urls[state]) + if state in ["WA", "SA", "QLD", "NSW", "TAS", "VIC", "ACT", "NT"]: + self.set_filename( + Datatype.GEOLOGY, AustraliaStateUrls.aus_geology_urls[state] + ) + self.set_filename( + Datatype.STRUCTURE, AustraliaStateUrls.aus_structure_urls[state] + ) self.set_filename(Datatype.FAULT, AustraliaStateUrls.aus_fault_urls[state]) self.set_filename(Datatype.FOLD, AustraliaStateUrls.aus_fold_urls[state]) self.set_filename(Datatype.DTM, "au") lower = state == "SA" - self.set_config_filename(AustraliaStateUrls.aus_config_urls[state], legacy_format=True, lower=lower) + self.set_config_filename( + AustraliaStateUrls.aus_config_urls[state], + legacy_format=True, + lower=lower, + ) self.set_colour_filename(AustraliaStateUrls.aus_clut_urls[state]) else: raise ValueError(f"Australian state {state} not in state url database") @@ -369,9 +383,7 @@ def load_map_data(self, datatype: Datatype): self.filenames[datatype] is None or self.data_states[datatype] == Datastate.UNNAMED ): - print( - f"Datatype {datatype.name} is not set and so cannot be loaded\n" - ) + print(f"Datatype {datatype.name} is not set and so cannot be loaded\n") self.data[datatype] = self.get_empty_dataframe(datatype) self.dirtyflags[datatype] = False self.data_states[datatype] = Datastate.COMPLETE @@ -388,9 +400,7 @@ def load_map_data(self, datatype: Datatype): print( f"Failed to open {datatype.name} file called '{self.filenames[datatype]}'\n" ) - print( - f"Cannot continue as {datatype.name} was not loaded\n" - ) + print(f"Cannot continue as {datatype.name} was not loaded\n") return if self.data_states[datatype] == Datastate.LOADED: # Reproject geopanda to required CRS @@ -423,14 +433,14 @@ def get_empty_dataframe(self, datatype: Datatype): data = None if datatype == Datatype.FAULT: data = geopandas.GeoDataFrame( - columns=['geometry', 'ID', 'NAME', 'DIPDIR', 'DIP'], - crs=self.working_projection - ) + columns=["geometry", "ID", "NAME", "DIPDIR", "DIP"], + crs=self.working_projection, + ) elif datatype == Datatype.FOLD: data = geopandas.GeoDataFrame( - columns=['geometry', 'ID', 'NAME', 'SYNCLINE'], - crs=self.working_projection - ) + columns=["geometry", "ID", "NAME", "SYNCLINE"], + crs=self.working_projection, + ) return data @beartype.beartype @@ -449,7 +459,7 @@ def open_http_query(url: str): try: request = urllib.Request(url, headers={"Accept-Encoding": "gzip"}) response = urllib.request.urlopen(request, timeout=30) - if response.info().get('Content-Encoding') == 'gzip': + if response.info().get("Content-Encoding") == "gzip": return GzipFile(fileobj=BytesIO(response.read())) else: return response @@ -479,7 +489,9 @@ def __retrieve_tif(self, filename: str): self.__check_and_create_tmp_path() # For gdal debugging use exceptions gdal.UseExceptions() - bb_ll = tuple(self.bounding_box_polygon.to_crs("EPSG:4326").geometry.total_bounds) + bb_ll = tuple( + self.bounding_box_polygon.to_crs("EPSG:4326").geometry.total_bounds + ) # try: if filename.lower() == "aus" or filename.lower() == "au": url = "http://services.ga.gov.au/gis/services/DEM_SRTM_1Second_over_Bathymetry_Topography/MapServer/WCSServer?" @@ -501,24 +513,29 @@ def __retrieve_tif(self, filename: str): tif = gdal.Open(tmp_file) elif filename == "hawaii": import netCDF4 + bbox_str = f"[({str(bb_ll[1])}):1:({str(bb_ll[3])})][({str(bb_ll[0])}):1:({str(bb_ll[2])})]" filename = f"https://pae-paha.pacioos.hawaii.edu/erddap/griddap/srtm30plus_v11_land.nc?elev{bbox_str}" f = urllib.request.urlopen(filename) ds = netCDF4.Dataset("in-mem-file", mode="r", memory=f.read()) - spatial = [ds.geospatial_lon_min, - ds.geospatial_lon_resolution, - 0, - ds.geospatial_lat_min, - 0, - ds.geospatial_lat_resolution] + spatial = [ + ds.geospatial_lon_min, + ds.geospatial_lon_resolution, + 0, + ds.geospatial_lat_min, + 0, + ds.geospatial_lat_resolution, + ] srs = osr.SpatialReference() srs.ImportFromEPSG(4326) driver = gdal.GetDriverByName("GTiff") - tif = driver.Create(f"/vsimem/{str(uuid4())}", - xsize=ds.dimensions['longitude'].size, - ysize=ds.dimensions['latitude'].size, - bands=1, - eType=gdal.GDT_Float32) + tif = driver.Create( + f"/vsimem/{str(uuid4())}", + xsize=ds.dimensions["longitude"].size, + ysize=ds.dimensions["latitude"].size, + bands=1, + eType=gdal.GDT_Float32, + ) tif.SetGeoTransform(spatial) tif.SetProjection(srs.ExportToWkt()) tif.GetRasterBand(1).WriteArray(numpy.flipud(ds.variables["elev"][:][:])) @@ -548,9 +565,7 @@ def load_raster_map_data(self, datatype: Datatype): self.filenames[datatype] is None or self.data_states[datatype] == Datastate.UNNAMED ): - print( - f"Datatype {datatype.name} is not set and so cannot be loaded\n" - ) + print(f"Datatype {datatype.name} is not set and so cannot be loaded\n") elif self.dirtyflags[datatype] is True: if self.data_states[datatype] == Datastate.UNLOADED: # Load data from file @@ -560,24 +575,26 @@ def load_raster_map_data(self, datatype: Datatype): # Reproject raster to required CRS try: self.data[datatype] = gdal.Warp( - '', + "", self.data[datatype], srcSRS=self.data[datatype].GetProjection(), dstSRS=self.working_projection, - format='VRT', - outputType=gdal.GDT_Float32 + format="VRT", + outputType=gdal.GDT_Float32, ) except Exception: print(f"Warp failed for {datatype.name}\n") self.data_states[datatype] = Datastate.REPROJECTED if self.data_states[datatype] == Datastate.REPROJECTED: # Clip raster image to bounding polygon - bounds = [self.bounding_box['minx'], - self.bounding_box['maxy'], - self.bounding_box['maxx'], - self.bounding_box['miny']] + bounds = [ + self.bounding_box["minx"], + self.bounding_box["maxy"], + self.bounding_box["maxx"], + self.bounding_box["miny"], + ] self.data[datatype] = gdal.Translate( - '', + "", self.data[datatype], format="VRT", outputType=gdal.GDT_Float32, @@ -621,14 +638,21 @@ def parse_structure_map(self) -> tuple: tuple: A tuple of (bool: success/fail, str: failure message) """ # Check type and size of loaded structure map - if self.raw_data[Datatype.STRUCTURE] is None or type(self.raw_data[Datatype.STRUCTURE]) is not geopandas.GeoDataFrame: + if ( + self.raw_data[Datatype.STRUCTURE] is None + or type(self.raw_data[Datatype.STRUCTURE]) is not geopandas.GeoDataFrame + ): return (True, "Structure map is not loaded or valid") if len(self.raw_data[Datatype.STRUCTURE]) < 2: - print("Stucture map does not enough orientations to complete calculations (need at least 2), projection may be inconsistent") + print( + "Stucture map does not enough orientations to complete calculations (need at least 2), projection may be inconsistent" + ) # Create new geodataframe - structure = geopandas.GeoDataFrame(self.raw_data[Datatype.STRUCTURE]["geometry"]) + structure = geopandas.GeoDataFrame( + self.raw_data[Datatype.STRUCTURE]["geometry"] + ) config = self.config.structure_config # Parse dip direction and dip columns @@ -638,9 +662,13 @@ def parse_structure_map(self) -> tuple: lambda row: (row[config["dipdir_column"]] + 90.0) % 360.0, axis=1 ) else: - structure["DIPDIR"] = self.raw_data[Datatype.STRUCTURE][config["dipdir_column"]] + structure["DIPDIR"] = self.raw_data[Datatype.STRUCTURE][ + config["dipdir_column"] + ] else: - print(f"Structure map does not contain dipdir_column '{config['dipdir_column']}'") + print( + f"Structure map does not contain dipdir_column '{config['dipdir_column']}'" + ) if config["dip_column"] in self.raw_data[Datatype.STRUCTURE]: structure["DIP"] = self.raw_data[Datatype.STRUCTURE][config["dip_column"]] @@ -649,18 +677,28 @@ def parse_structure_map(self) -> tuple: # Add bedding and overturned booleans if config["overturned_column"] in self.raw_data[Datatype.STRUCTURE]: - structure["OVERTURNED"] = self.raw_data[Datatype.STRUCTURE][config["overturned_column"]].astype(str).str.contains(config["overturned_text"]) + structure["OVERTURNED"] = ( + self.raw_data[Datatype.STRUCTURE][config["overturned_column"]] + .astype(str) + .str.contains(config["overturned_text"]) + ) else: structure["OVERTURNED"] = False if config["description_column"] in self.raw_data[Datatype.STRUCTURE]: - structure["BEDDING"] = self.raw_data[Datatype.STRUCTURE][config["description_column"]].astype(str).str.contains(config["bedding_text"]) + structure["BEDDING"] = ( + self.raw_data[Datatype.STRUCTURE][config["description_column"]] + .astype(str) + .str.contains(config["bedding_text"]) + ) else: structure["BEDDING"] = False # Add object id if config["objectid_column"] in self.raw_data[Datatype.STRUCTURE]: - structure["ID"] = self.raw_data[Datatype.STRUCTURE][config["objectid_column"]] + structure["ID"] = self.raw_data[Datatype.STRUCTURE][ + config["objectid_column"] + ] else: structure["ID"] = numpy.arange(len(structure)) @@ -676,7 +714,10 @@ def parse_geology_map(self) -> tuple: tuple: A tuple of (bool: success/fail, str: failure message) """ # Check type of loaded geology map - if self.raw_data[Datatype.GEOLOGY] is None or type(self.raw_data[Datatype.GEOLOGY]) is not geopandas.GeoDataFrame: + if ( + self.raw_data[Datatype.GEOLOGY] is None + or type(self.raw_data[Datatype.GEOLOGY]) is not geopandas.GeoDataFrame + ): return (True, "Geology map is not loaded or valid") # Create new geodataframe @@ -685,13 +726,17 @@ def parse_geology_map(self) -> tuple: # Parse unit names and codes if config["unitname_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["UNITNAME"] = self.raw_data[Datatype.GEOLOGY][config["unitname_column"]].astype(str) + geology["UNITNAME"] = self.raw_data[Datatype.GEOLOGY][ + config["unitname_column"] + ].astype(str) else: msg = f"Geology map does not contain unitname_column {config['unitname_column']}" print(msg) return (True, msg) if config["alt_unitname_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["CODE"] = self.raw_data[Datatype.GEOLOGY][config["alt_unitname_column"]].astype(str) + geology["CODE"] = self.raw_data[Datatype.GEOLOGY][ + config["alt_unitname_column"] + ].astype(str) else: msg = f"Geology map does not contain alt_unitname_column {config['alt_unitname_column']}" print(msg) @@ -699,31 +744,49 @@ def parse_geology_map(self) -> tuple: # Parse group and supergroup columns if config["group_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["GROUP"] = self.raw_data[Datatype.GEOLOGY][config["group_column"]].astype(str) + geology["GROUP"] = self.raw_data[Datatype.GEOLOGY][ + config["group_column"] + ].astype(str) else: geology["GROUP"] = "" if config["supergroup_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["SUPERGROUP"] = self.raw_data[Datatype.GEOLOGY][config["supergroup_column"]].astype(str) + geology["SUPERGROUP"] = self.raw_data[Datatype.GEOLOGY][ + config["supergroup_column"] + ].astype(str) else: geology["SUPERGROUP"] = "" # Parse description and rocktype columns for sill and intrusive flags if config["description_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["SILL"] = self.raw_data[Datatype.GEOLOGY][config["description_column"]].astype(str).str.contains(config["sill_text"]) - geology["DESCRIPTION"] = self.raw_data[Datatype.GEOLOGY][config["description_column"]].astype(str) + geology["SILL"] = ( + self.raw_data[Datatype.GEOLOGY][config["description_column"]] + .astype(str) + .str.contains(config["sill_text"]) + ) + geology["DESCRIPTION"] = self.raw_data[Datatype.GEOLOGY][ + config["description_column"] + ].astype(str) else: geology["SILL"] = False geology["DESCRIPTION"] = "" if config["rocktype_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["INTRUSIVE"] = self.raw_data[Datatype.GEOLOGY][config["rocktype_column"]].astype(str).str.contains(config["intrusive_text"]) - geology["ROCKTYPE1"] = self.raw_data[Datatype.GEOLOGY][config["rocktype_column"]].astype(str) + geology["INTRUSIVE"] = ( + self.raw_data[Datatype.GEOLOGY][config["rocktype_column"]] + .astype(str) + .str.contains(config["intrusive_text"]) + ) + geology["ROCKTYPE1"] = self.raw_data[Datatype.GEOLOGY][ + config["rocktype_column"] + ].astype(str) else: geology["INSTRUSIVE"] = False geology["ROCKTYPE1"] = "" if config["alt_rocktype_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["ROCKTYPE2"] = self.raw_data[Datatype.GEOLOGY][config["alt_rocktype_column"]].astype(str) + geology["ROCKTYPE2"] = self.raw_data[Datatype.GEOLOGY][ + config["alt_rocktype_column"] + ].astype(str) else: geology["ROCKTYPE2"] = "" @@ -731,11 +794,15 @@ def parse_geology_map(self) -> tuple: # Parse age columns if config["minage_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["MIN_AGE"] = self.raw_data[Datatype.GEOLOGY][config["minage_column"]].astype(numpy.float64) + geology["MIN_AGE"] = self.raw_data[Datatype.GEOLOGY][ + config["minage_column"] + ].astype(numpy.float64) else: geology["MIN_AGE"] = 0.0 if config["maxage_column"] in self.raw_data[Datatype.GEOLOGY]: - geology["MAX_AGE"] = self.raw_data[Datatype.GEOLOGY][config["maxage_column"]].astype(numpy.float64) + geology["MAX_AGE"] = self.raw_data[Datatype.GEOLOGY][ + config["maxage_column"] + ].astype(numpy.float64) else: geology["MAX_AGE"] = 100000.0 @@ -754,7 +821,9 @@ def parse_geology_map(self) -> tuple: geology["UNITNAME"] = geology["UNITNAME"].str.replace("[ -/?]", "_", regex=True) geology["CODE"] = geology["CODE"].str.replace("[ -/?]", "_", regex=True) geology["GROUP"] = geology["GROUP"].str.replace("[ -/?]", "_", regex=True) - geology["SUPERGROUP"] = geology["SUPERGROUP"].str.replace("[ -/?]", "_", regex=True) + geology["SUPERGROUP"] = geology["SUPERGROUP"].str.replace( + "[ -/?]", "_", regex=True + ) # Mask out ignored unit_names/codes (ie. for cover) for code in self.config.geology_config["ignore_codes"]: @@ -776,7 +845,10 @@ def parse_fault_map(self) -> tuple: tuple: A tuple of (bool: success/fail, str: failure message) """ # Check type of loaded fault map - if self.raw_data[Datatype.FAULT] is None or type(self.raw_data[Datatype.FAULT]) is not geopandas.GeoDataFrame: + if ( + self.raw_data[Datatype.FAULT] is None + or type(self.raw_data[Datatype.FAULT]) is not geopandas.GeoDataFrame + ): return (True, "Fault map is not loaded or valid") # Create new geodataframe @@ -784,20 +856,31 @@ def parse_fault_map(self) -> tuple: config = self.config.fault_config if config["structtype_column"] in self.raw_data[Datatype.FAULT]: - faults["FEATURE"] = self.raw_data[Datatype.FAULT][config["structtype_column"]] - faults = faults[faults["FEATURE"].astype(str).str.contains(config["fault_text"])] + faults["FEATURE"] = self.raw_data[Datatype.FAULT][ + config["structtype_column"] + ] + faults = faults[ + faults["FEATURE"].astype(str).str.contains(config["fault_text"]) + ] if self.verbose_level > VerboseLevel.NONE: - if len(faults) < len(self.raw_data[Datatype.GEOLOGY]) and len(faults) == 0: + if ( + len(faults) < len(self.raw_data[Datatype.GEOLOGY]) + and len(faults) == 0 + ): msg = f"Fault map reduced to 0 faults as structtype_column ({config['structtype_column']}) does not contains as row with fault_text \"{config['fault_text']}\"" print(msg) if config["name_column"] in self.raw_data[Datatype.FAULT]: - faults["NAME"] = self.raw_data[Datatype.FAULT][config["name_column"]].astype(str) + faults["NAME"] = self.raw_data[Datatype.FAULT][ + config["name_column"] + ].astype(str) else: faults["NAME"] = "Fault_" + faults.index.astype(str) if config["dip_column"] in self.raw_data[Datatype.FAULT]: - faults["DIP"] = self.raw_data[Datatype.FAULT][config["dip_column"]].astype(numpy.float64) + faults["DIP"] = self.raw_data[Datatype.FAULT][config["dip_column"]].astype( + numpy.float64 + ) else: faults["DIP"] = numpy.nan # config["dip_null_value"] @@ -807,15 +890,21 @@ def parse_fault_map(self) -> tuple: # Parse the dip direction for the fault if config["dipdir_flag"] != "alpha": if config["dipdir_column"] in self.raw_data[Datatype.FAULT]: - faults["DIPDIR"] = self.raw_data[Datatype.FAULT][config["dipdir_column"]].astype(numpy.float64) + faults["DIPDIR"] = self.raw_data[Datatype.FAULT][ + config["dipdir_column"] + ].astype(numpy.float64) else: faults["DIPDIR"] = numpy.nan else: # Take the geoDataSeries of the dipdir estimates (assume it's a string description) if config["dip_estimate_column"] in self.raw_data[Datatype.FAULT]: - dipdir_text_estimates = self.raw_data[Datatype.FAULT][config["dip_estimate_column"]].astype(str) + dipdir_text_estimates = self.raw_data[Datatype.FAULT][ + config["dip_estimate_column"] + ].astype(str) elif config["dipdir_column"] in self.raw_data[Datatype.FAULT]: - dipdir_text_estimates = self.raw_data[Datatype.FAULT][config["dipdir_column"]].astype(str) + dipdir_text_estimates = self.raw_data[Datatype.FAULT][ + config["dipdir_column"] + ].astype(str) else: dipdir_text_estimates = None faults["DIPDIR"] = numpy.nan @@ -831,12 +920,12 @@ def parse_fault_map(self) -> tuple: "east": 90.0, "south": 180.0, "west": 270.0, - } + } for direction in direction_map: - dipdir_text_estimates = dipdir_text_estimates.astype(str).str.replace( - f".*{direction}.*", - direction_map[direction], - regex=True + dipdir_text_estimates = dipdir_text_estimates.astype( + str + ).str.replace( + f".*{direction}.*", direction_map[direction], regex=True ) # Catch all for any field that still contains anything that isn't a number dipdir_text_estimates = dipdir_text_estimates.astype(str).str.replace( @@ -846,13 +935,25 @@ def parse_fault_map(self) -> tuple: # Add object id if config["objectid_column"] in self.raw_data[Datatype.FAULT]: - faults["ID"] = self.raw_data[Datatype.FAULT][config["objectid_column"]].astype(int) + faults["ID"] = self.raw_data[Datatype.FAULT][ + config["objectid_column"] + ].astype(int) else: faults["ID"] = faults.index if len(faults): - faults["NAME"] = faults.apply(lambda fault: "Fault_" + str(fault["ID"]) if fault["NAME"].lower() == "nan" else fault["NAME"], axis=1) - faults["NAME"] = faults.apply(lambda fault: "Fault_" + str(fault["ID"]) if fault["NAME"].lower() == "none" else fault["NAME"], axis=1) + faults["NAME"] = faults.apply( + lambda fault: "Fault_" + str(fault["ID"]) + if fault["NAME"].lower() == "nan" + else fault["NAME"], + axis=1, + ) + faults["NAME"] = faults.apply( + lambda fault: "Fault_" + str(fault["ID"]) + if fault["NAME"].lower() == "none" + else fault["NAME"], + axis=1, + ) faults["NAME"] = faults["NAME"].str.replace(" -/?", "_", regex=True) self.data[Datatype.FAULT] = faults @@ -867,7 +968,10 @@ def parse_fold_map(self) -> tuple: tuple: A tuple of (bool: success/fail, str: failure message) """ # Check type of loaded fold map - if self.raw_data[Datatype.FOLD] is None or type(self.raw_data[Datatype.FOLD]) is not geopandas.GeoDataFrame: + if ( + self.raw_data[Datatype.FOLD] is None + or type(self.raw_data[Datatype.FOLD]) is not geopandas.GeoDataFrame + ): return (True, "Fold map is not loaded or valid") # Create new geodataframe @@ -875,22 +979,37 @@ def parse_fold_map(self) -> tuple: config = self.config.fold_config if config["structtype_column"] in self.raw_data[Datatype.FOLD]: - folds[config["structtype_column"]] = self.raw_data[Datatype.FOLD][config["structtype_column"]] - folds = folds[folds[config["structtype_column"]].astype(str).str.contains(config["fold_text"])] + folds[config["structtype_column"]] = self.raw_data[Datatype.FOLD][ + config["structtype_column"] + ] + folds = folds[ + folds[config["structtype_column"]] + .astype(str) + .str.contains(config["fold_text"]) + ] if self.verbose_level > VerboseLevel.NONE: - if len(folds) < len(self.raw_data[Datatype.GEOLOGY]) and len(folds) == 0: + if ( + len(folds) < len(self.raw_data[Datatype.GEOLOGY]) + and len(folds) == 0 + ): msg = f"Fold map reduced to 0 folds as structtype_column ({config['structtype_column']}) does not contains any row with fold_text \"{config['fold_text']}\"" print(msg) if config["foldname_column"] in self.raw_data[Datatype.FOLD]: - folds["NAME"] = self.raw_data[Datatype.FOLD][config["foldname_column"]].astype(str) + folds["NAME"] = self.raw_data[Datatype.FOLD][ + config["foldname_column"] + ].astype(str) else: folds["NAME"] = numpy.arange(len(folds)) folds["NAME"] = "Fold_" + folds["NAME"].astype(str) # Extract syncline from description field if config["description_column"] in self.raw_data[Datatype.FOLD]: - folds["SYNCLINE"] = self.raw_data[Datatype.FOLD][config["description_column"]].astype(str).str.contains(config["synform_text"]) + folds["SYNCLINE"] = ( + self.raw_data[Datatype.FOLD][config["description_column"]] + .astype(str) + .str.contains(config["synform_text"]) + ) else: folds["SYNCLINE"] = False @@ -917,14 +1036,18 @@ def set_working_projection_on_map_data(self, datatype: Datatype): elif type(self.raw_data[datatype]) is geopandas.GeoDataFrame: if self.data_states[datatype] >= Datastate.LOADED: if self.raw_data[datatype].crs is None: - print(f"No projection on original map data, assigning to working_projection {self.working_projection}") + print( + f"No projection on original map data, assigning to working_projection {self.working_projection}" + ) self.raw_data[datatype].crs = self.working_projection else: self.raw_data[datatype].to_crs( crs=self.working_projection, inplace=True ) else: - print(f"Type of {datatype.name} map not a GeoDataFrame so cannot change map crs projection") + print( + f"Type of {datatype.name} map not a GeoDataFrame so cannot change map crs projection" + ) @beartype.beartype def save_all_map_data(self, output_dir: str, extension: str = ".csv"): @@ -983,7 +1106,10 @@ def get_raw_map_data(self, datatype: Datatype): Returns: geopandas.GeoDataFrame: The raw data """ - if self.data_states[datatype] != Datastate.COMPLETE or self.dirtyflags[datatype] is True: + if ( + self.data_states[datatype] != Datastate.COMPLETE + or self.dirtyflags[datatype] is True + ): self.load_map_data(datatype) return self.raw_data[datatype] @@ -999,7 +1125,10 @@ def get_map_data(self, datatype: Datatype): Returns: geopandas.GeoDataFrame: The dataframe """ - if self.data_states[datatype] != Datastate.COMPLETE or self.dirtyflags[datatype] is True: + if ( + self.data_states[datatype] != Datastate.COMPLETE + or self.dirtyflags[datatype] is True + ): self.load_map_data(datatype) return self.data[datatype] @@ -1051,7 +1180,9 @@ def calculate_bounding_box_and_projection(self): dict, str: The bounding box and projection of the geology shapefile """ if self.filenames[Datatype.GEOLOGY] is None: - print("Could not open geology file as none set, no bounding box or projection available") + print( + "Could not open geology file as none set, no bounding box or projection available" + ) return None, None temp_geology_filename = self.filenames[Datatype.GEOLOGY] temp_geology_filename = temp_geology_filename.replace("{BBOX_STR}", "") @@ -1066,7 +1197,9 @@ def calculate_bounding_box_and_projection(self): "maxy": bounds[3], }, geology_data.crs except Exception: - print(f"Could not open geology file {temp_geology_filename} so no bounding box or projection found") + print( + f"Could not open geology file {temp_geology_filename} so no bounding box or projection found" + ) return None, None @beartype.beartype @@ -1086,9 +1219,22 @@ def export_wkt_format_files(self): if type(self.data[Datatype.GEOLOGY]) is not geopandas.GeoDataFrame: print("Cannot export geology data as it is not a GeoDataFrame") elif self.data_states[Datatype.GEOLOGY] != Datastate.COMPLETE: - print(f"Cannot export geology data as it only loaded to {self.data_states[Datatype.GEOLOGY].name} status") + print( + f"Cannot export geology data as it only loaded to {self.data_states[Datatype.GEOLOGY].name} status" + ) else: - columns = ["geometry", "ID", "UNITNAME", "GROUP", "MIN_AGE", "MAX_AGE", "CODE", "ROCKTYPE1", "ROCKTYPE2", "DESCRIPTION"] + columns = [ + "geometry", + "ID", + "UNITNAME", + "GROUP", + "MIN_AGE", + "MAX_AGE", + "CODE", + "ROCKTYPE1", + "ROCKTYPE2", + "DESCRIPTION", + ] geology = self.get_map_data(Datatype.GEOLOGY)[columns].copy() geology.reset_index(inplace=True, drop=True) geology.rename( @@ -1100,18 +1246,28 @@ def export_wkt_format_files(self): geology["GROUP"] = geology["GROUP"].replace("None", "") geology["ROCKTYPE1"] = geology["ROCKTYPE1"].replace("", "None") geology["ROCKTYPE2"] = geology["ROCKTYPE2"].replace("", "None") - geology.to_csv(os.path.join(self.tmp_path, "map2model_data", "geology_wkt.csv"), sep="\t", index=False) + geology.to_csv( + os.path.join(self.tmp_path, "map2model_data", "geology_wkt.csv"), + sep="\t", + index=False, + ) # Check faults data status and export to a WKT format file self.load_map_data(Datatype.FAULT) if type(self.data[Datatype.FAULT]) is not geopandas.GeoDataFrame: print("Cannot export fault data as it is not a GeoDataFrame") elif self.data_states[Datatype.FAULT] != Datastate.COMPLETE: - print(f"Cannot export fault data as it only loaded to {self.data_states[Datatype.FAULT].name} status") + print( + f"Cannot export fault data as it only loaded to {self.data_states[Datatype.FAULT].name} status" + ) else: faults = self.get_map_data(Datatype.FAULT).copy() faults.rename(columns={"geometry": "WKT"}, inplace=True) - faults.to_csv(os.path.join(self.tmp_path, "map2model_data", "faults_wkt.csv"), sep="\t", index=False) + faults.to_csv( + os.path.join(self.tmp_path, "map2model_data", "faults_wkt.csv"), + sep="\t", + index=False, + ) @beartype.beartype def get_value_from_raster(self, datatype: Datatype, x, y): @@ -1135,13 +1291,17 @@ def get_value_from_raster(self, datatype: Datatype, x, y): return None inv_geotransform = gdal.InvGeoTransform(data.GetGeoTransform()) - px = int(inv_geotransform[0] + inv_geotransform[1] * x + inv_geotransform[2] * y) - py = int(inv_geotransform[3] + inv_geotransform[4] * x + inv_geotransform[5] * y) + px = int( + inv_geotransform[0] + inv_geotransform[1] * x + inv_geotransform[2] * y + ) + py = int( + inv_geotransform[3] + inv_geotransform[4] * x + inv_geotransform[5] * y + ) # Clamp values to the edges of raster if past boundary, similiar to GL_CLIP px = max(px, 0) - px = min(px, data.RasterXSize-1) + px = min(px, data.RasterXSize - 1) py = max(py, 0) - py = min(py, data.RasterYSize-1) + py = min(py, data.RasterYSize - 1) val = data.ReadAsArray(px, py, 1, 1)[0][0] return val @@ -1163,8 +1323,12 @@ def __value_from_raster(self, inv_geotransform, data, x: float, y: float): Returns: float or int: The value at the point specified """ - px = int(inv_geotransform[0] + inv_geotransform[1] * x + inv_geotransform[2] * y) - py = int(inv_geotransform[3] + inv_geotransform[4] * x + inv_geotransform[5] * y) + px = int( + inv_geotransform[0] + inv_geotransform[1] * x + inv_geotransform[2] * y + ) + py = int( + inv_geotransform[3] + inv_geotransform[4] * x + inv_geotransform[5] * y + ) # Clamp values to the edges of raster if past boundary, similiar to GL_CLIP px = max(px, 0) px = min(px, data.shape[0] - 1) @@ -1187,7 +1351,7 @@ def get_value_from_raster_df(self, datatype: Datatype, df: pandas.DataFrame): pandas.DataFrame: The modified dataframe """ if len(df) <= 0: - df['Z'] = [] + df["Z"] = [] return df data = self.get_map_data(datatype) if data is None: @@ -1197,13 +1361,12 @@ def get_value_from_raster_df(self, datatype: Datatype, df: pandas.DataFrame): inv_geotransform = gdal.InvGeoTransform(data.GetGeoTransform()) data_array = numpy.array(data.GetRasterBand(1).ReadAsArray().T) - df['Z'] = df.apply(lambda row: - self.__value_from_raster(inv_geotransform, - data_array, - row['X'], - row['Y']), - axis=1 - ) + df["Z"] = df.apply( + lambda row: self.__value_from_raster( + inv_geotransform, data_array, row["X"], row["Y"] + ), + axis=1, + ) return df @beartype.beartype @@ -1217,26 +1380,34 @@ def extract_all_contacts(self, save_contacts=True): if self.get_map_data(Datatype.FAULT) is not None: faults = self.get_map_data(Datatype.FAULT).copy() faults["geometry"] = faults.buffer(50) - geology = geopandas.overlay(geology, faults, how="difference", keep_geom_type=False) + geology = geopandas.overlay( + geology, faults, how="difference", keep_geom_type=False + ) units = geology["UNITNAME"].unique() column_names = ["UNITNAME_1", "UNITNAME_2", "geometry"] - contacts = geopandas.GeoDataFrame(crs=geology.crs, columns=column_names, data=None) + contacts = geopandas.GeoDataFrame( + crs=geology.crs, columns=column_names, data=None + ) while len(units) > 1: unit1 = units[0] units = units[1:] for unit2 in units: if unit1 != unit2: - join = geopandas.overlay(geology[geology["UNITNAME"] == unit1], - geology[geology["UNITNAME"] == unit2], - keep_geom_type=False)[column_names] - join['geometry'] = join.buffer(1) - buffered = geology[geology["UNITNAME"] == unit2][["geometry"]].copy() + join = geopandas.overlay( + geology[geology["UNITNAME"] == unit1], + geology[geology["UNITNAME"] == unit2], + keep_geom_type=False, + )[column_names] + join["geometry"] = join.buffer(1) + buffered = geology[geology["UNITNAME"] == unit2][ + ["geometry"] + ].copy() buffered["geometry"] = buffered.boundary end = geopandas.overlay(buffered, join, keep_geom_type=False) if len(end): contacts = pandas.concat([contacts, end], ignore_index=True) # contacts["TYPE"] = "UNKNOWN" - contacts['length'] = [row.length for row in contacts['geometry']] + contacts["length"] = [row.length for row in contacts["geometry"]] if save_contacts: self.contacts = contacts return contacts @@ -1253,13 +1424,19 @@ def extract_basal_contacts(self, stratigraphic_column: list, save_contacts=True) units = stratigraphic_column basal_contacts = self.contacts.copy() basal_contacts["ID"] = basal_contacts.apply( - lambda row: min(units.index(row["UNITNAME_1"]), units.index(row["UNITNAME_2"])), axis=1 + lambda row: min( + units.index(row["UNITNAME_1"]), units.index(row["UNITNAME_2"]) + ), + axis=1, ) basal_contacts["basal_unit"] = basal_contacts.apply( lambda row: units[row["ID"]], axis=1 ) basal_contacts["distance"] = basal_contacts.apply( - lambda row: abs(units.index(row["UNITNAME_1"]) - units.index(row["UNITNAME_2"])), axis=1 + lambda row: abs( + units.index(row["UNITNAME_1"]) - units.index(row["UNITNAME_2"]) + ), + axis=1, ) basal_contacts["type"] = basal_contacts.apply( lambda row: "ABNORMAL" if abs(row["distance"]) > 1 else "BASAL", axis=1 @@ -1283,13 +1460,26 @@ def colour_units(self, stratigraphic_units: pandas.DataFrame, random: bool = Fal Returns: pandas.DataFrame: The modified units """ + if self.colour_filename is None: return stratigraphic_units - colour_lookup = pandas.read_csv(self.colour_filename, sep=",") + try: + colour_lookup = pandas.read_csv(self.colour_filename, sep=",") + except FileNotFoundError: + print(f"Colour Lookup file {self.colour_filename} not found") + return stratigraphic_units colour_lookup["colour"] = colour_lookup["colour"].str.upper() if "UNITNAME" in colour_lookup.columns and "colour" in colour_lookup.columns: - stratigraphic_units = stratigraphic_units.merge(colour_lookup, left_on="name", right_on="UNITNAME", suffixes=("_old", ""), how="left").fillna("#000000") + stratigraphic_units = stratigraphic_units.merge( + colour_lookup, + left_on="name", + right_on="UNITNAME", + suffixes=("_old", ""), + how="left", + ).fillna("#000000") stratigraphic_units.drop(columns=["UNITNAME", "colour_old"], inplace=True) else: - print(f"Colour Lookup file {self.colour_filename} does not contain 'UNITNAME' or 'colour' field") + print( + f"Colour Lookup file {self.colour_filename} does not contain 'UNITNAME' or 'colour' field" + ) return stratigraphic_units