From 6e4fe41b0d805629230db2ea85442a14bef12003 Mon Sep 17 00:00:00 2001 From: Benjamin Schmidt Date: Tue, 26 Nov 2024 17:38:32 +0100 Subject: [PATCH] fix(config): missed some config spots. Will squash later --- COSIPY.py | 165 ++++++++++++++++++++------------- cosipy/cpkernel/cosipy_core.py | 2 +- 2 files changed, 100 insertions(+), 67 deletions(-) diff --git a/COSIPY.py b/COSIPY.py index 013fb0a..1ce6082 100644 --- a/COSIPY.py +++ b/COSIPY.py @@ -74,28 +74,33 @@ def main(): # Measure time start_time = datetime.now() - #----------------------------------------------- + # ----------------------------------------------- # Create a client for distributed calculations - #----------------------------------------------- - if Config.slurm_use: - SlurmConfig() + # ----------------------------------------------- + if main_config.slurm_use and slurm_config is not None: with SLURMCluster( - job_name=SlurmConfig.name, - cores=SlurmConfig.cores, - processes=SlurmConfig.cores, - memory=SlurmConfig.memory, - account=SlurmConfig.account, - job_extra_directives=SlurmConfig.slurm_parameters, - local_directory=SlurmConfig.local_directory, + job_name=slurm_config.name, + cores=slurm_config.cores, + processes=slurm_config.cores, + memory=slurm_config.memory, + account=slurm_config.account, + job_extra_directives=slurm_config.slurm_parameters, + local_directory=slurm_config.local_directory, ) as cluster: - cluster.scale(SlurmConfig.nodes * SlurmConfig.cores) + cluster.scale(slurm_config.nodes * slurm_config.cores) print(cluster.job_script()) print("You are using SLURM!\n") print(cluster) run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures) else: - with LocalCluster(scheduler_port=Config.local_port, n_workers=Config.workers, local_directory='logs/dask-worker-space', threads_per_worker=1, silence_logs=True) as cluster: + with LocalCluster( + scheduler_port=main_config.local_port, + n_workers=main_config.workers, + local_directory="logs/dask-worker-space", + threads_per_worker=1, + silence_logs=True, + ) as cluster: print(cluster) run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures) @@ -103,10 +108,12 @@ def main(): print_notice(msg="Write results ...") start_writing = datetime.now() - #----------------------------------------------- + # ----------------------------------------------- # Write results and restart files - #----------------------------------------------- - timestamp = pd.to_datetime(str(IO.get_restart().time.values)).strftime('%Y-%m-%dT%H-%M') + # ----------------------------------------------- + timestamp = pd.to_datetime(str(IO.get_restart().time.values)).strftime( + "%Y-%m-%dT%H-%M" + ) encoding = dict() for var in IO.get_result().data_vars: @@ -115,8 +122,8 @@ def main(): # dtype = 'int16' # FillValue = -9999 # scale_factor, add_offset = compute_scale_and_offset(dataMin, dataMax, 16) - #encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue) - encoding[var] = dict(zlib=True, complevel=Config.compression_level) + # encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue) + encoding[var] = dict(zlib=True, complevel=main_config.compression_level) output_netcdf = set_output_netcdf_path() output_path = create_data_directory(name="output") IO.get_result().to_netcdf(output_path / output_netcdf, encoding=encoding, mode="w") @@ -128,8 +135,8 @@ def main(): # dtype = 'int16' # FillValue = -9999 # scale_factor, add_offset = compute_scale_and_offset(dataMin, dataMax, 16) - #encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue) - encoding[var] = dict(zlib=True, complevel=Config.compression_level) + # encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue) + encoding[var] = dict(zlib=True, complevel=main_config.compression_level) restart_path = create_data_directory(name='restart') IO.get_restart().to_netcdf(restart_path / f'restart_{timestamp}.nc', encoding=encoding) @@ -140,20 +147,16 @@ def main(): duration_run = datetime.now() - start_time duration_run_writing = datetime.now() - start_writing - #----------------------------------------------- + # ----------------------------------------------- # Print out some information - #----------------------------------------------- - get_time_required( - action="write restart and output files", times=duration_run_writing - ) + # ----------------------------------------------- + get_time_required(action="write restart and output files", times=duration_run_writing) run_time = duration_run.total_seconds() print(f"\tTotal run duration: {run_time // 60.0:4g} minutes {run_time % 60.0:2g} seconds\n") print_notice(msg="\tSIMULATION WAS SUCCESSFUL") def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): - Config() - Constants() with Client(cluster) as client: print_notice(msg="\tStarting clients and submitting jobs ...") @@ -161,23 +164,30 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): print(client) # Get dimensions of the whole domain - # ny = DATA.sizes[Config.northing] - # nx = DATA.sizes[Config.easting] + # ny = DATA.sizes[config.northing] + # nx = DATA.sizes[config.easting] # cp = cProfile.Profile() # Get some information about the cluster/nodes - total_grid_points = DATA.sizes[Config.northing]*DATA.sizes[Config.easting] - if Config.slurm_use: - total_cores = SlurmConfig.cores * SlurmConfig.nodes + total_grid_points = DATA.sizes[main_config.northing] * DATA.sizes[main_config.easting] + if main_config.slurm_use and slurm_config is not None: + total_cores = slurm_config.cores * slurm_config.nodes points_per_core = total_grid_points // total_cores print(total_grid_points, total_cores, points_per_core) # Check if evaluation is selected: - if Config.stake_evaluation: + if main_config.stake_evaluation: # Read stake data (data must be given as cumulative changes) - df_stakes_loc = pd.read_csv(Config.stakes_loc_file, delimiter='\t', na_values='-9999') - df_stakes_data = pd.read_csv(Config.stakes_data_file, delimiter='\t', index_col='TIMESTAMP', na_values='-9999') + df_stakes_loc = pd.read_csv( + main_config.stakes_loc_file, delimiter="\t", na_values="-9999" + ) + df_stakes_data = pd.read_csv( + main_config.stakes_data_file, + delimiter="\t", + index_col="TIMESTAMP", + na_values="-9999", + ) df_stakes_data.index = pd.to_datetime(df_stakes_data.index) # Uncomment, if stake data is given as changes between measurements @@ -188,7 +198,7 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): df_val = df_stakes_data.copy() # reshape and stack coordinates - if Config.WRF: + if main_config.WRF: coords = np.column_stack((DATA.lat.values.ravel(), DATA.lon.values.ravel())) else: # in case lat/lon are 1D coordinates @@ -214,8 +224,10 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): # Distribute data and model to workers start_res = datetime.now() - for y,x in product(range(DATA.sizes[Config.northing]),range(DATA.sizes[Config.easting])): - if Config.stake_evaluation: + for y, x in product( + range(DATA.sizes[main_config.northing]), range(DATA.sizes[main_config.easting]) + ): + if main_config.stake_evaluation: stake_names = [] # Check if the grid cell contain stakes and store the stake names in a list for idx, (stake_loc_y, stake_loc_x, stake_name) in enumerate(stakes_list): @@ -224,13 +236,22 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): else: stake_names = None - if Config.WRF: + if main_config.WRF: mask = DATA.MASK.sel(south_north=y, west_east=x) # Provide restart grid if necessary - if (mask == 1) and (not Config.restart): + if (mask == 1) and (not main_config.restart): check_for_nan(data=DATA.sel(south_north=y, west_east=x)) - futures.append(client.submit(cosipy_core, DATA.sel(south_north=y, west_east=x), y, x, stake_names=stake_names, stake_data=df_stakes_data)) - elif (mask == 1) and (Config.restart): + futures.append( + client.submit( + cosipy_core, + DATA.sel(south_north=y, west_east=x), + y, + x, + stake_names=stake_names, + stake_data=df_stakes_data, + ) + ) + elif (mask == 1) and (main_config.restart): check_for_nan(data=DATA.sel(south_north=y, west_east=x)) futures.append( client.submit( @@ -249,36 +270,44 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): else: mask = DATA.MASK.isel(lat=y, lon=x) # Provide restart grid if necessary - if (mask == 1) and (not Config.restart): - check_for_nan(data=DATA.isel(lat=y,lon=x)) - futures.append(client.submit(cosipy_core, DATA.isel(lat=y, lon=x), y, x, stake_names=stake_names, stake_data=df_stakes_data)) - elif (mask == 1) and (Config.restart): - check_for_nan(data=DATA.isel(lat=y,lon=x)) + if (mask == 1) and (not main_config.restart): + check_for_nan(data=DATA.isel(lat=y, lon=x)) futures.append( client.submit( cosipy_core, DATA.isel(lat=y, lon=x), y, x, - GRID_RESTART=IO.create_grid_restart().isel( - lat=y, lon=x - ), stake_names=stake_names, - stake_data=df_stakes_data + stake_data=df_stakes_data, + ) + ) + elif (mask == 1) and (main_config.restart): + check_for_nan(data=DATA.isel(lat=y, lon=x)) + futures.append( + client.submit( + cosipy_core, + DATA.isel(lat=y, lon=x), + y, + x, + GRID_RESTART=IO.create_grid_restart().isel(lat=y, lon=x), + stake_names=stake_names, + stake_data=df_stakes_data, ) ) # Finally, do the calculations and print the progress progress(futures) - #--------------------------------------- + # --------------------------------------- # Guarantee that restart file is closed - #--------------------------------------- - if Config.restart: + # --------------------------------------- + if main_config.restart: IO.get_grid_restart().close() # Create numpy arrays which aggregates all local results IO.create_global_result_arrays() + # Create numpy arrays which aggregates all local results IO.create_global_restart_arrays() @@ -286,7 +315,6 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): # Assign local results to global #--------------------------------------- for future in as_completed(futures): - # Get the results from the workers indY, indX, local_restart, RAIN, SNOWFALL, LWin, LWout, H, LE, B, \ QRR, MB, surfMB, Q, SNOWHEIGHT, TOTALHEIGHT, TS, ALBEDO, \ @@ -311,22 +339,22 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): # Write restart data to file IO.write_restart_to_file() - if Config.stake_evaluation: + if main_config.stake_evaluation: # Store evaluation of stake measurements to dataframe - stat = stat.rename('rmse') + stat = stat.rename("rmse") df_stat = pd.concat([df_stat, stat]) for i in stake_names: - if Config.obs_type == 'mb': + if main_config.obs_type == "mb": df_val[i] = df_eval.mb - if Config.obs_type == 'snowheight': + if main_config.obs_type == "snowheight": df_val[i] = df_eval.snowheight # Measure time - end_res = datetime.now()-start_res + end_res = datetime.now() - start_res get_time_required(action="do calculations", times=end_res) - if Config.stake_evaluation: + if main_config.stake_evaluation: # Save the statistics and the mass balance simulations at the stakes to files output_path = create_data_directory(name="output") df_stat.to_csv( @@ -341,15 +369,19 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures): ) -def create_data_directory(name: Union[Path, str]) -> Path: +def create_data_directory(name: Path | str) -> Path: """Create a directory in the configured data folder. + Args: + name: Name of the directory to create. If a Path object is passed, the (base)name + of the path is used. + Returns: Path to the created directory. """ if isinstance(name, Path): name = name.name - dir_path = Path(Config.data_path) / str(name) + dir_path = Path(main_config.data_path) / str(name) dir_path.mkdir(parents=True, exist_ok=True) return dir_path @@ -373,9 +405,10 @@ def set_output_netcdf_path() -> Path: Returns: The path to the output netCDF file. """ - time_start = get_timestamp_label(timestamp=Config.time_start) - time_end = get_timestamp_label(timestamp=Config.time_end) - return Path(f"{Config.output_prefix}_{time_start}-{time_end}.nc") + time_start = get_timestamp_label(timestamp=main_config.time_start) + time_end = get_timestamp_label(timestamp=main_config.time_end) + return Path(f"{main_config.output_prefix}_{time_start}-{time_end}.nc") + def start_logging(): diff --git a/cosipy/cpkernel/cosipy_core.py b/cosipy/cpkernel/cosipy_core.py index 9a51977..ff40139 100644 --- a/cosipy/cpkernel/cosipy_core.py +++ b/cosipy/cpkernel/cosipy_core.py @@ -233,7 +233,7 @@ def cosipy_core(DATA, indY, indX, GRID_RESTART=None, stake_names=None, stake_dat # get seconds since start # timestamp = dt*t - # if Config.WRF_X_CSPY: + # if main_config.WRF_X_CSPY: # timestamp = np.float64(DATA.CURR_SECS.values) # Calc fresh snow density