Source code for jwst.outlier_detection.outlier_detection_ifu

"""Class definition for performing outlier detection on IFU data."""

import logging

import numpy as np
from skimage.util import view_as_windows

from stdatamodels.jwst import datamodels
from stdatamodels.jwst.datamodels import dqflags
from .outlier_detection import OutlierDetection

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

__all__ = ["OutlierDetectionIFU"]


def medfilt(arr, kern_size):
    # scipy.signal.medfilt (and many other median filters) have undefined behavior
    # for nan inputs. See: https://github.com/scipy/scipy/issues/4800
    padded = np.pad(arr, [[k // 2] for k in kern_size])
    windows = view_as_windows(padded, kern_size, np.ones(len(kern_size), dtype='int'))
    return np.nanmedian(windows, axis=np.arange(-len(kern_size), 0))


[docs] class OutlierDetectionIFU(OutlierDetection): """Sub-class defined for performing outlier detection on IFU data. This is the controlling routine for the outlier detection process. It loads and sets the various input data and parameters needed to flag outliers. Pixel are flagged as outliers based on the MINIMUM difference a pixel has with its neighbor across all the input cal files. Notes ----- This routine performs the following operations:: 1. Extracts parameter settings from input ModelContainer and merges them with any user-provided values 2. Loop over cal files a. read in science data b. Store computed neighbor differences for all the pixels. The neighbor pixel differences are defined by the dispersion axis. For MIRI, with the dispersion axis along the y axis, the neighbors that are used to to find the differences are to the left and right of each pixel being examined. For NIRSpec, with the dispersion along the x axis, the neighbors that are used to find the differences are above and below the pixel being examined. 3. For each input file store the minimum of the pixel neighbor differences 4. Comparing all the differences from all the input data find the minimum neighbor difference 5. Normalize minimum difference to local median of difference array 6. select outliers by flagging those normailzed minimum values > threshold_percent 7. Updates input ImageModel DQ arrays with mask of detected outliers. """ def __init__(self, input_models, reffiles=None, **pars): """Initialize class for IFU data processing. Parameters ---------- input_models : ~jwst.datamodels.ModelContainer, str list of data models as ModelContainer or ASN file, one data model for each input 2-D ImageModel reffiles : dict of `~stdatamodels.jwst.datamodels.JwstDataModel` Dictionary of datamodels. Keys are reffile_types. """ OutlierDetection.__init__(self, input_models, reffiles=reffiles, **pars)
[docs] def create_optional_results_model(self, opt_info): """ Creates an OutlierOutputModel from the computed arrays from outlier detection on IFU data. Parameter --------- input_model: ~stdatamodels.jwst.datamodels.RampModel opt_info: tuple The output arrays needed for the OultierOutputModel. Return --------- opt_model : OutlierIFUOutputModel The optional OutlierIFUOutputModel to be returned from the outlier_detection_ifu step. """ (kernsize_x, kernsize_y, threshold_percent, diffarr, minarr, normarr, minnorm) = opt_info opt_model = datamodels.OutlierIFUOutputModel( diffarr=diffarr, minarr=minarr, normarr=normarr, minnorm=minnorm) opt_model.meta.kernel_xsize = kernsize_x opt_model.meta.kernel_ysize = kernsize_y opt_model.meta.threshold_percent = threshold_percent return opt_model
def _find_detector_parameters(self): """Find the size of data and the axis to form the differences (perpendicular to disaxis) """ if self.input_models[0].meta.instrument.name.upper() == 'MIRI': diffaxis = 1 elif self.input_models[0].meta.instrument.name.upper() == 'NIRSPEC': diffaxis = 0 ny, nx = self.inputs[0].data.shape return (diffaxis, ny, nx)
[docs] def do_detection(self): """Split data by detector to find outliers.""" # outlier_detection.py has the basic class that is used by all favors # of outlier detection. This class sets up self.inputs. We need to fill # in self.input_models with flagged outliers (this is what outlier_detection_step.py # returns). self.input_models = self.inputs self.build_suffix(**self.outlierpars) save_intermediate_results = \ self.outlierpars['save_intermediate_results'] kernel_size = self.outlierpars['kernel_size'] sizex, sizey = [int(val) for val in kernel_size.split()] kern_size = np.zeros(2, dtype=int) kern_size[0] = sizex kern_size[1] = sizey ifu_second_check = self.outlierpars['ifu_second_check'] # check if kernel size is an odd value if kern_size[0] % 2 == 0: log.info("X kernel size is given as an even number. This value must be an odd number. Increasing number by 1") kern_size[0] = kern_size[0] + 1 log.info("New x kernel size is {}: ".format(kern_size[0])) if kern_size[1] % 2 == 0: log.info("Y kernel size is given as an even number. This value must be an odd number. Increasing number by 1") kern_size[1] = kern_size[1] + 1 log.info("New y kernel size is {}: ".format(kern_size[1])) threshold_percent = self.outlierpars['threshold_percent'] (diffaxis, ny, nx) = self._find_detector_parameters() nfiles = len(self.input_models) detector = np.empty(nfiles, dtype='<U15') for i, model in enumerate(self.input_models): detector[i] = model.meta.instrument.detector.lower() exptype = self.input_models[0].meta.exposure.type log.info("Performing IFU outlier_detection for exptype {}".format( exptype)) # How many unique values of detector? uq_det = np.unique(detector) ndet = len(uq_det) for idet in range(ndet): indx = (np.where(detector == uq_det[idet]))[0] ndet_files = int(len(indx)) self.flag_outliers(idet, uq_det, ndet_files, diffaxis, nx, ny, kern_size, threshold_percent, save_intermediate_results, ifu_second_check)
[docs] def flag_outliers(self, idet, uq_det, ndet_files, diffaxis, nx, ny, kern_size, threshold_percent, save_intermediate_results, ifu_second_check): """ Flag outlier pixels on IFU. In general we are searching for pixels that are a form of a bad pixel but not in bad pixel mask, because the bad pixels vary with time. This program will flag the DQ of input images as DO_NOT_USE and OUTLIER and set the associated science pixel to a Nan. This routine only works on data from one detector. Parameters ---------- idet : int Integer indicating which detector we are working with uq_det : string array Array of (unique) detector names found input data n_det_files : int Number of files for the detector we are working on diffaxis : int The axis to form the adjacent pixel differences nx : int Size of input data on x axis ny : int Since of inut data on y axis threshold_percent : float Percent for flagging outliers. Flags pixels where the minimum difference between adjacent pixels for all the input data for a detector is above this percentage. The percentage is based on using all the pixels except a 4 X 4 row and column region around the detector that is often noisy. save_intermediate_results : boolean If True then save intermediate output data ifu_second_check : boolean If True then perform a secondary check searching for outliers. This will set outliers where ever the difference array of adjacent pixels is a Nan. """ # set up array to hold group differences diffarr = np.zeros([ndet_files, ny, nx]) j = 0 for i, model in enumerate(self.input_models): detector = model.meta.instrument.detector.lower() # only use data from the same detector if detector == uq_det[idet]: sci = model.data dq = model.dq bad = np.bitwise_and(dq, dqflags.pixel['DO_NOT_USE']).astype(bool) # set all science data that have DO_NOT_USE to NAN sci[bad] = np.nan # Compute left and right differences (MIRI dispersion axis = 1 along y axis) # For NIRSpec dispersion axis = 0 (along the x axis), these differences are top, bottom # prepend = 0 has the effect of keeping the same shape as sci and # for MIRI data (disp axis = 1) the first column = sci data # OR # for NIRSpec data (disp axis = 0) the first row = sci data leftdiff = np.diff(sci, axis=diffaxis, prepend=0) flip = np.flip(sci, axis=diffaxis) rightdiff = np.diff(flip, axis=diffaxis, prepend=0) rightdiff = np.flip(rightdiff, axis=diffaxis) # Combine left and right differences with minimum of the abs value # to avoid artifacts from bright edges comb = np.zeros([2, ny, nx]) comb[0, :, :] = np.abs(leftdiff) comb[1, :, :] = np.abs(rightdiff) combdiff = np.nanmin(comb, axis=0) diffarr[j, :, :] = combdiff j = j + 1 # minarr final minimum combined differences, size: ny X nx minarr = np.nanmin(diffarr, axis=0) # Normalise the differences to a local median image to deal with ultra-bright sources normarr = medfilt(minarr, kern_size) nfloor = np.nanmedian(minarr)/3 normarr[normarr < nfloor] = nfloor # Ensure we never divide by a tiny number minarr_norm = minarr / normarr # Percentile cut of the central region (cutting out weird detector edge effects) pctmin = np.nanpercentile(minarr_norm[4:ny-4, 4:nx-4], threshold_percent) log.debug("Flag pixels with values above {} {}: ".format(threshold_percent, pctmin)) # Flag everything above this percentile value. Using np.where here because we count # the number of pixels flagged using len(indx[0]) indx = minarr_norm > pctmin num_above = indx.sum() if save_intermediate_results: detector_name = uq_det[idet] opt_info = (kern_size[0], kern_size[1], threshold_percent, diffarr, minarr, normarr, minarr_norm) opt_model = self.create_optional_results_model(opt_info) opt_model.meta.filename = self.make_output_path( basepath=self.input_models.meta.asn_table.products[0].name, suffix=detector_name + '_outlier_output') log.info("Writing out intermediate outlier file {}".format(opt_model.meta.filename)) opt_model.save(opt_model.meta.filename) del diffarr # store some information if the second flagging step is to be done. if ifu_second_check: # store where the minarr is nan (neighbor pixels have nan so differences produces a nan) nanminarr = np.isnan(minarr) nanindx = np.where(nanminarr) # Update DQ flag for i in range(len(self.input_models)): detector = self.input_models[i].meta.instrument.detector.lower() # only use data from the same detector if detector == uq_det[idet]: model = self.input_models[i] sci = model.data dq = model.dq # There could be a large number of pixels with a sci value of NaN # but the dq flag of DO_NOT_USE has not been set. # This can occur in Non-science regions of the detector. check = np.where( np.logical_and(~np.bitwise_and(dq, dqflags.pixel['DO_NOT_USE']).astype(bool), np.isnan(sci))) log.debug("Number of pixels DQ was not set to DO_NOT_USE and Sci array was Nan{} ". format(len(check[0]))) # set all pixels with dq = DO_NOT_USE to have sci values of Nan bad = np.bitwise_and(dq, dqflags.pixel['DO_NOT_USE']).astype(bool) sci[bad] = np.nan # Basic setting outliers: flagging those at are found in from Percentage cut sci[indx] = np.nan dq[indx] = np.bitwise_or(dq[indx], dqflags.pixel['DO_NOT_USE']) dq[indx] = np.bitwise_or(dq[indx], dqflags.pixel['OUTLIER']) nadditional = 0 # Second level of setting outliers: flagging pixels were minarr was a Nan # This will also catch pixels that have a sci of Nan but the DQ flags did # not have DO_NOT_USE set if ifu_second_check: # For counting purposes, count the number of science values that were valid (not Nan) # after basic flagging in the nanminarr region that will now be flagged as a Nan. nadditional = (~np.isnan(sci[nanindx])).sum() sci[nanindx] = np.nan dq[nanindx] = np.bitwise_or(dq[nanindx], dqflags.pixel['DO_NOT_USE']) dq[nanindx] = np.bitwise_or(dq[nanindx], dqflags.pixel['OUTLIER']) log.info("Number of outlier pixels flagged main ifu outlier flagging: {} on detector {} ".format( len(indx[0]), uq_det[idet])) log.info("Number of outlier pixels flagged in second check: {} on detector {} ".format( nadditional, uq_det[idet])) total_bad = num_above + nadditional percent_cr = total_bad / (model.data.shape[0] * model.data.shape[1]) * 100 log.info(f"Total # pixels flagged as outliers: {total_bad} ({percent_cr:.2f}%)") # update model self.input_models[i] = model