Skip to content

Commit

Permalink
needed to break up tmy / agg / blend with separate collect methods so…
Browse files Browse the repository at this point in the history
… they can be called through cli.pipeline
  • Loading branch information
bnb32 committed Jul 7, 2024
1 parent 4afe5e3 commit 71d20c0
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 76 deletions.
226 changes: 155 additions & 71 deletions nsrdb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,98 +713,156 @@ def collect_final(ctx, config, verbose=False, pipeline_step=None):
is_flag=True,
help='Flag to turn on debug logging. Default is False.',
)
@click.option(
'--collect',
is_flag=True,
help='Flag to collect blended chunks into a single final file.',
)
@click.pass_context
def blend(ctx, config, verbose=False, pipeline_step=None, collect=False):
def blend(ctx, config, verbose=False, pipeline_step=None):
"""Blend files from separate domains (e.g. east / west) into a single
domain."""

mod_name = ModuleName.COLLECT_BLEND if collect else ModuleName.BLEND

config = BaseCLI.from_config_preflight(
ctx=ctx,
module_name=mod_name,
module_name=ModuleName.BLEND,
config=config,
verbose=verbose,
pipeline_step=pipeline_step,
)

if collect:
file_tags = config.get('file_tag', 'all')
file_tags = (
file_tags
if file_tags != 'all'
else ['_'.join(k.split('_')[1:-1]) for k in NSRDB.OUTS]
)

file_tags = file_tags if isinstance(file_tags, list) else [file_tags]
for file_tag in file_tags:
log_id = file_tag
config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{log_id}'
config['file_tag'] = file_tag
BaseCLI.kickoff_job(
ctx=ctx,
module_name=mod_name,
func=Collector.collect_dir,
module_name=ModuleName.BLEND,
func=Blender.run_full,
config=config,
log_id=log_id,
)

else:
file_tags = config.get('file_tag', 'all')
file_tags = (
file_tags
if file_tags != 'all'
else ['_'.join(k.split('_')[1:-1]) for k in NSRDB.OUTS]
)

file_tags = file_tags if isinstance(file_tags, list) else [file_tags]
for file_tag in file_tags:
log_id = file_tag
config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{log_id}'
config['file_tag'] = file_tag
BaseCLI.kickoff_job(
ctx=ctx,
module_name=mod_name,
func=Blender.run_full,
config=config,
log_id=log_id,
)


@main.command()
@click.option(
'--config',
'-c',
type=str,
required=True,
help='Path to config file with kwargs for NSRDB.aggregate_files()',
help='Path to config file with kwargs for Collector.collect_dir()',
)
@click.option(
'-v',
'--verbose',
is_flag=True,
help='Flag to turn on debug logging. Default is False.',
)
@click.pass_context
def collect_blend(ctx, config, verbose=False, pipeline_step=None):
"""Collect blended files into a single file with multiple datasets."""

BaseCLI.kickoff_single(
ctx=ctx,
module_name=ModuleName.COLLECT_BLEND,
func=Collector.collect_dir,
config=config,
verbose=verbose,
pipeline_step=pipeline_step,
)


