diff --git a/config/params_template.env b/config/params_template.env index 8312bea9..4818326e 100644 --- a/config/params_template.env +++ b/config/params_template.env @@ -37,6 +37,7 @@ export healed_hand_hydrocondition=true #### apply bathymetry adjustment to rating curve #### export bathymetry_adjust=True +export ai_toggle=0 #### estimating bankfull stage in SRCs #### # Toggle to run identify_bankfull routine (True=on; False=off) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 4a955fd3..9f7ea5f4 100755 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,37 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## vx.x.x.x - 2025-01-08 - [PR#1340](https://github.com/NOAA-OWP/inundation-mapping/pull/1340) + +This PR focuses on adjusting rating curves by using bathymetric data and optimized channel roughness values. The bathymetry data includes eHydro surveys and AI-based datasets created for all NWM streams. New manning roughness values were developed for each feature-id using a differential evolution objective function (OF). The OF minimizes the number of the false_positives and false_negatives cells in our flood inundation maps where we have test cases across the CONUS. + +Even though the Python scripts of roughness manning number optimization were not included in this branch, optimized roughness values can be found here: `/fim-data/inputs/rating_curve/variable_roughness/mannings_optz_fe_clusters_so3.csv`. Detailed python scripts also can be found here: `/fim-data/outputs/heidi-mannN-optimization/projects/bathy_mannN_projects/dev-bathymetric-adjustment-mannN-optz/`. + +### Changes +- `src/bathymetric-adjustment.py`: `correct_rating_for_ai_based_bathymetry` function was added to the script. This function processes AI-based bathymetry data and adjusts rating curves using this data. Also `apply_src_adjustment_for_bathymetry` function was added to prioritize USACE eHydro over AI-based bathymetry dataset. The multi-processing function `multi_process_hucs` was updated based on the latest code. Also, an ai_toggle parameter was added to `apply_src_adjustment_for_bathymetry` and `process_bathy_adjustment` functions. When ai_toggle = 1, The SRCs will be adjusted with the ai_based bathymetry data. the default value for ai_toggle = 0, means no ai_based bathy data is included. + +- `src/bash_variables.env`: New variables and their paths were added. Also, a new input file with the nwm feature_ids and optimized channel roughness and overbank roughness attributes was created and stored here: +`/fim-data/inputs/rating_curve/variable_roughness/mannings_optz_fe_clusters_so3.csv` +The locations of these files were also added to the `bash_variables.env`. +Please note that when ai_toggle = 1, the manning roughness values should be switched to `vmann_input_file=${inputsDir}/rating_curve/variable_roughness/mannings_optz_fe_clusters_so3.csv` in the current version. + +Here is a list of new/updated input files: + +1. `/fim-data/inputs/rating_curve/variable_roughness/mannings_optz_fe_clusters_so3.csv` +This CSV file contains the new optimized roughness values. It will replace this file: +`vmann_input_file=${inputsDir}/rating_curve/variable_roughness/mannings_global_nwm3.csv` + +2. `bathy_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet` +This file contains the ml-bathymetry and manning roughness values data. + +3. `bathy_file_ehydro=${inputsDir}/bathymetry/final_bathymetry_ehydro.gpkg` +We already had this file, the name of the variable has changed from `bathymetry_file` to `bathy_file_ehydro`, and it was updated. + +- `fim_post_processing.sh`: New arguments were added. Please note that the default value for ai_toggle = 0 is included here. + +

+ + ## v4.5.13.7 - 2025-01-10 - [PR#1379](https://github.com/NOAA-OWP/inundation-mapping/pull/1379) There are many sites in non-CONUS regions (AK, PR, HI) where we would like to run CatFIM but they are being excluded because they are not NWM forecast points. This update brings back the double API pull and adds in some code to filter out duplicate (and NULL) lids from the metadata lists. @@ -148,8 +179,6 @@ As part of this PR, small modification was applied to bridge_inundation.py.

