Running the JWST pipeline: Python Interface

Note

The use of the run method to run a pipeline or step is not recommended. By default, using the pipeline.run() method defaults to pipeline and step-level coded defaults, ignoring parameter files, unless explicitly overridden. Please see Advanced use - pipeline.run() vs. pipeline.call for more details.

The Python interface is one of two options for running the pipeline. See here for an overview of the alternative command line interface.

Overview of Running the Pipeline in Python

When using the Python interface to the JWST pipeline, each pipeline and step is available as a module that can be imported into your Python session, configured (either directly as arguments/attributes or with a parameter file), and used to process input data. The following section will describe the necessary steps to run a pipeline or step in Python.

CRDS Environment Variables

The CRDS environment variables need to be defined before importing anything from jwst or crds to allow access to reference and parameter files. These environment variables can be set in the shell, or in a Python session by using os.environ. See Setting CRDS Environment Variables in Python for more information.

Importing and Running Pipelines and Steps in Python

All full pipeline stages can be imported by name from the pipeline module:

from jwst.pipeline import Image3Pipeline
from jwst.pipeline import Spec2Pipeline

Individual pipeline steps can be imported by name from their respective module in jwst:

from jwst.saturation import SaturationStep
from jwst.ramp_fitting import RampFitStep

Details of all the available pipeline modules and their names can be found at Pipeline Modules.

Once imported, you can execute a pipeline or a step from within Python by using the .call() method of the class. The input can be either a string path to a file on disk or an open DataModel object. Note that the .run() class method is also available for use, but is discouraged and should be used only with caution (see here for more information).

Example: Running a Pipeline or Step with Default Parameters and Reference Files

# running a full pipeline stage, input is path to file
from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits')

# running a single pipeline step, input is datamodel object
from jwst.linearity import LinearityStep
import stdatamodels.jwst.datamodels as dm
input_model = dm.open('jw00001001001_01101_00001_mirimage_uncal.fits')
result = LinearityStep.call(input_model)

In the examples above, the returned value result, is a Datamodel containing the corrected data - no files are written out, by default. See Controlling Output File Behavior for information on how to control the generation of output files.

Additionally in both examples above, there are no arguments other than the input data being passed in to the call method, so the appropriate parameter files and reference files are chosen by CRDS based on the current context. The following section will show how to configure the pipeline to override these defaults.

Configuring a Pipeline/Step in Python

By default when using the .call() method to run a pipeline/step, pipeline/step parameters and reference files are chosen by CRDS based on instrument, observing mode, date, etc. If set to the most current context, these represent the ‘best’ set of parameters and reference files for the dataset passed in, as determined by the JWST instrument teams.

To override parameter and reference file defaults, a pipeline/step can be configured for custom processing. Pipeline-level and step-level parameters can be changed, output file behavior can be set, reference files can be overridden, and pipeline steps can be skipped if desired. This section will be a general overview on how to configure the pipeline when running in Python, and the following sections will elaborate on each of these options.

When running in Python, there are two ways two configure a Pipeline/Step.

  1. By passing in keyword arguments to a pipeline/step’s call method

  2. By using a parameter file

A combination of both keyword arguments and custom parameter files can be used for configuration, but keep in mind the hierarchy of parameter precedence to keep track of which value will get used if set in multiple locations.

Example: Configuring a pipeline/step with keyword arguments

# configuring a pipeline and the steps within the pipeline with keyword arguments
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                save_results=False,
                                steps={'jump': {'rejection_threshold': 12.0, 'save_results':True}})
# configuring a pipeline step with keyword arguments
result = JumpStep.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                       save_results=True, 'rejection_threshold'=12.0)

Both examples above show how to configure the jump detection step with the same settings - the rejection_threshold set to 12.0, and save_results set to True to indicate the result from the step should be written to an output file.

