Skip to content

Commit

Permalink
fix(config): missed some config spots. Will squash later
Browse files Browse the repository at this point in the history
  • Loading branch information
benatouba committed Nov 26, 2024
1 parent e7ee96b commit 6e4fe41
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 67 deletions.
165 changes: 99 additions & 66 deletions COSIPY.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,39 +74,46 @@ def main():
# Measure time
start_time = datetime.now()

#-----------------------------------------------
# -----------------------------------------------
# Create a client for distributed calculations
#-----------------------------------------------
if Config.slurm_use:
SlurmConfig()
# -----------------------------------------------
if main_config.slurm_use and slurm_config is not None:
with SLURMCluster(
job_name=SlurmConfig.name,
cores=SlurmConfig.cores,
processes=SlurmConfig.cores,
memory=SlurmConfig.memory,
account=SlurmConfig.account,
job_extra_directives=SlurmConfig.slurm_parameters,
local_directory=SlurmConfig.local_directory,
job_name=slurm_config.name,
cores=slurm_config.cores,
processes=slurm_config.cores,
memory=slurm_config.memory,
account=slurm_config.account,
job_extra_directives=slurm_config.slurm_parameters,
local_directory=slurm_config.local_directory,
) as cluster:
cluster.scale(SlurmConfig.nodes * SlurmConfig.cores)
cluster.scale(slurm_config.nodes * slurm_config.cores)
print(cluster.job_script())
print("You are using SLURM!\n")
print(cluster)
run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures)

else:
with LocalCluster(scheduler_port=Config.local_port, n_workers=Config.workers, local_directory='logs/dask-worker-space', threads_per_worker=1, silence_logs=True) as cluster:
with LocalCluster(
scheduler_port=main_config.local_port,
n_workers=main_config.workers,
local_directory="logs/dask-worker-space",
threads_per_worker=1,
silence_logs=True,
) as cluster:
print(cluster)
run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures)

print("\n")
print_notice(msg="Write results ...")
start_writing = datetime.now()

#-----------------------------------------------
# -----------------------------------------------
# Write results and restart files
#-----------------------------------------------
timestamp = pd.to_datetime(str(IO.get_restart().time.values)).strftime('%Y-%m-%dT%H-%M')
# -----------------------------------------------
timestamp = pd.to_datetime(str(IO.get_restart().time.values)).strftime(
"%Y-%m-%dT%H-%M"
)

encoding = dict()
for var in IO.get_result().data_vars:
Expand All @@ -115,8 +122,8 @@ def main():
# dtype = 'int16'
# FillValue = -9999
# scale_factor, add_offset = compute_scale_and_offset(dataMin, dataMax, 16)
#encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue)
encoding[var] = dict(zlib=True, complevel=Config.compression_level)
# encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue)
encoding[var] = dict(zlib=True, complevel=main_config.compression_level)
output_netcdf = set_output_netcdf_path()
output_path = create_data_directory(name="output")
IO.get_result().to_netcdf(output_path / output_netcdf, encoding=encoding, mode="w")
Expand All @@ -128,8 +135,8 @@ def main():
# dtype = 'int16'
# FillValue = -9999
# scale_factor, add_offset = compute_scale_and_offset(dataMin, dataMax, 16)
#encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue)
encoding[var] = dict(zlib=True, complevel=Config.compression_level)
# encoding[var] = dict(zlib=True, complevel=compression_level, dtype=dtype, scale_factor=scale_factor, add_offset=add_offset, _FillValue=FillValue)
encoding[var] = dict(zlib=True, complevel=main_config.compression_level)

restart_path = create_data_directory(name='restart')
IO.get_restart().to_netcdf(restart_path / f'restart_{timestamp}.nc', encoding=encoding)
Expand All @@ -140,44 +147,47 @@ def main():
duration_run = datetime.now() - start_time
duration_run_writing = datetime.now() - start_writing

#-----------------------------------------------
# -----------------------------------------------
# Print out some information
#-----------------------------------------------
get_time_required(
action="write restart and output files", times=duration_run_writing
)
# -----------------------------------------------
get_time_required(action="write restart and output files", times=duration_run_writing)
run_time = duration_run.total_seconds()
print(f"\tTotal run duration: {run_time // 60.0:4g} minutes {run_time % 60.0:2g} seconds\n")
print_notice(msg="\tSIMULATION WAS SUCCESSFUL")