@main.command()
@click.option(
'--collect',
'--config',
'-c',
type=str,
required=True,
help='Path to config file with kwargs for NSRDB.aggregate_files()',
)
@click.option(
'-v',
'--verbose',
is_flag=True,
help='Flag to collect aggregated chunks into a single final file.',
help='Flag to turn on debug logging. Default is False.',
)
@click.pass_context
def aggregate(ctx, config, verbose=False, pipeline_step=None, collect=False):
def aggregate(ctx, config, verbose=False, pipeline_step=None):
"""Aggregate data files to a lower resolution.
NOTE: Used to create data files from high-resolution years (2018+) which
match resolution of low-resolution years (pre 2018)
"""
func = Collector.collect_dir if collect else Manager.run_chunk
mod_name = (
ModuleName.COLLECT_AGGREGATE if collect else ModuleName.AGGREGATE

BaseCLI.kickoff_multichunk(
ctx=ctx,
module_name=ModuleName.AGGREGATE,
func=Manager.run_chunk,
config=config,
verbose=verbose,
pipeline_step=pipeline_step,
)
kickoff_func = (
BaseCLI.kickoff_single if collect else BaseCLI.kickoff_multichunk


@main.command()
@click.option(
'--config',
'-c',
type=str,
required=True,
help='Path to config file with kwargs for Collector.collect_dir()',
)
@click.option(
'-v',
'--verbose',
is_flag=True,
help='Flag to turn on debug logging. Default is False.',
)
@click.pass_context
def collect_aggregate(ctx, config, verbose=False, pipeline_step=None):
"""Collect aggregate data files into a single file with multiple
datasets."""

BaseCLI.kickoff_single(
ctx=ctx,
module_name=ModuleName.COLLECT_AGGREGATE,
func=Collector.collect_dir,
config=config,
verbose=verbose,
pipeline_step=pipeline_step,
)

kickoff_func(

def _run_or_collect_tmy(
ctx, config, verbose=False, pipeline_step=None, collect=False
):
mod_name = ModuleName.COLLECT_TMY if collect else ModuleName.TMY
config = BaseCLI.from_config_preflight(
ctx=ctx,
module_name=mod_name,
func=func,
config=config,
verbose=verbose,
pipeline_step=pipeline_step,
)
tmy_types = config.pop('tmy_types', ['tmy', 'tgy', 'tdy'])
fn_out = config.pop('fn_out', 'tmy.h5')
out_dir = config['out_dir']
for tmy_type in tmy_types:
func = TmyRunner.collect if collect else TmyRunner.tmy
config['tmy_type'] = tmy_type
config['out_dir'] = os.path.join(out_dir, f'{tmy_type}/')
config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{tmy_type}'
config['fn_out'] = fn_out.replace('.h5', f'_{tmy_type}.h5')
BaseCLI.kickoff_job(
ctx=ctx,
module_name=mod_name,
func=func,
config=config,
log_id=tmy_type,
)


@main.command()
Expand All @@ -822,13 +880,8 @@ def aggregate(ctx, config, verbose=False, pipeline_step=None, collect=False):
is_flag=True,
help='Flag to turn on debug logging. Default is False.',
)
@click.option(
'--collect',
is_flag=True,
help='Flag to collect tmy chunks into a single final file.',
)
@click.pass_context
def tmy(ctx, config, verbose=False, pipeline_step=None, collect=False):
def tmy(ctx, config, verbose=False, pipeline_step=None):
"""Create tmy files for given input files.
You would call the nsrdb tmy module using::
Expand All @@ -853,30 +906,58 @@ def tmy(ctx, config, verbose=False, pipeline_step=None, collect=False):
}
""" # noqa : D301

mod_name = ModuleName.COLLECT_TMY if collect else ModuleName.TMY
config = BaseCLI.from_config_preflight(
ctx=ctx,
module_name=mod_name,
config=config,
_run_or_collect_tmy(
ctx,
config,
verbose=verbose,
pipeline_step=pipeline_step,
collect=False,
)


@main.command()
@click.option(
'--config',
'-c',
type=str,
required=True,
help='Path to config file with kwargs for TmyRunner.collect()',
)
@click.option(
'-v',
'--verbose',
is_flag=True,
help='Flag to turn on debug logging. Default is False.',
)
@click.pass_context
def collect_tmy(ctx, config, verbose=False, pipeline_step=None):
"""Collect the previously generated tmy file chunks.
You would call the nsrdb collect-tmy module using::
$ python -m nsrdb.cli -c config.json collect-tmy
A typical config.json file might look like this::
\b
{
"tmy": {},
"collect-tmy": {"purge_chunks": True},
"direct": {
"sites_per_worker": 50,
"site_slice": [0, 100],
"tmy_types": ['tmy', 'tdy', 'tgy'],
"nsrdb_base_fp": './nsrdb_*_{}.h5',
"years": [2000, ..., 2022],
"out_dir": './",
"fn_out": 'tmy_2000_2022.h5'
}
}
""" # noqa : D301

_run_or_collect_tmy(
ctx, config, verbose=verbose, pipeline_step=pipeline_step, collect=True
)
tmy_types = config.pop('tmy_types', ['tmy', 'tgy', 'tdy'])
fn_out = config.pop('fn_out', 'tmy.h5')
out_dir = config['out_dir']
for tmy_type in tmy_types:
func = TmyRunner.collect if collect else TmyRunner.tmy
config['tmy_type'] = tmy_type
config['out_dir'] = os.path.join(out_dir, f'{tmy_type}/')
config['job_name'] = f'{ctx.obj["RUN_NAME"]}_{tmy_type}'
config['fn_out'] = fn_out.replace('.h5', f'_{tmy_type}.h5')
BaseCLI.kickoff_job(
ctx=ctx,
module_name=mod_name,
func=func,
config=config,
log_id=tmy_type,
)


@main.group(invoke_without_command=True)
Expand Down Expand Up @@ -973,6 +1054,9 @@ def batch(
Pipeline.COMMANDS[ModuleName.COLLECT_DATA_MODEL] = collect_data_model
Pipeline.COMMANDS[ModuleName.COLLECT_FINAL] = collect_final
Pipeline.COMMANDS[ModuleName.TMY] = tmy
Pipeline.COMMANDS[ModuleName.COLLECT_BLEND] = collect_blend
Pipeline.COMMANDS[ModuleName.COLLECT_AGGREGATE] = collect_aggregate
Pipeline.COMMANDS[ModuleName.COLLECT_TMY] = collect_tmy


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion nsrdb/create_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ def _collect_aggregate(cls, kwargs):
config.update(kwargs)

meta_file = f'nsrdb_meta_{config["final_spatial"]}.csv'
meta_file = os.path.join(config['meta_dir'], meta_file)
config['meta_final'] = os.path.join(config['meta_dir'], meta_file)
collect_dir = f'nsrdb_{config["final_spatial"]}_{config["final_freq"]}'
collect_tag = f'{config["basename"]}_'
config['collect_dir'] = collect_dir
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_agg_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_agg_cli(runner):
with open(config_file, 'w') as f:
f.write(json.dumps(config))

result = runner.invoke(cli.aggregate, ['-c', config_file, '--collect'])
result = runner.invoke(cli.collect_aggregate, ['-c', config_file])
assert result.exit_code == 0, traceback.print_exception(
*result.exc_info
)
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_blend_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def test_blend_cli(runner):
with open(config_file, 'w') as f:
f.write(json.dumps(config))

result = runner.invoke(cli.blend, ['-c', config_file, '--collect'])
result = runner.invoke(cli.collect_blend, ['-c', config_file])
assert result.exit_code == 0, traceback.print_exception(
*result.exc_info
)
Expand Down
4 changes: 2 additions & 2 deletions tests/cli/test_tmy_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_tmy_cli(runner, tmpdir_factory):

result = runner.invoke(cli.tmy, ['-c', config_file, '-v'])
assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
result = runner.invoke(cli.tmy, ['-c', config_file, '--collect', '-v'])
result = runner.invoke(cli.collect_tmy, ['-c', config_file, '-v'])
assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
for tmy_type in tmy_types:
assert os.path.exists(file_pattern.format(tmy_type=tmy_type))
Expand Down Expand Up @@ -102,7 +102,7 @@ def test_tmy_regression(runner, tmpdir_factory, tmy_file):

result = runner.invoke(cli.tmy, ['-c', config_file, '-v'])
assert result.exit_code == 0, traceback.print_exception(*result.exc_info)
result = runner.invoke(cli.tmy, ['-c', config_file, '--collect', '-v'])
result = runner.invoke(cli.collect_tmy, ['-c', config_file, '-v'])
assert result.exit_code == 0, traceback.print_exception(*result.exc_info)

for tmy_type in tmy_types:
Expand Down

0 comments on commit 71d20c0

Please sign in to comment.