Skip to content

Commit

Permalink
Merge pull request #1262 from bhilbert4/pipeline-skip-run-steps
Browse files Browse the repository at this point in the history
Pipeline skip already run steps
  • Loading branch information
mfixstsci authored Jul 31, 2023
2 parents 568e314 + 9196d96 commit 65fb43e
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 89 deletions.
6 changes: 5 additions & 1 deletion jwql/instrument_monitors/common_monitors/dark_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,11 @@ def process(self, file_list):
logging.info("\t\tAdding {} to calibration set".format(filename))
pipeline_files.append(filename)

outputs = run_parallel_pipeline(pipeline_files, "dark", "rate", self.instrument)
# Specify that we want to skip the dark current correction step
step_args = {'dark_current': {'skip': True}}

# Call the pipeline
outputs = run_parallel_pipeline(pipeline_files, "dark", ["rate"], self.instrument, step_args=step_args)
for filename in file_list:
processed_file = filename.replace("_dark", "_rate")
if processed_file not in slope_files and os.path.isfile(processed_file):
Expand Down
28 changes: 12 additions & 16 deletions jwql/instrument_monitors/pipeline_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from jwst.dq_init import DQInitStep
from jwst.dark_current import DarkCurrentStep
from jwst.firstframe import FirstFrameStep
from jwst.gain_scale import GainScaleStep
from jwst.group_scale import GroupScaleStep
from jwst.ipc import IPCStep
from jwst.jump import JumpStep
Expand All @@ -34,6 +35,7 @@
from jwst.pipeline.calwebb_detector1 import Detector1Pipeline
from jwst.ramp_fitting import RampFitStep
from jwst.refpix import RefPixStep
from jwst.reset import ResetStep
from jwst.rscd import RscdStep
from jwst.saturation import SaturationStep
from jwst.superbias import SuperBiasStep
Expand All @@ -43,16 +45,17 @@

# Define the fits header keyword that accompanies each step
PIPE_KEYWORDS = {'S_GRPSCL': 'group_scale', 'S_DQINIT': 'dq_init', 'S_SATURA': 'saturation',
'S_REFPIX': 'refpix', 'S_SUPERB': 'superbias',
'S_REFPIX': 'refpix', 'S_SUPERB': 'superbias', 'S_RESET': 'reset',
'S_PERSIS': 'persistence', 'S_DARK': 'dark_current', 'S_LINEAR': 'linearity',
'S_FRSTFR': 'firstframe', 'S_LASTFR': 'lastframe', 'S_RSCD': 'rscd',
'S_JUMP': 'jump', 'S_RAMP': 'rate'}
'S_JUMP': 'jump', 'S_RAMP': 'rate', 'S_GANSCL': 'gain_scale', 'S_IPC': 'ipc'}

PIPELINE_STEP_MAPPING = {'dq_init': DQInitStep, 'dark_current': DarkCurrentStep,
'firstframe': FirstFrameStep, 'group_scale': GroupScaleStep,
'ipc': IPCStep, 'jump': JumpStep, 'lastframe': LastFrameStep,
'linearity': LinearityStep, 'persistence': PersistenceStep,
'rate': RampFitStep, 'refpix': RefPixStep, 'rscd': RscdStep,
'firstframe': FirstFrameStep, 'gain_scale': GainScaleStep,
'group_scale': GroupScaleStep, 'ipc': IPCStep, 'jump': JumpStep,
'lastframe': LastFrameStep, 'linearity': LinearityStep,
'persistence': PersistenceStep, 'rate': RampFitStep,
'refpix': RefPixStep, 'reset': ResetStep, 'rscd': RscdStep,
'saturation': SaturationStep, 'superbias': SuperBiasStep}

# Readout patterns that have nframes != a power of 2. These readout patterns
Expand Down Expand Up @@ -165,12 +168,8 @@ def get_pipeline_steps(instrument):

# Order is important in 'steps' lists below!!
if instrument == 'MIRI':
steps = ['group_scale', 'dq_init', 'saturation', 'ipc', 'firstframe', 'lastframe',
'linearity', 'rscd', 'dark_current', 'refpix', 'persistence', 'jump', 'rate']
# No persistence correction for MIRI
steps.remove('persistence')
# MIRI is limited to one frame per group
steps.remove('group_scale')
steps = ['group_scale', 'dq_init', 'saturation', 'ipc', 'firstframe', 'lastframe', 'reset',
'linearity', 'rscd', 'dark_current', 'refpix', 'jump', 'rate', 'gain_scale']
else:
steps = ['group_scale', 'dq_init', 'saturation', 'ipc', 'superbias', 'refpix', 'linearity',
'persistence', 'dark_current', 'jump', 'rate']
Expand All @@ -186,13 +185,10 @@ def get_pipeline_steps(instrument):
# IPC correction currently not done for any instrument
steps.remove('ipc')

