#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
import os
import json
import xxhash
import numpy as np
import scipy.linalg
from scipy.interpolate import RegularGridInterpolator
from os.path import expandvars, split, abspath
from typing import List
from collections import OrderedDict
### Variables ###
# Maximum size of our hash tables
max_table_size = 500
# small value used in finite difference derivatives
eps = 1e-5
### Classes ###
[docs]class VectorInterpolator:
""" Linear look up table interpolator. Support linear interpolation through radial space by expanding the look
up tables with sin and cos dimensions.
Args:
grid_input: list of lists of floats, indicating the gridpoint elements in each grid dimension
data_input: n dimensional array of radiative transfer engine outputs (each dimension size corresponds to the
given grid_input list length, with the last dimensions equal to the number of sensor channels)
lut_interp_types: a list indicating if each dimension is in radiance (r), degrees (r), or normal (n) units.
"""
def __init__(self, grid_input: List[List[float]], data_input: np.array, lut_interp_types: List[str]):
self.lut_interp_types = lut_interp_types
# Lists and arrays are mutable, so copy first
grid = grid_input.copy()
data = data_input.copy()
# expand grid dimensionality as needed
[radian_locations] = np.where(self.lut_interp_types == 'd')
[degree_locations] = np.where(self.lut_interp_types == 'r')
angle_locations = np.hstack([radian_locations, degree_locations])
angle_types = np.hstack(
[self.lut_interp_types[radian_locations],
self.lut_interp_types[degree_locations]])
for _angle_loc in range(len(angle_locations)):
angle_loc = angle_locations[_angle_loc]
# get original grid at given location
original_grid_subset = np.array(grid[angle_loc])
# convert for angular coordinates
if (angle_types[_angle_loc] == 'r'):
grid_subset_cosin = np.cos(original_grid_subset)
grid_subset_sin = np.sin(original_grid_subset)
elif (angle_types[_angle_loc] == 'd'):
grid_subset_cosin = np.cos(original_grid_subset / 180. * np.pi)
grid_subset_sin = np.sin(original_grid_subset / 180. * np.pi)
# handle the fact that the grid may no longer be in order
grid_subset_cosin_order = np.argsort(grid_subset_cosin)
grid_subset_sin_order = np.argsort(grid_subset_sin)
# convert current grid location, and add a second
grid[angle_loc] = grid_subset_cosin[grid_subset_cosin_order]
grid.insert(angle_loc+1, grid_subset_sin[grid_subset_sin_order])
# now copy the data to be interpolated through the extra dimension,
# at the specific angle_loc axes. We'll use broadcast_to to do
# this, but we need to do it on the last dimension. So start by
# temporarily moving the target axes there, then broadcasting
data = np.swapaxes(data, -1, angle_loc)
data_dim = list(np.shape(data))
data_dim.append(data_dim[-1])
data = data[..., np.newaxis] * np.ones(data_dim)
# Now we need to actually copy the data between the first two axes,
# as broadcast_to doesn't do this
for ind in range(data.shape[-1]):
data[..., ind] = data[..., :, ind]
# Now re-order the cosin dimension
data = data[..., grid_subset_cosin_order, :]
# Now re-order the sin dimension
data = data[..., grid_subset_sin_order]
# now re-arrange the axes so they're in the right order again,
dst_axes = np.arange(len(data.shape)-2).tolist()
dst_axes.insert(angle_loc, len(data.shape)-2)
dst_axes.insert(angle_loc+1, len(data.shape)-1)
dst_axes.remove(angle_loc)
dst_axes.append(angle_loc)
data = np.ascontiguousarray(np.transpose(data, axes=dst_axes))
# update the rest of the angle locations
angle_locations += 1
self.n = data.shape[-1]
grid_aug = grid + [np.arange(data.shape[-1])]
self.itp = RegularGridInterpolator(grid_aug, data,
bounds_error=False, fill_value=None)
def __call__(self, points):
x = np.zeros((self.n, len(points) + 1 +
np.sum(self.lut_interp_types != 'n')))
offset_count = 0
for i in range(len(points)):
if self.lut_interp_types[i] == 'n':
x[:, i + offset_count] = points[i]
elif self.lut_interp_types[i] == 'r':
x[:, i + offset_count] = np.cos(points[i])
x[:, i + 1 + offset_count] = np.sin(points[i])
offset_count += 1
elif self.lut_interp_types[i] == 'd':
x[:, i + offset_count] = np.cos(points[i] / 180. * np.pi)
x[:, i + 1 + offset_count] = np.sin(points[i] / 180. * np.pi)
offset_count += 1
# This last dimension is always an integer so no
# interpolation is performed. This is done only
# for performance reasons.
x[:, -1] = np.arange(self.n)
res = self.itp(x)
return res
[docs]def load_wavelen(wavelength_file: str):
"""Load a wavelength file, and convert to nanometers if needed.
Args:
wavelength_file: file to read wavelengths from
Returns:
(np.array, np.array): wavelengths, full-width-half-max
"""
q = np.loadtxt(wavelength_file)
if q.shape[1] > 2:
q = q[:, 1:3]
if q[0, 0] < 100:
q = q * 1000.0
wl, fwhm = q.T
return wl, fwhm
[docs]def emissive_radiance(emissivity: np.array, T: np.array, wl: np.array) -> (np.array, np.array):
"""Calcluate the radiance of a surface due to emission.
Args:
emissivity: surface emissivity.
T: surface temperature [K]
wl: emmissivity wavelengths [nm]
Returns:
np.array: surface upwelling radiance in uW $cm^{-2} sr^{-1} nm^{-nm}$
np.array: partial derivative of radiance with respect to temperature uW $cm^{-2} sr^{-1} nm^{-1} k^{-1}$
"""
c_1 = 1.88365e32/np.pi
c_2 = 14387690
J_per_eV = 1.60218e-19
wl_um = wl / 1000.0
ph_per_sec_cm2_sr_nm = c_1/(wl**4)/(np.exp(c_2/wl/T)-1.0) * emissivity
# photon energy in eV
eV_per_sec_cm2_sr_nm = 1.2398 * ph_per_sec_cm2_sr_nm/wl_um
W_per_cm2_sr_nm = J_per_eV * eV_per_sec_cm2_sr_nm
uW_per_cm2_sr_nm = W_per_cm2_sr_nm*1e6
dRdn_dT = c_1/(wl**4)*(-pow(np.exp(c_2/wl/T)-1.0, -2.0)) *\
np.exp(c_2/wl/T)*(-pow(T, -2)*c_2/wl) *\
emissivity/wl_um*1.2398*J_per_eV*1e6
return uW_per_cm2_sr_nm, dRdn_dT
[docs]def svd_inv(C: np.array, hashtable: OrderedDict = None):
"""Matrix inversion, based on decomposition. Built to be stable, and positive.
Args:
C: matrix to invert
hashtable: if used, the hashtable to store/retrieve results in/from
Return:
np.array: inverse of C
"""
return svd_inv_sqrt(C, hashtable)[0]
[docs]def svd_inv_sqrt(C: np.array, hashtable: OrderedDict = None) -> (np.array, np.array):
"""Matrix inversion, based on decomposition. Built to be stable, and positive.
Args:
C: matrix to invert
hashtable: if used, the hashtable to store/retrieve results in/from
Return:
(np.array, np.array): inverse of C and square root of the inverse of C
"""
# If we have a hash table, look for the precalculated solution
h = None
if hashtable is not None:
# If arrays are in Fortran ordering, they are not hashable.
if not C.flags['C_CONTIGUOUS']:
C = C.copy(order='C')
h = xxhash.xxh64_digest(C)
if h in hashtable:
return hashtable[h]
D, P = scipy.linalg.eigh(C)
for count in range(3):
if np.any(D < 0) or np.any(np.isnan(D)):
inv_eps = 1e-6 * (count-1)*10
D, P = scipy.linalg.eigh(
C + np.diag(np.ones(C.shape[0]) * inv_eps))
else:
break
if count == 2:
raise ValueError('Matrix inversion contains negative values,' +
'even after adding {} to the diagonal.'.format(inv_eps))
Ds = np.diag(1/np.sqrt(D))
L = P@Ds
Cinv_sqrt = L@P.T
Cinv = L@L.T
# If there is a hash table, cache our solution. Bound the total cache
# size by removing any extra items in FIFO order.
if hashtable is not None:
hashtable[h] = (Cinv, Cinv_sqrt)
while len(hashtable) > max_table_size:
hashtable.popitem(last=False)
return Cinv, Cinv_sqrt
[docs]def expand_path(directory: str, subpath: str) -> str:
"""Expand a path variable to an absolute path, if it is not one already.
Args:
directory: absolute location
subpath: path to expand
Returns:
str: expanded path
"""
if subpath.startswith('/'):
return subpath
return os.path.join(directory, subpath)
[docs]def recursive_replace(obj, key, val) -> None:
"""Find and replace a vector in a nested (mutable) structure.
Args:
obj: object to replace within
key: key to replace
val: value to replace with
"""
if isinstance(obj, dict):
if key in obj:
obj[key] = val
for item in obj.values():
recursive_replace(item, key, val)
elif any(isinstance(obj, t) for t in (list, tuple)):
for item in obj:
recursive_replace(item, key, val)
[docs]def get_absorption(wl: np.array, absfile: str) -> (np.array, np.array):
"""Calculate water and ice absorption coefficients using indices of
refraction, and interpolate them to new wavelengths (user specifies nm).
Args:
wl: wavelengths to interpolate to
absfile: file containing indices of refraction
Returns:
np.array: interpolated, wavelength-specific water absorption coefficients
np.array: interpolated, wavelength-specific ice absorption coefficients
"""
# read the indices of refraction
q = np.loadtxt(absfile, delimiter=',')
wl_orig_nm = q[:, 0]
wl_orig_cm = wl_orig_nm/1e9*1e2
water_imag = q[:, 2]
ice_imag = q[:, 4]
# calculate absorption coefficients in cm^-1
water_abscf = water_imag*np.pi*4.0/wl_orig_cm
ice_abscf = ice_imag*np.pi*4.0/wl_orig_cm
# interpolate to new wavelengths (user provides nm)
water_abscf_intrp = np.interp(wl, wl_orig_nm, water_abscf)
ice_abscf_intrp = np.interp(wl, wl_orig_nm, ice_abscf)
return water_abscf_intrp, ice_abscf_intrp
[docs]def recursive_reencode(j, shell_replace: bool = True):
"""Recursively re-encode a mutable object (ascii->str).
Args:
j: object to reencode
shell_replace: boolean helper for recursive calls
Returns:
Object: expanded, reencoded object
"""
if isinstance(j, dict):
for key, value in j.items():
j[key] = recursive_reencode(value)
return j
elif isinstance(j, list):
for i, k in enumerate(j):
j[i] = recursive_reencode(k)
return j
elif isinstance(j, tuple):
return tuple([recursive_reencode(k) for k in j])
else:
if shell_replace and isinstance(j, str):
try:
j = expandvars(j)
except IndexError:
pass
return j
[docs]def json_load_ascii(filename: str, shell_replace: bool = True) -> dict:
"""Load a hierarchical structure, convert all unicode to ASCII and
expand environment variables.
Args:
filename: json file to load from
shell_replace: boolean
Returns:
dict: encoded dictionary
"""
with open(filename, 'r') as fin:
j = json.load(fin)
return recursive_reencode(j, shell_replace)
[docs]def expand_all_paths(to_expand: dict, absdir: str):
"""Expand any dictionary entry containing the string 'file' into
an absolute path, if needed.
Args:
to_expand: dictionary to expand
absdir: path to expand with (absolute directory)
Returns:
dict: dictionary with expanded paths
"""
def recursive_expand(j):
if isinstance(j, dict):
for key, value in j.items():
if isinstance(key, str) and \
('file' in key or 'directory' in key or 'path' in key) and \
isinstance(value, str):
j[key] = expand_path(absdir, value)
else:
j[key] = recursive_expand(value)
return j
elif isinstance(j, list):
for i, k in enumerate(j):
j[i] = recursive_expand(k)
return j
elif isinstance(j, tuple):
return tuple([recursive_reencode(k) for k in j])
return j
return recursive_expand(to_expand)
[docs]def resample_spectrum(x: np.array, wl: np.array, wl2: np.array, fwhm2: np.array, fill: bool = False) -> np.array:
"""Resample a spectrum to a new wavelength / FWHM.
Assumes Gaussian SRFs.
Args:
x: radiance vector
wl: sample starting wavelengths
wl2: wavelengths to resample to
fwhm2: full-width-half-max at resample resolution
fill: boolean indicating whether to fill in extrapolated regions
Returns:
np.array: interpolated radiance vector
"""
H = np.array([spectral_response_function(wl, wi, fwhmi / 2.355)
for wi, fwhmi in zip(wl2, fwhm2)])
if fill is False:
return np.dot(H, x[:, np.newaxis]).ravel()
else:
xnew = np.dot(H, x[:, np.newaxis]).ravel()
good = np.isfinite(xnew)
for i, xi in enumerate(xnew):
if not good[i]:
nearest_good_ind = np.argmin(abs(wl2[good]-wl2[i]))
xnew[i] = xnew[nearest_good_ind]
return xnew
[docs]def load_spectrum(spectrum_file: str) -> (np.array, np.array):
"""Load a single spectrum from a text file with initial columns giving
wavelength and magnitude, respectively.
Args:
spectrum_file: file to load spectrum from
Returns:
np.array: spectrum values
np.array: wavelengths, if available in the file
"""
spectrum = np.loadtxt(spectrum_file)
if spectrum.ndim > 1:
spectrum = spectrum[:, :2]
wavelengths, spectrum = spectrum.T
if wavelengths[0] < 100:
wavelengths = wavelengths * 1000.0 # convert microns -> nm if needed
return spectrum, wavelengths
else:
return spectrum, None
[docs]def spectral_response_function(response_range: np.array, mu: float, sigma: float):
"""Calculate the spectral response function.
Args:
response_range: signal range to calculate over
mu: mean signal value
sigma: signal variation
Returns:
np.array: spectral response function
"""
u = (response_range-mu)/abs(sigma)
y = (1.0/(np.sqrt(2.0*np.pi)*abs(sigma)))*np.exp(-u*u/2.0)
srf = y/y.sum()
return srf
[docs]def combos(inds: List[List[float]]) -> np.array:
"""Return all combinations of indices in a list of index sublists.
For example, the call::
combos([[1, 2], [3, 4, 5]])
...[[1, 3], [2, 3], [1, 4], [2, 4], [1, 5], [2, 5]]
This is used for interpolation in the high-dimensional LUT.
Args:
inds: list of lists of values to expand
Returns:
np.array: meshgrid array of combinations
"""
n = len(inds)
cases = np.prod([len(i) for i in inds])
gridded_combinations = np.array(np.meshgrid(*inds)).reshape((n, cases)).T
return gridded_combinations
[docs]def conditional_gaussian(mu: np.array, C: np.array, window: np.array, remain: np.array, x: np.array) -> \
(np.array, np.array):
"""Define the conditional Gaussian distribution for convenience.
len(window)+len(remain)=len(x)
Args:
mu: mean values
C: matrix for conditioning
window: contains all indices not in remain
remain: contains indices of the observed part x1
x: values to condition with
Returns:
(np.array, np.array): conditional mean, conditional covariance
"""
w = np.array(window)[:,np.newaxis]
r = np.array(remain)[:,np.newaxis]
C11 = C[r, r.T]
C12 = C[r, w.T]
C21 = C[w, r.T]
C22 = C[w, w.T]
Cinv = svd_inv(C11)
conditional_mean = mu[window] + C21 @ Cinv @ (x-mu[remain])
conditional_cov = C22 - C21 @ Cinv @ C12
return conditional_mean, conditional_cov