The first example shows when the jump step is run inside a pipeline - because a pipeline consists of many steps, parameters for a substep are specified within the steps argument, a nested dictionary keyed by each substep and again by each possible parameter for each substep. Pipeline-level arguments (in this case, save_results) are passed in individually as keyword arguments. Note that in this example, the ‘save_results’ argument within steps will override the pipeline-level ‘save_results’ argument.

The second example shows the same configuration to the jump step, but this time when the step is run standalone. Here, there is no steps dictionary argument and all arguments can be passed to the step directly since it is now at the step level.

Example: Configuring a pipeline/step with a parameter file

To use a custom parameter file, set the config_file parameter:

# passing a custom parameter file to a pipeline
result = Detector1Pipeline.call("jw00017001001_01101_00001_nrca1_uncal.fits",\
                                config_file='calwebb_detector1.asdf')

Again, note the parameter precedence rules. If an override parameter file passed in does not contain the full set of required parameters for a step, the others will be obtained according to those rules and may grab values from the CRDS-chosen parameter file as well. If a custom parameter file is passed in to config_file AND an argument is passed directly to the pipeline/step class, the value in the parameter file is overridden.

Setting Step Parameters on a Pipeline or Individual Step

All steps have parameters that can be set to change various aspects of how they execute (e.g switching on and off certain options in a step, setting thresholds). By default, the values of these parameters are set in the CRDS-chosen parameter file (and if absent, defer to the coded defaults), but they can be overridden if desired.

As Arguments to a Pipeline / Step

As discussed in above, when setting a step-level parameter when that step is a substep of a pipeline, it must be passed to the steps argument dictionary. For exaple, to change the rejection_threshold parameter of the jump detection step when running the full Detector1Pipeline:

from jwst.pipeline import Detector1Pipeline
result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                 steps={'jump': {'rejection_threshold':12.0)}})

When running a single step, step-level parameters can be passed in directly as keyword arguments. For example, to change the parameter rejection_threshold for the jump detection step when running the step individually:

from jwst.jump import JumpStep
result = JumpStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', rejection_threshold=12.0)

Using a Parameter File

Alternatively, if using a parameter file, edit the file to add the following snippet (in this example, to a file named my_config_file.asdf in the current working directory):

steps:
- class: jwst.jump.jump_step.JumpStep
  parameters:
    rejection_threshold : 12

And pass in the modified file to the config_file argument:

result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                 config_file='my_config_file.asdf')

Disabling all CRDS Step Parameters

Retrieval of Step parameters from CRDS can be completely disabled by setting the STPIPE_DISABLE_CRDS_STEPPARS environment variable to TRUE. This can be done in the shell, or using the os.environ() command:

os.environ["STPIPE_DISABLE_CRDS_STEPPARS"] = 'True'

Overriding Reference Files

To override the reference file for a step selected by CRDS:

As Arguments to a Pipeline / Step

To override a reference file for a step within a pipeline, for example the saturation step in the Detector1Pipeline the override_saturation argument can be set in the saturation section of the steps argument.

# To override a reference file of a step within a pipeline
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                 steps={"saturation" : {"override_saturation": '/path/to/new_saturation_ref_file.fits'}})

Multiple reference file overrides can be provided, for example:

# To override a reference file for multiple steps within a pipeline
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                                                 steps={"saturation": {"override_saturation": '/path/to/new_saturation_ref_file.fits'},
                                                                       {"jump" : {"override_jump": '/path/to/new_jump_ref_file.fits'}})

To override a reference file for a standalone step, “override_<stepname>” can be passed directly as a keyword argument to that step’s call method:

# To override a reference file when running a standalone step
 from jwst.linearity import SaturationStep
 SaturationStep.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                         override_saturation='/path/to/new_saturation_ref_file.fits')

Using a Parameter File

If using a parameter file for configuration, to override a reference edit the file to add the following snippet (in this example, to a file named my_config_file.asdf in the current working directory):

steps:
- class: jwst.linearity.saturation_step.SaturationStep
  parameters:
    override_saturation: '/path/to/new_saturation_ref_file.fits'

And pass in the modified file to the config_file argument:

result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                 config_file='my_config_file.asdf')