# Initialize using PIPE_KEYWORDS so the steps will be in the right order
# Initialize using OrderedDict so the steps will be in the right order
required_steps = OrderedDict({})
for key in steps:
required_steps[key] = True
for key in PIPE_KEYWORDS.values():
if key not in required_steps.keys():
required_steps[key] = False

return required_steps

Expand Down
155 changes: 115 additions & 40 deletions jwql/shared_tasks/run_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import OrderedDict
from copy import deepcopy
from glob import glob
import json
import os
import shutil
import sys
Expand All @@ -27,13 +28,13 @@
from jwst.saturation import SaturationStep
from jwst.superbias import SuperBiasStep

from jwql.instrument_monitors.pipeline_tools import PIPELINE_STEP_MAPPING, get_pipeline_steps
from jwql.instrument_monitors.pipeline_tools import PIPELINE_STEP_MAPPING, completed_pipeline_steps, get_pipeline_steps
from jwql.utils.logging_functions import configure_logging
from jwql.utils.permissions import set_permissions
from jwql.utils.utils import copy_files, ensure_dir_exists, get_config, filesystem_path


def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_cores='all'):
def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_cores='all', step_args={}):
"""Run the steps of ``calwebb_detector1`` on the input file, saving the result of each
step as a separate output file, then return the name-and-path of the file as reduced
in the reduction directory.
Expand All @@ -43,27 +44,64 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co
status_file_name = short_name + "_status.txt"
status_file = os.path.join(work_directory, status_file_name)
uncal_file = os.path.join(work_directory, input_file_basename)

with open(status_file, 'a+') as status_f:
status_f.write("Running run_pipe\n")
status_f.write("\t input_file_basename is {} ({})\n".format(input_file_basename, type(input_file_basename)))
status_f.write("\t start_dir is {} ({})\n".format(start_dir, type(start_dir)))
status_f.write("\t uncal_file is {} ({})\n".format(uncal_file, type(uncal_file)))

status_f.write(f"\t outputs is {outputs}\n")

try:
copy_files([input_file], work_directory)
set_permissions(uncal_file)

steps = get_pipeline_steps(instrument)

# If the input file is a file other than uncal.fits, then we may only need to run a
# subset of steps. Check the completed steps in the input file. Find the latest step
# that has been completed, and skip that plus all prior steps
if 'uncal.fits' not in input_file:
completed_steps = completed_pipeline_steps(input_file)

# Reverse the boolean value, so that now steps answers the question: "Do we need
# to run this step?""
for step in steps:
steps[step] = not completed_steps[step]

# Make sure we don't run steps out of order. Find the latest step that has been
# run, and only run subsequent steps. This protects against cases where some early
# step was not run. In that case, we don't want to go back and run it because running
# pipeline steps out of order doesn't work.
last_run = 'group_scale' # initialize to the first step
for step in steps:
if not steps[step]:
last_run = deepcopy(step)

for step in steps:
if step == last_run:
break
if step != last_run:
steps[step] = False

# Set any steps the user specifically asks to skip
for step, step_dict in step_args.items():
if 'skip' in step_dict:
if step_dict['skip']:
steps[step] = False

# Run each specified step
first_step_to_be_run = True
for step_name in steps:
sys.stderr.write("Running step {}\n".format(step_name))
with open(status_file, 'a+') as status_f:
status_f.write("Running step {}\n".format(step_name))
kwargs = {}
if step_name in step_args:
kwargs = step_args[step_name]
if step_name in ['jump', 'rate']:
kwargs = {'maximum_cores': max_cores}
kwargs['maximum_cores'] = max_cores
if steps[step_name]:
sys.stderr.write("Running step {}\n".format(step_name))
with open(status_file, 'a+') as status_f:
status_f.write("Running step {}\n".format(step_name))
output_file_name = short_name + "_{}.fits".format(step_name)
output_file = os.path.join(work_directory, output_file_name)
# skip already-done steps
Expand Down Expand Up @@ -94,7 +132,13 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co
# If the dither_points entry is not populated, then ignore this change
pass
model[0].save(output_file)

if 'rateints' in outputs:
outbase = os.path.basename(output_file)
outbase = outbase.replace('rate', 'rateints')
output_file = os.path.join(work_directory, outbase)
model[1].save(output_file)
with open(status_file, 'a+') as status_f:
status_f.write(f"Saved rateints model to {output_file}\n")
done = True
for output in outputs:
output_name = "{}_{}.fits".format(short_name, output)
Expand All @@ -104,19 +148,24 @@ def run_pipe(input_file, short_name, work_directory, instrument, outputs, max_co
if done:
sys.stderr.write("Done pipeline.\n")
break
else:
sys.stderr.write("Skipping step {}\n".format(step_name))
with open(status_file, 'a+') as status_f:
status_f.write("Skipping step {}\n".format(step_name))

except Exception as e:
with open(status_file, "a+") as status_f:
status_f.write("EXCEPTION\n")
status_f.write("{}\n".format(e))
status_f.write("FAILED")
sys.exit(1)

with open(status_file, "a+") as status_f:
status_f.write("SUCCEEDED")
# Done.


def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=True, save_fitopt=True, max_cores='all'):
def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=True, save_fitopt=True, max_cores='all', step_args={}):
"""Call ``calwebb_detector1`` on the provided file, running all
steps up to the ``ramp_fit`` step, and save the result. Optionally
run the ``ramp_fit`` step and save the resulting slope file as well.
Expand All @@ -130,7 +179,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
sys.stderr.write("Starting pipeline\n")
with open(status_file, 'a+') as status_f:
status_f.write("Starting pipeline\n")