def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures):
Config()
Constants()

with Client(cluster) as client:
print_notice(msg="\tStarting clients and submitting jobs ...")
print(cluster)
print(client)

# Get dimensions of the whole domain
# ny = DATA.sizes[Config.northing]
# nx = DATA.sizes[Config.easting]
# ny = DATA.sizes[config.northing]
# nx = DATA.sizes[config.easting]

# cp = cProfile.Profile()

# Get some information about the cluster/nodes
total_grid_points = DATA.sizes[Config.northing]*DATA.sizes[Config.easting]
if Config.slurm_use:
total_cores = SlurmConfig.cores * SlurmConfig.nodes
total_grid_points = DATA.sizes[main_config.northing] * DATA.sizes[main_config.easting]
if main_config.slurm_use and slurm_config is not None:
total_cores = slurm_config.cores * slurm_config.nodes
points_per_core = total_grid_points // total_cores
print(total_grid_points, total_cores, points_per_core)

# Check if evaluation is selected:
if Config.stake_evaluation:
if main_config.stake_evaluation:
# Read stake data (data must be given as cumulative changes)
df_stakes_loc = pd.read_csv(Config.stakes_loc_file, delimiter='\t', na_values='-9999')
df_stakes_data = pd.read_csv(Config.stakes_data_file, delimiter='\t', index_col='TIMESTAMP', na_values='-9999')
df_stakes_loc = pd.read_csv(
main_config.stakes_loc_file, delimiter="\t", na_values="-9999"
)
df_stakes_data = pd.read_csv(
main_config.stakes_data_file,
delimiter="\t",
index_col="TIMESTAMP",
na_values="-9999",
)
df_stakes_data.index = pd.to_datetime(df_stakes_data.index)

# Uncomment, if stake data is given as changes between measurements
Expand All @@ -188,7 +198,7 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures):
df_val = df_stakes_data.copy()

# reshape and stack coordinates
if Config.WRF:
if main_config.WRF:
coords = np.column_stack((DATA.lat.values.ravel(), DATA.lon.values.ravel()))
else:
# in case lat/lon are 1D coordinates
Expand All @@ -214,8 +224,10 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures):

# Distribute data and model to workers
start_res = datetime.now()
for y,x in product(range(DATA.sizes[Config.northing]),range(DATA.sizes[Config.easting])):
if Config.stake_evaluation:
for y, x in product(
range(DATA.sizes[main_config.northing]), range(DATA.sizes[main_config.easting])
):
if main_config.stake_evaluation:
stake_names = []
# Check if the grid cell contain stakes and store the stake names in a list
for idx, (stake_loc_y, stake_loc_x, stake_name) in enumerate(stakes_list):
Expand All @@ -224,13 +236,22 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures):
else:
stake_names = None

