#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
import os
import numpy as np
import scipy.io
import pylab as plt
from spectral.io import envi
import logging
from collections import OrderedDict
from .common import load_spectrum, eps, resample_spectrum
from isofit.inversion.inverse_simple import invert_simple, invert_algebraic
from .geometry import Geometry
from isofit.configs import Config
from isofit.core.forward import ForwardModel
### Variables ###
# Constants related to file I/O
typemap = {
np.uint8: 1,
np.int16: 2,
np.int32: 3,
np.float32: 4,
np.float64: 5,
np.complex64: 6,
np.complex128: 9,
np.uint16: 12,
np.uint32: 13,
np.int64: 14,
np.uint64: 15
}
max_frames_size = 100
flush_rate = 10
### Classes ###
[docs]class SpectrumFile:
"""A buffered file object that contains configuration information about formatting, etc."""
def __init__(self, fname, write=False, n_rows=None, n_cols=None, n_bands=None,
interleave=None, dtype=np.float32, wavelengths=None, fwhm=None,
band_names=None, bad_bands='[]', zrange='{0.0, 1.0}', flag=-9999.0,
ztitles='{Wavelength (nm), Magnitude}', map_info='{}'):
"""."""
self.frames = OrderedDict()
self.write = write
self.fname = os.path.abspath(fname)
self.wl = wavelengths
self.band_names = band_names
self.fwhm = fwhm
self.flag = flag
self.n_rows = n_rows
self.n_cols = n_cols
self.n_bands = n_bands
if self.fname.endswith('.txt'):
# The .txt suffix implies a space-separated ASCII text file of
# one or more data columns. This is cheap to load and store, so
# we do not defer read/write operations.
logging.debug('Inferred ASCII file format for %s' % self.fname)
self.format = 'ASCII'
if not self.write:
self.data, self.wl = load_spectrum(self.fname)
self.n_rows, self.n_cols, self.map_info = 1, 1, '{}'
if self.wl is not None:
self.n_bands = len(self.wl)
else:
self.n_bands = None
self.meta = {}
elif self.fname.endswith('.mat'):
# The .mat suffix implies a matlab-style file, i.e. a dictionary
# of 2D arrays and other matlab-like objects. This is typically
# only used for specific output products associated with single
# spectrum retrievals; there is no read option.
logging.debug('Inferred MATLAB file format for %s' % self.fname)
self.format = 'MATLAB'
if not self.write:
logging.error('Unsupported MATLAB file in input block')
raise IOError('MATLAB format in input block not supported')
else:
# Otherwise we assume it is an ENVI-format file, which is
# basically just a binary data cube with a detached human-
# readable ASCII header describing dimensions, interleave, and
# metadata. We buffer this data in self.frames, reading and
# writing individual rows of the cube on-demand.
logging.debug('Inferred ENVI file format for %s' % self.fname)
self.format = 'ENVI'
if not self.write:
# If we are an input file, the header must preexist.
if not os.path.exists(self.fname+'.hdr'):
logging.error('Could not find %s' % (self.fname+'.hdr'))
raise IOError('Could not find %s' % (self.fname+'.hdr'))
# open file and copy metadata, checking interleave format
self.file = envi.open(self.fname + '.hdr', fname)
self.meta = self.file.metadata.copy()
if self.meta['interleave'] not in ['bil', 'bip']:
logging.error('Unsupported interleave format.')
raise IOError('Unsupported interleave format.')
self.n_rows = int(self.meta['lines'])
self.n_cols = int(self.meta['samples'])
self.n_bands = int(self.meta['bands'])
if 'data ignore value' in self.meta:
self.flag = float(self.meta['data ignore value'])
else:
self.flag = -9999.0
else:
# If we are an output file, we may have to build the header
# from scratch. Hopefully the caller has supplied the
# necessary metadata details.
meta = {
'lines': n_rows,
'samples': n_cols,
'bands': n_bands,
'byte order': 0,
'header offset': 0,
'map info': map_info,
'file_type': 'ENVI Standard',
'sensor type': 'unknown',
'interleave': interleave,
'data type': typemap[dtype],
'wavelength units': 'nm',
'z plot range': zrange,
'z plot titles': ztitles,
'fwhm': fwhm,
'bbl': bad_bands,
'band names': band_names,
'wavelength': self.wl
}
for k, v in meta.items():
if v is None:
logging.error('Must specify %s' % (k))
raise IOError('Must specify %s' % (k))
if os.path.isfile(fname+'.hdr') is False:
self.file = envi.create_image(fname+'.hdr', meta, ext='',
force=True)
else:
self.file = envi.open(fname+'.hdr')
self.open_map_with_retries()
[docs] def open_map_with_retries(self):
"""Try to open a memory map, handling Beowulf I/O issues."""
self.memmap = None
for attempt in range(10):
self.memmap = self.file.open_memmap(interleave='source',
writable=self.write)
if self.memmap is not None:
return
raise IOError('could not open memmap for '+self.fname)
[docs] def get_frame(self, row):
"""The frame is a 2D array, essentially a list of spectra. The
self.frames list acts as a hash table to avoid storing the
entire cube in memory. So we read them or create them on an
as-needed basis. When the buffer flushes via a call to
flush_buffers, they will be deleted."""
if row not in self.frames:
if not self.write:
d = self.memmap[row, :, :]
if self.file.metadata['interleave'] == 'bil':
d = d.T
self.frames[row] = d.copy()
else:
self.frames[row] = np.nan * np.zeros((self.n_cols, self.n_bands))
return self.frames[row]
[docs] def write_spectrum(self, row, col, x):
"""We write a spectrum. If a binary format file, we simply change
the data cached in self.frames and defer file I/O until
flush_buffers is called."""
if self.format == 'ASCII':
# Multicolumn output for ASCII products
np.savetxt(self.fname, x, fmt='%10.6f')
elif self.format == 'MATLAB':
# Dictionary output for MATLAB products
scipy.io.savemat(self.fname, x)
else:
# Omit wavelength column for spectral products
frame = self.get_frame(row)
if x.ndim == 2:
x = x[:, -1]
frame[col, :] = x
[docs] def read_spectrum(self, row, col):
"""Get a spectrum from the frame list or ASCII file. Note that if
we are an ASCII file, we have already read the single spectrum and
return it as-is (ignoring the row/column indices)."""
if self.format == 'ASCII':
return self.data
else:
frame = self.get_frame(row)
return frame[col]
[docs] def flush_buffers(self):
"""Write to file, and refresh the memory map object."""
if self.format == 'ENVI':
if self.write:
for row, frame in self.frames.items():
valid = np.logical_not(np.isnan(frame[:, 0]))
if self.file.metadata['interleave'] == 'bil':
self.memmap[row, :, valid] = frame[valid, :].T
else:
self.memmap[row, valid, :] = frame[valid, :]
self.frames = OrderedDict()
del self.file
self.file = envi.open(self.fname+'.hdr', self.fname)
self.open_map_with_retries()
[docs]class IO:
"""..."""
def __init__(self, config: Config, forward: ForwardModel, inverse, active_rows, active_cols):
"""Initialization specifies retrieval subwindows for calculating
measurement cost distributions."""
self.input = config.input
self.output = config.output
self.iv = inverse
self.fm = forward
self.bbl = '[]'
self.radiance_correction = None
self.meas_wl = forward.instrument.wl_init
self.meas_fwhm = forward.instrument.fwhm_init
self.writes = 0
self.n_rows = 1
self.n_cols = 1
self.n_sv = len(self.fm.statevec)
self.n_chan = len(self.fm.instrument.wl_init)
self.simulation_mode = config.implementation.mode == 'simulation'
# Names of either the wavelength or statevector outputs
wl_names = [('Channel %i' % i) for i in range(self.n_chan)]
sv_names = self.fm.statevec.copy()
self.defined_outputs, self.defined_inputs = {}, {}
self.infiles, self.outfiles, self.map_info = {}, {}, '{}'
# Load input files and record relevant metadata
for element, element_name in zip(*self.input.get_elements()):
self.infiles[element_name] = SpectrumFile(element)
if (self.infiles[element_name].n_rows > self.n_rows) or \
(self.infiles[element_name].n_cols > self.n_cols):
self.n_rows = self.infiles[element_name].n_rows
self.n_cols = self.infiles[element_name].n_cols
for inherit in ['map info', 'bbl']:
if inherit in self.infiles[element_name].meta:
setattr(self, inherit.replace(' ', '_'),
self.infiles[element_name].meta[inherit])
for element, element_header, element_name in zip(*self.output.get_output_files()):
band_names, ztitle, zrange = element_header
if band_names == 'statevector':
band_names = sv_names
elif band_names == 'wavelength':
band_names = wl_names
else:
band_names = '{}'
n_bands = len(band_names)
self.outfiles[element_name] = SpectrumFile(
element,
write=True,
n_rows=self.n_rows,
n_cols=self.n_cols,
n_bands=n_bands,
interleave='bip',
dtype=np.float32,
wavelengths=self.meas_wl,
fwhm=self.meas_fwhm,
band_names=band_names,
bad_bands=self.bbl,
map_info=self.map_info,
zrange=zrange,
ztitles=ztitle
)
# Do we apply a radiance correction?
if self.input.radiometry_correction_file is not None:
filename = self.input.radiometry_correction_file
self.radiance_correction, wl = load_spectrum(filename)
# Last thing is to define the active image area
if active_rows is None:
active_rows = np.arange(self.n_rows)
if active_cols is None:
active_cols = np.arange(self.n_cols)
self.iter_inds = []
for row in active_rows:
for col in active_cols:
self.iter_inds.append([row, col])
self.iter_inds = np.array(self.iter_inds)
if self.simulation_mode:
self.fm.surface.rwl = np.array([float(x) for x in self.infiles['reflectance_file'].meta['wavelength']])
[docs] def get_components_at_index(self, index: int) -> (int, int, np.array, Geometry):
"""
Get the spectrum from the file at the specified index. Helper/
parallel enabling function.
:param index: reference location for iter_inds
:return: success: boolean flag indicating if data present
:return: r: row index
:return: c: column index
:return: meas: measured radiance file
:return: geom: set up specified geometry files
"""
# Determine the appropriate row, column index. and initialize the
# data dictionary with empty entries.
r, c = self.iter_inds[index]
data = dict([(i, None) for i in self.input.get_all_element_names()])
logging.debug('Row %i Column %i' % (r, c))
# Read data from any of the input files that are defined.
for source in self.infiles:
data[source] = self.infiles[source].read_spectrum(r, c)
if (index % flush_rate) == 0:
self.infiles[source].flush_buffers()
# Check for any bad data flags
for source in self.infiles:
if np.all(abs(data[source] - self.infiles[source].flag) < eps):
return False, r, c, None, None
if self.simulation_mode:
# If solving the inverse problem, the measurment is the surface reflectance
self.fm.surface.rfl = data['reflectance_file'].copy()
if self.fm.surface.wl is not None:
self.fm.surface.resample_reflectance()
meas = self.fm.surface.rfl
else:
# If solving the inverse problem, the measurment is the radiance
# We apply the calibration correciton here for simplicity.
meas = data['measured_radiance_file']
if data["radiometry_correction_file"] is not None:
meas = meas.copy() * data['radiometry_correction_file']
# We build the geometry object for this spectrum. For files not
# specified in the input configuration block, the associated entries
# will be 'None'. The Geometry object will use reasonable defaults.
geom = Geometry(obs=data['obs_file'],
glt=data['glt_file'],
loc=data['loc_file'])
return True, r, c, meas, geom
def __iter__(self):
""" Reset the iterator"""
self.iter = 0
return self
def __next__(self):
""" Get the next spectrum from the file. Turn the iteration number
into row/column indices and read from all input products."""
# Try to read data until we hit the end or find good values
success = False
while not success:
if self.iter == len(self.iter_inds):
self.flush_buffers()
raise StopIteration
# Determine the appropriate row, column index. and initialize the
# data dictionary with empty entries.
success, r, c, meas, geom = self.get_components_at_index(
self.iter)
self.iter = self.iter + 1
return r, c, meas, geom
[docs] def check_wavelengths(self, wl):
"""Make sure an input wavelengths align to the instrument definition."""
return (len(wl) == self.fm.instrument.wl) and \
all((wl-self.fm.instrument.wl) < 1e-2)
[docs] def flush_buffers(self):
"""Write all buffered output data to disk, and erase read buffers."""
for file_dictionary in [self.infiles, self.outfiles]:
for name, fi in file_dictionary.items():
fi.flush_buffers()
[docs] def write_spectrum(self, row, col, states, meas, geom, flush_immediately=False):
"""Write data from a single inversion to all output buffers."""
self.writes = self.writes + 1
if len(states) == 0:
# Write a bad data flag
atm_bad = np.zeros(len(self.fm.statevec)) * -9999.0
state_bad = np.zeros(len(self.fm.statevec)) * -9999.0
data_bad = np.zeros(self.fm.instrument.n_chan) * -9999.0
to_write = {
'estimated_state_file': state_bad,
'estimated_reflectance_file': data_bad,
'estimated_emission_file': data_bad,
'modeled_radiance_file': data_bad,
'apparent_reflectance_file': data_bad,
'path_radiance_file': data_bad,
'simulated_measurement_file': data_bad,
'algebraic_inverse_file': data_bad,
'atmospheric_coefficients_file': atm_bad,
'radiometry_correction_file': data_bad,
'spectral_calibration_file': data_bad,
'posterior_uncertainty_file': state_bad
}
else:
# The inversion returns a list of states, which are
# intepreted either as samples from the posterior (MCMC case)
# or as a gradient descent trajectory (standard case). For
# gradient descent the last spectrum is the converged solution.
if self.iv.mode == 'mcmc':
state_est = states.mean(axis=0)
else:
state_est = states[-1, :]
# Spectral calibration
wl, fwhm = self.fm.calibration(state_est)
cal = np.column_stack(
[np.arange(0, len(wl)), wl / 1000.0, fwhm / 1000.0])
# If there is no actual measurement, we use the simulated version
# in subsequent calculations. Naturally in these cases we're
# mostly just interested in the simulation result.
if meas is None:
meas = self.fm.calc_rdn(state_est, geom)
# Rodgers diagnostics
lamb_est, meas_est, path_est, S_hat, K, G = \
self.iv.forward_uncertainty(state_est, meas, geom)
# Simulation with noise
meas_sim = self.fm.instrument.simulate_measurement(meas_est, geom)
# Algebraic inverse and atmospheric optical coefficients
x_surface, x_RT, x_instrument = self.fm.unpack(state_est)
rfl_alg_opt, Ls, coeffs = invert_algebraic(self.fm.surface,
self.fm.RT, self.fm.instrument, x_surface, x_RT, x_instrument,
meas, geom)
rhoatm, sphalb, transm, solar_irr, coszen, transup = coeffs
L_atm = self.fm.RT.get_L_atm(x_RT, geom)
L_down_transmitted = self.fm.RT.get_L_down_transmitted(x_RT, geom)
atm = np.column_stack(list(coeffs[:4]) +
[np.ones((len(wl), 1)) * coszen])
# Upward emission & glint and apparent reflectance
Ls_est = self.fm.calc_Ls(state_est, geom)
apparent_rfl_est = lamb_est + Ls_est
# Radiometric calibration
factors = np.ones(len(wl))
if 'radiometry_correction_file' in self.outfiles:
if 'reference_reflectance_file' in self.infiles:
reference_file = self.infiles['reference_reflectance_file']
self.rfl_ref = reference_file.read_spectrum(row, col)
self.wl_ref = reference_file.wl
w, fw = self.fm.instrument.calibration(x_instrument)
resamp = resample_spectrum(self.rfl_ref, self.wl_ref,
w, fw, fill=True)
meas_est = self.fm.calc_meas(state_est, geom, rfl=resamp)
factors = meas_est / meas
else:
logging.warning('No reflectance reference')
# Assemble all output products
to_write = {
'estimated_state_file': state_est,
'estimated_reflectance_file': np.column_stack((self.fm.surface.wl, lamb_est)),
'estimated_emission_file': np.column_stack((self.fm.surface.wl, Ls_est)),
'modeled_radiance_file': np.column_stack((wl, meas_est)),
'apparent_reflectance_file': np.column_stack((self.fm.surface.wl, apparent_rfl_est)),
'path_radiance_file': np.column_stack((wl, path_est)),
'simulated_measurement_file': np.column_stack((wl, meas_sim)),
'algebraic_inverse_file': np.column_stack((self.fm.surface.wl, rfl_alg_opt)),
'atmospheric_coefficients_file': atm,
'radiometry_correction_file': factors,
'spectral_calibration_file': cal,
'posterior_uncertainty_file': np.sqrt(np.diag(S_hat))
}
for product in self.outfiles:
logging.debug('IO: Writing '+product)
self.outfiles[product].write_spectrum(row, col, to_write[product])
if (self.writes % flush_rate) == 0 or flush_immediately:
self.outfiles[product].flush_buffers()
# Special case! samples file is matlab format.
if self.output.mcmc_samples_file is not None:
logging.debug('IO: Writing mcmc_samples_file')
mdict = {'samples': states}
scipy.io.savemat(self.output.mcmc_samples_file, mdict)
# Special case! Data dump file is matlab format.
if self.output.data_dump_file is not None:
logging.debug('IO: Writing data_dump_file')
x = state_est
xall = states
Seps_inv, Seps_inv_sqrt = self.iv.calc_Seps(x, meas, geom)
meas_est_window = meas_est[self.iv.winidx]
meas_window = meas[self.iv.winidx]
xa, Sa, Sa_inv, Sa_inv_sqrt = self.iv.calc_prior(x, geom)
prior_resid = (x - xa).dot(Sa_inv_sqrt)
rdn_est = self.fm.calc_rdn(x, geom)
rdn_est_all = np.array([self.fm.calc_rdn(xtemp, geom)
for xtemp in states])
x_surface, x_RT, x_instrument = self.fm.unpack(x)
Kb = self.fm.Kb(x, geom)
xinit = invert_simple(self.fm, meas, geom)
Sy = self.fm.instrument.Sy(meas, geom)
cost_jac_prior = np.diagflat(x - xa).dot(Sa_inv_sqrt)
cost_jac_meas = Seps_inv_sqrt.dot(K[self.iv.winidx, :])
meas_Cov = self.fm.Seps(x, meas, geom)
lamb_est, meas_est, path_est, S_hat, K, G = \
self.iv.forward_uncertainty(state_est, meas, geom)
A = np.matmul(K, G)
# Form the MATLAB dictionary object and write to file
mdict = {
'K': K,
'G': G,
'S_hat': S_hat,
'prior_mu': xa,
'Ls': Ls,
'prior_Cov': Sa,
'meas': meas,
'rdn_est': rdn_est,
'rdn_est_all': rdn_est_all,
'x': x,
'xall': xall,
'x_surface': x_surface,
'x_RT': x_RT,
'x_instrument': x_instrument,
'meas_Cov': meas_Cov,
'wl': wl,
'fwhm': fwhm,
'lamb_est': lamb_est,
'coszen': coszen,
'cost_jac_prior': cost_jac_prior,
'Kb': Kb,
'A': A,
'cost_jac_meas': cost_jac_meas,
'winidx': self.iv.winidx,
'windows': self.iv.windows,
'prior_resid': prior_resid,
'noise_Cov': Sy,
'xinit': xinit,
'rhoatm': rhoatm,
'sphalb': sphalb,
'transm': transm,
'transup': transup,
'solar_irr': solar_irr,
'L_atm': L_atm,
'L_down_transmitted': L_down_transmitted
}
scipy.io.savemat(self.output.data_dump_file, mdict)
# Write plots, if needed
if len(states) > 0 and self.output.plot_directory is not None:
if 'reference_reflectance_file' in self.infiles:
reference_file = self.infiles['reference_reflectance_file']
self.rfl_ref = reference_file.read_spectrum(row, col)
self.wl_ref = reference_file.wl
for i, x in enumerate(states):
# Calculate intermediate solutions
lamb_est, meas_est, path_est, S_hat, K, G = \
self.iv.forward_uncertainty(state_est, meas, geom)
plt.cla()
red = [0.7, 0.2, 0.2]
wl, fwhm = self.fm.calibration(x)
xmin, xmax = min(wl), max(wl)
fig = plt.subplots(1, 2, figsize=(10, 5))
plt.subplot(1, 2, 1)
meas_est = self.fm.calc_meas(x, geom)
for lo, hi in self.iv.windows:
idx = np.where(np.logical_and(wl > lo, wl < hi))[0]
p1 = plt.plot(wl[idx], meas[idx], color=red, linewidth=2)
p2 = plt.plot(wl, meas_est, color='k', linewidth=1)
plt.title("Radiance")
plt.title("Measurement (Scaled DN)")
ymax = max(meas)*1.25
ymax = max(meas)+0.01
ymin = min(meas)-0.01
plt.text(500, ymax*0.92, "Measured", color=red)
plt.text(500, ymax*0.86, "Model", color='k')
plt.ylabel(r"$\mu$W nm$^{-1}$ sr$^{-1}$ cm$^{-2}$")
plt.ylabel("Intensity")
plt.xlabel("Wavelength (nm)")
plt.ylim([-0.001, ymax])
plt.ylim([ymin, ymax])
plt.xlim([xmin, xmax])
plt.subplot(1, 2, 2)
lamb_est = self.fm.calc_lamb(x, geom)
ymax = min(max(lamb_est)*1.25, 0.10)
for lo, hi in self.iv.windows:
# black line
idx = np.where(np.logical_and(wl > lo, wl < hi))[0]
p2 = plt.plot(wl[idx], lamb_est[idx], 'k', linewidth=2)
ymax = max(max(lamb_est[idx]*1.2), ymax)
# red line
if 'reference_reflectance_file' in self.infiles:
idx = np.where(np.logical_and(
self.wl_ref > lo, self.wl_ref < hi))[0]
p1 = plt.plot(self.wl_ref[idx], self.rfl_ref[idx],
color=red, linewidth=2)
ymax = max(max(self.rfl_ref[idx]*1.2), ymax)
# green and blue lines - surface components
if hasattr(self.fm.surface, 'components') and \
self.output.plot_surface_components:
idx = np.where(np.logical_and(self.fm.surface.wl > lo,
self.fm.surface.wl < hi))[0]
p3 = plt.plot(self.fm.surface.wl[idx],
self.fm.xa(x, geom)[idx], 'b', linewidth=2)
for j in range(len(self.fm.surface.components)):
z = self.fm.surface.norm(
lamb_est[self.fm.surface.idx_ref])
mu = self.fm.surface.components[j][0] * z
plt.plot(self.fm.surface.wl[idx], mu[idx], 'g:',
linewidth=1)
plt.text(500, ymax*0.86, "Remote estimate", color='k')
if 'reference_reflectance_file' in self.infiles:
plt.text(500, ymax*0.92, "In situ reference", color=red)
if hasattr(self.fm.surface, 'components') and \
self.output.plot_surface_components:
plt.text(500, ymax*0.80, "Prior mean state ",
color='b')
plt.text(500, ymax*0.74, "Surface components ",
color='g')
plt.ylim([-0.0010, ymax])
plt.xlim([xmin, xmax])
plt.title("Reflectance")
plt.title("Source Model")
plt.xlabel("Wavelength (nm)")
fn = os.path.join(self.output.plot_directory, ('frame_%i.png' % i))
plt.savefig(fn)
plt.close()