Running the JWST pipeline: Python Interface
Note
The use of the run
method to run a pipeline or step is not
recommended. By default, using the pipeline.run()
method defaults to
pipeline and step-level coded defaults, ignoring parameter files,
unless explicitly overridden. Please see Advanced use - pipeline.run() vs. pipeline.call for more details.
The Python interface is one of two options for running the pipeline. See here for an overview of the alternative command line interface.
Overview of Running the Pipeline in Python
When using the Python interface to the JWST pipeline, each pipeline
and
step
is available as a module that can be imported into your Python session,
configured (either directly as arguments/attributes or with a
parameter file), and used to process input data. The
following section will describe the necessary steps to run a pipeline or step in
Python.
CRDS Environment Variables
The CRDS environment variables need to be defined before importing anything
from jwst
or crds
to allow access to reference and parameter files.
These environment variables can be set in the shell, or
in a Python session by using os.environ
. See Setting CRDS Environment Variables in Python
for more information.
Importing and Running Pipelines and Steps in Python
All full pipeline stages can be imported by name from the pipeline
module:
from jwst.pipeline import Image3Pipeline
from jwst.pipeline import Spec2Pipeline
Individual pipeline steps can be imported by name from their respective module
in jwst
:
from jwst.saturation import SaturationStep
from jwst.ramp_fitting import RampFitStep
Details of all the available pipeline modules and their names can be found at Pipeline Modules.
Once imported, you can execute a pipeline or a
step from within Python by using the .call() method of the class. The input can
be either a string path to a file on disk or an open DataModel
object. Note
that the .run() class method is also available for use, but is discouraged and
should be used only with caution (see here for
more information).
Example: Running a Pipeline or Step with Default Parameters and Reference Files
# running a full pipeline stage, input is path to file
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits')
# running a single pipeline step, input is datamodel object
from jwst.linearity import LinearityStep
import stdatamodels.jwst.datamodels as dm
input_model = dm.open('jw00001001001_01101_00001_mirimage_uncal.fits')
result = LinearityStep.call(input_model)
In the examples above, the returned value result
, is a Datamodel
containing the corrected data - no files are written out, by default.
See Controlling Output File Behavior for information on how to control the generation of
output files.
Additionally in both examples above, there are no arguments other than the input
data being passed in to the call
method, so the appropriate parameter files
and reference files are chosen by CRDS based on the current context.
The following section will
show how to configure the pipeline to override these defaults.
Configuring a Pipeline/Step in Python
By default when using the .call()
method to run a pipeline/step, pipeline/step
parameters and reference files are chosen by CRDS based on instrument,
observing mode, date, etc. If set to the most current context,
these represent the ‘best’ set of parameters and reference files for the dataset
passed in, as determined by the JWST instrument teams.
To override parameter and reference file defaults, a pipeline/step can be configured for custom processing. Pipeline-level and step-level parameters can be changed, output file behavior can be set, reference files can be overridden, and pipeline steps can be skipped if desired. This section will be a general overview on how to configure the pipeline when running in Python, and the following sections will elaborate on each of these options.
When running in Python, there are two ways two configure a Pipeline/Step.
By passing in keyword arguments to a pipeline/step’s
call
methodBy using a parameter file
A combination of both keyword arguments and custom parameter files can be used for configuration, but keep in mind the hierarchy of parameter precedence to keep track of which value will get used if set in multiple locations.
Example: Configuring a pipeline/step with keyword arguments
# configuring a pipeline and the steps within the pipeline with keyword arguments
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
save_results=False,
steps={'jump': {'rejection_threshold': 12.0, 'save_results':True}})
# configuring a pipeline step with keyword arguments
result = JumpStep.call('jw00017001001_01101_00001_nrca1_uncal.fits',
save_results=True, 'rejection_threshold'=12.0)
Both examples above show how to configure the jump detection step with the same
settings - the rejection_threshold
set to 12.0, and save_results
set to True to indicate
the result from the step should be written to an output file.
The first example shows when the jump step is run inside a pipeline - because a
pipeline consists of many steps, parameters for a substep are specified within
the steps
argument, a nested dictionary keyed by each substep and again by each
possible parameter for each substep. Pipeline-level arguments (in this case,
save_results
) are passed in individually as keyword arguments. Note that in this
example, the ‘save_results’ argument within steps
will override the
pipeline-level ‘save_results’ argument.
The second example shows the same configuration to the jump step, but this time
when the step is run standalone. Here, there is no steps
dictionary argument
and all arguments can be passed to the step directly since it is now at the step level.
Example: Configuring a pipeline/step with a parameter file
To use a custom parameter file, set the config_file
parameter:
# passing a custom parameter file to a pipeline
result = Detector1Pipeline.call("jw00017001001_01101_00001_nrca1_uncal.fits",\
config_file='calwebb_detector1.asdf')
Again, note the parameter precedence rules. If an
override parameter file passed in does not contain the full set of required
parameters for a step, the others will be obtained according to those rules and
may grab values from the CRDS-chosen parameter file as well. If a custom
parameter file is passed in to config_file
AND an argument is passed directly
to the pipeline/step class, the value in the parameter file is overridden.
Setting Step Parameters on a Pipeline or Individual Step
All steps have parameters that can be set to change various aspects of how they execute (e.g switching on and off certain options in a step, setting thresholds). By default, the values of these parameters are set in the CRDS-chosen parameter file (and if absent, defer to the coded defaults), but they can be overridden if desired.
As Arguments to a Pipeline / Step
As discussed in above, when setting a
step-level parameter when that step is a substep of a pipeline, it must be passed
to the steps
argument dictionary. For exaple, to change the rejection_threshold
parameter of the jump detection step when running the full Detector1Pipeline:
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
steps={'jump': {'rejection_threshold':12.0)}})
When running a single step, step-level parameters can be passed in directly as
keyword arguments. For example, to change the parameter
rejection_threshold
for the jump detection step when running the step individually:
from jwst.jump import JumpStep
result = JumpStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', rejection_threshold=12.0)
Using a Parameter File
Alternatively, if using a parameter file, edit the
file to add the following snippet (in this example, to a file named
my_config_file.asdf
in the current working directory):
steps:
- class: jwst.jump.jump_step.JumpStep
parameters:
rejection_threshold : 12
And pass in the modified file to the config_file
argument:
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
config_file='my_config_file.asdf')
Disabling all CRDS Step Parameters
Retrieval of Step parameters from CRDS can be completely disabled by setting the STPIPE_DISABLE_CRDS_STEPPARS environment variable to TRUE. This can be done in the shell, or using the os.environ() command:
os.environ["STPIPE_DISABLE_CRDS_STEPPARS"] = 'True'
Overriding Reference Files
To override the reference file for a step selected by CRDS:
As Arguments to a Pipeline / Step
To override a reference file for a step within a pipeline, for example the saturation
step in the Detector1Pipeline the override_saturation
argument can be set in the
saturation
section of the steps
argument.
# To override a reference file of a step within a pipeline
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
steps={"saturation" : {"override_saturation": '/path/to/new_saturation_ref_file.fits'}})
Multiple reference file overrides can be provided, for example:
# To override a reference file for multiple steps within a pipeline
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
steps={"saturation": {"override_saturation": '/path/to/new_saturation_ref_file.fits'},
{"jump" : {"override_jump": '/path/to/new_jump_ref_file.fits'}})
To override a reference file for a standalone step, “override_<stepname>”
can be passed directly as a keyword argument to that step’s call
method:
# To override a reference file when running a standalone step
from jwst.linearity import SaturationStep
SaturationStep.call('jw00017001001_01101_00001_nrca1_uncal.fits',
override_saturation='/path/to/new_saturation_ref_file.fits')
Using a Parameter File
If using a parameter file for configuration, to override
a reference edit the file to add the following snippet (in this example, to a file named
my_config_file.asdf
in the current working directory):
steps:
- class: jwst.linearity.saturation_step.SaturationStep
parameters:
override_saturation: '/path/to/new_saturation_ref_file.fits'
And pass in the modified file to the config_file
argument:
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
config_file='my_config_file.asdf')
To use an entire set of past reference files from a previous CRDS mapping, see here.
Skipping a Pipeline Step
Note
Some steps in a pipeline expect certain previous steps to have been run beforehand, and therefore won’t run if that expected previous correction has not been applied. Proceed with caution when skipping steps.
When using the Python interface you wish to run a pipeline but skip one or some of the steps contained in that pipeline, this can be done in two different ways:
As Arguments to a Pipeline / Step
Every step in a pipeline has a skip
parameter that when set to true, will entirely
skip that step. For example, to skip the saturation step in the Detector1Pipeline:
# To set a step parameter on a step within a pipeline
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', steps={"saturation": {"skip": True}})
Using a Parameter File
The equivalent to the above example can be done by adding the following snippet
to your parameter file (in this example, to a file named my_config_file.asdf
in the current working directory):
steps:
- class: jwst.linearity.linearity_step.LinearityStep
parameters:
skip: true
And pass in the modified file to the config_file
argument:
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
config_file='my_config_file.asdf')
Controlling Output File Behavior
By default, when running in Python, all outputs are returned in-memory
(typically as a Datamodel
) and no output files are written - even the final
result of a pipeline. To control this behavior, and other aspects of output file
generation like directory and file name, certain pipeline and step-level parameters
can be set.
Output file behavior can be modified with the ``save_results``, ``output_file``, and ``output_dir`` parameters
Saving Final Pipeline Results
The save_results
parameter, when set at the pipeline-level, indicates
that the final pipeline output products should be saved to a file. The output
files will be in the current working directory, and be named based on the
input file name and the appropriate file suffix. Note that setting save_results
at the pipeline-level will not save the results from each step, only the final
results from the full pipeline.
# To save the final results from a pipeline to a file
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', save_results=True)
- In this example, the following output files will be written in the current working directory:
jw00017001001_01101_00001_nrca1_trapsfilled.fits
jw00017001001_01101_00001_nrca1_rate.fits
jw00017001001_01101_00001_nrca1_rateints.fits
Changing Output File Name
Setting output_file
at the pipeline-level indicates that the pipeline’s final result
should be saved (so, also setting save_results
is redundant), and that a new file
base name should be used with the appropriate file suffix appended. For example,
to save the intermediate result from the saturation step when running
Detector1Pipeline
with a file name based on the string detector_1_final
instead
of jw00017001001_01101_00001_nrca1
:
# saving the final results from running a pipeline with a custom output file basename
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_file='detector_1_final_result')
In this example, the following output files will be written in the current working directory
detector_1_final_result_trapsfilled.fits
detector_1_final_result_rate.fits
detector_1_final_result_rateints.fits
Changing Output File Directory
When set at the pipeline level, the output_dir
parameter will set where the final
pipeline output products are placed. The default is the current working directory.
For example, to save the results from Detector1Pipeline in a subdirectoy /calibrated
:
Setting output_dir
at the pipeline-level indicates that the pipeline’s final
results should be saved (so, also setting save_results
is redundant), and that
the files should be saved in the directory specified instead of the current working
directory. For example, to save the intermediate results of Detector1Pipeline
in a subdirectory /calibrated
:
# to save the final result of a pipeline in a different specified output directory
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_dir='calibrated')
Saving Intermediate Step Results
When the save_results
parameter is set at the step-level (either within a pipeline,
or on a standalone step), it indicates that the result from that step should be
saved to a file.
To save the intermediate output from a step within a pipeline:
# To save the intermediate results of a step within a pipeline to a file
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
steps={"saturation": {"save_results": True}})
Similarly, when save_results
is set on an individual step class, this will indicate
that the final result from that step should be saved.
# To save the final results from SaturationStep when run standalone
from jwst.linearity import SaturationStep
SaturationStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', save_results=True)
Setting Output File Name
Setting output_file
at the step-level indicates that the step’s result should
be saved (so, also setting save_results
is redundant), and that a new file
base name should be used with the appropriate file suffix appended. For example,
to save the intermediate result from the saturation step when running
Detector1Pipeline
with a file name based on the string saturation_result
instead
of jw00017001001_01101_00001_nrca1
:
# To save the intermediate results of a step within a pipeline to a file with a custom name
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
steps={"saturation": {"output_file": 'saturation_result'})
Similarly, when output_file
is set on an individual step class, this will indicate
that the result from that step should be saved to a file with that basename and the
appropriate suffix.
# To save the final results from SaturationStep with a custom output file name when run standalone
from jwst.linearity import SaturationStep
SaturationStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_file="saturation_result")
Setting Output File Directory
Setting output_dir
at the step-level indicates that the step’s result should
be saved (so, also setting save_results
is redundant), and that the files
should be saved in the directory specified instead of the current working directory.
For example, to save the intermediate results of DarkCurrentStep
when running
Detector1Pipeline
in a subdirectory /calibrated
:
# to save the intermediate step result in a different specified output directory
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
steps={'dark': {'output_dir': 'calibrated'}})
Similarly, when output_dir
is set on an individual step class, this will indicate
that the result from that step should be saved to the specified directory:
# to save the final result of a
from jwst.pipeline import Detector1Pipeline
result = DarkCurrentStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_dir='calibrated')
Advanced use - pipeline.run()
vs. pipeline.call
Another option for running pipelines or steps is to use the run()
method
instead of the call()
method. Using .run() is not reccomended and
considered advanced use, but it is an option to users.
The difference between .run()
in .call()
is in the retrieval and use
of parameters from CRDS parameter files. When the .call()
method is invoked,
there is additional setup done to retrieve parameter and reference files and
reconcile those with any passed into the pipeline directly as an argument or in
a custom parameter file. When .call()
is invoked, a new instance of the
pipeline/step class is created internally, and after parameters are determined the
.run()
method of that internal class is called. Because the actual processing
occurs on this new instance, attributes cannot be set directly on the original
pipeline/step class. They must be passed in as arguments to .call()
or set
in the parameter file.
In contrast, when using the .run()
method directly on a pipeline/step, the
additional logic to determine parameters and reference files is skipped. The pipeline
instance is being run as-is, and coded defaults for the pipeline and each intermediate step
will be used unless explicitly overridden individually. Because the instance created is
being run directly on the data, attributes can be set directly:
from jwst.pipeline import Detector1Pipeline
pipe = Detector1Pipeline()
pipe.jump.rejection_threshold = 12
pipe.ramp_fit.skip = True
result = pipe.run('jw00017001001_01101_00001_nrca1_uncal.fits')
The pipe
object created and the attributes set will persist and this object
can be reused within a Python session for processing data. Keep in mind that each
individual step parameter must be set when using this method, or else the coded
defaults will be used, which may be inappropriate for the dataset being processed.
See Executing a pipeline or pipeline step via call() for more information.
Multiprocessing
Multiprocessing is supported to speed up certain computationally-intensive steps in the pipeline, including the jump detection, ramp fitting, and WFSS contamination correction steps. The examples below show how multiprocessing can be enabled for these steps, as well as how to set up multiprocessing to simultaneously run the entire pipeline on multiple observations.
Since the pipeline uses multiprocessing it is critical that any code using the pipeline adhere
to the guidelines described in the
python multiprocessing documentation.
The pipeline uses the forkserver
start method internally and it is recommended that any
multiprocessing scripts that use the pipline use the same start. As detailed in the
python documentation
this will require that code be “protected” with a if __name__ == '__main__':
check as follows
if __name__ = '__main__':
[code used in multiprocessing]
There are a couple of scenarios to use multiprocessing with the pipeline:
1. Multiprocessing within a pipeline step. At the moment, the steps that
support this are the jump,
ramp_fitting,
and wfss_contam steps. To enable multiprocessing, the
optional parameter is maximum_cores
for the jump
, ramp_fitting
, and
wfss_contam
steps. This parameter can be set to a numerical value given
as a string or it can be set to the words quarter
, half
, all
,
or none
, which is the default value.
The following example turns on a step’s multiprocessing option. Notice only one of the steps has multiprocessing turned on. We do not recommend simultaneously enabling both steps to do multiprocessing, as this may likely lead to running out of system memory.
# SampleScript1
import os, sys
from jwst.pipeline import Detector1Pipeline
uncal_file = 'jw0000_0000_uncal.fits'
output_dir = '/my_project'
def main():
det1 = Detector1Pipeline()
parameter_dict = {"ramp_fit": {"maximum_cores": 'all'}}
det1.call(uncal_file, save_results=True, steps=parameter_dict, output_dir=output_dir)
if __name__ = '__main__':
sys.exit(main())
2. Calling the pipeline using multiprocessing. The following example uses this
option setting up a log file for each run of the pipeline and a text file with
the full traceback in case there is a crash. Notice that the import
statement
of the pipeline is within the multiprocessing block that gets called by every
worker. This is to avoid a known memory leak.
# SampleScript2
import os, sys
import traceback
import configparser
import multiprocessing
from glob import glob
def mk_stpipe_log_cfg(output_dir, log_name):
"""
Create a configuration file with the name log_name, where
the pipeline will write all output.
Args:
outpur_dir: str, path of the output directory
log_name: str, name of the log to record screen output
Returns:
nothing
"""
config = configparser.ConfigParser()
config.add_section("*")
config.set("*", "handler", "file:" + log_name)
config.set("*", "level", "INFO")
pipe_log_config = os.path.join(output_dir, "pipeline-log.cfg")
config.write(open(pipe_log_config, "w"))
def run_det1(uncal_file, output_dir):
"""
Run the Detector1 pipeline on the given file.
Args:
uncal_file: str, name of uncalibrated file to run
outpur_dir: str, path of the output directory
Returns:
nothing
"""
log_name = os.path.basename(uncal_file).replace('.fits', '')
mk_stpipe_log_cfg(output_dir, log_name+'.log')
from jwst.pipeline.calwebb_detector1 import Detector1Pipeline
pipe_success = False
try:
det1 = Detector1Pipeline()
det1.call(uncal_file, output_dir=output_dir, logcfg="pipeline-log.cfg", save_results=True)
pipe_success = True
print('\n * Pipeline finished for file: ', uncal_file, ' \n')
except Exception:
print('\n *** OH NO! The detector1 pipeline crashed! *** \n')
pipe_crash_msg = traceback.print_exc()
if not pipe_success:
crashfile = open(log_name+'_pipecrash.txt', 'w')
print('Printing file with full traceback')
print(pipe_crash_msg, file=crashfile)
def main():
input_data_dir = '/my_project_dir'
output_dir = input_data_dir
# get the files to run
files_to_run = glob(os.path.join(input_data_dir, '*_uncal.fits'))
print('Will run the pipeline on {} files'.format(len(files_to_run)))
# the output list should be the same length as the files to run
outptd = [output_dir for _ in range(len(files_to_run))]
# get the cores to use
cores2use = int(os.cpu_count()/2) # half of all available cores
print('* Using ', cores2use, ' cores for multiprocessing.')
# set the pool and run multiprocess
with multiprocessing.Pool(cores2use) as pool:
pool.starmap(run_det1, zip(files_to_run, outptd))
print('\n * Finished multiprocessing! \n')
if __name__ == '__main__':
sys.exit(main())
Warning
Although it is technically possible to call the pipeline with
multiprocessing while also enabling this option in a step, we
strongly recommend not to do this. This scenario would be the same as
SampleScript2
except with adding and calling the parameter dictionary
parameter_dict
in SampleScript1
. However, Python will crash
if both multiprocessing options are set to use all the cores or even
less, because it is not permitted that a worker has children processes.
We recommend not enabling step multiprocessing for parallel pipeline
runs to avoid potentially running out of memory.