if Config.WRF:
if main_config.WRF:
mask = DATA.MASK.sel(south_north=y, west_east=x)
# Provide restart grid if necessary
if (mask == 1) and (not Config.restart):
if (mask == 1) and (not main_config.restart):
check_for_nan(data=DATA.sel(south_north=y, west_east=x))
futures.append(client.submit(cosipy_core, DATA.sel(south_north=y, west_east=x), y, x, stake_names=stake_names, stake_data=df_stakes_data))
elif (mask == 1) and (Config.restart):
futures.append(
client.submit(
cosipy_core,
DATA.sel(south_north=y, west_east=x),
y,
x,
stake_names=stake_names,
stake_data=df_stakes_data,
)
)
elif (mask == 1) and (main_config.restart):
check_for_nan(data=DATA.sel(south_north=y, west_east=x))
futures.append(
client.submit(
Expand All @@ -249,44 +270,51 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures):
else:
mask = DATA.MASK.isel(lat=y, lon=x)
# Provide restart grid if necessary
if (mask == 1) and (not Config.restart):
check_for_nan(data=DATA.isel(lat=y,lon=x))
futures.append(client.submit(cosipy_core, DATA.isel(lat=y, lon=x), y, x, stake_names=stake_names, stake_data=df_stakes_data))
elif (mask == 1) and (Config.restart):
check_for_nan(data=DATA.isel(lat=y,lon=x))
if (mask == 1) and (not main_config.restart):
check_for_nan(data=DATA.isel(lat=y, lon=x))
futures.append(
client.submit(
cosipy_core,
DATA.isel(lat=y, lon=x),
y,
x,
GRID_RESTART=IO.create_grid_restart().isel(
lat=y, lon=x
),
stake_names=stake_names,
stake_data=df_stakes_data
stake_data=df_stakes_data,
)
)
elif (mask == 1) and (main_config.restart):
check_for_nan(data=DATA.isel(lat=y, lon=x))
futures.append(
client.submit(
cosipy_core,
DATA.isel(lat=y, lon=x),
y,
x,
GRID_RESTART=IO.create_grid_restart().isel(lat=y, lon=x),
stake_names=stake_names,
stake_data=df_stakes_data,
)
)
# Finally, do the calculations and print the progress
progress(futures)

#---------------------------------------
# ---------------------------------------
# Guarantee that restart file is closed
#---------------------------------------
if Config.restart:
# ---------------------------------------
if main_config.restart:
IO.get_grid_restart().close()

# Create numpy arrays which aggregates all local results
IO.create_global_result_arrays()


# Create numpy arrays which aggregates all local results
IO.create_global_restart_arrays()

#---------------------------------------
# Assign local results to global
#---------------------------------------
for future in as_completed(futures):

# Get the results from the workers
indY, indX, local_restart, RAIN, SNOWFALL, LWin, LWout, H, LE, B, \
QRR, MB, surfMB, Q, SNOWHEIGHT, TOTALHEIGHT, TS, ALBEDO, \
Expand All @@ -311,22 +339,22 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures):
# Write restart data to file
IO.write_restart_to_file()

if Config.stake_evaluation:
if main_config.stake_evaluation:
# Store evaluation of stake measurements to dataframe
stat = stat.rename('rmse')
stat = stat.rename("rmse")
df_stat = pd.concat([df_stat, stat])

for i in stake_names:
if Config.obs_type == 'mb':
if main_config.obs_type == "mb":
df_val[i] = df_eval.mb
if Config.obs_type == 'snowheight':
if main_config.obs_type == "snowheight":
df_val[i] = df_eval.snowheight

# Measure time
end_res = datetime.now()-start_res
end_res = datetime.now() - start_res
get_time_required(action="do calculations", times=end_res)

if Config.stake_evaluation:
if main_config.stake_evaluation:
# Save the statistics and the mass balance simulations at the stakes to files
output_path = create_data_directory(name="output")
df_stat.to_csv(
Expand All @@ -341,15 +369,19 @@ def run_cosipy(cluster, IO, DATA, RESULT, RESTART, futures):
)


def create_data_directory(name: Union[Path, str]) -> Path:
def create_data_directory(name: Path | str) -> Path:
"""Create a directory in the configured data folder.
Args:
name: Name of the directory to create. If a Path object is passed, the (base)name
of the path is used.
Returns:
Path to the created directory.
"""
if isinstance(name, Path):
name = name.name
dir_path = Path(Config.data_path) / str(name)
dir_path = Path(main_config.data_path) / str(name)
dir_path.mkdir(parents=True, exist_ok=True)

return dir_path
Expand All @@ -373,9 +405,10 @@ def set_output_netcdf_path() -> Path:
Returns:
The path to the output netCDF file.
"""
time_start = get_timestamp_label(timestamp=Config.time_start)
time_end = get_timestamp_label(timestamp=Config.time_end)
return Path(f"{Config.output_prefix}_{time_start}-{time_end}.nc")
time_start = get_timestamp_label(timestamp=main_config.time_start)
time_end = get_timestamp_label(timestamp=main_config.time_end)
return Path(f"{main_config.output_prefix}_{time_start}-{time_end}.nc")



def start_logging():
Expand Down
2 changes: 1 addition & 1 deletion cosipy/cpkernel/cosipy_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def cosipy_core(DATA, indY, indX, GRID_RESTART=None, stake_names=None, stake_dat

# get seconds since start
# timestamp = dt*t
# if Config.WRF_X_CSPY:
# if main_config.WRF_X_CSPY:
# timestamp = np.float64(DATA.CURR_SECS.values)

# Calc fresh snow density
Expand Down

0 comments on commit 6e4fe41

Please sign in to comment.