diff --git a/nsrdb/cli.py b/nsrdb/cli.py index a2d61a5b..409cfa2b 100755 --- a/nsrdb/cli.py +++ b/nsrdb/cli.py @@ -16,7 +16,6 @@ from gaps.batch import BatchJob from gaps.cli.pipeline import pipeline as gaps_pipeline from rex import safe_json_load -from rex.utilities.fun_utils import get_fun_call_str from rex.utilities.loggers import init_logger from nsrdb import __version__ @@ -458,34 +457,15 @@ def cloud_fill(ctx, config, verbose=False, pipeline_step=None): """Gap fill cloud properties in a collect-data-model output file, using legacy gap-fill method.""" - config = BaseCLI.from_config_preflight( + BaseCLI.kickoff_multichunk( ctx=ctx, module_name=ModuleName.CLOUD_FILL, + func=NSRDB.gap_fill_clouds, config=config, verbose=verbose, pipeline_step=pipeline_step, ) - log_file = config.get('log_file', None) - log_level = config.get('log_level', 'INFO') - log_arg_str = f'"nsrdb", log_level="{log_level}"' - config['n_chunks'] = config.get('n_chunks', 1) - name = ctx.obj['NAME'] - - for i_chunk in range(config['n_chunks']): - if log_file is not None: - log_file_i = log_file.replace('.log', f'_{i_chunk}.log') - log_arg_str_i = f'{log_arg_str}, log_file="{log_file_i}"' - ctx.obj['LOG_ARG_STR'] = log_arg_str_i - config['i_chunk'] = i_chunk - config['job_name'] = f'{name}_{i_chunk}' - ctx.obj['NAME'] = config['job_name'] - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.gap_fill_clouds, config) - - BaseCLI.kickoff_job( - ctx=ctx, module_name=ModuleName.CLOUD_FILL, config=config - ) - @main.command() @click.option( @@ -505,31 +485,15 @@ def cloud_fill(ctx, config, verbose=False, pipeline_step=None): def all_sky(ctx, config, verbose=False, pipeline_step=None): """Run all-sky physics model on collected data model output files.""" - config = BaseCLI.from_config_preflight( + BaseCLI.kickoff_multichunk( ctx=ctx, module_name=ModuleName.ALL_SKY, + func=NSRDB.run_all_sky, config=config, verbose=verbose, pipeline_step=pipeline_step, ) - log_arg_str = ctx.obj['LOG_ARG_BASE'] - config['n_chunks'] = config.get('n_chunks', 1) - name = ctx.obj['NAME'] - - for i_chunk in range(config['n_chunks']): - log_file = ctx.obj['LOG_FILE'].replace('.log', f'{i_chunk}.log') - log_arg_str_i = f'{log_arg_str}, log_file="{log_file}"' - config['i_chunk'] = i_chunk - config['job_name'] = f'{name}_{i_chunk}' - ctx.obj['LOG_ARG_STR'] = log_arg_str_i - ctx.obj['NAME'] = config['job_name'] - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.run_all_sky, config) - - BaseCLI.kickoff_job( - ctx=ctx, module_name=ModuleName.ALL_SKY, config=config - ) - @main.command() @click.option( @@ -588,7 +552,6 @@ def collect_data_model(ctx, config, verbose=False, pipeline_step=None): pipeline_step=pipeline_step, ) - log_arg_str = ctx.obj['LOG_ARG_BASE'] config['n_chunks'] = config.get('n_chunks', 1) config['n_writes'] = config.get('n_writes', 1) config['final'] = config.get('final', False) @@ -599,7 +562,6 @@ def collect_data_model(ctx, config, verbose=False, pipeline_step=None): else config.get('collect_files', n_files_default) ) fnames = sorted(NSRDB.OUTS.keys()) - name = ctx.obj['NAME'] if config['final'] and config['n_chunks'] != 1: msg = 'collect-data-model was marked as final but n_chunks != 1' @@ -609,20 +571,17 @@ def collect_data_model(ctx, config, verbose=False, pipeline_step=None): for i_chunk, i_fname in itertools.product( range(config['n_chunks']), i_files ): - log_file_i = ctx.obj['LOG_FILE'].replace('.log', f'_{i_fname}.log') - log_arg_str_i = f'{log_arg_str}, log_file="{log_file_i}"' - ctx.obj['LOG_ARG_STR'] = log_arg_str_i - config['final_file_name'] = name + log_id = f'{fnames[i_fname].split("_")[1]}_{i_chunk}' config['i_chunk'] = i_chunk config['i_fname'] = i_fname - config['job_name'] = ( - f'{name}_{i_fname}_{fnames[i_fname].split("_")[1]}_{i_chunk}' - ) - ctx.obj['NAME'] = config['job_name'] - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.collect_data_model, config) + config['job_name'] = f'{ctx.obj["NAME"]}_{i_fname}_{log_id}' BaseCLI.kickoff_job( - ctx=ctx, module_name=ModuleName.COLLECT_DATA_MODEL, config=config + ctx=ctx, + module_name=ModuleName.COLLECT_DATA_MODEL, + func=NSRDB.collect_data_model, + config=config, + log_id=log_id, ) @@ -652,21 +611,16 @@ def collect_final(ctx, config, verbose=False, pipeline_step=None): pipeline_step=pipeline_step, ) - fnames = sorted(NSRDB.OUTS.keys()) - log_arg_str = ctx.obj['LOG_ARG_BASE'] - name = ctx.obj['NAME'] - for i_fname in range(len(NSRDB.OUTS)): - log_file_i = ctx.obj['LOG_FILE'].replace('.log', f'_{i_fname}.log') - log_arg_str_i = f'{log_arg_str}, log_file="{log_file_i}"' - config['job_name'] = ( - f'{name}_{i_fname}_{fnames[i_fname].split("_")[1]}' - ) + for i_fname, fname in enumerate(sorted(NSRDB.OUTS.keys())): + log_id = fname.split('_')[1] + config['job_name'] = f'{ctx.obj["NAME"]}_{i_fname}_{log_id}' config['i_fname'] = i_fname - ctx.obj['LOG_ARG_STR'] = log_arg_str_i - ctx.obj['NAME'] = config['job_name'] - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.collect_final, config) BaseCLI.kickoff_job( - ctx=ctx, module_name=ModuleName.COLLECT_FINAL, config=config + ctx=ctx, + module_name=ModuleName.COLLECT_FINAL, + func=NSRDB.collect_final, + config=config, + log_id=log_id, ) @@ -689,15 +643,14 @@ def blend(ctx, config, verbose=False, pipeline_step=None): """Blend files from separate domains (e.g. east / west) into a single domain.""" - config = BaseCLI.from_config_preflight( + BaseCLI.kickoff_single( ctx=ctx, module_name=ModuleName.BLEND, + func=NSRDB.blend_files, config=config, verbose=verbose, pipeline_step=pipeline_step, ) - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.blend_files, config) - BaseCLI.kickoff_job(ctx=ctx, module_name=ModuleName.BLEND, config=config) @main.command() @@ -717,19 +670,15 @@ def blend(ctx, config, verbose=False, pipeline_step=None): def collect_blended(ctx, config, verbose=False, pipeline_step=None): """Collect blended data chunks into a single file.""" - config = BaseCLI.from_config_preflight( - module_name=ModuleName.COLLECT_BLENDED, + BaseCLI.kickoff_single( ctx=ctx, + module_name=ModuleName.COLLECT_BLENDED, + func=NSRDB.collect_blended, config=config, verbose=verbose, pipeline_step=pipeline_step, ) - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.collect_blended, config) - BaseCLI.kickoff_job( - ctx=ctx, module_name=ModuleName.COLLECT_BLENDED, config=config - ) - @main.command() @click.option( @@ -753,17 +702,14 @@ def aggregate(ctx, config, verbose=False, pipeline_step=None): match resolution of low-resolution years (pre 2018) """ - config = BaseCLI.from_config_preflight( + BaseCLI.kickoff_single( ctx=ctx, module_name=ModuleName.AGGREGATE, + func=NSRDB.aggregate_files, config=config, verbose=verbose, pipeline_step=pipeline_step, ) - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.aggregate_files, config) - BaseCLI.kickoff_job( - ctx=ctx, module_name=ModuleName.AGGREGATE, config=config - ) @main.command() @@ -783,19 +729,15 @@ def aggregate(ctx, config, verbose=False, pipeline_step=None): def collect_aggregation(ctx, config, verbose=False, pipeline_step=None): """Collect aggregated data chunks.""" - config = BaseCLI.from_config_preflight( - module_name=ModuleName.COLLECT_AGG, + BaseCLI.kickoff_single( ctx=ctx, + module_name=ModuleName.COLLECT_AGG, + func=NSRDB.collect_aggregation, config=config, verbose=verbose, pipeline_step=pipeline_step, ) - ctx.obj['FUN_STR'] = get_fun_call_str(NSRDB.collect_aggregation, config) - BaseCLI.kickoff_job( - ctx=ctx, module_name=ModuleName.COLLECT_AGG, config=config - ) - @main.group(invoke_without_command=True) @click.option( diff --git a/nsrdb/config/templates/config_nsrdb_post2017.json b/nsrdb/config/templates/config_nsrdb_post2017.json index 79c5570c..6b833061 100755 --- a/nsrdb/config/templates/config_nsrdb_post2017.json +++ b/nsrdb/config/templates/config_nsrdb_post2017.json @@ -59,7 +59,6 @@ "max_workers_regrid": 16, "mlclouds": true }, - "debug_day": null, "direct": { "log_level": "INFO", "name": "%basename%_%satellite%_%extent%_%year%_%spatial%", diff --git a/nsrdb/config/templates/config_nsrdb_pre2018.json b/nsrdb/config/templates/config_nsrdb_pre2018.json index f5790321..95ba6185 100755 --- a/nsrdb/config/templates/config_nsrdb_pre2018.json +++ b/nsrdb/config/templates/config_nsrdb_pre2018.json @@ -57,7 +57,6 @@ "max_workers_regrid": null, "mlclouds": true }, - "debug_day": null, "direct": { "log_level": "INFO", "name": "%basename%_%satellite%_%extent%_%year%_%spatial%", diff --git a/nsrdb/nsrdb.py b/nsrdb/nsrdb.py index f2fd1734..3e6dbf46 100755 --- a/nsrdb/nsrdb.py +++ b/nsrdb/nsrdb.py @@ -39,6 +39,16 @@ from nsrdb.gap_fill.cloud_fill import CloudGapFill from nsrdb.utilities.file_utils import clean_meta, pd_date_range, ts_freq_check +PRE2018_CONFIG_TEMPLATE = os.path.join( + CONFIGDIR, 'templates/config_nsrdb_pre2018.json' +) +POST2017_CONFIG_TEMPLATE = os.path.join( + CONFIGDIR, 'templates/config_nsrdb_post2017.json' +) +PIPELINE_CONFIG_TEMPLATE = os.path.join( + CONFIGDIR, 'templates/config_pipeline.json' +) + logger = logging.getLogger(__name__) @@ -584,16 +594,6 @@ def create_config_files(kwargs): user_input['start_doy'] = user_input['doy_range'][0] user_input['end_doy'] = user_input['doy_range'][1] - PRE2018_CONFIG_TEMPLATE = os.path.join( - CONFIGDIR, 'templates/config_nsrdb_pre2018.json' - ) - POST2017_CONFIG_TEMPLATE = os.path.join( - CONFIGDIR, 'templates/config_nsrdb_post2017.json' - ) - PIPELINE_CONFIG_TEMPLATE = os.path.join( - CONFIGDIR, 'templates/config_pipeline.json' - ) - run_name = '_'.join( str(user_input[k]) for k in [ @@ -614,12 +614,13 @@ def create_config_files(kwargs): f'{pprint.pformat(user_input, indent=2)}' ) - if int(user_input['year']) < 2018: - with open(PRE2018_CONFIG_TEMPLATE, encoding='utf-8') as s: - s = s.read() - else: - with open(POST2017_CONFIG_TEMPLATE, encoding='utf-8') as s: - s = s.read() + template = ( + PRE2018_CONFIG_TEMPLATE + if int(user_input['year']) < 2018 + else POST2017_CONFIG_TEMPLATE + ) + with open(template, encoding='utf-8') as s: + s = s.read() for k, v in user_input.items(): if isinstance(v, int): @@ -1243,8 +1244,7 @@ def collect_data_model( this will collect the data to the out_dir/final/ directory instead of the out_dir/collect Directory. final_file_name : str | None - Final file name for filename outputs if this is the - terminal job. + Final file name for filename outputs if this is the terminal job. """ nsrdb = cls(out_dir, year, grid, freq=freq, var_meta=var_meta) diff --git a/nsrdb/utilities/cli.py b/nsrdb/utilities/cli.py index f85be58c..a7b2edde 100644 --- a/nsrdb/utilities/cli.py +++ b/nsrdb/utilities/cli.py @@ -115,8 +115,7 @@ def from_config_preflight( ctx.obj['STATUS_DIR'] = status_dir ctx.obj['VERBOSE'] = verbose - ctx.obj['OUT_DIR'] = status_dir - ctx.obj['IMPORT_STR'] = IMPORT_STR + ctx.obj['OUT_DIR'] = config.get('outdir', status_dir) ctx.obj['PIPELINE_STEP'] = pipeline_step or module_name sanitized_mod = module_name.replace('-', '_') ctx.obj['LOG_DIR'] = os.path.join(status_dir, 'logs', sanitized_mod) @@ -127,14 +126,10 @@ def from_config_preflight( ctx.obj['LOG_FILE'] = config.get( 'log_file', os.path.join(ctx.obj['LOG_DIR'], name + '.log') ) - config_verbose = config.get('log_level', 'INFO') - log_arg_str = f'"nsrdb", log_level="{config_verbose}"' - config_verbose = config_verbose == 'DEBUG' - verbose = any([verbose, config_verbose, ctx.obj['VERBOSE']]) - ctx.obj['LOG_ARG_BASE'] = log_arg_str - ctx.obj['LOG_ARG_STR'] = ( - f'{log_arg_str}, log_file="{ctx.obj["LOG_FILE"]}"' - ) + log_level = config.get('log_level', 'INFO') + ctx.obj['LOG_ARG_STR'] = f'"nsrdb", log_level="{log_level}"' + log_level = log_level == 'DEBUG' + verbose = any([verbose, log_level, ctx.obj['VERBOSE']]) exec_kwargs = config.get('execution_control', {}) direct_args = config.get('direct', {}) cmd_args = config.get(module_name, {}) @@ -158,7 +153,7 @@ def from_config_preflight( logger.debug( f'Found execution kwargs {exec_kwargs} for {module_name} module' ) - + cmd_args['log_file'] = ctx.obj['LOG_FILE'] cmd_args['job_name'] = name cmd_args['status_dir'] = status_dir cmd_args['execution_control'] = exec_kwargs @@ -217,8 +212,7 @@ def kickoff_slurm_job( """ cls.check_module_name(module_name) pipeline_step = ctx.obj['PIPELINE_STEP'] - - name = ctx.obj['NAME'] + name = ctx.obj['JOB_NAME'] status_dir = ctx.obj['STATUS_DIR'] slurm_manager = ctx.obj.get('SLURM_MANAGER', None) if slurm_manager is None: @@ -273,7 +267,12 @@ def kickoff_slurm_job( pipeline_step=pipeline_step, job_name=name, replace=True, - job_attrs={'job_id': out, 'hardware': option}, + job_attrs={ + 'job_id': out, + 'outdir': ctx.obj['OUT_DIR'], + 'log_file': ctx.obj['LOG_FILE'], + 'hardware': option, + }, ) click.echo(msg) @@ -295,7 +294,7 @@ def kickoff_local_job(cls, ctx, module_name, cmd): """ cls.check_module_name(module_name) pipeline_step = ctx.obj['PIPELINE_STEP'] - name = ctx.obj['NAME'] + name = ctx.obj['JOB_NAME'] status_dir = ctx.obj['STATUS_DIR'] subprocess_manager = SubprocessManager @@ -380,7 +379,7 @@ def get_status_cmd(cls, config, pipeline_step): return cmd.replace('\\', '/') @classmethod - def kickoff_job(cls, ctx, module_name, config): + def kickoff_job(cls, ctx, module_name, func, config, log_id=None): """Run nsrdb module either locally or on HPC. Parameters @@ -389,19 +388,29 @@ def kickoff_job(cls, ctx, module_name, config): Click context object where ctx.obj is a dictionary module_name : str Module name string from :class:`nsrdb.utilities.ModuleName`. + func : Callable + Function used to run module. e.g. `NSRDB.run_data_model()` config : dict nsrdb config with all necessary args and kwargs to run given module. + log_id : str | None + String id to append to base log file if this job is part of a multi + job kickoff. None is used is this is just a single job. """ - import_str = ctx.obj['IMPORT_STR'] - log_arg_str = ctx.obj['LOG_ARG_STR'] - fun_str = ctx.obj['FUN_STR'] + log_file = ( + ctx.obj['LOG_FILE'] + if log_id is None + else ctx.obj['LOG_FILE'].replace('.log', f'_{log_id}.log') + ) + log_arg_str = f'{ctx.obj["LOG_ARG_STR"]}, log_file="{log_file}"' pipeline_step = ctx.obj['PIPELINE_STEP'] exec_kwargs = config.get('execution_control', {}) hardware_option = exec_kwargs.get('option', 'local') + config['log_file'] = log_file + fun_str = get_fun_call_str(func, config) cmd = ( - f"python -c '{import_str}\n" + f"python -c '{IMPORT_STR}\n" 't0 = time.time();\n' f'logger = init_logger({log_arg_str});\n' f'{fun_str};\n' @@ -410,11 +419,48 @@ def kickoff_job(cls, ctx, module_name, config): cmd += cls.get_status_cmd(config, pipeline_step) + ctx.obj['JOB_NAME'] = config['job_name'] if hardware_option == 'local': cls.kickoff_local_job(ctx, module_name, cmd) else: cls.kickoff_slurm_job(ctx, module_name, cmd, **exec_kwargs) + @classmethod + def kickoff_single( + cls, ctx, module_name, func, config, verbose, pipeline_step=None + ): + """Kick off single job. + + Parameters + ---------- + ctx : click.pass_context + Click context object where ctx.obj is a dictionary + module_name : str + Module name string from :class:`nsrdb.utilities.ModuleName`. + func : Callable + Function used to run module. e.g. `NSRDB.run_data_model()` + config : str | dict + Path to nsrdb config file or a dictionary with all necessary args + and kwargs to run given module + verbose : bool + Flag to turn on debug logging + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. + """ + + config = cls.from_config_preflight( + ctx=ctx, + module_name=module_name, + config=config, + verbose=verbose, + pipeline_step=pipeline_step, + ) + cls.kickoff_job( + ctx=ctx, module_name=module_name, func=func, config=config + ) + @classmethod def kickoff_multiday( cls, ctx, module_name, func, config, verbose, pipeline_step=None @@ -441,8 +487,8 @@ def kickoff_multiday( """ config_dict = cls.from_config_preflight( - module_name=module_name, ctx=ctx, + module_name=module_name, config=config, verbose=verbose, pipeline_step=pipeline_step, @@ -458,19 +504,61 @@ def kickoff_multiday( logger.error(msg) raise KeyError(msg) - name = ctx.obj['NAME'] - log_arg_str = ctx.obj['LOG_ARG_BASE'] - pipeline_step = ctx.obj['PIPELINE_STEP'] - for doy in doys: - log_file_d = ctx.obj['LOG_FILE'].replace('.log', f'_{doy}.log') - ctx.obj['LOG_ARG_STR'] = f'{log_arg_str}, log_file="{log_file_d}"' - date = NSRDB.doy_to_datestr(config_dict['year'], doy) config_dict['date'] = date - config_dict['job_name'] = f'{name}_{doy}_{date}' + config_dict['job_name'] = f'{ctx.obj["NAME"]}_{doy}_{date}' config_dict['doy'] = doy - ctx.obj['FUN_STR'] = get_fun_call_str(func, config_dict) - ctx.obj['NAME'] = config_dict['job_name'] - cls.kickoff_job(ctx, module_name, config_dict) + cls.kickoff_job( + ctx, + module_name=module_name, + func=func, + config=config_dict, + log_id=doy, + ) + + @classmethod + def kickoff_multichunk( + cls, ctx, module_name, func, config, verbose, pipeline_step=None + ): + """Kick off jobs for multiple chunks. + + Parameters + ---------- + ctx : click.pass_context + Click context object where ctx.obj is a dictionary + module_name : str + Module name string from :class:`nsrdb.utilities.ModuleName`. + func : Callable + Function used to run module. e.g. `NSRDB.gap_fill_clouds()` + config : str | dict + Path to nsrdb config file or a dictionary with all necessary args + and kwargs to run given module + verbose : bool + Flag to turn on debug logging + pipeline_step : str, optional + Name of the pipeline step being run. If ``None``, the + ``pipeline_step`` will be set to the ``module_name``, + mimicking old reV behavior. By default, ``None``. + """ + config = cls.from_config_preflight( + ctx=ctx, + module_name=module_name, + config=config, + verbose=verbose, + pipeline_step=pipeline_step, + ) + config['n_chunks'] = config.get('n_chunks', 1) + + for i_chunk in range(config['n_chunks']): + config['i_chunk'] = i_chunk + config['job_name'] = f'{ctx.obj["NAME"]}_{i_chunk}' + + cls.kickoff_job( + ctx=ctx, + module_name=module_name, + func=func, + config=config, + log_id=i_chunk, + ) diff --git a/tests/cli/test_nsrdb_cli.py b/tests/cli/test_nsrdb_cli.py index 866d35ab..0c7bba7c 100755 --- a/tests/cli/test_nsrdb_cli.py +++ b/tests/cli/test_nsrdb_cli.py @@ -178,31 +178,28 @@ def test_cli_steps(runner, modern_config): gap-fill, all-sky, and collection""" config_file, _ = modern_config + out_dir = os.path.dirname(config_file) result = runner.invoke(cli.data_model, ['-c', config_file]) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) result = runner.invoke(cli.ml_cloud_fill, ['-c', config_file]) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) # specific_humidity and cloud_fill_flag not included in ALL_VARS_ML - assert len(glob(f'{os.path.dirname(config_file)}/daily/*.h5')) == 2 + len( - DataModel.ALL_VARS_ML - ) + assert len(glob(f'{out_dir}/daily/*.h5')) == 2 + len(DataModel.ALL_VARS_ML) result = runner.invoke(cli.daily_all_sky, ['-c', config_file]) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) # specific_humidity not included in OUTS or MLCLOUDS_VARS - assert len(glob(f'{os.path.dirname(config_file)}/daily/*.h5')) == 1 + len( + assert len(glob(f'{out_dir}/daily/*.h5')) == 1 + len( DataModel.MLCLOUDS_VARS ) + sum(len(v) for v in NSRDB.OUTS.values()) result = runner.invoke(cli.collect_data_model, ['-c', config_file]) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) - assert len(glob(f'{os.path.dirname(config_file)}/final/*.h5')) == 7 + assert len(glob(f'{out_dir}/final/*.h5')) == 7 - status_files = glob( - os.path.dirname(config_file) + '/.gaps/jobstatus*.json' - ) + status_files = glob(out_dir + '/.gaps/jobstatus*.json') status_dicts = [safe_json_load(sf) for sf in status_files] for sd in status_dicts: assert all('successful' in str(vals) for vals in sd.values()) @@ -211,7 +208,9 @@ def test_cli_steps(runner, modern_config): def test_cli_pipeline(runner, modern_config): """Test cli for pipeline, run using cli.pipeline""" - _, pipeline_file = modern_config + config_file, pipeline_file = modern_config + config = safe_json_load(config_file) + out_dir = os.path.dirname(pipeline_file) # data-model result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) @@ -222,30 +221,34 @@ def test_cli_pipeline(runner, modern_config): assert result.exit_code == 0, traceback.print_exception(*result.exc_info) # specific_humidity and cloud_fill_flag not included in ALL_VARS_ML - assert len( - glob(f'{os.path.dirname(pipeline_file)}/daily/*.h5') - ) == 2 + len(DataModel.ALL_VARS_ML) + assert len(glob(f'{out_dir}/daily/*.h5')) == 2 + len(DataModel.ALL_VARS_ML) # all-sky result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) # specific_humidity not included in OUTS or MLCLOUDS_VARS - assert len( - glob(f'{os.path.dirname(pipeline_file)}/daily/*.h5') - ) == 1 + len(DataModel.MLCLOUDS_VARS) + sum( - len(v) for v in NSRDB.OUTS.values() - ) + assert len(glob(f'{out_dir}/daily/*.h5')) == 1 + len( + DataModel.MLCLOUDS_VARS + ) + sum(len(v) for v in NSRDB.OUTS.values()) # data collection result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) - assert len(glob(f'{os.path.dirname(pipeline_file)}/final/*.h5')) == 7 + final_files = glob(f'{out_dir}/final/*.h5') + final_files = sorted([os.path.basename(f) for f in final_files]) + assert ( + sorted( + f.format(y=config['direct']['year']) + for f in sorted(NSRDB.OUTS.keys()) + ) + == final_files + ) # final status file update result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) - status_file = glob(os.path.dirname(pipeline_file) + '/.gaps/*.json')[0] + status_file = glob(out_dir + '/.gaps/*.json')[0] status_dict = safe_json_load(status_file) assert all('successful' in str(vals) for vals in status_dict.values()) @@ -254,7 +257,9 @@ def test_cli_pipeline_legacy(runner, legacy_config): """Test cli for pipeline, run using cli.pipeline. Uses legacy gap-fill method""" - _, pipeline_file = legacy_config + config_file, pipeline_file = legacy_config + config = safe_json_load(config_file) + out_dir = os.path.dirname(pipeline_file) # data-model result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) @@ -270,7 +275,7 @@ def test_cli_pipeline_legacy(runner, legacy_config): ) == 1 + len(DataModel.ALL_VARS) # collected data doesn't include all-sky files yet (irradiance / clearsky) - assert len(glob(f'{os.path.dirname(pipeline_file)}/collect/*.h5')) == 5 + assert len(glob(f'{out_dir}/collect/*.h5')) == 5 # gap-fill result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) @@ -281,17 +286,28 @@ def test_cli_pipeline_legacy(runner, legacy_config): assert result.exit_code == 0, traceback.print_exception(*result.exc_info) # irrad and clearsky now in collect directory - assert len(glob(f'{os.path.dirname(pipeline_file)}/collect/*.h5')) == 7 + assert ( + len(glob(f'{out_dir}/collect/*.h5')) + == config['collect-data-model']['n_chunks'] * 7 + ) # final collection result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) assert result.exit_code == 0, traceback.print_exception(*result.exc_info) - assert len(glob(f'{os.path.dirname(pipeline_file)}/final/*.h5')) == 7 + final_files = glob(f'{out_dir}/final/*.h5') + final_files = sorted([os.path.basename(f) for f in final_files]) + assert ( + sorted( + f.format(y=config['direct']['year']) + for f in sorted(NSRDB.OUTS.keys()) + ) + == final_files + ) # final status file update result = runner.invoke(cli.pipeline, ['-c', pipeline_file]) - status_file = glob(os.path.dirname(pipeline_file) + '/.gaps/*.json')[0] + status_file = glob(out_dir + '/.gaps/*.json')[0] status_dict = safe_json_load(status_file) assert all('successful' in str(vals) for vals in status_dict.values())