From 71d20c0aafcc91499ef650808f371368cda4fc31 Mon Sep 17 00:00:00 2001 From: bnb32 Date: Sun, 7 Jul 2024 12:33:06 -0600 Subject: [PATCH] needed to break up tmy / agg / blend with separate collect methods so they can be called through cli.pipeline --- nsrdb/cli.py | 226 +++++++++++++++++++++++++----------- nsrdb/create_configs.py | 2 +- tests/cli/test_agg_cli.py | 2 +- tests/cli/test_blend_cli.py | 2 +- tests/cli/test_tmy_cli.py | 4 +- 5 files changed, 160 insertions(+), 76 deletions(-) diff --git a/nsrdb/cli.py b/nsrdb/cli.py index b8ee36e9..adb5c6b2 100755 --- a/nsrdb/cli.py +++ b/nsrdb/cli.py @@ -713,55 +713,39 @@ def collect_final(ctx, config, verbose=False, pipeline_step=None): is_flag=True, help='Flag to turn on debug logging. Default is False.', ) -@click.option( - '--collect', - is_flag=True, - help='Flag to collect blended chunks into a single final file.', -) @click.pass_context -def blend(ctx, config, verbose=False, pipeline_step=None, collect=False): +def blend(ctx, config, verbose=False, pipeline_step=None): """Blend files from separate domains (e.g. east / west) into a single domain.""" - mod_name = ModuleName.COLLECT_BLEND if collect else ModuleName.BLEND - config = BaseCLI.from_config_preflight( ctx=ctx, - module_name=mod_name, + module_name=ModuleName.BLEND, config=config, verbose=verbose, pipeline_step=pipeline_step, ) - if collect: + file_tags = config.get('file_tag', 'all') + file_tags = ( + file_tags + if file_tags != 'all' + else ['_'.join(k.split('_')[1:-1]) for k in NSRDB.OUTS] + ) + + file_tags = file_tags if isinstance(file_tags, list) else [file_tags] + for file_tag in file_tags: + log_id = file_tag + config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{log_id}' + config['file_tag'] = file_tag BaseCLI.kickoff_job( ctx=ctx, - module_name=mod_name, - func=Collector.collect_dir, + module_name=ModuleName.BLEND, + func=Blender.run_full, config=config, + log_id=log_id, ) - else: - file_tags = config.get('file_tag', 'all') - file_tags = ( - file_tags - if file_tags != 'all' - else ['_'.join(k.split('_')[1:-1]) for k in NSRDB.OUTS] - ) - - file_tags = file_tags if isinstance(file_tags, list) else [file_tags] - for file_tag in file_tags: - log_id = file_tag - config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{log_id}' - config['file_tag'] = file_tag - BaseCLI.kickoff_job( - ctx=ctx, - module_name=mod_name, - func=Blender.run_full, - config=config, - log_id=log_id, - ) - @main.command() @click.option( @@ -769,7 +753,7 @@ def blend(ctx, config, verbose=False, pipeline_step=None, collect=False): '-c', type=str, required=True, - help='Path to config file with kwargs for NSRDB.aggregate_files()', + help='Path to config file with kwargs for Collector.collect_dir()', ) @click.option( '-v', @@ -777,34 +761,108 @@ def blend(ctx, config, verbose=False, pipeline_step=None, collect=False): is_flag=True, help='Flag to turn on debug logging. Default is False.', ) +@click.pass_context +def collect_blend(ctx, config, verbose=False, pipeline_step=None): + """Collect blended files into a single file with multiple datasets.""" + + BaseCLI.kickoff_single( + ctx=ctx, + module_name=ModuleName.COLLECT_BLEND, + func=Collector.collect_dir, + config=config, + verbose=verbose, + pipeline_step=pipeline_step, + ) + + +@main.command() @click.option( - '--collect', + '--config', + '-c', + type=str, + required=True, + help='Path to config file with kwargs for NSRDB.aggregate_files()', +) +@click.option( + '-v', + '--verbose', is_flag=True, - help='Flag to collect aggregated chunks into a single final file.', + help='Flag to turn on debug logging. Default is False.', ) @click.pass_context -def aggregate(ctx, config, verbose=False, pipeline_step=None, collect=False): +def aggregate(ctx, config, verbose=False, pipeline_step=None): """Aggregate data files to a lower resolution. NOTE: Used to create data files from high-resolution years (2018+) which match resolution of low-resolution years (pre 2018) """ - func = Collector.collect_dir if collect else Manager.run_chunk - mod_name = ( - ModuleName.COLLECT_AGGREGATE if collect else ModuleName.AGGREGATE + + BaseCLI.kickoff_multichunk( + ctx=ctx, + module_name=ModuleName.AGGREGATE, + func=Manager.run_chunk, + config=config, + verbose=verbose, + pipeline_step=pipeline_step, ) - kickoff_func = ( - BaseCLI.kickoff_single if collect else BaseCLI.kickoff_multichunk + + +@main.command() +@click.option( + '--config', + '-c', + type=str, + required=True, + help='Path to config file with kwargs for Collector.collect_dir()', +) +@click.option( + '-v', + '--verbose', + is_flag=True, + help='Flag to turn on debug logging. Default is False.', +) +@click.pass_context +def collect_aggregate(ctx, config, verbose=False, pipeline_step=None): + """Collect aggregate data files into a single file with multiple + datasets.""" + + BaseCLI.kickoff_single( + ctx=ctx, + module_name=ModuleName.COLLECT_AGGREGATE, + func=Collector.collect_dir, + config=config, + verbose=verbose, + pipeline_step=pipeline_step, ) - kickoff_func( + +def _run_or_collect_tmy( + ctx, config, verbose=False, pipeline_step=None, collect=False +): + mod_name = ModuleName.COLLECT_TMY if collect else ModuleName.TMY + config = BaseCLI.from_config_preflight( ctx=ctx, module_name=mod_name, - func=func, config=config, verbose=verbose, pipeline_step=pipeline_step, ) + tmy_types = config.pop('tmy_types', ['tmy', 'tgy', 'tdy']) + fn_out = config.pop('fn_out', 'tmy.h5') + out_dir = config['out_dir'] + for tmy_type in tmy_types: + func = TmyRunner.collect if collect else TmyRunner.tmy + config['tmy_type'] = tmy_type + config['out_dir'] = os.path.join(out_dir, f'{tmy_type}/') + config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{tmy_type}' + config['fn_out'] = fn_out.replace('.h5', f'_{tmy_type}.h5') + BaseCLI.kickoff_job( + ctx=ctx, + module_name=mod_name, + func=func, + config=config, + log_id=tmy_type, + ) @main.command() @@ -822,13 +880,8 @@ def aggregate(ctx, config, verbose=False, pipeline_step=None, collect=False): is_flag=True, help='Flag to turn on debug logging. Default is False.', ) -@click.option( - '--collect', - is_flag=True, - help='Flag to collect tmy chunks into a single final file.', -) @click.pass_context -def tmy(ctx, config, verbose=False, pipeline_step=None, collect=False): +def tmy(ctx, config, verbose=False, pipeline_step=None): """Create tmy files for given input files. You would call the nsrdb tmy module using:: @@ -853,30 +906,58 @@ def tmy(ctx, config, verbose=False, pipeline_step=None, collect=False): } """ # noqa : D301 - mod_name = ModuleName.COLLECT_TMY if collect else ModuleName.TMY - config = BaseCLI.from_config_preflight( - ctx=ctx, - module_name=mod_name, - config=config, + _run_or_collect_tmy( + ctx, + config, verbose=verbose, pipeline_step=pipeline_step, + collect=False, + ) + + +@main.command() +@click.option( + '--config', + '-c', + type=str, + required=True, + help='Path to config file with kwargs for TmyRunner.collect()', +) +@click.option( + '-v', + '--verbose', + is_flag=True, + help='Flag to turn on debug logging. Default is False.', +) +@click.pass_context +def collect_tmy(ctx, config, verbose=False, pipeline_step=None): + """Collect the previously generated tmy file chunks. + + You would call the nsrdb collect-tmy module using:: + + $ python -m nsrdb.cli -c config.json collect-tmy + + A typical config.json file might look like this:: + + \b + { + "tmy": {}, + "collect-tmy": {"purge_chunks": True}, + "direct": { + "sites_per_worker": 50, + "site_slice": [0, 100], + "tmy_types": ['tmy', 'tdy', 'tgy'], + "nsrdb_base_fp": './nsrdb_*_{}.h5', + "years": [2000, ..., 2022], + "out_dir": './", + "fn_out": 'tmy_2000_2022.h5' + } + } + """ # noqa : D301 + + _run_or_collect_tmy( + ctx, config, verbose=verbose, pipeline_step=pipeline_step, collect=True ) - tmy_types = config.pop('tmy_types', ['tmy', 'tgy', 'tdy']) - fn_out = config.pop('fn_out', 'tmy.h5') - out_dir = config['out_dir'] - for tmy_type in tmy_types: - func = TmyRunner.collect if collect else TmyRunner.tmy - config['tmy_type'] = tmy_type - config['out_dir'] = os.path.join(out_dir, f'{tmy_type}/') - config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{tmy_type}' - config['fn_out'] = fn_out.replace('.h5', f'_{tmy_type}.h5') - BaseCLI.kickoff_job( - ctx=ctx, - module_name=mod_name, - func=func, - config=config, - log_id=tmy_type, - ) @main.group(invoke_without_command=True) @@ -973,6 +1054,9 @@ def batch( Pipeline.COMMANDS[ModuleName.COLLECT_DATA_MODEL] = collect_data_model Pipeline.COMMANDS[ModuleName.COLLECT_FINAL] = collect_final Pipeline.COMMANDS[ModuleName.TMY] = tmy +Pipeline.COMMANDS[ModuleName.COLLECT_BLEND] = collect_blend +Pipeline.COMMANDS[ModuleName.COLLECT_AGGREGATE] = collect_aggregate +Pipeline.COMMANDS[ModuleName.COLLECT_TMY] = collect_tmy if __name__ == '__main__': diff --git a/nsrdb/create_configs.py b/nsrdb/create_configs.py index a746d391..32c97af7 100755 --- a/nsrdb/create_configs.py +++ b/nsrdb/create_configs.py @@ -592,7 +592,7 @@ def _collect_aggregate(cls, kwargs): config.update(kwargs) meta_file = f'nsrdb_meta_{config["final_spatial"]}.csv' - meta_file = os.path.join(config['meta_dir'], meta_file) + config['meta_final'] = os.path.join(config['meta_dir'], meta_file) collect_dir = f'nsrdb_{config["final_spatial"]}_{config["final_freq"]}' collect_tag = f'{config["basename"]}_' config['collect_dir'] = collect_dir diff --git a/tests/cli/test_agg_cli.py b/tests/cli/test_agg_cli.py index c2f89c06..9460c792 100755 --- a/tests/cli/test_agg_cli.py +++ b/tests/cli/test_agg_cli.py @@ -116,7 +116,7 @@ def test_agg_cli(runner): with open(config_file, 'w') as f: f.write(json.dumps(config)) - result = runner.invoke(cli.aggregate, ['-c', config_file, '--collect']) + result = runner.invoke(cli.collect_aggregate, ['-c', config_file]) assert result.exit_code == 0, traceback.print_exception( *result.exc_info ) diff --git a/tests/cli/test_blend_cli.py b/tests/cli/test_blend_cli.py index 787e8b14..d6533aa9 100755 --- a/tests/cli/test_blend_cli.py +++ b/tests/cli/test_blend_cli.py @@ -118,7 +118,7 @@ def test_blend_cli(runner): with open(config_file, 'w') as f: f.write(json.dumps(config)) - result = runner.invoke(cli.blend, ['-c', config_file, '--collect']) + result = runner.invoke(cli.collect_blend, ['-c', config_file]) assert result.exit_code == 0, traceback.print_exception( *result.exc_info ) diff --git a/tests/cli/test_tmy_cli.py b/tests/cli/test_tmy_cli.py index e24fc6e1..3e8df39b 100755 --- a/tests/cli/test_tmy_cli.py +++ b/tests/cli/test_tmy_cli.py @@ -69,7 +69,7 @@ def test_tmy_cli(runner, tmpdir_factory): result = runner.invoke(cli.tmy, ['-c', config_file, '-v']) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) - result = runner.invoke(cli.tmy, ['-c', config_file, '--collect', '-v']) + result = runner.invoke(cli.collect_tmy, ['-c', config_file, '-v']) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) for tmy_type in tmy_types: assert os.path.exists(file_pattern.format(tmy_type=tmy_type)) @@ -102,7 +102,7 @@ def test_tmy_regression(runner, tmpdir_factory, tmy_file): result = runner.invoke(cli.tmy, ['-c', config_file, '-v']) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) - result = runner.invoke(cli.tmy, ['-c', config_file, '--collect', '-v']) + result = runner.invoke(cli.collect_tmy, ['-c', config_file, '-v']) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) for tmy_type in tmy_types: