#! /usr/bin/env python3
#
# Copyright 2019 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
from os.path import split, abspath
import numpy as np
import scipy.io
from scipy.interpolate import interp1d
from sklearn.cluster import KMeans
from spectral.io import envi
from scipy.stats import norm
from ..core.common import expand_path, json_load_ascii
[docs]def surface_model(config_file: str) -> None:
"""The surface model tool contains everything you need to build basic
multicomponent (i.e. colleciton of Gaussian) surface priors for the
multicomponent surface model.
Args:
config_file: path to a JSON formatted surface model configuration
Returns:
None
"""
# Load configuration JSON into a local dictionary
configdir, _ = split(abspath(config_file))
config = json_load_ascii(config_file, shell_replace=True)
# Determine top level parameters
for q in ['output_model_file', 'sources', 'normalize', 'wavelength_file']:
if q not in config:
raise ValueError("Missing parameter: %s" % q)
wavelength_file = expand_path(configdir, config['wavelength_file'])
normalize = config['normalize']
reference_windows = config['reference_windows']
outfile = expand_path(configdir, config['output_model_file'])
# load wavelengths file, and change units to nm if needed
q = np.loadtxt(wavelength_file)
if q.shape[1] > 2:
q = q[:, 1:]
if q[0, 0] < 100:
q = q * 1000.0
wl = q[:, 0]
nchan = len(wl)
# build global reference windows
refwl = []
for wi, window in enumerate(reference_windows):
active_wl = np.logical_and(wl >= window[0], wl < window[1])
refwl.extend(wl[active_wl])
normind = np.array([np.argmin(abs(wl - w)) for w in refwl])
refwl = np.array(refwl, dtype=float)
# create basic model template
model = {
'normalize': normalize,
'wl': wl,
'means': [],
'covs': [],
'attribute_means': [],
'attribute_covs': [],
'attributes': [],
'refwl': refwl
}
# each "source" (i.e. spectral library) is treated separately
for si, source_config in enumerate(config['sources']):
# Determine source parameters
for q in ['input_spectrum_files', 'windows', 'n_components', 'windows']:
if q not in source_config:
raise ValueError(
'Source %i is missing a parameter: %s' % (si, q))
# Determine whether we should synthesize our own mixtures
if 'mixtures' in source_config:
mixtures = source_config['mixtures']
elif 'mixtures' in config:
mixtures = config['mixtures']
else:
mixtures = 0
# open input files associated with this source
infiles = [expand_path(configdir, fi) for fi in
source_config['input_spectrum_files']]
# associate attributes, if they exist. These will not be used
# in the retrieval, but can be used in post-analysis
if 'input_attribute_files' in source_config:
infiles_attributes = [expand_path(configdir, fi) for fi in
source_config['input_attribute_files']]
if len(infiles_attributes) != len(infiles):
raise IndexError('spectrum / attribute file mismatch')
else:
infiles_attributes = [None for fi in
source_config['input_spectrum_files']]
ncomp = int(source_config['n_components'])
windows = source_config['windows']
# load spectra
spectra, attributes = [],[]
for infile, attribute_file in zip(infiles, infiles_attributes):
hdrfile = infile + '.hdr'
rfl = envi.open(hdrfile, infile)
nl, nb, ns = [int(rfl.metadata[n])
for n in ('lines', 'bands', 'samples')]
swl = np.array([float(f) for f in rfl.metadata['wavelength']])
# Maybe convert to nanometers
if swl[0] < 100:
swl = swl * 1000.0
# Load library and adjust interleave, if needed
rfl_mm = rfl.open_memmap(interleave='source', writable=False)
if rfl.metadata['interleave'] == 'bip':
x = np.array(rfl_mm[:, :, :])
if rfl.metadata['interleave'] == 'bil':
x = np.array(rfl_mm[:, :, :]).transpose((0, 2, 1))
x = x.reshape(nl * ns, nb)
# import spectra and resample
for x1 in x:
p = interp1d(swl, x1, kind='linear', bounds_error=False,
fill_value='extrapolate')
spectra.append(p(wl))
# Load attributes
if attribute_file is not None:
hdrfile = attribute_file + '.hdr'
attr = envi.open(hdrfile, attribute_file)
nla, nba, nsa = [int(attr.metadata[n])
for n in ('lines', 'bands', 'samples')]
# Load library and adjust interleave, if needed
attr_mm = attr.open_memmap(interleave='source', writable=False)
if attr.metadata['interleave'] == 'bip':
x = np.array(attr_mm[:, :, :])
if attr.metadata['interleave'] == 'bil':
x = np.array(attr_mm[:, :, :]).transpose((0, 2, 1))
x = x.reshape(nla * nsa, nba)
model['attributes'] = attr.metadata['band names']
# import spectra and resample
for x1 in x:
attributes.append(x1)
if len(attributes)>0 and len(attributes) != len(spectra):
raise IndexError('Mismatch in number of spectra vs. attributes')
# calculate mixtures, if needed
if len(attributes)>0 and mixtures > 0:
raise ValueError('Synthetic mixtures w/ attributes is not advised')
n = float(len(spectra))
nmix = int(n * mixtures)
for mi in range(nmix):
s1, m1 = spectra[int(np.random.rand() * n)], np.random.rand()
s2, m2 = spectra[int(np.random.rand() * n)], 1.0 - m1
spectra.append(m1 * s1 + m2 * s2)
# Lists to arrays
spectra = np.array(spectra)
attributes = np.array(attributes)
# Flag bad data
use = np.all(np.isfinite(spectra), axis=1)
spectra = spectra[use, :]
if len(attributes)>0:
attributes = attributes[use,:]
# Accumulate total list of window indices
window_idx = -np.ones((nchan), dtype=int)
for wi, win in enumerate(windows):
active_wl = np.logical_and(wl >= win['interval'][0], wl < win['interval'][1])
window_idx[active_wl] = wi
# Two step model generation. First step is k-means clustering.
# This is more "stable" than Expectation Maximization with an
# unconstrained covariance matrix
kmeans = KMeans(init='k-means++', n_clusters=ncomp, n_init=10)
kmeans.fit(spectra)
Z = kmeans.predict(spectra)
# Build a combined dataset of attributes and spectra
if len(attributes)>0:
spectra_attr = np.concatenate((spectra,attributes), axis=1)
# Now fit the full covariance for each component
for ci in range(ncomp):
m = np.mean(spectra[Z == ci, :], axis=0)
C = np.cov(spectra[Z == ci, :], rowvar=False)
if len(attributes) > 0:
m_attr = np.mean(spectra_attr[Z == ci, :], axis=0)
C_attr = np.cov(spectra_attr[Z == ci, :], rowvar=False)
for i in range(nchan):
window = windows[window_idx[i]]
# Each spectral interval, or window, is constructed
# using one of several rules. We can draw the covariance
# directly from the data...
if window['correlation'] == 'EM':
C[i, i] = C[i, i] + float(window['regularizer'])
# Alternatively, we can use a band diagonal form,
# a Gaussian process that promotes local smoothness.
elif window['correlation'] == 'GP':
width = float(window['gp_width'])
magnitude = float(window['gp_magnitude'])
kernel = norm.pdf((wl-wl[i])/width)
kernel = kernel/kernel.sum() * magnitude
C[i, :] = kernel
C[:, i] = kernel
C[i, i] = C[i, i] + float(window['regularizer'])
# To minimize bias, leave the channels independent
# and uncorrelated
elif window['correlation'] == 'decorrelated':
ci = C[i, i]
C[:, i] = 0
C[i, :] = 0
C[i, i] = ci + float(window['regularizer'])
else:
raise ValueError(
'I do not recognize the method ' + window['correlation'])
# Normalize the component spectrum if desired
if normalize == 'Euclidean':
z = np.sqrt(np.sum(pow(m[normind], 2)))
elif normalize == 'RMS':
z = np.sqrt(np.mean(pow(m[normind], 2)))
elif normalize == 'None':
z = 1.0
else:
raise ValueError(
'Unrecognized normalization: %s\n' % normalize)
m = m / z
C = C / (z ** 2)
model['means'].append(m)
model['covs'].append(C)
if len(attributes)>0:
model['attribute_means'].append(m_attr)
model['attribute_covs'].append(C_attr)
model['means'] = np.array(model['means'])
model['covs'] = np.array(model['covs'])
model['attribute_means'] = np.array(model['attribute_means'])
model['attribute_covs'] = np.array(model['attribute_covs'])
scipy.io.savemat(outfile, model)