try:
copy_files([input_file], work_directory)
set_permissions(uncal_file)
Expand All @@ -157,51 +206,73 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
# and use the run() method so that we can set parameters
# progammatically.
model = Detector1Pipeline()
params = {}

# Always true
if instrument == 'nircam':
model.refpix.odd_even_rows = False
params['refpix'] = dict(odd_even_rows=False)

# Default CR rejection threshold is too low
model.jump.rejection_threshold = 15
params['jump']['rejection_threshold'] = 15

# Turn off IPC step until it is put in the right place
model.ipc.skip = True

model.jump.save_results = True
model.jump.output_dir = work_directory
model.jump.maximum_cores = max_cores
jump_output = uncal_file.replace('uncal', 'jump')
# Set up to save jump step output
params['jump']['save_results'] = True
params['jump']['output_dir'] = work_directory
params['jump']['maximum_cores'] = max_cores
jump_output = short_name + '_jump.fits'

# Check to see if the jump version of the requested file is already
# present
run_jump = not os.path.isfile(jump_output)

if ramp_fit:
model.ramp_fit.save_results = True
model.ramp_fit.maximum_cores = max_cores
# model.save_results = True
model.output_dir = work_directory
# pipe_output = os.path.join(output_dir, input_file_only.replace('uncal', 'rate'))
pipe_output = uncal_file.replace('uncal', '0_ramp_fit')
params['ramp_fit'] = dict(save_results=True, maximum_cores=max_cores)

pipe_output = os.path.join(work_directory, short_name + '_0_ramp_fit.fits')
run_slope = not os.path.isfile(pipe_output)
if save_fitopt:
model.ramp_fit.save_opt = True
fitopt_output = uncal_file.replace('uncal', 'fitopt')
params['ramp_fit']['save_opt'] = True
fitopt_output = os.path.join(work_directory, short_name + '_fitopt.fits')
run_fitopt = not os.path.isfile(fitopt_output)
else:
model.ramp_fit.save_opt = False
params['ramp_fit']['save_opt'] = False
fitopt_output = None
run_fitopt = False
else:
model.ramp_fit.skip = True
params['ramp_fit']['skip'] = True
pipe_output = None
fitopt_output = None
run_slope = False
run_fitopt = False

# If the input file is dark.fits rather than uncal.fits, then skip
# all of the pipeline steps that are run prior to dark subtraction
if 'dark.fits' in input_file:
if instrument.lower() == 'miri':
steps_to_skip = ['group_scale', 'dq_init', 'saturation', 'ipc', 'firstframe',
'lastframe', 'reset', 'linearity', 'rscd']
else:
steps_to_skip = ['group_scale', 'dq_init', 'saturation', 'ipc', 'superbias',
'refpix', 'linearity']
for step in steps_to_skip:
step_dict = dict(skip=True)
if step in params:
params[step] = params[step].update(step_dict)
else:
params[step] = dict(skip=True)
else:
# Turn off IPC step until it is put in the right place
params['ipc'] = dict(skip=True)

