#!/usr/bin/env python
from collections import defaultdict
import os.path as op
import numpy as np
from stdatamodels.jwst import datamodels
from jwst.datamodels import SourceModelContainer
from ..associations.lib.rules_level3_base import format_product
from ..exp_to_source import multislit_to_container
from ..master_background.master_background_step import split_container
from ..stpipe import Pipeline
from ..lib.exposure_types import is_moving_target
# step imports
from ..assign_mtwcs import assign_mtwcs_step
from ..cube_build import cube_build_step
from ..extract_1d import extract_1d_step
from ..master_background import master_background_step
from ..mrs_imatch import mrs_imatch_step
from ..outlier_detection import outlier_detection_step
from ..resample import resample_spec_step
from ..combine_1d import combine_1d_step
from ..photom import photom_step
from ..spectral_leak import spectral_leak_step
__all__ = ['Spec3Pipeline']
# Group exposure types
MULTISOURCE_MODELS = ['MultiSlitModel']
IFU_EXPTYPES = ['MIR_MRS', 'NRS_IFU']
SLITLESS_TYPES = ['NIS_SOSS', 'NIS_WFSS', 'NRC_WFSS']
[docs]
class Spec3Pipeline(Pipeline):
"""
Spec3Pipeline: Processes JWST spectroscopic exposures from Level 2b to 3.
Included steps are:
assign moving target wcs (assign_mtwcs)
master background subtraction (master_background)
MIRI MRS background matching (mrs_imatch)
outlier detection (outlier_detection)
2-D spectroscopic resampling (resample_spec)
3-D spectroscopic resampling (cube_build)
1-D spectral extraction (extract_1d)
Absolute Photometric Calibration (photom)
1-D spectral combination (combine_1d)
"""
class_alias = "calwebb_spec3"
spec = """
"""
# Define aliases to steps
step_defs = {
'assign_mtwcs': assign_mtwcs_step.AssignMTWcsStep,
'master_background': master_background_step.MasterBackgroundStep,
'mrs_imatch': mrs_imatch_step.MRSIMatchStep,
'outlier_detection': outlier_detection_step.OutlierDetectionStep,
'resample_spec': resample_spec_step.ResampleSpecStep,
'cube_build': cube_build_step.CubeBuildStep,
'extract_1d': extract_1d_step.Extract1dStep,
'photom': photom_step.PhotomStep,
'combine_1d': combine_1d_step.Combine1dStep,
'spectral_leak': spectral_leak_step.SpectralLeakStep
}
# Main processing
[docs]
def process(self, input):
"""Entrypoint for this pipeline
Parameters
----------
input: str, Level3 Association, or ~jwst.datamodels.JwstDataModel
The exposure or association of exposures to process
"""
self.log.info('Starting calwebb_spec3 ...')
asn_exptypes = ['science', 'background']
# Setup sub-step defaults
self.master_background.suffix = 'mbsub'
self.mrs_imatch.suffix = 'mrs_imatch'
self.outlier_detection.suffix = 'crf'
self.outlier_detection.save_results = self.save_results
self.resample_spec.suffix = 's2d'
self.resample_spec.save_results = self.save_results
self.cube_build.suffix = 's3d'
self.cube_build.save_results = self.save_results
self.extract_1d.suffix = 'x1d'
self.extract_1d.save_results = self.save_results
self.combine_1d.suffix = 'c1d'
self.combine_1d.save_results = self.save_results
self.spectral_leak.suffix = 'x1d'
self.spectral_leak.save_results = self.save_results
# Retrieve the inputs:
# could either be done via LoadAsAssociation and then manually
# load input members into models and ModelContainer, or just
# do a direct open of all members in ASN file, e.g.
input_models = datamodels.open(input, asn_exptypes=asn_exptypes)
# Immediately update the ASNTABLE keyword value in all inputs,
# so that all outputs get the new value
for model in input_models:
model.meta.asn.table_name = op.basename(input_models.asn_table_name)
# For the first round of development we will assume that the input
# is ALWAYS an ASN. There's no use case for anyone ever running a
# single exposure through.
# Once data are loaded, store a few things for future use;
# some of this is here only for the purpose of creating fake
# products until the individual tasks work and do it themselves
exptype = input_models[0].meta.exposure.type
model_type = input_models[0].meta.model_type
output_file = input_models.meta.asn_table.products[0].name
self.output_file = output_file
# Find all the member types in the product
members_by_type = defaultdict(list)
product = input_models.meta.asn_table.products[0].instance
for member in product['members']:
members_by_type[member['exptype'].lower()].append(member['expname'])
if is_moving_target(input_models):
self.log.info("Assigning WCS to a Moving Target exposure.")
input_models = self.assign_mtwcs(input_models)
# If background data are present, call the master background step
if members_by_type['background']:
source_models = self.master_background(input_models)
source_models.meta.asn_table = input_models.meta.asn_table
# If the step is skipped, do the container splitting that
# would've been done in master_background
if self.master_background.skip:
source_models, bkg_models = split_container(input_models)
# we don't need the background members
bkg_models.close()
del bkg_models
else:
# The input didn't contain any background members,
# so we use all the inputs in subsequent steps
source_models = input_models
# `sources` is the list of astronomical sources that need be
# processed. Each element is a ModelContainer, which contains
# models for all exposures that belong to a single source.
#
# For JWST spectral modes, the input associations can contain
# one of two types of collections. If the exposure type is
# considered single-source, then the association contains only
# exposures of that source.
#
# However, there are modes in which the exposures contain data
# from multiple sources. In that case, the data must be
# rearranged, collecting the exposures representing each
# source into its own ModelContainer. This produces a list of
# sources, each represented by a MultiExposureModel instead of
# a single ModelContainer.
sources = [source_models]
if model_type in MULTISOURCE_MODELS:
self.log.info('Convert from exposure-based to source-based data.')
sources = [
(name, model)
for name, model in multislit_to_container(source_models).items()
]
# Check for negative and large source_id values
if len(sources) > 99999:
self.log.critical("Data contain more than 100,000 sources;"
"filename does not support 6 digit source ids.")
raise Exception
available_src_ids = set(np.arange(99999) + 1)
used_src_ids = set()
for src in sources:
src_id, model = src
src_id = int(src_id)
used_src_ids.add(src_id)
if 0 < src_id <= 99999:
available_src_ids.remove(src_id)
hotfixed_sources = []
# now find and reset bad source_id values
for src in sources:
src_id, model = src
src_id = int(src_id)
# Replace ids that aren't positive 5-digit integers
if src_id < 0 or src_id > 99999:
src_id_new = available_src_ids.pop()
self.log.info(f"Source ID {src_id} falls outside allowed range.")
self.log.info(f"Reassigning {src_id} to {str(src_id_new).zfill(5)}.")
# Replace source_id for each model in the SourceModelContainers
for contained_model in model:
contained_model.source_id = src_id_new
src_id = src_id_new
hotfixed_sources.append((str(src_id), model))
sources = hotfixed_sources
# Process each source
for source in sources:
# If each source is a SourceModelContainer,
# the output name needs to be updated with the source ID, and potentially
# also the slit name (for NIRSpec fixed-slit only).
if isinstance(source, tuple):
source_id, result = source
if result[0].meta.exposure.type == "NRS_FIXEDSLIT":
slit_name = self._create_nrsfs_slit_name(result)
self.output_file = format_product(
output_file, source_id=source_id.lower(), slit_name=slit_name)
else:
self.output_file = format_product(
output_file, source_id=source_id.lower())
else:
result = source
# The MultiExposureModel is a required output.
if isinstance(result, SourceModelContainer):
self.save_model(result, 'cal')
# Call the skymatch step for MIRI MRS data
if exptype in ['MIR_MRS']:
result = self.mrs_imatch(result)
# Call outlier detection
if exptype not in SLITLESS_TYPES:
# Update the asn table name to the level 3 instance so that
# the downstream products have the correct table name since
# the _cal files are not saved they will not be updated
for cal_array in result:
cal_array.meta.asn.table_name = op.basename(input_models.asn_table_name)
result = self.outlier_detection(result)
# Resample time. Dependent on whether the data is IFU or not.
resample_complete = None
if exptype in IFU_EXPTYPES:
result = self.cube_build(result)
try:
resample_complete = result[0].meta.cal_step.cube_build
except AttributeError:
pass
else:
result = self.resample_spec(result)
try:
resample_complete = result.meta.cal_step.resample
except AttributeError:
pass
# Do 1-D spectral extraction
if exptype in SLITLESS_TYPES:
# For slitless data, extract 1D spectra and then combine them
if exptype in ['NIS_SOSS']:
# For NIRISS SOSS, don't save the extract_1d results,
# instead run photom on the extract_1d results and save
# those instead.
self.extract_1d.save_results = False
result = self.extract_1d(result)
# SOSS F277W may return None - don't bother with that.
if result is not None:
self.photom.save_results = self.save_results
self.photom.suffix = 'x1d'
result = self.photom(result)
else:
result = self.extract_1d(result)
result = self.combine_1d(result)
elif resample_complete is not None and resample_complete.upper() == 'COMPLETE':
# If 2D data were resampled and combined, just do a 1D extraction
if exptype in IFU_EXPTYPES:
self.extract_1d.search_output_file = False
if exptype in ['MIR_MRS']:
if not self.spectral_leak.skip:
self.extract_1d.save_results = False
self.spectral_leak.suffix = 'x1d'
self.spectral_leak.search_output_file = False
self.spectral_leak.save_results = self.save_results
result = self.extract_1d(result)
if exptype in ['MIR_MRS']:
result = self.spectral_leak(result)
else:
self.log.warning(
'Resampling was not completed. Skipping extract_1d.'
)
input_models.close()
self.log.info('Ending calwebb_spec3')
return
def _create_nrsfs_slit_name(self, source_models):
"""Create the complete slit_name product field for NIRSpec fixed-slit products
Each unique value of slit name within the list of input source models
is appended to the final slit name string.
"""
slit_names = []
slit_names.append(source_models[0].name.lower())
for i in range(len(source_models)):
name = source_models[i].name.lower()
if name not in slit_names:
slit_names.append(name)
slit_name = "-".join(slit_names) # append slit names using a dash separator
return slit_name