Source code for isofit.core.isofit

#! /usr/bin/env python
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Authors: David R Thompson, david.r.thompson@jpl.nasa.gov
#          Philip G Brodrick, philip.brodrick@jpl.nasa.gov
#          Adam Erickson, adam.m.erickson@nasa.gov
#

import os
import logging
import time

import multiprocessing
import numpy as np
import ray

from isofit.core.forward import ForwardModel
from isofit.inversion.inverse import Inversion
from isofit.inversion.inverse_mcmc import MCMCInversion
from isofit.core.fileio import IO
from isofit.configs import configs


[docs]class Isofit: """Initialize the Isofit class. Args: config_file: isofit configuration file in JSON or YAML format row_column: The user can specify * a single number, in which case it is interpreted as a row * a comma-separated pair, in which case it is interpreted as a row/column tuple (i.e. a single spectrum) * a comma-separated quartet, in which case it is interpreted as a row, column range in the order (line_start, line_end, sample_start, sample_end) all values are inclusive. If none of the above, the whole cube will be analyzed. level: logging level (ERROR, WARNING, INFO, DEBUG) logfile: file to write output logs to """ def __init__(self, config_file, row_column='', level='INFO', logfile=None): # Explicitly set the number of threads to be 1, so we more effectively #run in parallel os.environ["MKL_NUM_THREADS"] = "1" # Set logging level self.loglevel = level self.logfile = logfile logging.basicConfig(format='%(levelname)s:%(message)s', level=self.loglevel, filename=self.logfile) self.rows = None self.cols = None self.config = None self.fm = None self.iv = None self.io = None self.states = None # Load configuration file self.config = configs.create_new_config(config_file) self.config.get_config_errors() # Initialize ray for parallel execution rayargs = {'address': self.config.implementation.ip_head, 'redis_password': self.config.implementation.redis_password, 'local_mode': self.config.implementation.n_cores == 1} # We can only set the num_cpus if running on a single-node if self.config.implementation.ip_head is None and self.config.implementation.redis_password is None: rayargs['num_cpus'] = self.config.implementation.n_cores ray.init(**rayargs) if len(row_column) > 0: ranges = row_column.split(',') if len(ranges) == 1: self.rows, self.cols = [int(ranges[0])], None if len(ranges) == 2: row_start, row_end = ranges self.rows, self.cols = range( int(row_start), int(row_end)), None elif len(ranges) == 4: row_start, row_end, col_start, col_end = ranges line_start, line_end, samp_start, samp_end = ranges self.rows = range(int(row_start), int(row_end)) self.cols = range(int(col_start), int(col_end)) # Build the forward model and inversion objects self._init_nonpicklable_objects() self.io = IO(self.config, self.fm, self.iv, self.rows, self.cols) def _init_nonpicklable_objects(self) -> None: """ Internal function to initialize objects that cannot be pickled """ self.fm = ForwardModel(self.config) if self.config.implementation.mode == 'mcmc_inversion': self.iv = MCMCInversion(self.config, self.fm) elif self.config.implementation.mode in ['inversion', 'simulation']: self.iv = Inversion(self.config, self.fm) else: # This should never be reached due to configuration checking raise AttributeError('Config implementation mode node valid') def _clear_nonpicklable_objects(self): """ Internal function to clean objects that cannot be pickled """ self.fm = None self.iv = None @ray.remote def _run_set_of_spectra(self, index_start: int, index_stop: int) -> None: """Internal function to run a chunk of spectra Args: index_start: spectral index to start execution at index_stop: spectral index to stop execution at """ logging.basicConfig(format='%(levelname)s:%(message)s', level=self.loglevel, filename=self.logfile) self._init_nonpicklable_objects() io = IO(self.config, self.fm, self.iv, self.rows, self.cols) for index in range(index_start, index_stop): success, row, col, meas, geom = io.get_components_at_index( index) # Only run through the inversion if we got some data if success: if meas is not None and all(meas < -49.0): # Bad data flags self.states = [] else: # The inversion returns a list of states, which are # intepreted either as samples from the posterior (MCMC case) # or as a gradient descent trajectory (standard case). For # a trajectory, the last spectrum is the converged solution. self.states = self.iv.invert(meas, geom) # Write the spectra to disk io.write_spectrum(row, col, self.states, meas, geom, flush_immediately=True) if (index - index_start) % 100 == 0: logging.info( 'Core at start index {} completed inversion {}/{}'.format(index_start, index-index_start, index_stop-index_start))
[docs] def run(self): """ Iterate over all spectra, reading and writing through the IO object to handle formatting, buffering, and deferred write-to-file. The idea is to avoid reading the entire file into memory, or hitting the physical disk too often. These are our main class variables. """ n_iter = len(self.io.iter_inds) self._clear_nonpicklable_objects() self.io = None if self.config.implementation.n_cores is None: n_workers = min(multiprocessing.cpu_count(), n_iter) else: n_workers = min(self.config.implementation.n_cores, n_iter) start_time = time.time() logging.info('Beginning inversions using {} cores'.format(n_workers)) # Divide up spectra to run into chunks index_sets = np.linspace(0, n_iter, num=n_workers+1, dtype=int) # Run spectra, in either serial or parallel depending on n_workers results = ray.get([self._run_set_of_spectra.remote(self, index_sets[l], index_sets[l + 1]) for l in range(len(index_sets)-1)]) total_time = time.time() - start_time logging.info('Inversions complete. {} s total, {} spectra/s, {} spectra/s/core'.format( round(total_time,2), round(n_iter/total_time,4), round(n_iter/total_time/n_workers,4)))