# Include any user-specified parameters
for step_name in step_args:
if step_name in params:
params[step_name] = params[step_name].update(step_args[step_name])
else:
params[step_name] = step_args[step_name]

if run_jump or (ramp_fit and run_slope) or (save_fitopt and run_fitopt):
model.run(datamodel)
model.call(datamodel, output_dir=work_directory, steps=params)
else:
print(("Files with all requested calibration states for {} already present in "
"output directory. Skipping pipeline call.".format(uncal_file)))
Expand All @@ -211,7 +282,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
status_f.write("{}\n".format(e))
status_f.write("FAILED")
sys.exit(1)

with open(status_file, "a+") as status_f:
status_f.write("{}\n".format(jump_output))
status_f.write("{}\n".format(pipe_output))
Expand All @@ -236,6 +307,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
out_help = 'Comma-separated list of output extensions (for cal only, otherwise just "all")'
name_help = 'Input file name with no path or extensions'
cores_help = 'Maximum cores to use (default "all")'
step_args_help = 'Step-specific parameter value nested dictionary'
parser = argparse.ArgumentParser(description='Run local calibration')
parser.add_argument('pipe', metavar='PIPE', type=str, help=pipe_help)
parser.add_argument('outputs', metavar='OUTPUTS', type=str, help=out_help)
Expand All @@ -244,6 +316,7 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
parser.add_argument('input_file', metavar='FILE', type=str, help=file_help)
parser.add_argument('short_name', metavar='NAME', type=str, help=name_help)
parser.add_argument('max_cores', metavar='CORES', type=str, help=cores_help)
parser.add_argument('--step_args', metavar='STEP_ARGS', type=json.loads, default='{}', help=step_args_help)

with open(general_status_file, "a+") as status_file:
status_file.write("Created argument parser at {}\n".format(time.ctime()))
Expand All @@ -258,14 +331,15 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T

with open(general_status_file, "a+") as status_file:
status_file.write("Finished parsing args at {}\n".format(time.ctime()))

input_file = args.input_file
instrument = args.instrument
short_name = args.short_name
working_path = args.working_path
pipe_type = args.pipe
outputs = args.outputs

step_args = args.step_args

status_file = os.path.join(working_path, short_name+"_status.txt")
with open(status_file, 'w') as out_file:
out_file.write("Starting Process\n")
Expand All @@ -275,23 +349,24 @@ def run_save_jump(input_file, short_name, work_directory, instrument, ramp_fit=T
out_file.write("\tinstrument is {} ({})\n".format(instrument, type(instrument)))
out_file.write("\tinput_file is {} ({})\n".format(input_file, type(input_file)))
out_file.write("\tshort_name is {} ({})\n".format(short_name, type(short_name)))

out_file.write("\tstep_args is {} ({})\n".format(step_args, type(step_args)))

if not os.path.isfile(args.input_file):
raise FileNotFoundError("No input file {}".format(args.input_file))

if pipe_type not in ['jump', 'cal']:
raise ValueError("Unknown calibration type {}".format(pipe_type))

try:
if pipe_type == 'jump':
with open(status_file, 'a+') as out_file:
out_file.write("Running jump pipeline.\n")
run_save_jump(input_file, short_name, working_path, instrument, ramp_fit=True, save_fitopt=True, max_cores=args.max_cores)
run_save_jump(input_file, short_name, working_path, instrument, ramp_fit=True, save_fitopt=True, max_cores=args.max_cores, step_args=args.step_args)
elif pipe_type == 'cal':
with open(status_file, 'a+') as out_file:
out_file.write("Running cal pipeline.\n")
outputs = outputs.split(",")
run_pipe(input_file, short_name, working_path, instrument, outputs, max_cores=args.max_cores)
run_pipe(input_file, short_name, working_path, instrument, outputs, max_cores=args.max_cores, step_args=args.step_args)
except Exception as e:
with open(status_file, 'a+') as out_file:
out_file.write("Exception when starting pipeline.\n")
Expand Down
Loading

0 comments on commit 65fb43e

Please sign in to comment.