To use an entire set of past reference files from a previous CRDS mapping, see here.

Skipping a Pipeline Step

Note

Some steps in a pipeline expect certain previous steps to have been run beforehand, and therefore won’t run if that expected previous correction has not been applied. Proceed with caution when skipping steps.

When using the Python interface you wish to run a pipeline but skip one or some of the steps contained in that pipeline, this can be done in two different ways:

As Arguments to a Pipeline / Step

Every step in a pipeline has a skip parameter that when set to true, will entirely skip that step. For example, to skip the saturation step in the Detector1Pipeline:

# To set a step parameter on a step within a pipeline
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', steps={"saturation": {"skip": True}})

Using a Parameter File

The equivalent to the above example can be done by adding the following snippet to your parameter file (in this example, to a file named my_config_file.asdf in the current working directory):

steps:
- class: jwst.linearity.linearity_step.LinearityStep
  parameters:
    skip: true

And pass in the modified file to the config_file argument:

result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                 config_file='my_config_file.asdf')

Controlling Output File Behavior

By default, when running in Python, all outputs are returned in-memory (typically as a Datamodel) and no output files are written - even the final result of a pipeline. To control this behavior, and other aspects of output file generation like directory and file name, certain pipeline and step-level parameters can be set.

Output file behavior can be modified with the ``save_results``, ``output_file``, and ``output_dir`` parameters

Saving Final Pipeline Results

The save_results parameter, when set at the pipeline-level, indicates that the final pipeline output products should be saved to a file. The output files will be in the current working directory, and be named based on the input file name and the appropriate file suffix. Note that setting save_results at the pipeline-level will not save the results from each step, only the final results from the full pipeline.

# To save the final results from a pipeline to a file
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', save_results=True)
In this example, the following output files will be written in the current working directory:
  • jw00017001001_01101_00001_nrca1_trapsfilled.fits

  • jw00017001001_01101_00001_nrca1_rate.fits

  • jw00017001001_01101_00001_nrca1_rateints.fits

Changing Output File Name

Setting output_file at the pipeline-level indicates that the pipeline’s final result should be saved (so, also setting save_results is redundant), and that a new file base name should be used with the appropriate file suffix appended. For example, to save the intermediate result from the saturation step when running Detector1Pipeline with a file name based on the string detector_1_final instead of jw00017001001_01101_00001_nrca1:

# saving the final results from running a pipeline with a custom output file basename
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_file='detector_1_final_result')

In this example, the following output files will be written in the current working directory

  • detector_1_final_result_trapsfilled.fits

  • detector_1_final_result_rate.fits

  • detector_1_final_result_rateints.fits

Changing Output File Directory

When set at the pipeline level, the output_dir parameter will set where the final pipeline output products are placed. The default is the current working directory. For example, to save the results from Detector1Pipeline in a subdirectoy /calibrated:

Setting output_dir at the pipeline-level indicates that the pipeline’s final results should be saved (so, also setting save_results is redundant), and that the files should be saved in the directory specified instead of the current working directory. For example, to save the intermediate results of Detector1Pipeline in a subdirectory /calibrated:

# to save the final result of a pipeline in a different specified output directory
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_dir='calibrated')

Saving Intermediate Step Results

When the save_results parameter is set at the step-level (either within a pipeline, or on a standalone step), it indicates that the result from that step should be saved to a file.

To save the intermediate output from a step within a pipeline:

# To save the intermediate results of a step within a pipeline to a file
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                                                steps={"saturation": {"save_results": True}})

Similarly, when save_results is set on an individual step class, this will indicate that the final result from that step should be saved.

# To save the final results from SaturationStep when run standalone
 from jwst.linearity import SaturationStep
 SaturationStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', save_results=True)

Setting Output File Name

Setting output_file at the step-level indicates that the step’s result should be saved (so, also setting save_results is redundant), and that a new file base name should be used with the appropriate file suffix appended. For example, to save the intermediate result from the saturation step when running Detector1Pipeline with a file name based on the string saturation_result instead of jw00017001001_01101_00001_nrca1:

# To save the intermediate results of a step within a pipeline to a file with a custom name
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                                                steps={"saturation": {"output_file": 'saturation_result'})

Similarly, when output_file is set on an individual step class, this will indicate that the result from that step should be saved to a file with that basename and the appropriate suffix.

# To save the final results from SaturationStep with a custom output file name when run standalone
 from jwst.linearity import SaturationStep
 SaturationStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_file="saturation_result")

Setting Output File Directory

Setting output_dir at the step-level indicates that the step’s result should be saved (so, also setting save_results is redundant), and that the files should be saved in the directory specified instead of the current working directory. For example, to save the intermediate results of DarkCurrentStep when running Detector1Pipeline in a subdirectory /calibrated:

# to save the intermediate step result in a different specified output directory
 from jwst.pipeline import Detector1Pipeline
 result = Detector1Pipeline.call('jw00017001001_01101_00001_nrca1_uncal.fits',
                                                                 steps={'dark': {'output_dir': 'calibrated'}})

Similarly, when output_dir is set on an individual step class, this will indicate that the result from that step should be saved to the specified directory:

# to save the final result of a
 from jwst.pipeline import Detector1Pipeline
 result = DarkCurrentStep.call('jw00017001001_01101_00001_nrca1_uncal.fits', output_dir='calibrated')

Advanced use - pipeline.run() vs. pipeline.call

Another option for running pipelines or steps is to use the run() method instead of the call() method. Using .run() is not reccomended and considered advanced use, but it is an option to users.

The difference between .run() in .call() is in the retrieval and use of parameters from CRDS parameter files. When the .call() method is invoked, there is additional setup done to retrieve parameter and reference files and reconcile those with any passed into the pipeline directly as an argument or in a custom parameter file. When .call() is invoked, a new instance of the pipeline/step class is created internally, and after parameters are determined the .run() method of that internal class is called. Because the actual processing occurs on this new instance, attributes cannot be set directly on the original pipeline/step class. They must be passed in as arguments to .call() or set in the parameter file.

In contrast, when using the .run() method directly on a pipeline/step, the additional logic to determine parameters and reference files is skipped. The pipeline instance is being run as-is, and coded defaults for the pipeline and each intermediate step will be used unless explicitly overridden individually. Because the instance created is being run directly on the data, attributes can be set directly:

from jwst.pipeline import Detector1Pipeline
pipe = Detector1Pipeline()
pipe.jump.rejection_threshold = 12
pipe.ramp_fit.skip = True
result = pipe.run('jw00017001001_01101_00001_nrca1_uncal.fits')

The pipe object created and the attributes set will persist and this object can be reused within a Python session for processing data. Keep in mind that each individual step parameter must be set when using this method, or else the coded defaults will be used, which may be inappropriate for the dataset being processed.

See Executing a pipeline or pipeline step via call() for more information.

Multiprocessing

Multiprocessing is supported to speed up certain computationally-intensive steps in the pipeline, including the jump detection, ramp fitting, and WFSS contamination correction steps. The examples below show how multiprocessing can be enabled for these steps, as well as how to set up multiprocessing to simultaneously run the entire pipeline on multiple observations.

Since the pipeline uses multiprocessing it is critical that any code using the pipeline adhere to the guidelines described in the python multiprocessing documentation. The pipeline uses the forkserver start method internally and it is recommended that any multiprocessing scripts that use the pipline use the same start. As detailed in the python documentation this will require that code be “protected” with a if __name__ == '__main__': check as follows

if __name__ = '__main__':
    [code used in multiprocessing]

There are a couple of scenarios to use multiprocessing with the pipeline:

1. Multiprocessing within a pipeline step. At the moment, the steps that support this are the jump, ramp_fitting, and wfss_contam steps. To enable multiprocessing, the optional parameter is maximum_cores for the jump, ramp_fitting, and wfss_contam steps. This parameter can be set to a numerical value given as a string or it can be set to the words quarter, half, all, or none, which is the default value.

