#!/usr/bin/env python
import os.path as op
from collections import defaultdict
from ..stpipe import Pipeline
from stdatamodels.jwst import datamodels
from jwst.datamodels import ModelContainer
from ..model_blender import blendmeta
# step imports
from ..coron import stack_refs_step
from ..coron import align_refs_step
from ..coron import klip_step
from ..outlier_detection import outlier_detection_step
from ..resample import resample_step
__all__ = ['Coron3Pipeline']
def to_container(model):
"""Convert to a ModelContainer of ImageModels for each plane"""
container = ModelContainer()
for plane in range(model.shape[0]):
image = datamodels.ImageModel()
for attribute in [
'data', 'dq', 'err', 'zeroframe', 'area',
'var_poisson', 'var_rnoise', 'var_flat'
]:
try:
setattr(image, attribute, model.getarray_noinit(attribute)[plane])
except AttributeError:
pass
image.update(model)
try:
image.meta.wcs = model.meta.wcs
except AttributeError:
pass
container.append(image)
return container
[docs]
class Coron3Pipeline(Pipeline):
"""Class for defining Coron3Pipeline.
Coron3Pipeline: Apply all level-3 calibration steps to a
coronagraphic association of exposures. Included steps are:
#. stack_refs (assemble reference PSF inputs)
#. align_refs (align reference PSFs to target images)
#. klip (PSF subtraction using the KLIP algorithm)
#. outlier_detection (flag outliers)
#. resample (image combination and resampling)
"""
class_alias = "calwebb_coron3"
spec = """
suffix = string(default='i2d')
"""
# Define aliases to steps
step_defs = {
'stack_refs': stack_refs_step.StackRefsStep,
'align_refs': align_refs_step.AlignRefsStep,
'klip': klip_step.KlipStep,
'outlier_detection': outlier_detection_step.OutlierDetectionStep,
'resample': resample_step.ResampleStep
}
prefetch_references = False
# Main processing
[docs]
def process(self, user_input):
"""Primary method for performing pipeline.
Parameters
----------
user_input : str, Level3 Association, or ~jwst.datamodels.JwstDataModel
The exposure or association of exposures to process
"""
self.log.info('Starting calwebb_coron3 ...')
asn_exptypes = ['science', 'psf']
# Create a DM object using the association table
input_models = datamodels.open(user_input, asn_exptypes=asn_exptypes)
acid = input_models.meta.asn_table.asn_id
# Store the output file for future use
self.output_file = input_models.meta.asn_table.products[0].name
# Find all the member types in the product
members_by_type = defaultdict(list)
prod = input_models.meta.asn_table.products[0].instance
for member in prod['members']:
members_by_type[member['exptype'].lower()].append(member['expname'])
# Set up required output products and formats
self.outlier_detection.suffix = f'{acid}_crfints'
self.outlier_detection.save_results = self.save_results
self.resample.blendheaders = False
# Save the original outlier_detection.skip setting from the
# input, because it may get toggled off within loops for
# processing individual inputs
skip_outlier_detection = self.outlier_detection.skip
# Extract lists of all the PSF and science target members
psf_files = members_by_type['psf']
targ_files = members_by_type['science']
# Make sure we found some PSF and target members
if len(psf_files) == 0:
err_str1 = 'No reference PSF members found in association table.'
self.log.error(err_str1)
self.log.error('Calwebb_coron3 processing will be aborted')
return
if len(targ_files) == 0:
err_str1 = 'No science target members found in association table'
self.log.error(err_str1)
self.log.error('Calwebb_coron3 processing will be aborted')
return
for member in psf_files + targ_files:
self.prefetch(member)
# Assemble all the input psf files into a single ModelContainer
psf_models = ModelContainer()
for i in range(len(psf_files)):
psf_input = datamodels.CubeModel(psf_files[i])
psf_models.append(psf_input)
psf_input.close()
# Perform outlier detection on the PSFs.
if not skip_outlier_detection:
for model in psf_models:
self.outlier_detection(model)
# step may have been skipped for this model;
# turn back on for next model
self.outlier_detection.skip = False
else:
self.log.info('Outlier detection skipped for PSF\'s')
# Stack all the PSF images into a single CubeModel
psf_stack = self.stack_refs(psf_models)
psf_models.close()
# Save the resulting PSF stack
self.save_model(psf_stack, suffix='psfstack')
# Call the sequence of steps: outlier_detection, align_refs, and klip
# once for each input target exposure
resample_input = ModelContainer()
for target_file in targ_files:
with datamodels.open(target_file) as target:
# Remove outliers from the target
if not skip_outlier_detection:
target = self.outlier_detection(target)
# step may have been skipped for this model;
# turn back on for next model
self.outlier_detection.skip = False
# Call align_refs
psf_aligned = self.align_refs(target, psf_stack)
# Save the alignment results
self.save_model(
psf_aligned, output_file=target_file,
suffix='psfalign', acid=acid
)
# Call KLIP
psf_sub = self.klip(target, psf_aligned)
psf_aligned.close()
# Save the psf subtraction results
self.save_model(
psf_sub, output_file=target_file,
suffix='psfsub', acid=acid
)
# Split out the integrations into separate models
# in a ModelContainer to pass to `resample`
for model in to_container(psf_sub):
resample_input.append(model)
# Call the resample step to combine all psf-subtracted target images
result = self.resample(resample_input)
# Blend the science headers
try:
completed = result.meta.cal_step.resample
except AttributeError:
self.log.debug('Could not determine if resample was completed.')
self.log.debug('Presuming not.')
completed = 'SKIPPED'
if completed == 'COMPLETE':
self.log.debug(f'Blending metadata for {result}')
blendmeta.blendmodels(result, inputs=targ_files)
try:
result.meta.asn.pool_name = input_models.meta.asn_table.asn_pool
result.meta.asn.table_name = op.basename(user_input)
except AttributeError:
self.log.debug('Cannot set association information on final')
self.log.debug(f'result {result}')
# Save the final result
self.save_model(result, suffix=self.suffix)
# We're done
self.log.info('...ending calwebb_coron3')
return