"""Set Telescope Pointing from Observatory Engineering Telemetry
Calculate and update the pointing-related and world coordinate system-related
keywords. Given a time period, usually defined by an exposure, the engineering
mnemonic database is queried for observatory orientation. The orientation
defines the sky coordinates a particular point on the observatory is pointed to.
Then, using a set of matrix transformations, the sky coordinates of the
reference pixel of a desired aperture is calculated.
The transformations are defined by the Technical Reference JWST-STScI-003222,
SM-12. This document has undergone a number of revisions. The current version
implemented is based on an internal email version Rev. C, produced 2021-11.
There are a number of algorithms, or *methods*, that have been implemented.
Most represent the historical refinement of the algorithm. Until the technical
reference is finalized, all methods will remain in the code. The default,
state-of-the art algorithm is represented by method ``OPS_TR_202111``,
implemented by
`~jwst.lib.set_telescope_pointing.calc_transforms_ops_tr_202111`.
Interface
=========
The primary usage is through the command line interface
`set_telescope_pointing.py`. Operating on a list of JWST Level 1b exposures,
this command updates the world coordinate system keywords with the values
necessary to translate from aperture pixel to sky coordinates.
Access to the JWST Engineering Mnemonic database is required. See the
:ref:`Engineering Database Interface<engdb>` for more information.
Programmatically, the command line is implemented by the function
`~jwst.lib.set_telescope_pointing.add_wcs`, which calls the basic function
`~jwst.lib.set_telescope_pointing.calc_wcs`. The available methods are defined
by `~jwst.lib.set_telescope_pointing.Methods`.
There are two data structures used to maintain the state of the transformation.
`~jwst.lib.set_telescope_pointing.TransformParameters` contains the parameters
needed to perform the transformations.
`~jwst.lib.set_telescope_pointing.Transforms` contains the calculated
transformation matrices.
Transformation Matrices
-----------------------
All the transformation matrices, as defined by
`~jwst.lib.set_telescope_pointing.Transforms`, are Direction Cosine Matrices
(DCM). A DCM contains the Euler rotation angles that represent the sky
coordinates for a particular frame-of-reference. The initial DCM is provided
through the engineering telemetry and represents where in the sky either the
Fine Guidance Sensor (FGS) or star tracker is pointed to. Then, through a set
of transformations, the DCM for the reference point of the target aperture
is calculated.
"""
import functools
import sys
import asdf
from collections import defaultdict, namedtuple
from copy import copy
import dataclasses
from enum import Enum
import logging
from math import (cos, sin, sqrt)
import typing
from astropy import units as U
from astropy.table import Table
from astropy.time import Time
import numpy as np
from scipy.interpolate import interp1d
from stdatamodels.jwst import datamodels
from .exposure_types import IMAGING_TYPES, FGS_GUIDE_EXP_TYPES
from .set_velocity_aberration import compute_va_effects_vector
from .siafdb import SIAF, SiafDb
from ..assign_wcs.util import update_s_region_keyword, calc_rotation_matrix
from ..assign_wcs.pointing import v23tosky
from ..lib.engdb_tools import ENGDB_Service
from ..lib.pipe_utils import is_tso
__all__ = [
'Methods',
'TransformParameters',
'Transforms',
'WCSRef',
'add_wcs',
'calc_transforms',
'calc_transforms_ops_tr_202111',
'calc_wcs',
'calc_wcs_over_time',
'update_wcs',
]
# Setup logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
DEBUG_FULL = logging.DEBUG - 1
LOGLEVELS = [logging.INFO, logging.DEBUG, DEBUG_FULL]
# Datamodels that can be updated, normally
EXPECTED_MODELS = (datamodels.Level1bModel, datamodels.ImageModel, datamodels.CubeModel)
# Exposure types that can be updated, normally
TYPES_TO_UPDATE = set(list(IMAGING_TYPES) + FGS_GUIDE_EXP_TYPES)
# Mnemonics for each transformation method.
# dict where value indicates whether the mnemonic is required or not.
COURSE_TR_202111_MNEMONICS = {
'SA_ZATTEST1': True,
'SA_ZATTEST2': True,
'SA_ZATTEST3': True,
'SA_ZATTEST4': True,
'SA_ZRFGS2J11': True,
'SA_ZRFGS2J12': True,
'SA_ZRFGS2J13': True,
'SA_ZRFGS2J21': True,
'SA_ZRFGS2J22': True,
'SA_ZRFGS2J23': True,
'SA_ZRFGS2J31': True,
'SA_ZRFGS2J32': True,
'SA_ZRFGS2J33': True,
'SA_ZADUCMDX': False,
'SA_ZADUCMDY': False,
'SA_ZFGGSCMDX': False,
'SA_ZFGGSCMDY': False,
'SA_ZFGDETID': False,
}
TRACK_TR_202111_MNEMONICS = {
**COURSE_TR_202111_MNEMONICS,
'SA_ZFGGSPOSX': False,
'SA_ZFGGSPOSY': False,
}
FGS_ACQ_EXP_TYPES = ['fgs_acq1', 'fgs_acq2']
FGS_ACQ_MNEMONICS = {
'IFGS_ACQ_DETXCOR': True,
'IFGS_ACQ_DETYCOR': True,
'IFGS_ACQ_DETXSIZ': True,
'IFGS_ACQ_DETYSIZ': True,
'IFGS_ACQ_XPOSG': True,
'IFGS_ACQ_YPOSG': True,
}
FGS_GUIDED_EXP_TYPES = ['fgs_fineguide', 'fgs_track']
FGS_GUIDED_MNEMONICS = {
'IFGS_TFGGS_X',
'IFGS_TFGGS_Y',
'IFGS_TFGDET_XCOR',
'IFGS_TFGDET_YCOR',
'IFGS_TFGDET_XSIZ',
'IFGS_TFGDET_YSIZ',
}
FGS_ID_EXP_TYPES = ['fgs_id-image', 'fgs_id-stack']
FGS_ID_MNEMONICS = {
'IFGS_ID_XPOSG',
'IFGS_ID_YPOSG',
'IFGS_ID_DETXCOR',
'IFGS_ID_DETYCOR',
'IFGS_ID_DETXSIZ',
'IFGS_ID_DETYSIZ',
}
# FGS ACQ1/ACQ2 modes, a dedicated range of mmenonics need to be present.
# These define those ranges. Key is the ACQ exposure type.
FGS_ACQ_MINVALUES = {
'fgs_acq1': 1,
'fgs_acq2': 4
}
FGS_ACQ_SLICES = {
'fgs_acq1': slice(0, 3),
'fgs_acq2': slice(3, 8)
}
FGS_ACQ_WINDOW_INDEX = {
'fgs_acq1': 0,
'fgs_acq2': -1
}
# The available methods for transformation
[docs]
class Methods(Enum):
"""Available methods to calculate V1 and aperture WCS information
Current state-of-art is OPS_TR_202111. This method chooses either COARSE_TR_202111 or
TRACK_TR_202111 depending on the guidance mode, as specified by header keyword PCS_MODE.
"""
#: COARSE tracking mode algorithm, TR version 2021-11.
COARSE_TR_202111 = ('coarse_tr_202111', 'calc_transforms_coarse_tr_202111', 'calc_wcs_tr_202111', COURSE_TR_202111_MNEMONICS)
#: Method to use in OPS to use TR version 2021-11
OPS_TR_202111 = ('ops_tr_202111', 'calc_transforms_ops_tr_202111', 'calc_wcs_tr_202111', TRACK_TR_202111_MNEMONICS)
#: TRACK and FINEGUIDE mode algorithm, TR version 2021-11
TRACK_TR_202111 = ('track_tr_202111', 'calc_transforms_track_tr_202111', 'calc_wcs_tr_202111', TRACK_TR_202111_MNEMONICS)
# Aliases
#: Algorithm to use by default. Used by Operations.
default = OPS_TR_202111
#: Default algorithm under PCS_MODE COARSE.
COARSE = COARSE_TR_202111
#: Default algorithm for use by Operations.
OPS = OPS_TR_202111
#: Default algorithm under PCS_MODE TRACK/FINEGUIDE/MOVING.
TRACK = TRACK_TR_202111
def __new__(cls: object, value: str, func_name: str, calc_func: str, mnemonics: dict):
obj = object.__new__(cls)
obj._value_ = value
obj._func_name = func_name
obj._calc_func = calc_func
obj._mnemonics = mnemonics
return obj
@property
def calc_func(self):
"""Function associated with the method"""
return globals()[self._calc_func]
@property
def func(self):
"""Function associated with the method"""
return globals()[self._func_name]
@property
def mnemonics(self):
return self._mnemonics
def __str__(self):
return self.value
# FGS id to aperture name
FGSId2Aper = {1: 'FGS1_FULL_OSS', 2: 'FGS2_FULL_OSS'}
# FGS Ids
FGSIDS = [1, 2]
# Definition of th J3 Ideal Y-Angle
J3IDLYANGLE = -1.25 # Degrees
# Conversion from seconds to MJD
SECONDS2MJD = 1 / 24 / 60 / 60
# Default transformation matrices
FGS12SIFOV_DEFAULT = np.array(
[[0.9999994955442, 0.0000000000000, 0.0010044457459],
[0.0000011174826, 0.9999993811310, -0.0011125359826],
[-0.0010044451243, 0.0011125365439, 0.9999988766756]]
)
J2FGS_MATRIX_DEFAULT = np.array([
[-0.0010044400033, 0.9999994955442, 0.0000033964915],
[0.0033814583568, 0.0000000000000, 0.9999942828533],
[0.9999937784005, 0.0010044457459, -0.0033814566510]
])
SIFOV2V_DEFAULT = np.array(
[[0.99999742598, 0., 0.00226892608],
[0., 1., 0.],
[-0.00226892608, 0., 0.99999742598]]
)
# Define the transformation matrices to move between the Idealized Coordinate System (ICS)
# and the Idealized Coordinate System (Idl). ICS is the spacecraft-centric system used by
# all frames up through the V-frame. Idl is used by the instruments.
# Reference: Eqs. 1 & 2 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
M_idl2ics = MX2Z = np.array(
[[0, 1, 0],
[0, 0, 1],
[1, 0, 0]]
)
M_ics2idl = MZ2X = np.array(
[[0, 0, 1],
[1, 0, 0],
[0, 1, 0]]
)
# Degree, radian, angle transformations
R2D = 180. / np.pi
D2R = np.pi / 180.
A2R = D2R / 3600.
R2A = 3600. * R2D
PI2 = np.pi * 2.
# Pointing container
# Attributes are as follows. Except for the observation time, all values
# are retrieved from the engineering data.
# q : Quaternion of the FGS.
# j2fgs_matrix : J-frame to FGS transformation.
# fsmcorr : Fine Steering Mirror position.
# obstime : Mid-point time of the observation at which all other values have been calculated.
# gs_commanded : Guide star position as originally commanded.
# fgsid : FGS in use, 1 or 2.
# gs_position : X/Y guide star position in the FGS.
Pointing = namedtuple('Pointing', ['q', 'j2fgs_matrix', 'fsmcorr', 'obstime', 'gs_commanded', 'fgsid', 'gs_position'])
Pointing.__new__.__defaults__ = ((None,) * 5)
# Guide Star ACQ pointing container
# Attributes are as follows. All values are retrieved from the engineering.
# position : X/Y position of the guide star within the acquisition window of the FGS.
# corner : X/Y corner of the acquisition window within the FGS.
# size : X/Y size of the acquisition window.
GuideStarPosition = namedtuple('GuideStarPosition', ['position', 'corner', 'size'])
GuideStarPosition.__new__.__defaults__ = ((None,) * 3)
# Transforms
# WCS reference container
WCSRef = namedtuple('WCSRef', ['ra', 'dec', 'pa'])
WCSRef.__new__.__defaults__ = (None, None, None)
[docs]
def add_wcs(filename, allow_any_file=False, force_level1bmodel=False,
default_pa_v3=0., siaf_path=None, prd=None, engdb_url=None,
fgsid=None, tolerance=60, allow_default=False, reduce_func=None,
dry_run=False, save_transforms=None, **transform_kwargs):
"""Add WCS information to a JWST DataModel.
Telescope orientation is attempted to be obtained from
the engineering database. Failing that, a default pointing
is used based on proposal target.
The file is updated in-place.
Parameters
----------
filename : str
The path to a data file.
allow_any_file : bool
Attempt to add the WCS information to any type of file.
The default, `False`, only allows modifications of files that contain
known datamodels of `Level1bmodel`, `ImageModel`, or `CubeModel`.
force_level1bmodel : bool
If not `allow_any_file`, and the input file model is unknown,
open the input file as a Level1bModel regardless.
default_pa_v3 : float
The V3 position angle to use if the pointing information
is not found.
siaf_path : str or file-like object or None
The path to the SIAF database. See `SiafDb` for more information.
prd : str
The PRD version from the `pysiaf` to use.
`siaf_path` overrides this value.
engdb_url : str or None
URL of the engineering telemetry database REST interface.
fgsid : int or None
When in COARSE mode, the FGS to use as the guider reference.
If None, use what is provided in telemetry.
tolerance : int
If no telemetry can be found during the observation,
the time, in seconds, beyond the observation time to
search for telemetry.
allow_default : bool
If telemetry cannot be determine, use existing
information in the observation's header.
reduce_func : func or None
Reduction function to use on values.
dry_run : bool
Do not write out the modified file.
save_transforms : Path-like or None
File to save the calculated transforms to.
transform_kwargs : dict
Keyword arguments used by matrix calculation routines.
Notes
-----
This function adds absolute pointing information to the JWST
datamodels provided. By default, only Stage 1 and Stage 2a exposures are
allowed to be updated. These have the suffixes of "uncal", "rate", and
"rateints" representing datamodels Level1bModel, ImageModel, and CubeModel.
Any higher level product, from Stage 2b and beyond, that has had the
`assign_wcs` step applied, have improved WCS information. Running
this task on such files will potentially corrupt the WCS.
It starts by populating the headers with values from the SIAF database.
It adds the following keywords to all files:
V2_REF (arcseconds)
V3_REF (arcseconds)
VPARITY (+1 or -1)
V3I_YANG (decimal degrees)
The keywords computed and added to all files are:
RA_V1
DEC_V1
PA_V3
RA_REF
DEC_REF
ROLL_REF
S_REGION
In addition the following keywords are computed and added to IMAGING_MODES only:
CRVAL1
CRVAL2
PC1_1
PC1_2
PC2_1
PC2_2
It does not currently place the new keywords in any particular location
in the header other than what is required by the standard.
"""
logger.info('Updating WCS info for file %s', filename)
try:
model = datamodels.open(filename, guess=allow_any_file)
except TypeError:
if force_level1bmodel:
logger.warning(f'Input {filename} is an unknown model, opening as a Level1bModel.')
model = datamodels.Level1bModel(filename)
else:
raise
try:
if type(model) not in EXPECTED_MODELS:
logger.warning(f'Input {model} is not of an expected type (uncal, rate, rateints)'
'\n Updating pointing may have no effect or detrimental effects on the WCS information,'
'\n especially if the input is the result of Level2b or higher calibration.')
if not allow_any_file:
raise TypeError(f'Input model {model} is not one of {EXPECTED_MODELS} and `allow_any_file` is `False`.'
'\n\tFailing WCS processing.')
t_pars, transforms = update_wcs(
model,
default_pa_v3=default_pa_v3,
siaf_path=siaf_path,
prd=prd,
engdb_url=engdb_url,
fgsid=fgsid,
tolerance=tolerance,
allow_default=allow_default,
reduce_func=reduce_func,
**transform_kwargs
)
try:
if model.meta.target.type.lower() == 'moving':
update_mt_kwds(model)
except AttributeError:
pass
model.meta.model_type = None
if dry_run:
logger.info('Dry run requested; results are not saved.')
else:
logger.info('Saving updated model %s', filename)
model.save(filename)
if transforms and save_transforms:
logger.info('Saving transform matrices to %s', save_transforms)
transforms.write_to_asdf(save_transforms)
finally:
model.close()
logger.info('...update completed')
def update_mt_kwds(model):
"""Add/update the Moving target header keywords
If the target type is "moving_target" check for the moving target position
table. If this is available calculate the moving target position keywords
and insert or update MT_RA & MT_DEC.
"""
if model.hasattr('moving_target'):
time_mt = Time(model.moving_target.time, format='isot')
time_mt = [t.mjd for t in time_mt]
exp_midpt_mjd = model.meta.exposure.mid_time
# check to see if the midpoint of the observation is contained within
# the timerange of the MT table
if time_mt[0] <= exp_midpt_mjd <= time_mt[-1]:
ra = model.moving_target.mt_apparent_RA
dec = model.moving_target.mt_apparent_Dec
f_ra = interp1d(time_mt, ra)
f_dec = interp1d(time_mt, dec)
model.meta.wcsinfo.mt_ra = f_ra(exp_midpt_mjd).item(0)
model.meta.wcsinfo.mt_dec = f_dec(exp_midpt_mjd).item(0)
model.meta.target.ra = f_ra(exp_midpt_mjd).item(0)
model.meta.target.dec = f_dec(exp_midpt_mjd).item(0)
else:
logger.info('Exposure midpoint %s is not in the moving_target '
'table range of %s to %s', exp_midpt_mjd, time_mt[0], time_mt[-1])
return
else:
logger.info("Moving target position table not found in the file")
return
logger.info("Moving target RA and Dec updated.")
return model
[docs]
def update_wcs(model, default_pa_v3=0., default_roll_ref=0., siaf_path=None, prd=None, engdb_url=None,
fgsid=None, tolerance=60, allow_default=False,
reduce_func=None, **transform_kwargs):
"""Update WCS pointing information
Given a `jwst.datamodels.JwstDataModel`, determine the simple WCS parameters
from the SIAF keywords in the model and the engineering parameters
that contain information about the telescope pointing.
It presumes all the accessed keywords are present (see first block).
Parameters
----------
model : `~jwst.datamodels.JwstDataModel`
The model to update.
default_roll_ref : float
If pointing information cannot be retrieved,
use this as the roll ref angle.
siaf_path : str or Path-like object
The path to the SIAF database. See `SiafDb` for more information.
prd : str
The PRD version from the `pysiaf` to use.
`siaf_path` overrides this value.
engdb_url : str or None
URL of the engineering telemetry database REST interface.
fgsid : int or None
When in COARSE mode, the FGS to use as the guider reference.
If None, use what is provided in telemetry.
tolerance : int
If no telemetry can be found during the observation,
the time, in seconds, beyond the observation time to
search for telemetry.
allow_default : bool
If telemetry cannot be determine, use existing
information in the observation's header.
reduce_func : func or None
Reduction function to use on values.
transform_kwargs : dict
Keyword arguments used by matrix calculation routines.
Returns
-------
t_pars, transforms : TransformParameters, Transforms
The parameters and transforms calculated. May be
None for either if telemetry calculations were not
performed. In particular, FGS GUIDER data does
not need `transforms`.
"""
t_pars = transforms = None # Assume telemetry is not used.
if not prd:
prd = model.meta.prd_software_version
siaf_db = SiafDb(source=siaf_path, prd=prd)
# Get model attributes
useafter = model.meta.observation.date
# Configure transformation parameters.
t_pars = t_pars_from_model(
model,
default_pa_v3=default_pa_v3, engdb_url=engdb_url,
tolerance=tolerance, allow_default=allow_default,
reduce_func=reduce_func, siaf_db=siaf_db, useafter=useafter,
**transform_kwargs
)
if fgsid:
t_pars.fgsid = fgsid
# Populate header with SIAF information.
if t_pars.siaf is None:
if t_pars.exp_type not in FGS_GUIDE_EXP_TYPES:
raise ValueError('Insufficient SIAF information found in header.')
else:
populate_model_from_siaf(model, t_pars.siaf)
# Calculate WCS.
if t_pars.exp_type in FGS_GUIDE_EXP_TYPES:
update_wcs_from_fgs_guiding(
model, t_pars, default_roll_ref=default_roll_ref
)
transforms = None
else:
transforms = update_wcs_from_telem(model, t_pars)
return t_pars, transforms
def update_wcs_from_fgs_guiding(model, t_pars, default_roll_ref=0.0, default_vparity=1, default_v3yangle=0.0):
""" Update WCS pointing from header information
For Fine Guidance guiding observations, nearly everything
in the `wcsinfo` meta information is already populated,
except for the PC matrix and CRVAL*. This function updates the PC
matrix based on the rest of the `wcsinfo`.
CRVAL* values are taken from GS_RA/GS_DEC.
Parameters
----------
model : `~jwst.datamodels.JwstDataModel`
The model to update.
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
default_roll_ref : float
If pointing information cannot be retrieved,
use this as the V3 position angle.
default_vparity : int
The default `VIdlParity` to use and should
be either "1" or "-1". "1" is the
default since FGS guiding will be using the
OSS aperture.
default_v3yangle : float
Default SIAF Y-angle.
"""
logger.info('Updating WCS for Fine Guidance.')
crpix1, crpix2, crval1, crval2, pc_matrix = calc_wcs_guiding(
model, t_pars, default_roll_ref, default_vparity, default_v3yangle
)
logger.info('WCS info:'
f'\n\tcrpix1: {crpix1} crpix2: {crpix2}'
f'\n\tcrval1: {crval1} crval2: {crval2}'
f'\n\tpc_matrix: {pc_matrix}')
model.meta.wcsinfo.crpix1 = crpix1
model.meta.wcsinfo.crpix2 = crpix2
model.meta.wcsinfo.crval1 = crval1
model.meta.wcsinfo.crval2 = crval2
(
model.meta.wcsinfo.pc1_1,
model.meta.wcsinfo.pc1_2,
model.meta.wcsinfo.pc2_1,
model.meta.wcsinfo.pc2_2
) = pc_matrix
def update_wcs_from_telem(model, t_pars: TransformParameters):
"""Update WCS pointing information
Given a `jwst.datamodels.JwstDataModel`, determine the simple WCS parameters
from the SIAF keywords in the model and the engineering parameters
that contain information about the telescope pointing.
It presumes all the accessed keywords are present (see first block).
Parameters
----------
model : `~jwst.datamodels.JwstDataModel`
The model to update. The update is done in-place.
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
Returns
-------
transforms : Transforms or None
If available, the transformation matrices.
"""
logger.info('Updating wcs from telemetry.')
transforms = None # Assume no transforms are calculated.
# Setup default WCS info if actual pointing and calculations fail.
wcsinfo = WCSRef(
model.meta.target.ra,
model.meta.target.dec,
t_pars.default_pa_v3
)
vinfo = wcsinfo
# Get the pointing information
try:
t_pars.update_pointing()
except ValueError as exception:
if not t_pars.allow_default:
raise
else:
logger.warning(
'Cannot retrieve valid telescope pointing.'
' Default pointing parameters will be used.'
)
logger.warning('Exception is %s', exception)
logger.info("Setting ENGQLPTG keyword to PLANNED")
model.meta.visit.engdb_pointing_quality = "PLANNED"
t_pars.pointing = None
else:
logger.info('Successful read of engineering quaternions:')
logger.info('\tPointing: %s', t_pars.pointing)
# If pointing is available, attempt to calculate WCS information
if t_pars.pointing is not None:
try:
wcsinfo, vinfo, transforms = calc_wcs(t_pars)
pointing_engdb_quality = f'CALCULATED_{t_pars.method.value.upper()}'
logger.info('Setting ENGQLPTG keyword to %s', pointing_engdb_quality)
model.meta.visit.engdb_pointing_quality = pointing_engdb_quality
except Exception as e:
logger.warning(
'WCS calculation has failed and will be skipped.'
'Default pointing parameters will be used.'
)
logger.warning('Exception is %s', e)
if not t_pars.allow_default:
raise
else:
logger.info("Setting ENGQLPTG keyword to PLANNED")
model.meta.visit.engdb_pointing_quality = "PLANNED"
logger.info('Aperture WCS info: %s', wcsinfo)
logger.info('V1 WCS info: %s', vinfo)
# Update V1 pointing
model.meta.pointing.ra_v1 = vinfo.ra
model.meta.pointing.dec_v1 = vinfo.dec
model.meta.pointing.pa_v3 = vinfo.pa
# Update Aperture pointing
model.meta.aperture.position_angle = wcsinfo.pa
model.meta.wcsinfo.ra_ref = wcsinfo.ra
model.meta.wcsinfo.dec_ref = wcsinfo.dec
model.meta.wcsinfo.roll_ref = pa_to_roll_ref(wcsinfo.pa, t_pars.siaf)
if model.meta.exposure.type.lower() in TYPES_TO_UPDATE:
model.meta.wcsinfo.crval1 = wcsinfo.ra
model.meta.wcsinfo.crval2 = wcsinfo.dec
(
model.meta.wcsinfo.pc1_1,
model.meta.wcsinfo.pc1_2,
model.meta.wcsinfo.pc2_1,
model.meta.wcsinfo.pc2_2
) = calc_rotation_matrix(
np.deg2rad(model.meta.wcsinfo.roll_ref),
np.deg2rad(model.meta.wcsinfo.v3yangle),
vparity=t_pars.siaf.vparity
)
# Calculate S_REGION with the footprint
# information
try:
update_s_region(model, t_pars.siaf)
except Exception as e:
logger.warning('Calculation of S_REGION failed and will be skipped.')
logger.warning('Exception is %s', e)
# If TARG_RA/TARG_DEC still 0/0 (e.g. pure parallels with no defined target),
# populate with RA_REF/DEC_REF values
if (model.meta.target.ra == 0.0 and model.meta.target.dec == 0.0) and (
'PARALLEL' in model.meta.visit.type):
logger.warning('No target location specified for parallel observation:'
'copying reference point RA/Dec to TARG_RA/TARG_DEC.')
model.meta.target.ra = model.meta.wcsinfo.ra_ref
model.meta.target.dec = model.meta.wcsinfo.dec_ref
return transforms
def update_s_region(model, siaf):
"""Update ``S_REGION`` sky footprint information.
The ``S_REGION`` keyword is intended to store the spatial footprint of
an observation using the VO standard STCS representation.
Parameters
----------
model : `~jwst.datamodels.JwstDataModel`
The model to update in-place.
siaf : namedtuple
The ``SIAF`` tuple with values populated from the PRD database.
"""
vertices = siaf.vertices_idl
xvert = vertices[:4]
yvert = vertices[4:]
logger.info("Vertices for aperture %s: %s", model.meta.aperture.name, vertices)
# Execute IdealToV2V3, followed by V23ToSky
from stdatamodels.jwst.transforms.models import IdealToV2V3
vparity = model.meta.wcsinfo.vparity
v3yangle = model.meta.wcsinfo.v3yangle
# V2_ref and v3_ref should be in arcsec
idltov23 = IdealToV2V3(
v3yangle,
model.meta.wcsinfo.v2_ref, model.meta.wcsinfo.v3_ref,
vparity
)
v2, v3 = idltov23(xvert, yvert) # in arcsec
# hardcode wrapping angles for V2 and RA here. Could be made more
# flexible if needed.
v23tosky_tr = v23tosky(model, wrap_v2_at=180, wrap_lon_at=360)
ra_vert, dec_vert = v23tosky_tr(v2, v3)
# Do not do any sorting, use the vertices in the SIAF order.
footprint = np.array([ra_vert, dec_vert]).T
update_s_region_keyword(model, footprint)
[docs]
def calc_wcs_over_time(obsstart, obsend, t_pars: TransformParameters):
"""Calculate V1 and WCS over a time period
Parameters
----------
obsstart, obsend : float
MJD observation start/end times
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
Returns
-------
obstimes, wcsinfos, vinfos : [astropy.time.Time[,...]], [WCSRef[,...]], [WCSRef[,...]]
A 3-tuple is returned with the WCS pointings for
the aperture and the V1 axis
"""
# Setup structures
obstimes = list()
wcsinfos = list()
vinfos = list()
# Calculate WCS
try:
pointings = get_pointing(obsstart, obsend, engdb_url=t_pars.engdb_url,
tolerance=t_pars.tolerance, reduce_func=t_pars.reduce_func)
except ValueError:
logger.warning("Cannot get valid engineering mnemonics from engineering database")
raise
if not isinstance(pointings, list):
pointings = [pointings]
for pointing in pointings:
t_pars.pointing = pointing
wcsinfo, vinfo, transforms = calc_wcs(t_pars)
obstimes.append(pointing.obstime)
wcsinfos.append(wcsinfo)
vinfos.append(vinfo)
return obstimes, wcsinfos, vinfos
[docs]
def calc_wcs(t_pars: TransformParameters):
"""Given observatory orientation and target aperture, calculate V1 and Reference Pixel sky coordinates
Parameters
----------
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
Returns
-------
wcsinfo, vinfo, transforms : WCSRef, WCSRef, Transforms
A 3-tuple is returned with the WCS pointing for
the aperture and the V1 axis, and the transformation matrices.
"""
if t_pars.siaf is None:
t_pars.siaf = SIAF()
# Calculate transforms
transforms = calc_transforms(t_pars)
# Calculate the wcs information
wcsinfo, vinfo = t_pars.method.calc_func(transforms)
# That's all folks
return wcsinfo, vinfo, transforms
def calc_wcs_tr_202111(transforms: Transforms):
"""Given observatory orientation and target aperture, calculate V1 and Reference Pixel sky coordinates
A refactor of `calc_wcs_orig` to use the standard `calc_wcs_from_matrix` instead of the specific `calc_aperture_wcs`.
Parameters
----------
transforms : Transforms
The transformation matrices.
Returns
-------
wcsinfo, vinfo: WCSRef, WCSRef
A 2-tuple is returned with the WCS pointing for
the aperture and the V1 axis.
"""
# Calculate the V1 WCS information
vinfo = calc_wcs_from_matrix(transforms.m_eci2v)
# Calculate the Aperture WCS
wcsinfo = calc_wcs_from_matrix(transforms.m_eci2siaf)
# That's all folks
return wcsinfo, vinfo
def calc_transforms_coarse_tr_202111(t_pars: TransformParameters):
"""Modified COARSE calculation
This implements Eq. 45 from Technical Report JWST-STScI-003222, SM-12. Rev. C, 2021-11
From Section 4:
In COARSE mode the measured attitude of the J-frame of the spacecraft is
determined by the star tracker and inertial gyroscopes attitude
measurements and is converted to an estimated guide star inertial attitude
using the equations in section 3.2. The V-frame attitude then is determined
using the equation below.
One modification from the TR is the calculation of M_eci2siaf. The transformation includes
the rotation from ICS to Ideal.
Parameters
----------
t_pars : TransformParameters
The transformation parameters. Parameters are updated during processing.
Returns
-------
transforms : Transforms
The list of coordinate matrix transformations
Notes
-----
The matrix transform pipeline to convert from ECI J2000 observatory
quaternion pointing to aperture ra/dec/roll information
is given by the following formula. Each term is a 3x3 matrix:
M_eci_to_siaf =
transpose(M_v_to_fgsx) *
transpose(M_fgsx_to_gs) *
M_x_to_z *
M_eci_to_gs
where
M_fgsx_to_v = FGSx to V-frame
M_gs_to_fgsx = Guide star to FGSx
M_eci_to_gs = ECI to Guide star
"""
logger.info('Calculating transforms using TR 202111 COARSE Tracking with SIAF modification method...')
t_pars.method = Methods.COARSE_TR_202111
# Choose the FGS to use.
# Default to using FGS1 if not specified and FGS1 is not the science instrument.
fgsid = t_pars.fgsid
if t_pars.detector is not None:
detector = t_pars.detector.lower()
if detector in ['guider1', 'guider2']:
fgsid = 1
if detector == 'guider1':
fgsid = 2
logger.info(f'COARSE mode using detector {detector} implies use of FGS{fgsid}')
if fgsid not in FGSIDS:
fgsid = 1
t_pars.fgsid = fgsid
logger.info('Using FGS%s.', t_pars.fgsid)
# Determine the M_eci_to_gs matrix. Since this is a full train, the matrix
# is returned as part of the full Transforms object. Many of the required
# matrices are already determined as part of this calculation.
t = calc_m_eci2gs(t_pars)
# Determine the M_fgsx_to_v matrix
siaf = t_pars.siaf_db.get_wcs(FGSId2Aper[t_pars.fgsid])
t.m_v2fgsx = calc_v2siaf_matrix(siaf)
# Determine M_eci_to_v frame.
t.m_eci2v = np.linalg.multi_dot([np.transpose(t.m_v2fgsx), np.transpose(t.m_fgsx2gs), M_idl2ics, t.m_eci2gs])
logger.debug('M_eci2v: %s', t.m_eci2v)
# Calculate the SIAF transform matrix
t.m_v2siaf = calc_v2siaf_matrix(t_pars.siaf)
# Calculate full transformation
t.m_eci2siaf = np.linalg.multi_dot([M_ics2idl, t.m_v2siaf, t.m_eci2v])
logger.debug('m_eci2siaf: %s', t.m_eci2siaf)
return t
def calc_transforms_track_tr_202111(t_pars: TransformParameters):
"""Calculate transforms for TRACK/FINEGUIDE guiding
This implements Eq. 46 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 5:
Under guide star control the guide star position is measured relative to
the V-frame. The V3 position angle at the guide star is derived from the
measured J-frame attitude. Then the corrected guide star catalog position
is used to determine the inertial V-frame attitude on the sky.
One modification from the TR is the calculation of M_eci2siaf. The transformation includes
the rotation from ICS to Ideal.
Parameters
----------
t_pars : TransformParameters
The transformation parameters. Parameters are updated during processing.
Returns
-------
transforms : Transforms
The list of coordinate matrix transformations
Notes
-----
The matrix transform pipeline to convert from ECI J2000 observatory
quaternion pointing to aperture ra/dec/roll information
is given by the following formula. Each term is a 3x3 matrix:
M_eci_to_siaf = # Complete transformation
M_v_to_siaf * # V to SIAF
M_eci_to_v # ECI to V
where
M_eci_to_v = Conversion of the attitude to a DCM
"""
logger.info('Calculating transforms using TR 202111 TRACK/FINEGUIDE Tracking method...')
t_pars.method = Methods.TRACK_TR_202111
t = Transforms(override=t_pars.override_transforms) # Shorthand the resultant transforms
# Check on telemetry for FGS ID. If invalid, use either user-specified or default to 1.
fgsid = t_pars.pointing.fgsid
if fgsid not in FGSIDS:
logger.warning(f'Method {t_pars.method} requires a valid FGS ID in telementry.'
'\nHowever telemetry reports an invalid id of {fgsid}')
if t_pars.fgsid in FGSIDS:
fgsid = t_pars.fgsid
logger.warning(f'Using user-specified ID of {fgsid}')
else:
fgsid = 1
logger.warning(f'Using FGS{fgsid} as the default for the guiding FGS')
t_pars.fgsid = fgsid
# Determine V3PA@GS
v3pags = calc_v3pags(t_pars)
t_pars.guide_star_wcs = WCSRef(t_pars.guide_star_wcs.ra, t_pars.guide_star_wcs.dec, v3pags)
# Transform the guide star location in ideal detector coordinates to the telescope/V23 frame.
gs_pos_v23 = trans_fgs2v(t_pars.fgsid, t_pars.pointing.gs_position, t_pars.siaf_db)
# Calculate the M_eci2v matrix. This is the attitude matrix of the observatory
# relative to the guide star.
t.m_eci2v = calc_attitude_matrix(t_pars.guide_star_wcs, v3pags, gs_pos_v23)
# Calculate the SIAF transform matrix
t.m_v2siaf = calc_v2siaf_matrix(t_pars.siaf)
# Calculate the full ECI to SIAF transform matrix
t.m_eci2siaf = np.linalg.multi_dot([M_ics2idl, t.m_v2siaf, t.m_eci2v])
logger.debug('m_eci2siaf: %s', t.m_eci2siaf)
return t
def calc_gs2gsapp(m_eci2gsics, jwst_velocity):
"""Calculate the Velocity Aberration correction
This implements Eq. 40 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.2.5:
The velocity aberration correction is applied in the direction of the guide
star. The matrix that translates from ECI to the apparent guide star ICS
frame is M_(ECI→GSAppICS), where the GS Apparent position vector is along
the z-axis in the guide star ICS frame.
Parameters
----------
m_eci2gsics : numpy.array(3, 3)
The the ECI to Guide Star transformation matrix, in the ICS frame.
jwst_velocity : numpy.array([dx, dy, dz])
The barycentric velocity of JWST.
Returns
-------
m_gs2gsapp : numpy.array(3, 3)
The velocity aberration correction matrix.
"""
# Check velocity. If present, negate the velocity since
# the desire is to remove the correction.
if jwst_velocity is None or any(jwst_velocity == None): # noqa Syntax needed for numpy arrays.
logger.warning('Velocity: %s contains None. Cannot calculate aberration. Returning identity matrix', jwst_velocity)
return np.identity(3)
velocity = -1 * jwst_velocity
# Eq. 35: Guide star position vector
uz = np.array([0., 0., 1.])
u_gseci = np.dot(np.transpose(m_eci2gsics), uz)
# Eq. 36: Compute the apparent shift due to velocity aberration.
try:
scale_factor, u_gseci_app = compute_va_effects_vector(*velocity, u_gseci)
except TypeError:
logger.warning('Failure in computing velocity aberration. Returning identity matrix.')
logger.warning('Exception: %s', sys.exc_info())
return np.identity(3)
# Eq. 39: Rotate from ICS into the guide star frame.
u_gs_app = np.dot(m_eci2gsics, u_gseci_app)
# Eq. 40: Compute the M_gs2gsapp matrix
u_prod = np.cross(uz, u_gs_app)
u_prod_mag = np.linalg.norm(u_prod)
a_hat = u_prod / u_prod_mag
m_a_hat = np.array([[0., -a_hat[2], a_hat[1]],
[a_hat[2], 0., -a_hat[0]],
[-a_hat[1], a_hat[0], 0.]])
theta = np.arcsin(u_prod_mag)
m_gs2gsapp = np.identity(3) \
- (m_a_hat * np.sin(theta)) \
+ (2 * m_a_hat**2 * np.sin(theta / 2.)**2)
logger.debug('m_gs2gsapp: %s', m_gs2gsapp)
return m_gs2gsapp
def calc_attitude_matrix(wcs, yangle, position):
"""Calculate the DCM attitude from known positions and roll angles.
This implements Appendix A from Technical Report JWST-STScI-003222, SM-12. 2021-07
Parameters
----------
wcs : WCSRef
The guide star position
yangle : float
The IdlYangle of the point in question.
position : numpy.array(2)
The position in Ideal frame.
Returns
-------
m : np.array(3,3)
The transformation matrix
"""
# Convert to radians
ra = wcs.ra * D2R
dec = wcs.dec * D2R
yangle_ra = yangle * D2R
pos_rads = position * A2R
v2 = pos_rads[0]
v3 = pos_rads[1]
# Create the matrices
r1 = dcm(ra, dec, yangle_ra)
r2 = np.array([
[cos(v2) * cos(v3), -sin(v2), -cos(v2) * sin(v3)],
[sin(v2) * cos(v3), cos(v2), -sin(v2) * sin(v3)],
[sin(v3), 0., cos(v3)]
])
# Final transformation
m = np.dot(r2, r1)
logger.debug('attitude DCM: %s', m)
return m
def calc_wcs_from_matrix(m):
"""Calculate the WCS information from a DCM.
Parameters
----------
m : np.array((3, 3))
The DCM matrix to extract WCS information from
Returns
-------
wcs : WCSRef
The WCS.
"""
# V1 RA/Dec is the first row of the transform
v1_ra, v1_dec = vector_to_angle(m[0])
wcs = WCSRef(v1_ra, v1_dec, None)
# V3 is the third row of the transformation
v3_ra, v3_dec = vector_to_angle(m[2])
v3wcs = WCSRef(v3_ra, v3_dec, None)
# Calculate the V3 position angle
v1_pa = calc_position_angle(wcs, v3wcs)
# Convert to degrees
wcs = WCSRef(
ra=wcs.ra * R2D,
dec=wcs.dec * R2D,
pa=v1_pa * R2D
)
logger.debug('wcs: %s', wcs)
return wcs
def calc_eci2j_matrix(q):
"""Calculate ECI to J-frame matrix from quaternions
This implements Eq. 24 from Technical Report JWST-STScI-003222, SM-12. Rev. C, 2021-11
From Section 3.2.1:
The M_(ECI→J) DCM is derived from the spacecraft Attitude Control System
(ACS) attitude quaternion telemetry using the transformation in SE-20,
Appendix B to transform the attitude quaternion into a DCM.
Parameters
----------
q : np.array(q1, q2, q3, q4)
Array of quaternions from the engineering database
Returns
-------
transform : np.array((3, 3))
The transform matrix representing the transformation
from observatory orientation to J-Frame
"""
q1, q2, q3, q4 = q
transform = np.array(
[[1. - 2. * q2 * q2 - 2. * q3 * q3,
2. * (q1 * q2 + q3 * q4),
2. * (q3 * q1 - q2 * q4)],
[2. * (q1 * q2 - q3 * q4),
1. - 2. * q3 * q3 - 2. * q1 * q1,
2. * (q2 * q3 + q1 * q4)],
[2. * (q3 * q1 + q2 * q4),
2. * (q2 * q3 - q1 * q4),
1. - 2. * q1 * q1 - 2. * q2 * q2]]
)
logger.debug('quaternion: %s', transform)
return transform
def calc_j2fgs1_matrix(j2fgs_matrix, transpose=True):
"""Calculate the J-frame to FGS1 transformation
This implements Eq. 25 from Technical Report JWST-STScI-003222, SM-12. Rev. C, 2021-11
From Section 3.2.2:
The M_(J→FGS1ICS) DCM is derived from the transpose of the SC ACS telemetry
Parameters
----------
j2fgs_matrix : n.array((9,))
Matrix parameters from the engineering database.
If all zeros, a predefined matrix is used.
transpose : bool
Transpose the resulting matrix.
Returns
-------
transform : np.array((3, 3))
The transformation matrix
Notes
-----
The parameter `transpose` is defaulted to `True` because the
matrix, as defined in the engineering telemetry, is actually for
FGS1-to-J-frame. However, all documentation has always
referred to this J-to-FGS1.
"""
if np.isclose(j2fgs_matrix, 0.).all():
logger.warning('J-Frame to FGS1 engineering parameters are all zero.')
logger.warning('Using default matrix')
transform = J2FGS_MATRIX_DEFAULT
else:
logger.info(
'Using J-Frame to FGS1 engineering parameters'
' for the J-Frame to FGS1 transformation.'
)
transform = np.array(j2fgs_matrix).reshape((3, 3))
if transpose:
logger.info('Transposing the J-Frame to FGS matrix.')
transform = transform.transpose()
logger.debug('j2fgs1: %s', transform)
return transform
def calc_sifov_fsm_delta_matrix(fsmcorr, fsmcorr_version='latest', fsmcorr_units='arcsec'):
"""Calculate Fine Steering Mirror correction matrix
Parameters
----------
fsmcorr : np.array((2,))
The FSM correction parameters:
0: SA_ZADUCMDX
1: SA_ZADUCMDY
fsmcorr_version : str
The version of the FSM correction calculation to use.
Versions available:
latest: The state-of-art. Currently `v2`
v2: Update 201708 to use actual spherical calculations
v1: Original linear approximation
fsmcorr_units : str
The units of the FSM correction values. Default is `arcsec`.
Returns
-------
transform : np.array((3, 3))
The transformation matrix
"""
version = fsmcorr_version.lower()
units = fsmcorr_units.lower()
logger.debug('Using version %s', version)
logger.debug('Using units %s', units)
x = fsmcorr[0] # SA_ZADUCMDX
y = fsmcorr[1] # SA_ZADUCMDY
# If FSMCORR values are in arcsec, convert to radians
if units == 'arcsec':
x *= D2R / 3600.
y *= D2R / 3600.
# `V1`: Linear approximation calculation
if version == 'v1':
transform = np.array(
[
[1., x / 22.01, y / 21.68],
[-x / 22.01, 1., 0.],
[-y / 21.68, 0., 1.]
]
)
# Default or `V2`: Direct spherical calculation
# Note: With the "0.0" in the lower middle Y transform
else:
if version not in ('latest', 'v2'):
logger.warning(
'Unknown version "%s" specified. Using the latest (spherical) calculation.', version
)
m_x_partial = np.array(
[
[1., 0., 0.],
[0., cos(x), sin(x)],
[0., -sin(x), cos(x)]
]
)
m_y_partial = np.array(
[
[cos(y), 0., -sin(y)],
[0., 1., 0.],
[sin(y), 0., cos(y)]
]
)
transform = np.dot(m_x_partial, m_y_partial)
logger.debug('fsm_delta_matrix: %s', transform)
return transform
def calc_sifov2v_matrix():
"""Calculate the SI-FOV to V-Frame matrix
This is currently defined as the inverse Euler rotation
about an angle of 7.8 arcmin. Here returns the pre-calculate
matrix.
"""
return SIFOV2V_DEFAULT
def calc_v2siaf_matrix(siaf):
"""Calculate the SIAF transformation matrix
This implements Eq. 12 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.1:
The V to SIAF parameters V3IdlYang, V2Ref, V3Ref, and VIdlParity are
defined and their usage explained in SIAF2017. The parameter values for
each aperture are specified in the Project Reference Database (PRD).
Parameters
----------
siaf : SIAF
The SIAF parameters, where angles are in arcseconds/degrees
Returns
-------
transform : np.array((3, 3))
The V1 to SIAF transformation matrix
"""
v2, v3, v3idlyang, vparity = (siaf.v2_ref, siaf.v3_ref,
siaf.v3yangle, siaf.vparity)
mat = dcm(v2 * A2R, v3 * A2R, v3idlyang * D2R)
pmat = np.array([[0., vparity, 0.],
[0., 0., 1.],
[1., 0., 0.]])
transform = np.dot(pmat, mat)
logger.debug('transform: %s', transform)
return transform
def calc_position_angle(point, ref):
"""Calculate position angle from reference to point
Algorithm implemented is from JWST Technical Report JWST-STScI-001550, SM-12,
2017-11-08, Rev A., Section 5.2, page 29, final equation:
tan(pa) = cos(dec_r) * sin(ra_r - ra_p) / (sin(dec_r)cos(dec_p) - cos(dec_r)sin(dec_p)cos(ra_r-ra_p))
where
pa : position angle
*_r : reference
*_p : point
Typically the reference is the V3 RA/DEC and point is the object RA/DEC
Parameters
----------
point : WCSRef
The POINT wcs parameters, in radians
ref : WCSRef
The TARGET wcs parameters, in radians
Returns
-------
point_pa : float
The POINT position angle, in radians
"""
y = cos(ref.dec) * sin(ref.ra - point.ra)
x = sin(ref.dec) * cos(point.dec) - \
cos(ref.dec) * sin(point.dec) * cos((ref.ra - point.ra))
point_pa = np.arctan2(y, x)
if point_pa < 0:
point_pa += PI2
if point_pa >= PI2:
point_pa -= PI2
logger.debug('Given reference: %s, point: %s, then PA: %s', ref, point, point_pa)
return point_pa
def get_pointing(obsstart, obsend, mnemonics_to_read=TRACK_TR_202111_MNEMONICS,
engdb_url=None, tolerance=60, reduce_func=None):
"""
Get telescope pointing engineering data.
Parameters
----------
obsstart, obsend : float
MJD observation start/end times
engdb_url : str or None
URL of the engineering telemetry database REST interface.
mnemonics_to_read: {str: bool[,...]}
The mnemonics to read. Key is the mnemonic name.
Value is a boolean indicating whether the mnemonic
is required to have values or not.
tolerance : int
If no telemetry can be found during the observation,
the time, in seconds, beyond the observation time to
search for telemetry.
reduce_func : func or None
Reduction function to use on values.
If None, the average pointing is returned.
Returns
-------
pointing : Pointing or [Pointing(, ...)]
The engineering pointing parameters.
If the `result_type` is `all`, a list
of pointings will be returned
Raises
------
ValueError
Cannot retrieve engineering information
Notes
-----
For the moment, the first found values will be used.
This will need be re-examined when more information is
available.
"""
if reduce_func is None:
reduce_func = pointing_from_average
logger.info('Determining pointing between observations times (mjd):')
logger.info('obsstart: %s obsend: %s', obsstart, obsend)
logger.info('Telemetry search tolerance: %s', tolerance)
logger.info('Reduction function: %s', reduce_func)
mnemonics = get_mnemonics(obsstart, obsend, mnemonics_to_read=mnemonics_to_read,
tolerance=tolerance, engdb_url=engdb_url)
reduced = reduce_func(mnemonics_to_read, mnemonics)
logger.log(DEBUG_FULL, 'Mnemonics found:')
logger.log(DEBUG_FULL, '%s', mnemonics)
logger.info('Reduced set of pointings:')
logger.info('%s', reduced)
return reduced
def vector_to_angle(v):
"""Returns tuple of spherical angles from unit direction Vector
This implements Eq. 10 & 11 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3:
The Direction Cosine Matrix (DCM) that provides the transformation of a
unit pointing vector defined in inertial frame (ECI J2000) coordinates to a
unit vector defined in the science aperture Ideal frame coordinates is
defined as:
Parameters
----------
v : [v0, v1, v2]
Returns
-------
alpha, delta : float, float
The spherical angles, in radians
"""
alpha = np.arctan2(v[1], v[0])
delta = np.arcsin(v[2])
if alpha < 0.:
alpha += 2. * np.pi
return alpha, delta
def angle_to_vector(alpha, delta):
"""Convert spherical angles to unit vector
This implements Eq. 9 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3:
Parameters
----------
alpha, delta : float
Spherical angles in radians
Returns
-------
v : [float, float, float]
Unit vector
"""
v0 = cos(delta) * cos(alpha)
v1 = cos(delta) * sin(alpha)
v2 = sin(delta)
return [v0, v1, v2]
def compute_local_roll(pa_v3, ra_ref, dec_ref, v2_ref, v3_ref):
"""
Computes the position angle of V3 (measured N to E)
at the center af an aperture.
Parameters
----------
pa_v3 : float
Position angle of V3 at (V2, V3) = (0, 0) [in deg]
v2_ref, v3_ref : float
Reference point in the V2, V3 frame [in arcsec]
ra_ref, dec_ref : float
RA and DEC corresponding to V2_REF and V3_REF, [in deg]
Returns
-------
new_roll : float
The value of ROLL_REF (in deg)
"""
v2 = np.deg2rad(v2_ref / 3600)
v3 = np.deg2rad(v3_ref / 3600)
ra_ref = np.deg2rad(ra_ref)
dec_ref = np.deg2rad(dec_ref)
pa_v3 = np.deg2rad(pa_v3)
M = np.array(
[[cos(ra_ref) * cos(dec_ref),
-sin(ra_ref) * cos(pa_v3) + cos(ra_ref) * sin(dec_ref) * sin(pa_v3),
-sin(ra_ref) * sin(pa_v3) - cos(ra_ref) * sin(dec_ref) * cos(pa_v3)],
[sin(ra_ref) * cos(dec_ref),
cos(ra_ref) * cos(pa_v3) + sin(ra_ref) * sin(dec_ref) * sin(pa_v3),
cos(ra_ref) * sin(pa_v3) - sin(ra_ref) * sin(dec_ref) * cos(pa_v3)],
[sin(dec_ref),
-cos(dec_ref) * sin(pa_v3),
cos(dec_ref) * cos(pa_v3)]]
)
return _roll_angle_from_matrix(M, v2, v3)
def _roll_angle_from_matrix(matrix, v2, v3):
X = -(matrix[2, 0] * np.cos(v2) + matrix[2, 1] * np.sin(v2)) * np.sin(v3) + matrix[2, 2] * np.cos(v3)
Y = (matrix[0, 0] * matrix[1, 2] - matrix[1, 0] * matrix[0, 2]) * np.cos(v2) + \
(matrix[0, 1] * matrix[1, 2] - matrix[1, 1] * matrix[0, 2]) * np.sin(v2)
new_roll = np.rad2deg(np.arctan2(Y, X))
if new_roll < 0:
new_roll += 360
return new_roll
def get_mnemonics(obsstart, obsend, tolerance, mnemonics_to_read=TRACK_TR_202111_MNEMONICS, engdb_url=None):
"""Retrieve pointing mnemonics from the engineering database
Parameters
----------
mnemonics_to_read : {str: bool[,...]}
The mnemonics to fetch. key is the mnemonic and
value is whether it is required to be found.
obsstart, obsend : float
MJD observation start/end times
tolerance : int
If no telemetry can be found during the observation,
the time, in seconds, beyond the observation time to
search for telemetry.
engdb_url : str or None
URL of the engineering telemetry database REST interface.
Returns
-------
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic
Raises
------
ValueError
Cannot retrieve engineering information
"""
try:
engdb = ENGDB_Service(base_url=engdb_url)
except Exception as exception:
raise ValueError(
'Cannot open engineering DB connection'
'\nException: {}'.format(exception)
)
logger.info(
'Querying engineering DB: %s', engdb.base_url
)
# Construct the mnemonic values structure.
mnemonics = {
mnemonic: None
for mnemonic in mnemonics_to_read
}
# Retrieve the mnemonics from the engineering database.
# Check for whether the bracket values are used and
# within tolerance.
for mnemonic in mnemonics:
try:
mnemonics[mnemonic] = engdb.get_values(
mnemonic, obsstart, obsend,
time_format='mjd', include_obstime=True,
include_bracket_values=False
)
except Exception as exception:
raise ValueError(f'Cannot retrieve {mnemonic} from engineering.') from exception
# If more than two points exist, throw off the bracket values.
# Else, ensure the bracket values are within the allowed time.
if len(mnemonics[mnemonic]) < 2:
logger.warning('Mnemonic %s has no telemetry within the observation time.', mnemonic)
logger.warning('Attempting to use bracket values within %s seconds', tolerance)
mnemonics[mnemonic] = engdb.get_values(
mnemonic, obsstart, obsend,
time_format='mjd', include_obstime=True,
include_bracket_values=True
)
tolerance_mjd = tolerance * SECONDS2MJD
allowed_start = obsstart - tolerance_mjd
allowed_end = obsend + tolerance_mjd
allowed = [
value
for value in mnemonics[mnemonic]
if allowed_start <= value.obstime.mjd <= allowed_end
]
if not len(allowed):
raise ValueError(
'No telemetry exists for mnemonic {} within {} and {}'.format(
mnemonic,
Time(allowed_start, format='mjd').isot,
Time(allowed_end, format='mjd').isot
)
)
mnemonics[mnemonic] = allowed
# All mnemonics must have some values.
if not all([len(mnemonic) for mnemonic in mnemonics.values()]):
raise ValueError('Incomplete set of pointing mnemonics')
return mnemonics
def all_pointings(mnemonics_to_read, mnemonics):
"""V1 of making pointings
Parameters
==========
mnemonics_to_read: {str: bool[,...]}
The mnemonics to read. Key is the mnemonic name.
Value is a boolean indicating whether the mnemonic
is required to have values or not.
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic
Returns
=======
pointings : [Pointing[,...]]
List of pointings.
"""
pointings = []
filled = fill_mnemonics_chronologically(mnemonics)
for obstime, mnemonics_at_time in filled.items():
# Fill out the matrices
q = np.array([
mnemonics_at_time['SA_ZATTEST1'].value,
mnemonics_at_time['SA_ZATTEST2'].value,
mnemonics_at_time['SA_ZATTEST3'].value,
mnemonics_at_time['SA_ZATTEST4'].value,
])
j2fgs_matrix = np.array([
mnemonics_at_time['SA_ZRFGS2J11'].value,
mnemonics_at_time['SA_ZRFGS2J12'].value,
mnemonics_at_time['SA_ZRFGS2J13'].value,
mnemonics_at_time['SA_ZRFGS2J21'].value,
mnemonics_at_time['SA_ZRFGS2J22'].value,
mnemonics_at_time['SA_ZRFGS2J23'].value,
mnemonics_at_time['SA_ZRFGS2J31'].value,
mnemonics_at_time['SA_ZRFGS2J32'].value,
mnemonics_at_time['SA_ZRFGS2J33'].value,
])
fsmcorr = np.array([
mnemonics_at_time['SA_ZADUCMDX'].value,
mnemonics_at_time['SA_ZADUCMDY'].value,
])
gs_commanded = np.array([
mnemonics_at_time['SA_ZFGGSCMDX'].value,
mnemonics_at_time['SA_ZFGGSCMDY'].value
])
gs_position = None
if all(k in mnemonics for k in ('SA_ZFGGSPOSX', 'SA_ZFGGSPOSY')):
gs_position = np.array([
mnemonics_at_time['SA_ZFGGSPOSX'].value,
mnemonics_at_time['SA_ZFGGSPOSY'].value
])
fgsid = mnemonics_at_time['SA_ZFGDETID'].value
pointing = Pointing(q=q, obstime=obstime, j2fgs_matrix=j2fgs_matrix,
fsmcorr=fsmcorr, gs_commanded=gs_commanded,
fgsid=fgsid, gs_position=gs_position)
pointings.append(pointing)
if not len(pointings):
raise ValueError('No non-zero quaternion found.')
return pointings
def populate_model_from_siaf(model, siaf):
"""
Populate the WCS keywords of a Level1bModel from the SIAF.
Parameters
----------
model : `~jwst.datamodels.Level1bModel`
Input data as Level1bModel.
siaf : namedtuple
The WCS keywords read in from the SIAF.
"""
# Update values from the SIAF for all exposures.
model.meta.wcsinfo.v2_ref = siaf.v2_ref
model.meta.wcsinfo.v3_ref = siaf.v3_ref
model.meta.wcsinfo.v3yangle = siaf.v3yangle
model.meta.wcsinfo.vparity = siaf.vparity
# For imaging modes, also update the basic FITS WCS keywords
if model.meta.exposure.type.lower() in TYPES_TO_UPDATE:
logger.info('Setting basic FITS WCS keywords for imaging')
model.meta.wcsinfo.ctype1 = 'RA---TAN'
model.meta.wcsinfo.ctype2 = 'DEC--TAN'
model.meta.wcsinfo.wcsaxes = 2
model.meta.wcsinfo.cunit1 = "deg"
model.meta.wcsinfo.cunit2 = "deg"
model.meta.wcsinfo.crpix1 = siaf.crpix1
model.meta.wcsinfo.crpix2 = siaf.crpix2
model.meta.wcsinfo.cdelt1 = siaf.cdelt1 / 3600 # in deg
model.meta.wcsinfo.cdelt2 = siaf.cdelt2 / 3600 # in deg
model.meta.coordinates.reference_frame = "ICRS"
# For TSO exposures, also populate XREF_SCI/YREF_SCI keywords,
# which are used by the Cal pipeline to determine the
# location of the source.
# Note that we use a combination of the is_tso function and
# a check on EXP_TYPE, because there are rare corner cases
# where EXP_TIME=NRC_TSGRISM, TSOVISIT=False, NINTS=1, which
# normally return False, but we want to treat it as TSO anyway.
if is_tso(model) or model.meta.exposure.type.lower() in ['nrc_tsimage', 'nrc_tsgrism']:
logger.info('TSO exposure:')
logger.info(' setting xref_sci to %s', siaf.crpix1)
logger.info(' setting yref_sci to %s', siaf.crpix2)
model.meta.wcsinfo.siaf_xref_sci = siaf.crpix1
model.meta.wcsinfo.siaf_yref_sci = siaf.crpix2
def first_pointing(mnemonics_to_read, mnemonics):
"""Return first pointing
Parameters
==========
mnemonics_to_read: {str: bool[,...]}
The mnemonics to read. Key is the mnemonic name.
Value is a boolean indicating whether the mnemonic
is required to have values or not.
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic
Returns
=======
pointing : Pointing
First pointing.
"""
pointings = all_pointings(mnemonics_to_read, mnemonics)
return pointings[0]
def pointing_from_average(mnemonics_to_read, mnemonics):
"""Determine single pointing from average of available pointings
Parameters
==========
mnemonics_to_read: {str: bool[,...]}
The mnemonics to read. Key is the mnemonic name.
Value is a boolean indicating whether the mnemonic
is required to have values or not.
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic
Returns
=======
pointing : Pointing
Pointing from average.
"""
# Get average observation time.
times = [
eng_param.obstime.unix
for key in mnemonics
for eng_param in mnemonics[key]
if eng_param.obstime.unix != 0.0
]
if len(times) > 0:
obstime = Time(np.average(times), format='unix')
else:
raise ValueError("No valid times in range")
# Get averages for all the mnemonics.
mnemonic_averages = {}
zero_mnemonics = []
for mnemonic in mnemonics:
values = [
eng_param.value
for eng_param in mnemonics[mnemonic]
]
# Weed out mnemonic entries that are zero, though some are OK to be zero.
if mnemonics_to_read[mnemonic]:
good_mnemonic = []
for this_value in values:
if this_value != 0.0:
good_mnemonic.append(this_value)
if len(good_mnemonic) > 0:
mnemonic_averages[mnemonic] = np.average(good_mnemonic)
else:
zero_mnemonics.append(mnemonic)
else:
mnemonic_averages[mnemonic] = np.average(values)
# Raise exception if there are mnemonics with only zeros in the time range
if len(zero_mnemonics):
logger.warning("The following engineering mnemonics only contained zeros in the requested time interval:")
badmnemonicsstring = ' '.join(zero_mnemonics)
logger.info(badmnemonicsstring)
raise ValueError("Bad telemetry values")
# Fill out the pointing matrices.
q = np.array([
mnemonic_averages['SA_ZATTEST1'],
mnemonic_averages['SA_ZATTEST2'],
mnemonic_averages['SA_ZATTEST3'],
mnemonic_averages['SA_ZATTEST4']
])
j2fgs_matrix = np.array([
mnemonic_averages['SA_ZRFGS2J11'],
mnemonic_averages['SA_ZRFGS2J12'],
mnemonic_averages['SA_ZRFGS2J13'],
mnemonic_averages['SA_ZRFGS2J21'],
mnemonic_averages['SA_ZRFGS2J22'],
mnemonic_averages['SA_ZRFGS2J23'],
mnemonic_averages['SA_ZRFGS2J31'],
mnemonic_averages['SA_ZRFGS2J32'],
mnemonic_averages['SA_ZRFGS2J33']
])
fsmcorr = np.array([
mnemonic_averages['SA_ZADUCMDX'],
mnemonic_averages['SA_ZADUCMDY']
])
gs_commanded = np.array([
mnemonic_averages['SA_ZFGGSCMDX'],
mnemonic_averages['SA_ZFGGSCMDY']
])
gs_position = None
if all(k in mnemonic_averages for k in ('SA_ZFGGSPOSX', 'SA_ZFGGSPOSY')):
gs_position = np.array([
mnemonic_averages['SA_ZFGGSPOSX'],
mnemonic_averages['SA_ZFGGSPOSY']
])
# For FGS ID, just take the first one.
fgsid = mnemonics['SA_ZFGDETID'][0].value
pointing = Pointing(obstime=obstime, q=q, j2fgs_matrix=j2fgs_matrix,
fsmcorr=fsmcorr, gs_commanded=gs_commanded,
fgsid=fgsid, gs_position=gs_position)
# That's all folks
return pointing
def fill_mnemonics_chronologically(mnemonics, filled_only=True):
"""Return time-ordered mnemonic list with progressive values
The different set of mnemonics used for observatory orientation
appear at different cadences. This routine creates a time-ordered dictionary
with all the mnemonics for each time found in the engineering. For mnemonics
missing for a particular time, the last previous value is used.
Parameters
----------
mnemonics : {mnemonic: [value[,...]]}
filled_only : bool
Only return a matrix where observation times have all the mnemonics defined.
Returns
-------
filled_by_time : {obstime: {mnemonic: value}}
"""
# Collect all information by observation time and order.
by_obstime = defaultdict(dict)
n_mnemonics = len(mnemonics)
for mnemonic, values in mnemonics.items():
for value in values:
by_obstime[value.obstime][mnemonic] = value
by_obstime = sorted(by_obstime.items())
# Created the filled matrix
filled = dict()
last_obstime = dict()
for obstime, mnemonics_at_time in by_obstime:
last_obstime.update(mnemonics_at_time)
if len(last_obstime) >= n_mnemonics or not filled_only:
# Engineering data may be present, but all zeros.
# Filter out this situation also.
if filled_only:
values = [
value.value
for value in last_obstime.values()
]
if not any(values):
continue
filled[obstime] = copy(last_obstime)
return filled
def fill_mnemonics_chronologically_table(mnemonics, filled_only=True):
"""Return time-ordered mnemonic list with progressive values
The different set of mnemonics used for observatory orientation
appear at different cadences. This routine creates a time-ordered dictionary
with all the mnemonics for each time found in the engineering. For mnemonics
missing for a particular time, the last previous value is used.
Parameters
----------
mnemonics : {mnemonic: [value[,...]]}
filled_only : bool
Only return a matrix where observation times have all the mnemonics defined.
Returns
-------
filled_by_time : astropy.table.Table
"""
filled = fill_mnemonics_chronologically(mnemonics, filled_only=filled_only)
names = [mnemonic for mnemonic in mnemonics]
names = ['time'] + names
time_idx = 0
values = [[] for _ in names]
for time in filled:
values[time_idx].append(time)
for mnemonic in filled[time]:
idx = names.index(mnemonic)
values[idx].append(filled[time][mnemonic].value)
t = Table(values, names=names)
return t
def calc_estimated_gs_wcs(t_pars: TransformParameters):
"""Calculate the estimated guide star RA/DEC/Y-angle
This implements Eq. 18, 19, 20 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.2:
Parameters
----------
t_pars : TransformParameters
The transformation parameters. Parameters are updated during processing.
Returns
-------
gs_wcs : WCSRef
Estimated RA, Dec, and Y-angle. All in degrees.
"""
# Determine the ECI to Guide star transformation
t = calc_m_eci2gs(t_pars)
m_eci2gs = t.m_eci2gs
# Determine the wcs
wcs = calc_wcs_from_matrix(m_eci2gs)
logger.debug('wcs: %s', wcs)
return wcs
def calc_v3pags(t_pars: TransformParameters):
"""Calculate the V3 Position Angle at the Guide Star
This implements Eq. 21 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
Parameters
----------
t_pars : TransformParameters
The transformation parameters. Parameters are updated during processing.
Returns
-------
v3pags : float
The V3 Position Angle at the Guide Star, in degrees
Notes
-----
Modification for `jwst` release post-1.11.3: The commanded position of the guide
star is always relative to FGS1. Hence, the aperture to use for the SIAF
transformation is always FGS1.
"""
# Determine Guides Star estimated WCS information.
gs_wcs = calc_estimated_gs_wcs(t_pars)
# Retrieve the Ideal Y-angle for FGS1
fgs_siaf = t_pars.siaf_db.get_wcs(FGSId2Aper[1], useafter=t_pars.useafter)
# Calculate V3PAGS
v3pags = gs_wcs.pa - fgs_siaf.v3yangle
logger.debug('v3pags: %s', v3pags)
return v3pags
def calc_m_eci2gs(t_pars: TransformParameters):
"""Calculate the M_eci2gs matrix as per TR presented in 2021-07
This implements Eq. 16 & 17 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.2:
The equation is formed by inverting the equation in Section 5.9.1.2 of
SE-20 which converts from the attitude specified at the Guide Star into the
commanded spacecraft J-frame attitude. With the inversion, the commanded
J-frame attitude quaternion is replaced in this equation by the matrix
derived from the measured J-frame attitude quaternion.
Parameters
----------
t_pars : TransformParameters
The transformation parameters. Parameters are updated during processing.
Returns
-------
transforms : Transforms
The calculated transforms. The target transform is
`transforms.m_eci2gs`. See the notes for other transforms
used and calculated.
Notes
-----
The transform train needed to calculate M_eci_to_gs is
M_eci_to_gs =
M_z_to_x *
M_gsics_to_gsappics *
M_fgs1ics_to_gsics *
M_j_to_fgs1ics *
M_eci_to_j
where
M_eci_to_gs = ECI to Guide Star Ideal Frame
M_gsics_to_gsappics = Velocity Aberration correction
M_fgs1ics_to_gsics = Convert from the FGS1 ICS frame to Guide Star ICS frame
M_j_to_fgs1ics = Convert from J frame to FGS1 ICS frame
M_eci_to_j = ECI (quaternion) to J-frame
Modification for `jwst` release post-1.11.3: The commanded position of the guide
star is always relative to FGS1. Hence, the aperture to use is always FGS1.
The formulae above have been modified appropriately.
However, in the code, note that the transformations go to FGS1, but then
is suddenly referred to thereafter as FGSX. The assumption to make is that X is always 1,
for FGS1.
"""
# Initial state of the transforms
t = Transforms(override=t_pars.override_transforms)
t.m_eci2j = calc_eci2j_matrix(t_pars.pointing.q)
t.m_j2fgs1 = calc_j2fgs1_matrix(t_pars.pointing.j2fgs_matrix, t_pars.j2fgs_transpose)
t.m_fgsx2gs = calc_m_fgsx2gs(t_pars.pointing.gs_commanded)
# Apply the Velocity Aberration. To do so, the M_eci2gsics matrix must be created. This
# is used to calculate the aberration matrix.
# Also, since the aberration is to be removed, the velocity is negated.
m_eci2gsics = np.linalg.multi_dot([t.m_fgsx2gs, t.m_j2fgs1, t.m_eci2j])
logger.debug('m_eci2gsics: %s', m_eci2gsics)
t.m_gs2gsapp = calc_gs2gsapp(m_eci2gsics, t_pars.jwst_velocity)
# Put it all together
t.m_eci2gs = np.linalg.multi_dot([M_ics2idl, t.m_gs2gsapp, m_eci2gsics])
logger.debug('m_eci2gs: %s', t.m_eci2gs)
# That's all folks
return t
def calc_m_fgs12fgsx(fgsid, siaf_db):
"""Calculate the FGS1 to FGSx matrix
This implements Eq. 27 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.2.3:
A selected guide star being used, could be in FGS 1 or FGS 2. The JWST ACS
always uses the FGS 1 ICS frame to calculate the commanded spacecraft
J-frame attitude and is used in the attitude control loop. If the specified
guide star is in FGS 2, its position will be converted to the FGS 1 ICS
using an on board FGS2 to FGS1 k-constant matrix. Here we are creating the
FGS1 ICS to FGSj ICS DCM which converts from the FGS1 ICS frame to the FGSj
ICS frame using SIAF parameters for the FGSs.
Parameters
----------
fgsid : [1, 2]
The id of the FGS in use.
siaf_db : SiafDb
The SIAF database.
Returns
-------
m_fgs12fgsx : numpy.array(3, 3)
The DCM to transform from FGS1 ICS frame to the desired FGS frame
"""
# If the in-use FGS is FGS1, no transformation is necessary.
# Simply return the identity matrix.
if fgsid == 1:
m_fgs12fgsx = np.identity(3)
logger.debug('FGS1 is in use, the identity matrix is returned: %s', m_fgs12fgsx)
return m_fgs12fgsx
if fgsid != 2:
raise ValueError(f'fgsid == {fgsid} is invalid. Must be 1 or 2')
# FGS2 is in use. Calculate the transform from FGS1 to FGS2
fgs1_siaf = siaf_db.get_wcs(FGSId2Aper[1])
fgs2_siaf = siaf_db.get_wcs(FGSId2Aper[2])
m_fgs1 = calc_v2siaf_matrix(fgs1_siaf)
m_fgs2 = calc_v2siaf_matrix(fgs2_siaf)
m_fgs12fgsx = np.dot(m_fgs2, m_fgs1.transpose())
logger.debug('m_fgs12fgsx: %s', m_fgs12fgsx)
return m_fgs12fgsx
def calc_m_fgsx2gs(gs_commanded):
"""Calculate the FGS1 to commanded Guide Star frame
This implements Eq. 29 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.2.4.
Parameters
----------
gs_commanded : numpy.array(2)
The Guide Star commanded position, in arcseconds
Returns
-------
m_fgsx2gs : numpy.array(3, 3)
The DCM transform from FGSx (1 or 2) to Guide Star ICS frame
"""
m_gs2fgsx = calc_m_gs2fgsx(gs_commanded)
m_fgsx2gs = m_gs2fgsx.transpose()
logger.debug('m_fgsx2gs: %s', m_fgsx2gs)
return m_fgsx2gs
def calc_m_gs2fgsx(gs_commanded):
"""Calculate the Guides Star frame to FGSx ICS frame
This implements Eq. 30 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3.2.4.
Parameters
----------
gs_commanded : numpy.array(2)
The commanded position of the guide stars, in arcseconds
Returns
-------
m_gs2fgsx : numpy.array(3, 3)
The guide star to FGSx transformation
"""
in_rads = gs_commanded * A2R
x, y = in_rads
m_x = np.array([
[cos(-x), 0., -sin(-x)],
[0., 1., 0.],
[sin(-x), 0., cos(-x)]
])
m_y = np.array([
[1.0, 0.0, 0.0],
[0., cos(y), sin(y)],
[0., -sin(y), cos(y)]
])
m_gs2fgsx = np.dot(m_y, m_x)
logger.debug('m_gs2fgsx: %s', m_gs2fgsx)
return m_gs2fgsx
def trans_fgs2v(fgsid, ideal, siaf_db):
"""Transform an Ideal coordinate to V coordinates
Parameters
----------
fgsid : [1, 2]
The FGS in use.
ideal : numpy.array(2)
The Ideal coordinates in arcseconds
siaf_db : SiafDb
The SIAF database.
Returns
-------
v : numpy.array(2)
The V-frame coordinates in arcseconds
"""
ideal_rads = ideal * A2R
ideal_vec = cart_to_vector(ideal_rads)
siaf = siaf_db.get_wcs(FGSId2Aper[fgsid])
m_v2fgs = calc_v2siaf_matrix(siaf)
v_vec = np.dot(m_v2fgs.transpose(), ideal_vec)
v_rads = np.array(vector_to_angle(v_vec))
v = v_rads * R2A
logger.debug('FGS%s %s -> V %s', fgsid, ideal, v)
return v
def cart_to_vector(coord):
"""Convert Cartesian to a unit vector
This implements Eq. 6 from Technical Report JWST-STScI-003222, SM-12, Rev. C, 2021-11
From Section 3:
The Direction Cosine Matrix (DCM) that provides the transformation of a
unit pointing vector defined in inertial frame (ECI J2000) coordinates to a
unit vector defined in the science aperture Ideal frame coordinates is
defined as...
Parameters
----------
coord : numpy.array(2)
The Cartesian coordinate.
Returns
-------
vector : numpy.array(3)
The vector version
"""
vector = np.array([
coord[0],
coord[1],
sqrt(1 - coord[0]**2 - coord[1]**2)
])
return vector
def pa_to_roll_ref(pa: float, siaf: SIAF):
"""Calculate Roll from the position angle of the given aperture.
Parameters
----------
pa : float
Position angle of the aperture, in degrees.
siaf : SIAF
The SIAF of the aperturn
Returns
-------
roll_ref : float
The roll reference, in degrees
"""
return pa - siaf.v3yangle
def t_pars_from_model(model, **t_pars_kwargs):
"""Initialize TransformParameters from a DataModel
Parameters
----------
model : DataModel
Data model to initialize from.
t_pars_kwargs : dict
Keyword arguments used to initialize the TransformParameters object
before reading from the model meta information.
Returns
-------
t_par : TransformParameters
The initialized parameters.
"""
t_pars = TransformParameters(**t_pars_kwargs)
# Retrieve SIAF information
if t_pars.siaf is None:
siaf = None
useafter = None
if t_pars.siaf_db is not None:
aperture_name = model.meta.aperture.name.upper()
useafter = model.meta.observation.date
if aperture_name != "UNKNOWN":
logger.info("Updating WCS for aperture %s", aperture_name)
# Special case. With aperture MIRIM_TAMRS, the siaf definition is
# for the subarray of interest. However, the whole detector is
# read out. Hence, need to convert pixel coordinates to be detector-based.
to_detector = False
if aperture_name == 'MIRIM_TAMRS':
to_detector = True
siaf = t_pars.siaf_db.get_wcs(aperture_name, to_detector=to_detector, useafter=useafter)
t_pars.siaf = siaf
t_pars.useafter = useafter
logger.debug('SIAF: %s', t_pars.siaf)
# Instrument details
t_pars.detector = model.meta.instrument.detector
try:
exp_type = model.meta.exposure.type.lower()
except AttributeError:
exp_type = None
t_pars.exp_type = exp_type
# observation parameters
if t_pars.exp_type in FGS_GUIDE_EXP_TYPES:
t_pars.obsstart = Time(model.meta.observation.date_beg, format='isot').mjd
t_pars.obsend = Time(model.meta.observation.date_end, format='isot').mjd
else:
t_pars.obsstart = model.meta.exposure.start_time
t_pars.obsend = model.meta.exposure.end_time
logger.debug('Observation time: %s - %s', t_pars.obsstart, t_pars.obsend)
# Get Guide Star information
t_pars.guide_star_wcs = WCSRef(
model.meta.guidestar.gs_ra,
model.meta.guidestar.gs_dec,
model.meta.guidestar.gs_v3_pa_science
)
t_pars.pcs_mode = model.meta.guidestar.gs_pcs_mode
logger.debug('guide_star_wcs from model: %s', t_pars.guide_star_wcs)
logger.debug('PCS_MODE: %s', t_pars.pcs_mode)
# Get jwst velocity
t_pars.jwst_velocity = np.array([
model.meta.ephemeris.velocity_x_bary,
model.meta.ephemeris.velocity_y_bary,
model.meta.ephemeris.velocity_z_bary,
])
logger.debug('JWST Velocity: %s', t_pars.jwst_velocity)
# Set the transform and WCS calculation method.
t_pars.method = method_from_pcs_mode(t_pars.pcs_mode)
# Set pointing reduction function if not already set.
if not t_pars.reduce_func:
t_pars.reduce_func = get_reduce_func_from_exptype(t_pars.exp_type)
return t_pars
def dcm(alpha, delta, angle):
"""Construct the Direction Cosine Matrix (DCM)
Typical usage is passing of (RA, DEC, PositionAngle).
All values must be in radians.
Parameters
----------
alpha : float
First coordinate in radians.
delta : float
Second coordinate in radians.
angle : float
Position angle in radians.
Returns
-------
dcm : nd.array((3, 3))
The 3x3 direction cosine matrix
"""
dcm = np.array(
[[cos(delta) * cos(alpha),
cos(delta) * sin(alpha),
sin(delta)],
[-cos(angle) * sin(alpha) + sin(angle) * sin(delta) * cos(alpha),
cos(angle) * cos(alpha) + sin(angle) * sin(delta) * sin(alpha),
-sin(angle) * cos(delta)],
[-sin(angle) * sin(alpha) - cos(angle) * sin(delta) * cos(alpha),
sin(angle) * cos(alpha) - cos(angle) * sin(delta) * sin(alpha),
cos(angle) * cos(delta)]])
return dcm
# Determine calculation method from tracking mode.
def method_from_pcs_mode(pcs_mode):
"""Determine transform/wcs calculation method from PCS_MODE
Pointing Control System Mode (PCS_MODE) contains the string representing
which mode the JWST tracking system is in. The orientation calculation
changes depending on the mode in use.
Parameters
----------
pcs_mode : str
The PCS mode in use.
Returns
-------
method : Methods
The orientation calculation method to use.
Raises
------
ValueError
If `pcs_mode` does not uniquely define the method to use.
"""
if pcs_mode is None or pcs_mode in ['NONE', 'COARSE']:
return Methods.COARSE_TR_202111
elif pcs_mode in ['FINEGUIDE', 'MOVING', 'TRACK']:
return Methods.TRACK_TR_202111
else:
raise ValueError(
f'Invalid PCS_MODE: {pcs_mode}. Should be one of ["NONE", "COARSE", "FINEGUIDE", "MOVING", "TRACK"]'
)
def get_reduce_func_from_exptype(exp_type):
"""Determine preferred pointing reduction based on exposure type
Parameters
----------
exp_type : str
The exposure type.
Returns
-------
reduce_func : func
The preferred reduction function.
"""
if exp_type in FGS_ACQ_EXP_TYPES:
reduce_func = functools.partial(gs_position_acq, exp_type=exp_type)
elif exp_type in FGS_GUIDED_EXP_TYPES:
reduce_func = gs_position_fgtrack
elif exp_type in FGS_ID_EXP_TYPES:
reduce_func = gs_position_id
else:
reduce_func = pointing_from_average
return reduce_func
def gs_position_acq(mnemonics_to_read, mnemonics, exp_type='fgs_acq1'):
"""Get the guide star position from guide star telemetry for FGS
The ACQ1 and ACQ2 exposures share nearly the same time range of mnemonics
with ACQ2 being slightly longer. As such, the information needed by
both types are interleaved in the engineering.
ACQ1 needs the first three values of the IFGS_ACQ_*POSG mnemonics.
ACQ2 needs the next five values of the IFGS_ACQ_*POSG mnemonics.
The corresponding values of the acquisition box location and size
are also different between ACQ1 and ACQ2. The values to retrieve
need be coordinated with the corresponding *POSG values.
The *POSG values are in arcseconds and are relative to the full
FGS array. They need to be converted to pixels and shifted
to be with respect to the actual acquisition box in the exposure.
Parameters
==========
mnemonics_to_read: {str: bool[,...]}
The mnemonics to read. Key is the mnemonic name.
Value is a boolean indicating whether the mnemonic
is required to have values or not.
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic
exp_type: str
The exposure type being dealt with. Either
'fgs_acq1' or 'fgs_acq2'
Returns
=======
gs_position : GuideStarPosition
The guide star position
"""
exp_type = exp_type.lower()
if exp_type not in FGS_ACQ_EXP_TYPES:
raise ValueError(f'exp_type {exp_type} not one of {FGS_ACQ_EXP_TYPES}')
ordered = fill_mnemonics_chronologically_table(mnemonics)
logger.debug('%s available mnemonics:', exp_type)
logger.debug('%s', ordered)
valid = ordered[ordered['IFGS_ACQ_XPOSG'] != 0.0]
if len(valid) < FGS_ACQ_MINVALUES[exp_type]:
raise ValueError(
f'exp_type {exp_type} IFGS_ACQ_XPOSG only has {len(valid)} locations. Requires {FGS_ACQ_MINVALUES[exp_type]}'
)
# Setup parameters depending on ACQ1 vs ACQ2
if exp_type == 'fgs_acq1':
subarray = valid[FGS_ACQ_WINDOW_INDEX[exp_type]]
posg_slice = FGS_ACQ_SLICES[exp_type]
else:
subarray = valid[FGS_ACQ_WINDOW_INDEX[exp_type]]
posg_slice = FGS_ACQ_SLICES[exp_type]
position = (np.average(valid['IFGS_ACQ_XPOSG'][posg_slice]),
np.average(valid['IFGS_ACQ_YPOSG'][posg_slice]))
corner = (subarray['IFGS_ACQ_DETXCOR'], subarray['IFGS_ACQ_DETYCOR'])
size = (subarray['IFGS_ACQ_DETXSIZ'], subarray['IFGS_ACQ_DETYSIZ'])
gs_position = GuideStarPosition(position=position, corner=corner, size=size)
return gs_position
def gs_position_fgtrack(mnemonics_to_read, mnemonics):
"""Get the guide star position from guide star telemetry for FGS FINEGUIDE/TRACK
For FGS FINEGUIDE and TRACK modes, the position of the guide star is
given by mnemonics IFGS_TFGGS_[X|Y] in arcseconds (Ideal system).
The values are generally valid throughout the whole length of the observation.
The average is used as the result. Ignore values where both coordinates are
exactly zero.
The position is relative to the full detector. However, the science data is cropped
to an 8x8 box. The location of the box is defined by the IFGS_TFGDET_* mnemonics.
These do not change during the duration of the exposure, so the first values are used.
Parameters
==========
mnemonics_to_read: {str: bool[,...]}
The mnemonics to read. Key is the mnemonic name.
Value is a boolean indicating whether the mnemonic
is required to have values or not.
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic
Returns
=======
gs_position : GuideStarPosition
The guide star position
"""
# Remove the zero positions.
ordered = fill_mnemonics_chronologically_table(mnemonics)
logger.debug('Guide Star track available mnemonics:')
logger.debug('%s', ordered)
valid_flags = (ordered['IFGS_TFGGS_X'] != 0.0) | (ordered['IFGS_TFGGS_Y'] != 0.0)
valid = ordered[valid_flags]
if len(valid) < 1:
raise ValueError('Fine guide track mode, no valid engineering is found.')
# Get the positions
position = (np.average(valid['IFGS_TFGGS_X']),
np.average(valid['IFGS_TFGGS_Y']))
corner = (valid['IFGS_TFGDET_XCOR'][0], valid['IFGS_TFGDET_YCOR'][0])
size = (valid['IFGS_TFGDET_XSIZ'][0], valid['IFGS_TFGDET_YSIZ'][0])
gs_position = GuideStarPosition(position=position, corner=corner, size=size)
return gs_position
def gs_position_id(mnemonics_to_read, mnemonics):
"""Get the guide star position from guide star telemetry for FGS ID
For FGS ID, the position of the guide star is given by mnemonics
IFGS_ID_[X|Y] in arcseconds (Ideal system). This mode is when the desired guide star is identified.
As such, there is no position to report. However, when completed, the position is then reported for approximately
five seconds after the end of the exposure. The input mnemonic array needs to have this. The first non-zero
report is used.
There is a box defined by the IFGS_ID_DET* mnemonics.
Parameters
==========
mnemonics_to_read: {str: bool[,...]}
The mnemonics to read. Key is the mnemonic name.
Value is a boolean indicating whether the mnemonic
is required to have values or not.
mnemonics : {mnemonic: [value[,...]][,...]}
The values for each pointing mnemonic
Returns
=======
gs_position : GuideStarPosition
The guide star position
"""
# Remove the zero positions.
ordered = fill_mnemonics_chronologically_table(mnemonics)
logger.debug('Guide STar ID mode available mnemonics:')
logger.debug('%s', ordered)
valid_flags = (ordered['IFGS_ID_XPOSG'] != 0.0) | (ordered['IFGS_ID_YPOSG'] != 0.0)
valid = ordered[valid_flags]
if len(valid) < 1:
raise ValueError('Guide Star ID mode, no valid engineering is found.')
# Get the positions
position = (valid['IFGS_ID_XPOSG'][0], valid['IFGS_ID_YPOSG'][0])
corner = (valid['IFGS_ID_DETXCOR'][0], valid['IFGS_ID_DETYCOR'][0])
size = (valid['IFGS_ID_DETXSIZ'][0], valid['IFGS_ID_DETYSIZ'][0])
gs_position = GuideStarPosition(position=position, corner=corner, size=size)
return gs_position
def gs_ideal_to_subarray(gs_position, aperture, flip=False):
"""Calculate pixel position for the guide star in the acquisition subarray
Parameters
----------
gs_position : GuideStarPosition
The guide star position telemetry.
aperture : pysiaf.Aperture
The aperture in use.
flip : bool
Institute the magic flip. Necessary for ACQ1/ACQ2 exposures.
Returns
-------
x, y : float
The pixel position relative to the subarray
"""
position_pixel = aperture.idl_to_det(*gs_position.position)
position_subarray = (position_pixel[0] - gs_position.corner[0],
position_pixel[1] - gs_position.corner[1])
# The magic flip. For FGS1, both axes must be reversed. For FGS2, only the X-axis changed.
x, y = position_subarray
if flip:
if aperture.AperName.startswith('FGS1'):
x = gs_position.size[0] - position_subarray[0]
y = gs_position.size[1] - position_subarray[1]
else:
x = gs_position.size[0] - position_subarray[0]
y = position_subarray[1]
return x, y
def calc_wcs_guiding(model, t_pars, default_roll_ref=0.0, default_vparity=1, default_v3yangle=0.0):
"""Calculate WCS info for FGS guiding
For Fine Guidance guiding observations, nearly everything
in the `wcsinfo` meta information is already populated,
except for the PC matrix and CRVAL*. This function updates the PC
matrix based on the rest of the `wcsinfo`.
CRVAL* values are taken from GS_RA/GS_DEC.
Parameters
----------
model : `~jwst.datamodels.DataModel`
The model to update.
t_pars : `TransformParameters`
The transformation parameters. Parameters are updated during processing.
default_roll_ref : float
If pointing information cannot be retrieved,
use this as the V3 position angle.
default_vparity : int
The default `VIdlParity` to use and should
be either "1" or "-1". "1" is the
default since FGS guiding will be using the
OSS aperture.
default_v3yangle : float
Default SIAF Y-angle.
Returns
-------
crpix1, crpix2, crval1, crval2, pc1_1, pc1_2, pc2_1, pc2_2 : float
The WCS info.
"""
# Determine reference pixel
# Retrieve the appropriate mnemonics that represent the X/Y position of guide star
# in the image.
if t_pars.exp_type in ['fgs_acq1', 'fgs_acq2']:
mnemonics_to_read = FGS_ACQ_MNEMONICS
elif t_pars.exp_type in ['fgs_fineguide', 'fgs_track']:
mnemonics_to_read = FGS_GUIDED_MNEMONICS
elif t_pars.exp_type in FGS_ID_EXP_TYPES:
mnemonics_to_read = FGS_ID_MNEMONICS
else:
raise ValueError(f'Exposure type {t_pars.exp_type} cannot be processed as an FGS product.')
# Getting the timing to extract from the engineering database is complicated by
# the fact that the observation times and the processing of the guide star
# acquisition do not always exactly match. Extend the end time by two seconds.
#
# For ID modes, the mnemonics are valid only after the exposure is completed.
# The time to examine is within 10 seconds after the end of exposure
obsstart = t_pars.obsstart
obsend = t_pars.obsend
if t_pars.exp_type in FGS_ID_EXP_TYPES:
obsstart = obsend
obsend = (Time(obsend, format='mjd') + (10 * U.second)).mjd
elif t_pars.exp_type in FGS_ACQ_EXP_TYPES:
obsend = (Time(obsend, format='mjd') + (2 * U.second)).mjd
try:
gs_position = get_pointing(obsstart, obsend,
mnemonics_to_read=mnemonics_to_read,
engdb_url=t_pars.engdb_url,
tolerance=t_pars.tolerance, reduce_func=t_pars.reduce_func)
except ValueError as exception:
logger.warning('Cannot determine guide star position from engineering. Defaulting to CRPIX = 0')
logger.warning('Engineering failure due to exception: %s', exception)
crpix1 = crpix2 = 0
else:
apername = f'FGS{t_pars.detector[-1]}_FULL_OSS'
aperture = t_pars.siaf_db.get_aperture(apername, t_pars.useafter)
crpix1, crpix2 = gs_ideal_to_subarray(gs_position, aperture, flip=True)
# Determine PC matrix
# Get position angle
try:
roll_ref = model.meta.wcsinfo.roll_ref if model.meta.wcsinfo.roll_ref is not None else default_roll_ref
except AttributeError:
logger.warning('Keyword `ROLL_REF` not found. Using %s as default value', default_roll_ref)
roll_ref = default_roll_ref
roll_ref = np.deg2rad(roll_ref)
# Get VIdlParity
try:
vparity = model.meta.wcsinfo.vparity
except AttributeError:
logger.warning('Keyword "VPARITY" not found. Using %s as default value', default_vparity)
vparity = default_vparity
try:
v3i_yang = model.meta.wcsinfo.v3yangle
except AttributeError:
logger.warning('Keyword "V3I_YANG" not found. Using %s as default value.', default_v3yangle)
v3i_yang = default_v3yangle
pc_matrix = calc_rotation_matrix(roll_ref, np.deg2rad(v3i_yang), vparity=vparity)
# Determine reference sky values
crval1 = model.meta.guidestar.gs_ra
crval2 = model.meta.guidestar.gs_dec
return crpix1, crpix2, crval1, crval2, pc_matrix