The following example turns on a step’s multiprocessing option. Notice only one of the steps has multiprocessing turned on. We do not recommend simultaneously enabling both steps to do multiprocessing, as this may likely lead to running out of system memory.

# SampleScript1

import os, sys
from jwst.pipeline import Detector1Pipeline

uncal_file = 'jw0000_0000_uncal.fits'
output_dir = '/my_project'

def main():
    det1 = Detector1Pipeline()
    parameter_dict = {"ramp_fit": {"maximum_cores": 'all'}}
    det1.call(uncal_file, save_results=True, steps=parameter_dict, output_dir=output_dir)

if __name__ = '__main__':
    sys.exit(main())

2. Calling the pipeline using multiprocessing. The following example uses this option setting up a log file for each run of the pipeline and a text file with the full traceback in case there is a crash. Notice that the import statement of the pipeline is within the multiprocessing block that gets called by every worker. This is to avoid a known memory leak.

# SampleScript2

import os, sys
import traceback
import configparser
import multiprocessing
from glob import glob

def mk_stpipe_log_cfg(output_dir, log_name):
    """
    Create a configuration file with the name log_name, where
    the pipeline will write all output.
    Args:
        outpur_dir: str, path of the output directory
        log_name: str, name of the log to record screen output
    Returns:
        nothing
    """
    config = configparser.ConfigParser()
    config.add_section("*")
    config.set("*", "handler", "file:" + log_name)
    config.set("*", "level", "INFO")
    pipe_log_config = os.path.join(output_dir, "pipeline-log.cfg")
    config.write(open(pipe_log_config, "w"))

def run_det1(uncal_file, output_dir):
    """
    Run the Detector1 pipeline on the given file.
    Args:
        uncal_file: str, name of uncalibrated file to run
        outpur_dir: str, path of the output directory
    Returns:
        nothing
    """
    log_name = os.path.basename(uncal_file).replace('.fits', '')
    mk_stpipe_log_cfg(output_dir, log_name+'.log')
    from jwst.pipeline.calwebb_detector1 import Detector1Pipeline
    pipe_success = False
    try:
        det1 = Detector1Pipeline()
        det1.call(uncal_file, output_dir=output_dir, logcfg="pipeline-log.cfg", save_results=True)
        pipe_success = True
        print('\n * Pipeline finished for file: ', uncal_file, ' \n')
    except Exception:
        print('\n *** OH NO! The detector1 pipeline crashed! *** \n')
        pipe_crash_msg = traceback.print_exc()
    if not pipe_success:
        crashfile = open(log_name+'_pipecrash.txt', 'w')
        print('Printing file with full traceback')
        print(pipe_crash_msg, file=crashfile)

def main():
    input_data_dir = '/my_project_dir'
    output_dir = input_data_dir

    # get the files to run
    files_to_run = glob(os.path.join(input_data_dir, '*_uncal.fits'))
    print('Will run the pipeline on {} files'.format(len(files_to_run)))

    # the output list should be the same length as the files to run
    outptd = [output_dir for _ in range(len(files_to_run))]

    # get the cores to use
    cores2use = int(os.cpu_count()/2)   # half of all available cores
    print('* Using ', cores2use, ' cores for multiprocessing.')

    # set the pool and run multiprocess
    with multiprocessing.Pool(cores2use) as pool:
        pool.starmap(run_det1, zip(files_to_run, outptd))

    print('\n * Finished multiprocessing! \n')

if __name__ == '__main__':
    sys.exit(main())

Warning

Although it is technically possible to call the pipeline with multiprocessing while also enabling this option in a step, we strongly recommend not to do this. This scenario would be the same as SampleScript2 except with adding and calling the parameter dictionary parameter_dict in SampleScript1. However, Python will crash if both multiprocessing options are set to use all the cores or even less, because it is not permitted that a worker has children processes. We recommend not enabling step multiprocessing for parallel pipeline runs to avoid potentially running out of memory.