Source code for isofit.radiative_transfer.modtran

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

from sys import platform
import os
import logging
import re
import json
from copy import deepcopy
import numpy as np
import scipy.stats
import scipy.interpolate

from ..radiative_transfer.look_up_tables import TabularRT, FileExistsError
from ..core.common import json_load_ascii, recursive_replace

from ..core.common import VectorInterpolator
from isofit.configs.sections.radiative_transfer_config import RadiativeTransferEngineConfig
from isofit.configs import Config
import subprocess


### Variables ###

eps = 1e-5  # used for finite difference derivative calculations

### Classes ###


[docs]class ModtranRT(TabularRT): """A model of photon transport including the atmosphere.""" def __init__(self, engine_config: RadiativeTransferEngineConfig, full_config: Config): """.""" # Specify which of the potential MODTRAN LUT parameters are angular, which will be handled differently self.angular_lut_keys_degrees = ['OBSZEN', 'TRUEAZ', 'viewzen', 'viewaz', 'solzen', 'solaz'] self.angular_lut_keys_radians = [] super().__init__(engine_config, full_config) # Flag to determine if MODTRAN should operate with reflectivity = 1 # (enabling thermal_upwelling and thermal_downwelling to be determined - see comments below) self.treat_as_emissive = False if self.wl[0] > 2500: self.treat_as_emissive = True self.modtran_dir = self.find_basedir(engine_config) flt_name = 'wavelengths_{}_{}_{}.flt'.format( engine_config.engine_name, self.wl[0], self.wl[-1]) self.filtpath = os.path.join(self.lut_dir, flt_name) self.template = deepcopy(json_load_ascii(engine_config.template_file)['MODTRAN']) # Insert aerosol templates, if specified if engine_config.aerosol_model_file is not None: self.template[0]['MODTRANINPUT']['AEROSOLS'] = \ deepcopy(json_load_ascii(engine_config.aerosol_template_file)) # Insert aerosol data, if specified if engine_config.aerosol_model_file is not None: aer_data = np.loadtxt(engine_config.aerosol_model_file) self.aer_wl = aer_data[:, 0] aer_data = np.transpose(aer_data[:, 1:]) self.naer = int(len(aer_data)/3) aer_absc, aer_extc, aer_asym = [], [], [] for i in range(self.naer): aer_extc.append(aer_data[i*3]) aer_absc.append(aer_data[i*3+1]) aer_asym.append(aer_data[i*3+2]) self.aer_absc = np.array(aer_absc) self.aer_extc = np.array(aer_extc) self.aer_asym = np.array(aer_asym) self.modtran_lut_names = ['rhoatm', 'transm', 'sphalb', 'transup'] if self.treat_as_emissive: self.modtran_lut_names = ['thermal_upwelling', 'thermal_downwelling'] + self.modtran_lut_names self.last_point_looked_up = np.zeros(self.n_point) self.last_point_lookup_values = np.zeros(self.n_point) # Build the lookup table self.build_lut()
[docs] def find_basedir(self, config): """Seek out a modtran base directory.""" if config.engine_base_dir is not None: return config.engine_base_dir try: return os.getenv('MODTRAN_DIR') except KeyError: logging.error('I could not find the MODTRAN base directory') raise KeyError('I could not find the MODTRAN base directory')
[docs] def load_tp6(self, infile): """Load a '.tp6' file. This contains the solar geometry. We Return cosine of mean solar zenith.""" with open(infile, 'r') as f: ts, te = -1, -1 # start and end indices lines = [] while len(lines) == 0 or len(lines[-1]) > 0: try: lines.append(f.readline()) except UnicodeDecodeError: pass for i, line in enumerate(lines): if "SINGLE SCATTER SOLAR" in line: ts = i+5 if ts >= 0 and len(line) < 5: te = i break if ts < 0: logging.error('%s is missing solar geometry' % infile) raise ValueError('%s is missing solar geometry' % infile) szen = np.array([float(lines[i].split()[3]) for i in range(ts, te)]).mean() return szen
[docs] def load_chn(self, infile, coszen): """Load a '.chn' output file and parse critical coefficient vectors. These are: * wl - wavelength vector * sol_irr - solar irradiance * sphalb - spherical sky albedo at surface * transm - diffuse and direct irradiance along the sun-ground-sensor path * transup - transmission along the ground-sensor path only !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Be careful with these! They are to be used only by the modtran_tir functions because MODTRAN must be run with a reflectivity of 1 for them to be used in the RTM defined in radiative_transfer.py. * thermal_upwelling - atmospheric path radiance * thermal_downwelling - sky-integrated thermal path radiance reflected off the ground and back into the sensor. !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! We parse them one wavelength at a time.""" with open(infile) as f: sols, transms, sphalbs, wls, rhoatms, transups = \ [], [], [], [], [], [] thermal_upwellings, thermal_downwellings = [], [] lines = f.readlines() for i, line in enumerate(lines): if i < 5: continue toks = line.strip().split(' ') toks = re.findall(r"[\S]+", line.strip()) wl, wid = float(toks[0]), float(toks[8]) # nm solar_irr = float(toks[18]) * 1e6 * \ np.pi / wid / coszen # uW/nm/sr/cm2 rdnatm = float(toks[4]) * 1e6 # uW/nm/sr/cm2 rhoatm = rdnatm * np.pi / (solar_irr * coszen) sphalb = float(toks[23]) transm = float(toks[22]) + float(toks[21]) transup = float(toks[24]) # Be careful with these! See note in function comments above thermal_emission = float(toks[11]) thermal_scatter = float(toks[12]) thermal_upwelling = (thermal_emission + thermal_scatter) / \ wid * 1e6 # uW/nm/sr/cm2 # Be careful with these! See note in function comments above # grnd_rflt already includes ground-to-sensor transmission grnd_rflt = float(toks[16]) thermal_downwelling = grnd_rflt / wid * 1e6 # uW/nm/sr/cm2 sols.append(solar_irr) transms.append(transm) sphalbs.append(sphalb) rhoatms.append(rhoatm) transups.append(transup) thermal_upwellings.append(thermal_upwelling) thermal_downwellings.append(thermal_downwelling) wls.append(wl) params = [np.array(i) for i in [wls, sols, rhoatms, transms, sphalbs, transups, thermal_upwellings, thermal_downwellings]] return tuple(params)
[docs] def ext550_to_vis(self, ext550): """.""" return np.log(50.0) / (ext550 + 0.01159)
[docs] def modtran_driver(self, overrides): """Write a MODTRAN 6.0 input file.""" param = deepcopy(self.template) if hasattr(self, 'aer_absc'): fracs = np.zeros((self.naer)) # Perform overrides for key, val in overrides.items(): recursive_replace(param, key, val) if key.startswith('AER'): i = int(key.split('_')[-1]) fracs[i] = val elif key == 'EXT550' or key == 'AOT550' or key == 'AOD550': # MODTRAN 6.0 convention treats negative visibility as AOT550 recursive_replace(param, 'VIS', -val) elif key == 'FILTNM': param[0]['MODTRANINPUT']['SPECTRAL']['FILTNM'] = val elif key in ['ITYPE', 'H1ALT', 'IDAY', 'IPARM', 'PARM1', 'PARM2', 'GMTIME', 'TRUEAZ', 'OBSZEN']: param[0]['MODTRANINPUT']['GEOMETRY'][key] = val # For custom aerosols, specify final extinction and absorption # MODTRAN 6.0 convention treats negative visibility as AOT550 if hasattr(self, 'aer_absc'): total_aot = fracs.sum() recursive_replace(param, 'VIS', -total_aot) total_extc = self.aer_extc.T.dot(fracs) total_absc = self.aer_absc.T.dot(fracs) norm_fracs = fracs/(fracs.sum()) total_asym = self.aer_asym.T.dot(norm_fracs) # Normalize to 550 nm total_extc550 = scipy.interpolate.interp1d(self.aer_wl, total_extc)(0.55) lvl0 = param[0]['MODTRANINPUT']['AEROSOLS']['IREGSPC'][0] lvl0['NARSPC'] = len(self.aer_wl) lvl0['VARSPC'] = [float(v) for v in self.aer_wl] lvl0['ASYM'] = [float(v) for v in total_asym] lvl0['EXTC'] = [float(v) / total_extc550 for v in total_extc] lvl0['ABSC'] = [float(v) / total_extc550 for v in total_absc] return json.dumps({"MODTRAN": param}), param
[docs] def build_lut(self, rebuild=False): """Each LUT is associated with a source directory. We build a lookup table by: (1) defining the LUT dimensions, state vector names, and the grid of values; (2) running modtran if needed, with each MODTRAN run defining a different point in the LUT; and (3) loading the LUTs, one per key atmospheric coefficient vector, into memory as VectorInterpolator objects. """ # Regenerate MODTRAN input wavelength file if not os.path.exists(self.filtpath): self.wl2flt(self.wl, self.fwhm, self.filtpath) # Check that the H2OSTR value, if present, is not too high. # MODTRAN caps the value at 5x profile specified value or 100% RH, as # defined in PDF-page 52 of the MODTRAN user guide. if 'H2OSTR' in self.lut_names: if 'H2OOPT' in self.template[0]['MODTRANINPUT']['ATMOSPHERE'].keys() and self.template[0]['MODTRANINPUT']['ATMOSPHERE']['H2OOPT'] == '+': logging.info('H2OOPT found in MODTRAN template - ignoring H2O upper bound') else: # Only do this check if we don't have a LUT provided: need_to_rebuild = np.any([not self.required_results_exist(x) for x in self.get_lut_filenames()]) if need_to_rebuild: # Define a realistic point, based on lut grid point = np.array([x[-1] for x in self.lut_grids]) # Set the H2OSTR value as arbitrarily high - 50 g/cm2 in this case point[self.lut_names.index('H2OSTR')] = 50 filebase = os.path.join(os.path.dirname(self.files[-1]), 'H2O_bound_test') cmd = self.rebuild_cmd(point, filebase) # Run MODTRAN for up to 10 seconds - this should be plenty of time cwd = os.getcwd() if os.path.isdir(self.lut_dir) is False: os.mkdir(self.lut_dir) os.chdir(self.lut_dir) try: subprocess.call(cmd, shell=True, timeout=10) except: pass os.chdir(cwd) max_water = None with open(os.path.join(self.lut_dir,filebase + '.tp6'), errors='ignore') as tp6file: for count, line in enumerate(tp6file): if 'The water column is being set to the maximum' in line: max_water = line.split(',')[1].strip() max_water = float(max_water.split(' ')[0]) break if max_water is None: logging.error('Could not find MODTRAN H2O upper bound in file {}'.format(filebase + '.tp6')) raise KeyError('Could not find MODTRAN H2O upper bound') if np.max(self.lut_grids[self.lut_names.index('H2OSTR')]) > max_water: logging.error('MODTRAN max H2OSTR with current profile is {}, while H2O lut_grid is {}. Either adjust MODTRAN profile or lut_grid. To over-ride MODTRANs maximum allowable value, set H2OOPT to "+"'.format(max_water, self.lut_grids[self.lut_names.index('H2OSTR')])) raise KeyError('MODTRAN H2O lut grid is invalid - see logs for details.') TabularRT.build_lut(self, rebuild) mod_outputs = [] for point, fn in zip(self.points, self.files): mod_outputs.append(self.load_rt(fn)) self.wl = mod_outputs[0]['wl'] self.solar_irr = mod_outputs[0]['sol'] self.coszen = np.cos(mod_outputs[0]['solzen'] * np.pi / 180.0) dims_aug = self.lut_dims + [self.n_chan] for key in self.modtran_lut_names: temp = np.zeros(dims_aug, dtype=float) for mod_output, point in zip(mod_outputs, self.points): ind = [np.where(g == p)[0] for g, p in zip(self.lut_grids, point)] ind = tuple(ind) temp[ind] = mod_output[key] self.luts[key] = VectorInterpolator(self.lut_grids, temp, self.lut_interp_types)
[docs] def rebuild_cmd(self, point, fn): """.""" if not fn: logging.error("Function is not defined.") raise SystemExit("Function is not defined.") vals = dict([(n, v) for n, v in zip(self.lut_names, point)]) vals['DISALB'] = True vals['NAME'] = fn vals['FILTNM'] = os.path.normpath(self.filtpath) modtran_config_str, modtran_config = self.modtran_driver(dict(vals)) # Check rebuild conditions: LUT is missing or from a different config infilename = 'LUT_'+fn+'.json' infilepath = os.path.join(self.lut_dir, 'LUT_'+fn+'.json') if not self.required_results_exist(fn): rebuild = True else: # We compare the two configuration files, ignoring names and # wavelength paths which tend to be non-portable with open(infilepath, 'r') as fin: current_config = json.load(fin)['MODTRAN'] current_config[0]['MODTRANINPUT']['NAME'] = '' modtran_config[0]['MODTRANINPUT']['NAME'] = '' current_config[0]['MODTRANINPUT']['SPECTRAL']['FILTNM'] = '' modtran_config[0]['MODTRANINPUT']['SPECTRAL']['FILTNM'] = '' current_str = json.dumps(current_config) modtran_str = json.dumps(modtran_config) rebuild = (modtran_str.strip() != current_str.strip()) if not rebuild: raise FileExistsError('File exists') # write_config_file with open(infilepath, 'w') as f: f.write(modtran_config_str) # Specify location of the proper MODTRAN 6.0 binary for this OS xdir = { 'linux': 'linux', 'darwin': 'macos', 'windows': 'windows' } # If self.modtran_dir is not defined, raise an exception # This occurs e.g., when MODTRAN is not installed if not self.modtran_dir: logging.error("MODTRAN directory not defined in config file.") raise SystemExit("MODTRAN directory not defined in config file.") # Generate the CLI path cmd = os.path.join(self.modtran_dir, 'bin', xdir[platform], 'mod6c_cons ' + infilename) return cmd
[docs] def required_results_exist(self, fn): infilename = os.path.join(self.lut_dir, 'LUT_'+fn+'.json') outchnname = os.path.join(self.lut_dir, fn+'.chn') outtp6name = os.path.join(self.lut_dir, fn+'.tp6') if os.path.isfile(infilename) and os.path.isfile(outchnname) and os.path.isfile(outtp6name): return True else: return False
[docs] def load_rt(self, fn): """.""" tp6file = os.path.join(self.lut_dir, fn+'.tp6') solzen = self.load_tp6(tp6file) coszen = np.cos(solzen * np.pi / 180.0) chnfile = os.path.join(self.lut_dir, fn+'.chn') params = self.load_chn(chnfile, coszen) # Be careful with the two thermal values! They can only be used in # the modtran_tir functions as they require the modtran reflectivity # be set to 1 in order to use them in the RTM in radiative_transfer.py. # Don't add these to the VSWIR functions! names = ['wl', 'sol', 'rhoatm', 'transm', 'sphalb', 'transup'] # Don't include the thermal terms in VSWIR runs to avoid incorrect usage if self.treat_as_emissive: names = names + ['thermal_upwelling', 'thermal_downwelling'] results_dict = {name: param for name, param in zip(names, params)} results_dict['solzen'] = solzen results_dict['coszen'] = coszen return results_dict
def _lookup_lut(self, point): if np.all(np.equal(point, self.last_point_looked_up)): return self.last_point_lookup_values else: ret = {} for key, lut in self.luts.items(): ret[key] = np.array(lut(point)).ravel() self.last_point_looked_up = point self.last_point_lookup_values = ret return ret
[docs] def get(self, x_RT, geom): point = np.zeros((self.n_point,)) for point_ind, name in enumerate(self.lut_grid_config): if name in self.statevector_names: ix = self.statevector_names.index(name) point[point_ind] = x_RT[ix] elif name == "OBSZEN": point[point_ind] = geom.OBSZEN elif name == "GNDALT": point[point_ind] = geom.GNDALT elif name == "viewzen": point[point_ind] = geom.observer_zenith elif name == "viewaz": point[point_ind] = geom.observer_azimuth elif name == "solaz": point[point_ind] = geom.solar_azimuth elif name == "solzen": point[point_ind] = geom.solar_zenith elif name == "TRUEAZ": point[point_ind] = geom.TRUEAZ elif name == 'phi': point[point_ind] = geom.phi elif name == 'umu': point[point_ind] = geom.umu else: # If a variable is defined in the lookup table but not # specified elsewhere, we will default to the minimum point[point_ind] = min(self.lut_grid_config[name]) return self._lookup_lut(point)
[docs] def get_L_atm(self, x_RT, geom): if self.treat_as_emissive: return self.get_L_atm_tir(x_RT, geom) else: return self.get_L_atm_vswir(x_RT, geom)
[docs] def get_L_atm_vswir(self, x_RT, geom): r = self.get(x_RT, geom) rho = r['rhoatm'] rdn = rho/np.pi*(self.solar_irr*self.coszen) return rdn
[docs] def get_L_atm_tir(self, x_RT, geom): r = self.get(x_RT, geom) return r['thermal_upwelling']
[docs] def get_L_down_transmitted(self, x_RT, geom): if self.treat_as_emissive: return self.get_L_down_transmitted_tir(x_RT, geom) else: return self.get_L_down_transmitted_vswir(x_RT, geom)
[docs] def get_L_down_transmitted_vswir(self, x_RT, geom): r = self.get(x_RT, geom) rdn = (self.solar_irr*self.coszen) / np.pi * r['transm'] return rdn
[docs] def get_L_down_transmitted_tir(self, x_RT, geom): """thermal_downwelling already includes the transmission factor. Also assume there is no multiple scattering for TIR. """ r = self.get(x_RT, geom) return r['thermal_downwelling']
[docs] def wl2flt(self, wls, fwhms, outfile): """Helper function to generate Gaussian distributions around the center wavelengths.""" I = None sigmas = fwhms/2.355 span = 2.0 * (wls[1]-wls[0]) # nm steps = 101 with open(outfile, 'w') as fout: fout.write('Nanometer data for sensor\n') for wl, fwhm, sigma in zip(wls, fwhms, sigmas): ws = wl + np.linspace(-span, span, steps) vs = scipy.stats.norm.pdf(ws, wl, sigma) vs = vs/vs[int(steps/2)] wns = 10000.0/(ws/1000.0) fout.write('CENTER: %6.2f NM FWHM: %4.2f NM\n' % (wl, fwhm)) for w, v, wn in zip(ws, vs, wns): fout.write(' %9.4f %9.7f %9.2f\n' % (w, v, wn))