- - ## v4.5.11.3 - 2024-10-25 - [PR#1320](https://github.com/NOAA-OWP/inundation-mapping/pull/1320) The fix: During the post processing scan for the word "error" or "warning", it was only finding records which had either of those two words as stand alone words and not part of bigger phrases. ie); "error" was found, but not "fielderror". Added wildcards and it is now fixed. diff --git a/fim_post_processing.sh b/fim_post_processing.sh index 91db004d..04431af5 100755 --- a/fim_post_processing.sh +++ b/fim_post_processing.sh @@ -161,15 +161,18 @@ Tcount ## RUN BATHYMETRY ADJUSTMENT ROUTINE ## if [ "$bathymetry_adjust" = "True" ]; then - l_echo $startDiv"Performing Bathymetry Adjustment routine" - Tstart + echo -e $startDiv"Performing Bathymetry Adjustment routine" # Run bathymetry adjustment routine + aibathy_toggle=${ai_toggle} #:-0} + Tstart python3 $srcDir/bathymetric_adjustment.py \ -fim_dir $outputDestDir \ - -bathy $bathymetry_file \ + -bathy_ehydro $bathy_file_ehydro \ + -bathy_aibased $bathy_file_aibased \ -buffer $wbd_buffer \ -wbd $inputsDir/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg \ - -j $jobLimit + -j $jobLimit \ + -ait $aibathy_toggle Tcount fi diff --git a/src/bash_variables.env b/src/bash_variables.env index fff5d785..0a4ebec5 100644 --- a/src/bash_variables.env +++ b/src/bash_variables.env @@ -31,7 +31,9 @@ export input_nwm_lakes_Alaska=${inputsDir}/nwm_hydrofabric/nwm_ export input_WBD_gdb=${inputsDir}/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg export input_WBD_gdb_Alaska=${inputsDir}/wbd/WBD_National_South_Alaska.gpkg export input_calib_points_dir=${inputsDir}/rating_curve/water_edge_database/calibration_points/ -export bathymetry_file=${inputsDir}/bathymetry/bathymetric_adjustment_data.gpkg +export bathy_file_ehydro=${inputsDir}/bathymetry/final_bathymetry_ehydro.gpkg +export bathy_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet +export mannN_file_aibased=${inputsDir}/bathymetry/ml_outputs_v1.01.parquet export osm_bridges=${inputsDir}/osm/bridges/241002/osm_all_bridges.gpkg # input file location with nwm feature_id and recurrence flow values @@ -39,6 +41,7 @@ export bankfull_flows_file=${inputsDir}/rating_curve/bankfull_flow # input file location with nwm feature_id and channel roughness and overbank roughness attributes export vmann_input_file=${inputsDir}/rating_curve/variable_roughness/mannings_global_nwm3.csv +# export vmann_input_file=${inputsDir}/rating_curve/variable_roughness/mannings_optz_fe_clusters_so3.csv # input file location with nwm feature_id and recurrence flow values export nwm_recur_file=${inputsDir}/rating_curve/nwm_recur_flows/nwm3_17C_recurrence_flows_cfs.csv diff --git a/src/bathymetric_adjustment.py b/src/bathymetric_adjustment.py old mode 100755 new mode 100644 index 6e1fe687..46925934 --- a/src/bathymetric_adjustment.py +++ b/src/bathymetric_adjustment.py @@ -6,18 +6,18 @@ import sys import traceback from argparse import ArgumentParser -from concurrent.futures import ProcessPoolExecutor +from concurrent.futures import ProcessPoolExecutor, as_completed from os.path import join import geopandas as gpd import pandas as pd -from utils.shared_functions import progress_bar_handler - -def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): +# ------------------------------------------------------- +# Adjusting synthetic rating curves using 'USACE eHydro' bathymetry data +def correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose): """Function for correcting synthetic rating curves. It will correct each branch's - SRCs in serial based on the feature_ids in the input bathy_file. + SRCs in serial based on the feature_ids in the input eHydro bathy_file. Parameters ---------- @@ -25,8 +25,8 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): Directory path for fim_pipeline output. huc : str HUC-8 string. - bathy_file : str - Path to bathymetric adjustment geopackage, e.g. + bathy_file_ehydro : str + Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". verbose : bool Verbose printing. @@ -37,12 +37,12 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): """ - log_text = f'Calculating bathymetry adjustment: {huc}\n' + log_text = f'Calculating eHydro bathymetry adjustment: {huc}\n' # Load wbd and use it as a mask to pull the bathymetry data fim_huc_dir = join(fim_dir, huc) wbd8_clp = gpd.read_file(join(fim_huc_dir, 'wbd8_clp.gpkg'), engine="pyogrio", use_arrow=True) - bathy_data = gpd.read_file(bathy_file, mask=wbd8_clp, engine="fiona") + bathy_data = gpd.read_file(bathy_file_ehydro, mask=wbd8_clp, engine="fiona") bathy_data = bathy_data.rename(columns={'ID': 'feature_id'}) # Get src_full from each branch @@ -55,14 +55,14 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): # Update src parameters with bathymetric data for src in src_all_branches: - src_df = pd.read_csv(src) + src_df = pd.read_csv(src, low_memory=False) if 'Bathymetry_source' in src_df.columns: src_df = src_df.drop(columns='Bathymetry_source') branch = re.search(r'branches/(\d{10}|0)/', src).group()[9:-1] log_text += f' Branch: {branch}\n' if bathy_data.empty: - log_text += ' There were no bathymetry feature_ids for this branch' + log_text += ' There were no eHydro bathymetry feature_ids for this branch' src_df['Bathymetry_source'] = [""] * len(src_df) src_df.to_csv(src, index=False) return log_text @@ -86,13 +86,15 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): 'Bathymetry_source' ].first() src_df = src_df.merge(reconciled_bathy_data, on='feature_id', how='left', validate='many_to_one') - # Exit if there are no recalculations to be made - if ~src_df['Bathymetry_source'].any(axis=None): - log_text += ' No matching feature_ids in this branch\n' - continue + + # # Exit if there are no recalculations to be made + # if ~src_df['Bathymetry_source'].any(axis=None): + # log_text += ' No matching feature_ids in this branch\n' + # continue src_df['missing_xs_area_m2'] = src_df['missing_xs_area_m2'].fillna(0.0) src_df['missing_wet_perimeter_m'] = src_df['missing_wet_perimeter_m'].fillna(0.0) + # Add missing hydraulic geometry into base parameters src_df['Volume (m3)'] = src_df['Volume (m3)'] + ( src_df['missing_xs_area_m2'] * (src_df['LENGTHKM'] * 1000) @@ -111,6 +113,7 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): * pow(src_df['SLOPE'], 0.5) / src_df['ManningN'] ) + # Force zero stage to have zero discharge src_df.loc[src_df['Stage'] == 0, ['Discharge (m3s-1)']] = 0 # Calculate number of adjusted HydroIDs @@ -119,10 +122,279 @@ def correct_rating_for_bathymetry(fim_dir, huc, bathy_file, verbose): # Write src back to file src_df.to_csv(src, index=False) log_text += f' Successfully recalculated {count} HydroIDs\n' + + return log_text + + +# ------------------------------------------------------- +# Adjusting synthetic rating curves using 'AI-based' bathymetry data +def correct_rating_for_ai_bathymetry(fim_dir, huc, strm_order, bathy_file_aibased): + """ + Function for correcting synthetic rating curves. It will correct each branch's + SRCs in serial based on the feature_ids in the input AI-based bathy_file. + + Parameters + ---------- + fim_dir : str + Directory path for fim_pipeline output. + huc : str + HUC-8 string. + strm_order : int + stream order on or higher for which you want to apply AI-based bathymetry data. + default = 4 + bathy_file_aibased : str + Path to AI-based bathymetric adjustment file, e.g. + "/data/inputs/bathymetry/ml_outputs_v1.01.parquet". + verbose : bool + Verbose printing. + + Returns + ---------- + log_text : str + + """ + log_text = f'Calculating AI-based bathymetry adjustment: {huc}\n' + print(f'Calculating AI-based bathymetry adjustment: {huc}\n') + + # Load AI-based bathymetry data + ml_bathy_data = pd.read_parquet(bathy_file_aibased, engine='pyarrow') + ml_bathy_data_df = ml_bathy_data[ + ['COMID', 'owp_tw_inchan', 'owp_inchan_channel_area', 'owp_inchan_channel_perimeter'] + ] + + fim_huc_dir = join(fim_dir, huc) + + path_nwm_streams = join(fim_huc_dir, "nwm_subset_streams.gpkg") + nwm_stream = gpd.read_file(path_nwm_streams) + + wbd8 = gpd.read_file(join(fim_huc_dir, 'wbd.gpkg'), engine="pyogrio", use_arrow=True) + nwm_stream_clp = nwm_stream.clip(wbd8) + + ml_bathy_data_df = ml_bathy_data_df.merge( + nwm_stream_clp[['ID', 'order_']], left_on='COMID', right_on='ID' + ) + aib_bathy_data_df = ml_bathy_data_df.drop(columns=['ID']) + + aib_bathy_data_df = aib_bathy_data_df.rename(columns={'COMID': 'feature_id'}) + aib_bathy_data_df = aib_bathy_data_df.rename(columns={'owp_inchan_channel_area': 'missing_xs_area_m2'}) + + # Calculating missing_wet_perimeter_m and adding it to aib_bathy_data_gdf + missing_wet_perimeter_m = ( + aib_bathy_data_df['owp_inchan_channel_perimeter'] - aib_bathy_data_df['owp_tw_inchan'] + ) + aib_bathy_data_df['missing_wet_perimeter_m'] = missing_wet_perimeter_m + aib_bathy_data_df['Bathymetry_source'] = "AI_Based" + + # Excluding streams with order lower than desired order (default = 4) + aib_bathy_data_df.loc[ + aib_bathy_data_df["order_"] < strm_order, + ["missing_xs_area_m2", "missing_wet_perimeter_m", "Bathymetry_source"], + ] = 0 + aib_df0 = aib_bathy_data_df[ + ['feature_id', 'missing_xs_area_m2', 'missing_wet_perimeter_m', 'Bathymetry_source'] + ] + + # test = aib_df[aib_df.duplicated(subset='feature_id', keep=False)] + aib_df = aib_df0.drop_duplicates(subset=['feature_id'], keep='first') + aib_df.index = range(len(aib_df)) + print(f'Adjusting SRCs only with EHydro Bathymetry Data: {huc}\n') + + # Get src_full from each branch + src_all_branches_path = [] + branches = os.listdir(join(fim_huc_dir, 'branches')) + for branch in branches: + src_full = join(fim_huc_dir, 'branches', str(branch), f'src_full_crosswalked_{branch}.csv') + if os.path.isfile(src_full): + src_all_branches_path.append(src_full) + + # Update src parameters with bathymetric data + for src in src_all_branches_path: + src_df = pd.read_csv(src, low_memory=False) + # print(src_df.loc[~src_df['Bathymetry_source'].isna()]['Bathymetry_source']) + + src_name = os.path.basename(src) + branch = src_name.split(".")[0].split("_")[-1] + log_text += f' Branch: {branch}\n' + + # Merge in missing ai bathy data and fill Nans + if 'missing_xs_area_m2' not in src_df.columns: + src_df.drop(columns=["Bathymetry_source"], inplace=True) + src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') + + src_df['missing_xs_area_m2'] = src_df['missing_xs_area_m2'].fillna(0.0) + src_df['missing_wet_perimeter_m'] = src_df['missing_wet_perimeter_m'].fillna(0.0) + + # Add missing hydraulic geometry into base parameters + src_df['Volume (m3)'] = src_df['Volume (m3)'] + ( + src_df['missing_xs_area_m2'] * (src_df['LENGTHKM'] * 1000) + ) + src_df['BedArea (m2)'] = src_df['BedArea (m2)'] + ( + src_df['missing_wet_perimeter_m'] * (src_df['LENGTHKM'] * 1000) + ) + # Recalc discharge with adjusted geometries + src_df['WettedPerimeter (m)'] = src_df['WettedPerimeter (m)'] + src_df['missing_wet_perimeter_m'] + src_df['WetArea (m2)'] = src_df['WetArea (m2)'] + src_df['missing_xs_area_m2'] + src_df['HydraulicRadius (m)'] = src_df['WetArea (m2)'] / src_df['WettedPerimeter (m)'] + src_df['HydraulicRadius (m)'] = src_df['HydraulicRadius (m)'].fillna(0) + src_df['Discharge (m3s-1)'] = ( + src_df['WetArea (m2)'] + * pow(src_df['HydraulicRadius (m)'], 2.0 / 3) + * pow(src_df['SLOPE'], 0.5) + / src_df['ManningN'] + ) + # Force zero stage to have zero discharge + src_df.loc[src_df['Stage'] == 0, ['Discharge (m3s-1)']] = 0 + + # Write src back to file + src_df.to_csv(src, index=False) + + else: + src_df = src_df.merge(aib_df, on='feature_id', how='left', validate='many_to_one') + # checked + + src_df.loc[src_df["Bathymetry_source_x"].isna(), ["missing_xs_area_m2_x"]] = src_df[ + "missing_xs_area_m2_y" + ] + src_df.loc[src_df["Bathymetry_source_x"].isna(), ["missing_wet_perimeter_m_x"]] = src_df[ + "missing_wet_perimeter_m_y" + ] + src_df.loc[src_df["Bathymetry_source_x"].isna(), ["Bathymetry_source_x"]] = src_df[ + "Bathymetry_source_y" + ] + # checked + + src_df.drop( + columns=["missing_xs_area_m2_y", "missing_wet_perimeter_m_y", "Bathymetry_source_y"], + inplace=True, + ) + src_df = src_df.rename(columns={'missing_xs_area_m2_x': 'missing_xs_area_m2'}) + src_df = src_df.rename(columns={'missing_wet_perimeter_m_x': 'missing_wet_perimeter_m'}) + src_df = src_df.rename(columns={'Bathymetry_source_x': 'Bathymetry_source'}) + + src_df['missing_xs_area_m2'] = src_df['missing_xs_area_m2'].fillna(0.0) + src_df['missing_wet_perimeter_m'] = src_df['missing_wet_perimeter_m'].fillna(0.0) + + # Add missing hydraulic geometry into base parameters + Volume_m3 = src_df['Volume (m3)'] + (src_df['missing_xs_area_m2'] * (src_df['LENGTHKM'] * 1000)) + src_df.loc[src_df["Bathymetry_source"] == "AI_Based", ["Volume (m3)"]] = Volume_m3 + # src_df['Volume (m3)'] = src_df['Volume (m3)'] + ( + # src_df['missing_xs_area_m2'] * (src_df['LENGTHKM'] * 1000)) + + BedArea_m2 = src_df['BedArea (m2)'] + ( + src_df['missing_wet_perimeter_m'] * (src_df['LENGTHKM'] * 1000) + ) + src_df.loc[src_df["Bathymetry_source"] == "AI_Based", ["BedArea (m2)"]] = BedArea_m2 + # src_df['BedArea (m2)'] = src_df['BedArea (m2)'] + ( + # src_df['missing_wet_perimeter_m'] * (src_df['LENGTHKM'] * 1000)) + + # Recalc discharge with adjusted geometries + WettedPerimeter_m = src_df['WettedPerimeter (m)'] + src_df['missing_wet_perimeter_m'] + src_df.loc[src_df["Bathymetry_source"] == "AI_Based", ["WettedPerimeter (m)"]] = WettedPerimeter_m + # src_df['WettedPerimeter (m)'] = src_df['WettedPerimeter (m)'] + src_df['missing_wet_perimeter_m'] + Wetarea_m2 = src_df['WetArea (m2)'] + src_df['missing_xs_area_m2'] + src_df.loc[src_df["Bathymetry_source"] == "AI_Based", ["WetArea (m2)"]] = Wetarea_m2 + # src_df['WetArea (m2)'] = src_df['WetArea (m2)'] + src_df['missing_xs_area_m2'] + HydraulicRadius_m = src_df['WetArea (m2)'] / src_df['WettedPerimeter (m)'] + src_df.loc[src_df["Bathymetry_source"] == "AI_Based", ["HydraulicRadius (m)"]] = HydraulicRadius_m + # src_df['HydraulicRadius (m)'] = src_df['WetArea (m2)'] / src_df['WettedPerimeter (m)'] + src_df['HydraulicRadius (m)'] = src_df['HydraulicRadius (m)'].fillna(0) + + dicharge_cms = ( + src_df['WetArea (m2)'] + * pow(src_df['HydraulicRadius (m)'], 2.0 / 3) + * pow(src_df['SLOPE'], 0.5) + / src_df['ManningN'] + ) + src_df.loc[src_df["Bathymetry_source"] == "AI_Based", ["Discharge (m3s-1)"]] = dicharge_cms + # src_df['Discharge (m3s-1)'] = ( + # src_df['WetArea (m2)'] + # * pow(src_df['HydraulicRadius (m)'], 2.0 / 3) + # * pow(src_df['SLOPE'], 0.5) + # / src_df['ManningN'] + # ) + + # Force zero stage to have zero discharge + src_df.loc[src_df['Stage'] == 0, ['Discharge (m3s-1)']] = 0 + + # Write src back to file + src_df.to_csv(src, index=False) + return log_text -def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose): +# -------------------------------------------------------- +# Apply src_adjustment_for_bathymetry +def apply_src_adjustment_for_bathymetry( + fim_dir, huc, strm_order, bathy_file_ehydro, bathy_file_aibased, ai_toggle, verbose, log_file_path +): + """ + Function for applying both eHydro & AI-based bathymetry adjustment to synthetic rating curves. + + Note: Any failure in here will be logged when it can be but will not abort the Multi-Proc + + Parameters + ---------- + Please refer to correct_rating_for_ehydro_bathymetry and + correct_rating_for_ai_based_bathymetry functions parameters. + + Returns + ---------- + log_text : str + """ + log_text = "" + try: + if os.path.exists(bathy_file_ehydro): + msg = f"Correcting rating curve for ehydro bathy for huc : {huc}" + log_text += msg + '\n' + print(msg) + log_text += correct_rating_for_ehydro_bathymetry(fim_dir, huc, bathy_file_ehydro, verbose) + else: + print(f'USACE eHydro bathymetry file does not exist for huc: {huc}') + + except Exception: + log_text += f"An error has occurred while processing ehydro bathy for huc {huc}" + log_text += traceback.format_exc() + + try: + with open(log_file_path, "a") as log_file: + log_file.write(log_text + '\n') + except Exception: + print(f"Error trying to write to the log file of {log_file_path}") + + if ai_toggle == 1: + try: + if os.path.exists(bathy_file_aibased): + msg = f"Correcting rating curve for AI-based bathy for huc : {huc}" + log_text += msg + '\n' + print(msg + '\n') + + log_text += correct_rating_for_ai_bathymetry( + fim_dir, huc, strm_order, bathy_file_aibased + ) # , ai_toggle + else: + print(f'AI-based bathymetry file does not exist for huc : {huc}') + + except Exception: + log_text += f"An error has occurred while processing AI-based bathy for huc {huc}" + log_text += traceback.format_exc() + + with open(log_file_path, "a") as log_file: + log_file.write(log_text + '\n') + + +# ------------------------------------------------------- +def process_bathy_adjustment( + fim_dir, + strm_order, + bathy_file_ehydro, + bathy_file_aibased, + wbd_buffer, + wbd, + output_suffix, + number_of_jobs, + ai_toggle, + verbose, +): """Function for correcting synthetic rating curves. It will correct each branch's SRCs in serial based on the feature_ids in the input bathy_file. @@ -130,9 +402,15 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb ---------- fim_dir : str Directory path for fim_pipeline output. - bathy_file : str - Path to bathymetric adjustment geopackage, e.g. + strm_order : int + stream order on or higher for which you want to apply AI-based bathymetry data. + default = 4 + bathy_file_eHydro : str + Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". + bathy_file_aibased : str + Path to AI-based bathymetric adjustment file, e.g. + "/data/inputs/bathymetry/ml_outputs_v1.01.parquet". wbd_buffer : int Distance in meters to buffer wbd dataset when searching for relevant HUCs. wbd : str @@ -146,80 +424,102 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb Verbose printing. """ - # Set up log file - print( - 'Writing progress to log file here: ' - + str(join(fim_dir, 'logs', 'bathymetric_adjustment' + output_suffix + '.log')) - ) + log_file_path = os.path.join(fim_dir, 'logs', 'bathymetric_adjustment' + output_suffix + '.log') + print(f'Writing progress to log file here: {log_file_path}') print('This may take a few minutes...') ## Create a time var to log run time - begin_time = dt.datetime.now() + begin_time = dt.datetime.now(dt.timezone.utc) ## Initiate log file - log_file = open(join(fim_dir, 'logs', 'bathymetric_adjustment' + output_suffix + '.log'), "w") - log_file.write('START TIME: ' + str(begin_time) + '\n') - log_file.write('#########################################################\n\n') + with open(log_file_path, "w") as log_file: + log_file.write('START TIME: ' + str(begin_time) + '\n') + log_file.write('#########################################################\n\n') + # Let log_text build up starting here until the bottom. + log_text = "" # Exit program if the bathymetric data doesn't exist - if not os.path.exists(bathy_file): - statement = f'The input bathymetry file {bathy_file} does not exist. Exiting...' - log_file.write(statement) - print(statement) - sys.exit(0) - - # Find applicable HUCs to apply bathymetric adjustment - # NOTE: This block can be removed if we have estimated bathymetry data for - # the whole domain later. + if ai_toggle == 1: + if not all([os.path.exists(bathy_file_ehydro), os.path.exists(bathy_file_aibased)]): + statement = f'The input bathymetry files {bathy_file_ehydro} and {bathy_file_aibased} do not exist. Exiting...' + with open(log_file_path, "w") as log_file: + log_file.write(statement) + print(statement) + sys.exit(0) + else: + if not os.path.exists(bathy_file_ehydro): + statement = f'The input bathymetry file {bathy_file_ehydro} does not exist. Exiting...' + with open(log_file_path, "w") as log_file: + log_file.write(statement) + print(statement) + sys.exit(0) + + # Find applicable HUCs to apply ehydro bathymetric adjustment fim_hucs = [h for h in os.listdir(fim_dir) if re.match(r'\d{8}', h)] - bathy_gdf = gpd.read_file(bathy_file, engine="pyogrio", use_arrow=True) + bathy_gdf = gpd.read_file(bathy_file_ehydro, engine="pyogrio", use_arrow=True) buffered_bathy = bathy_gdf.geometry.buffer(wbd_buffer) # We buffer the bathymetric data to get adjacent wbd = gpd.read_file( wbd, mask=buffered_bathy, engine="fiona" ) # HUCs that could also have bathymetric reaches included hucs_with_bathy = wbd.HUC8.to_list() hucs = [h for h in fim_hucs if h in hucs_with_bathy] - log_file.write(f"Identified {len(hucs)} HUCs that have bathymetric data: {hucs}\n") - print(f"Identified {len(hucs)} HUCs that have bathymetric data\n") + hucs.sort() + msg = f"Identified {len(hucs)} HUCs that have USACE eHydro bathymetric data: {hucs}\n" + log_text += msg + print(msg) + + if ai_toggle == 1: + msg = f"AI-Based bathymetry data is applied on streams with order {strm_order} or higher\n" + log_text += msg + print(msg) - # Set up multiprocessor with ProcessPoolExecutor(max_workers=number_of_jobs) as executor: - # Loop through all test cases, build the alpha test arguments, and submit them to the process pool - executor_dict = {} - for huc in hucs: - arg_keeper = {'fim_dir': fim_dir, 'huc': huc, 'bathy_file': bathy_file, 'verbose': verbose} - future = executor.submit(correct_rating_for_bathymetry, **arg_keeper) - executor_dict[future] = huc - - # Send the executor to the progress bar and wait for all tasks to finish - progress_bar_handler(executor_dict, f"Running BARC on {len(hucs)} HUCs") - # Get the returned logs and write to the log file - for future in executor_dict.keys(): - try: - log_file.write(future.result()) - except Exception as ex: - print(f"ERROR: {executor_dict[future]} BARC failed for some reason") - log_file.write(f"ERROR --> {executor_dict[future]} BARC failed (details: *** {ex} )\n") - traceback.print_exc(file=log_file) + # Loop through all hucs, build the arguments, and submit them to the process pool + futures = {} + for huc in fim_hucs: + args = { + 'fim_dir': fim_dir, + 'huc': huc, + 'strm_order': strm_order, + 'bathy_file_ehydro': bathy_file_ehydro, + 'bathy_file_aibased': bathy_file_aibased, + 'ai_toggle': ai_toggle, + 'verbose': verbose, + 'log_file_path': log_file_path, + } + future = executor.submit(apply_src_adjustment_for_bathymetry, **args) + futures[future] = future + + for future in as_completed(futures): + if future is not None: + if future.exception(): + raise future.exception() ## Record run time and close log file - end_time = dt.datetime.now() - log_file.write('END TIME: ' + str(end_time) + '\n') + end_time = dt.datetime.now(dt.timezone.utc) + log_text += 'END TIME: ' + str(end_time) + '\n' tot_run_time = end_time - begin_time - log_file.write('TOTAL RUN TIME: ' + str(tot_run_time)) + log_text += 'TOTAL RUN TIME: ' + str(tot_run_time).split('.')[0] log_file.close() if __name__ == '__main__': + """ Parameters ---------- fim_dir : str Directory path for fim_pipeline output. Log file will be placed in fim_dir/logs/bathymetric_adjustment.log. - bathy_file : str - Path to bathymetric adjustment geopackage, e.g. + strm_order : int + stream order on or higher for which you want to apply AI-based bathymetry data. + default = 4 + bathy_file_ehydro : str + Path to eHydro bathymetric adjustment geopackage, e.g. "/data/inputs/bathymetry/bathymetry_adjustment_data.gpkg". + bathy_file_aibased : str + Path to AI-based bathymetric adjustment file, e.g. + "/data/inputs/bathymetry/ml_outputs_v1.01.parquet". wbd_buffer : int Distance in meters to buffer wbd dataset when searching for relevant HUCs. wbd : str @@ -228,23 +528,38 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb output_suffix : str Optional. Output filename suffix. Defaults to no suffix. number_of_jobs : int - Optional. Number of CPU cores to parallelize HUC processing. Defaults to 8. + Optional. Number of CPU cores to parallelize HUC processing. Defaults to 1. verbose : bool Optional flag for enabling verbose printing. Sample Usage ---------- - python3 /foss_fim/src/bathymetric_adjustment.py -fim_dir /outputs/fim_run_dir -bathy /data/inputs/bathymetry/bathymetry_adjustment_data.gpkg + python3 /foss_fim/src/bathymetric_adjustment.py -fim_dir /outputs/fim_run_dir + -bathy_eHydro /data/inputs/bathymetry/bathymetric_adjustment_data.gpkg + -bathy_aibased /data/inputs/bathymetry/ml_outputs_v1.01.parquet -buffer 5000 -wbd /data/inputs/wbd/WBD_National_EPSG_5070_WBDHU8_clip_dem_domain.gpkg -j $jobLimit - """ - parser = ArgumentParser(description="Bathymetric Adjustment") parser.add_argument('-fim_dir', '--fim-dir', help='FIM output dir', required=True, type=str) parser.add_argument( - '-bathy', - '--bathy_file', - help="Path to geopackage with preprocessed bathymetic data", + '-sor', + '--strm_order', + help="stream order on or higher for which AI-based bathymetry data is applied", + default=4, + required=False, + type=int, + ) + parser.add_argument( + '-bathy_ehydro', + '--bathy_file_ehydro', + help="Path to geopackage with preprocessed eHydro bathymetic data", + required=True, + type=str, + ) + parser.add_argument( + '-bathy_aibased', + '--bathy_file_aibased', + help="Path to parquet file with preprocessed AI-based bathymetic data", required=True, type=str, ) @@ -273,9 +588,17 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb parser.add_argument( '-j', '--number-of-jobs', - help='OPTIONAL: number of workers (default=8)', + help='OPTIONAL: number of workers (default=1)', required=False, - default=8, + default=1, + type=int, + ) + parser.add_argument( + '-ait', + '--ai_toggle', + help='Toggle to apply ai_based bathymetry, ait = 1', + required=False, + default=0, type=int, ) parser.add_argument( @@ -290,11 +613,25 @@ def multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, numb args = vars(parser.parse_args()) fim_dir = args['fim_dir'] - bathy_file = args['bathy_file'] + strm_order = args['strm_order'] + bathy_file_ehydro = args['bathy_file_ehydro'] + bathy_file_aibased = args['bathy_file_aibased'] wbd_buffer = int(args['wbd_buffer']) wbd = args['wbd'] output_suffix = args['output_suffix'] number_of_jobs = args['number_of_jobs'] + ai_toggle = args['ai_toggle'] verbose = bool(args['verbose']) - multi_process_hucs(fim_dir, bathy_file, wbd_buffer, wbd, output_suffix, number_of_jobs, verbose) + process_bathy_adjustment( + fim_dir, + strm_order, + bathy_file_ehydro, + bathy_file_aibased, + wbd_buffer, + wbd, + output_suffix, + number_of_jobs, + ai_toggle, + verbose, + ) diff --git a/src/delineate_hydros_and_produce_HAND.sh b/src/delineate_hydros_and_produce_HAND.sh index 16f68f3b..9e1fbbc6 100755 --- a/src/delineate_hydros_and_produce_HAND.sh +++ b/src/delineate_hydros_and_produce_HAND.sh @@ -218,6 +218,17 @@ if [ -f $tempCurrentBranchDataDir/LandSea_subset_$current_branch_id.tif ]; then --outfile=$tempCurrentBranchDataDir/"rem_zeroed_masked_$current_branch_id.tif" fi +# ## HEAL HAND -- REMOVES HYDROCONDITIONING ARTIFACTS ## +# if [ "$healed_hand_hydrocondition" = true ]; then +# echo -e $startDiv"Healed HAND to Remove Hydro-conditioning Artifacts $hucNumber $current_branch_id" +# gdal_calc.py --quiet --type=Float32 --overwrite --co "COMPRESS=LZW" --co "BIGTIFF=YES" --co "TILED=YES" \ +# -R $tempCurrentBranchDataDir/rem_zeroed_masked_$current_branch_id.tif \ +# -D $tempCurrentBranchDataDir/dem_meters_$current_branch_id.tif \ +# -T $tempCurrentBranchDataDir/dem_thalwegCond_$current_branch_id.tif \ +# --calc="R+(D-T)" --NoDataValue=$ndv \ +# --outfile=$tempCurrentBranchDataDir/"rem_zeroed_masked_$current_branch_id.tif" +# fi + ## HYDRAULIC PROPERTIES ## echo -e $startDiv"Sample reach averaged parameters $hucNumber $current_branch_id" $taudemDir/catchhydrogeo -hand $tempCurrentBranchDataDir/rem_zeroed_masked_$current_branch_id.tif \