# -*- coding: utf-8 -*-
__author__ = "ajshajib"
import numpy as np
from copy import deepcopy
import matplotlib.pyplot as plt
from scipy.optimize import least_squares
from scipy.special import huber
from scipy.linalg import lstsq
from astropy.stats import sigma_clip
from scipy import stats
from statsmodels.stats.multitest import fdrcorrection
from tqdm.notebook import tqdm
from .util import Util
from matplotlib.gridspec import GridSpec
[docs]
class WiggleCleaner(object):
"""WiggleCleaner class for cleaning modulation/wiggles in spectral data."""
RED = "#e41a1c"
BLUE = "#377eb8"
GREEN = "#4daf4a"
PURPLE = "#984ea3"
ORANGE = "#ff7f00"
GREY = "#999999"
def __init__(
self,
wavelengths,
datacube,
noise_cube,
gaps=None,
symmetric_sharpening=False,
asymmetric_sharpening=False,
continuum_diff_polynomial_order=2,
):
"""Initialize the WiggleCleaner object.
:param wavelengths: Wavelengths
:type wavelengths: list or np.ndarray
:param datacube: 3D data cube containing the spectral data
:type datacube: np.ndarray
:param noise_cube: 3D data cube containing the noise associated with the
spectral data
:type noise_cube: np.ndarray
:param gaps: Gaps
:type gaps: list
:param symmetric_sharpening: If True, apply symmetric sharpening
:type symmetric_sharpening: bool
:param asymmetric_sharpening: If True, apply asymmetric sharpening
:type asymmetric_sharpening: bool
:param continuum_diff_polynomial_order: Continuum polynomial order for the
difference between the single spaxel and aperture spectra
:type continuum_diff_polynomial_order: int
:return: None
:rtype: None
"""
self._wavelengths = np.array(wavelengths)
self._datacube = datacube
self._noise_cube = noise_cube
self._gaps = np.array(gaps)
self._n_amplitude = -1
self._n_frequency = -1
self._continuum_diff_polynomial_order = continuum_diff_polynomial_order
self._symmetric_sharpening = symmetric_sharpening
self._asymmetric_sharpening = asymmetric_sharpening
self._amplitude_spline = None
self._frequency_spline = None
self._use_huber_loss = False
self._huber_delta = 1.35
self._outlier_rejection_method = None
# Initialize mask for handling gaps and outliers
if gaps is None:
self._gaps = []
self._gap_mask = np.ones_like(self._wavelengths)
else:
self.set_gaps(gaps)
self._outlier_mask = np.ones_like(self._wavelengths)
[docs]
def set_gaps(self, gaps):
"""Set the gaps to be ignored during the fitting process.
:param gaps: List of wavelength ranges to be ignored
:type gaps: list of tuples
:return: None
:rtype: None
"""
self._gaps = gaps
gap_mask = np.ones_like(self._wavelengths)
for g in self._gaps:
mask = (self._wavelengths > g[0]) & (self._wavelengths < g[1])
gap_mask[mask] = 0
self._gap_mask = np.array(gap_mask)
@property
def symmetric_sharpening(self):
"""Get the symmetric sharpening flag.
:return: Symmetric sharpening flag
:rtype: bool
"""
return self._symmetric_sharpening
@symmetric_sharpening.setter
def symmetric_sharpening(self, value):
"""Set the symmetric sharpening flag.
:param value: Symmetric sharpening flag
:type value: bool
:return: None
:rtype: None
"""
self._symmetric_sharpening = value
@property
def asymmetric_sharpening(self):
"""Get the asymmetric sharpening flag.
:return: Asymmetric sharpening flag
:rtype: bool
"""
return self._asymmetric_sharpening
@asymmetric_sharpening.setter
def asymmetric_sharpening(self, value):
"""Set the asymmetric sharpening flag.
:param value: Asymmetric sharpening flag
:type value: bool
:return: None
:rtype: None
"""
self._asymmetric_sharpening = value
@property
def scaled_w(self):
"""Scaled wavelengths.
:return: Scaled wavelengths
:rtype: np.ndarray
"""
return self.scale_wavelengths_negative1_to_1(self._wavelengths)
[docs]
def wiggle_func(self, xs, amplitude_params, frequency_params, phi, a_1=0, a_2=0):
"""Get the wiggle function.
:param xs: Scaled wavelengths
:type xs: np.ndarray
:param frequency_params: Frequency parameters
:type frequency_params: np.ndarray
:param amplitude_params: Amplitude parameters
:type amplitude_params: np.ndarray
:param phi: Phase
:type phi: float
:return: Wiggle function
:rtype: np.ndarray
"""
amplitude_spline = deepcopy(self._amplitude_spline)
frequency_spline = deepcopy(self._frequency_spline)
amplitude_spline.c = amplitude_params
frequency_spline.c = frequency_params
amplitude = amplitude_spline(xs)
frequency = frequency_spline(xs)
wave_function = (
np.sin(frequency * xs + phi)
+ a_1 * (np.sin(frequency * xs + phi) ** 2) # asymmetric sharpness
+ a_2 * np.sin(3 * (frequency * xs + phi)) # sharpness
)
return 1.0 + amplitude * wave_function
[docs]
def scale_wavelengths_negative1_to_1(self, w):
"""Scale the wavelengths to -1 to 1.
:param w: Wavelengths
:type w: np.ndarray
:return: Scaled wavelengths
:rtype: np.ndarray
"""
return (w - self._wavelengths[0]) / (
self._wavelengths[-1] - self._wavelengths[0]
) * 2 - 1
[docs]
def scale_wavelengths_to_0_1(self, w):
"""Scale the wavelengths to 0 to 1.
:param w: Wavelengths
:type w: np.ndarray
:return: Scaled wavelengths
:rtype: np.ndarray
"""
return (w - self._wavelengths[0]) / (
self._wavelengths[-1] - self._wavelengths[0]
)
[docs]
def wiggle_model(self, params):
"""Get the wiggle model given the parameters.
:param params: Parameters
:type params: np.ndarray
:return: Model
:rtype: np.ndarray
"""
n_amplitude, n_frequency = self.configure_polynomial_ns()
amplitude_params, frequency_params, phi_0 = self.split_params(
params, n_amplitude, n_frequency
)
a_1, a_2 = self.get_a1_a2(params)
wiggle_model = self.wiggle_func(
self.scaled_w,
amplitude_params,
frequency_params,
phi_0,
a_1=a_1,
a_2=a_2,
)
return wiggle_model
[docs]
def get_a1_a2(self, params):
"""Get the a_1 and a_2 parameters for asymmetric and symmetric sharpening from
the params array.
:param params: Parameters
:type params: np.ndarray
:return: a_1 and a_2 parameters
:rtype: Tuple[float, float]
"""
n_amplitude, n_frequency = self.configure_polynomial_ns()
if self._asymmetric_sharpening and not self._symmetric_sharpening:
a_1 = params[n_amplitude + n_frequency + 5]
a_2 = 0
elif self._symmetric_sharpening and not self._asymmetric_sharpening:
a_1 = 0
a_2 = params[n_amplitude + n_frequency + 5]
elif self._symmetric_sharpening and self._asymmetric_sharpening:
a_1 = params[n_amplitude + n_frequency + 5]
a_2 = params[n_amplitude + n_frequency + 6]
else:
a_1 = 0
a_2 = 0
return a_1, a_2
[docs]
def get_exponent_param(self, params):
"""Get the exponent parameter for the wavelength term in the power-law component
of the continuum.
:param params: Parameters
:type params: np.ndarray
:return: Exponent parameter
:rtype: float
"""
n_amplitude, n_frequency = self.configure_polynomial_ns()
add_more = 0
if self._asymmetric_sharpening and not self._symmetric_sharpening:
add_more = 1
elif self._symmetric_sharpening and not self._asymmetric_sharpening:
add_more = 1
elif self._symmetric_sharpening and self._asymmetric_sharpening:
add_more = 2
return params[n_amplitude + n_frequency + add_more + 5]
[docs]
def split_params(self, params, n_amplitude=None, n_frequency=None):
"""Split the parameters. Opposite of the set_params function.
:param params: Parameters
:type params: np.ndarray
:param n_amplitude: Number of amplitude parameters
:type n_amplitude: int
:param n_frequency: Number of frequency parameters
:type n_frequency: int
:return: amplitude parameters, frequency parameters, and phi_0
:rtype: Tuple
"""
n_amplitude, n_frequency = self.configure_polynomial_ns(
n_amplitude, n_frequency
)
amplitude_params = params[: n_amplitude + 2]
frequency_params = params[n_amplitude + 2 : n_amplitude + n_frequency + 4]
phi_0 = params[n_amplitude + n_frequency + 4]
return amplitude_params, frequency_params, phi_0
[docs]
def set_params(
self,
amplitude_params,
frequency_params,
phi_0,
n_amplitude=None,
n_frequency=None,
):
"""Set the parameters. Opposite function of the split_params function.
:param amplitude_params: Amplitude parameters
:type amplitude_params: np.ndarray
:param frequency_params: Frequency parameters
:type frequency_params: np.ndarray
:param phi_0: Phase
:type phi_0: float
:param n_amplitude: Number of amplitude parameters
:type n_amplitude: int
:param n_frequency: Number of frequency parameters
:type n_frequency: int
:return: Parameters
:rtype: np.ndarray
"""
if n_amplitude is None:
n_amplitude = self._n_amplitude
if n_frequency is None:
n_frequency = self._n_frequency
params = np.concatenate([amplitude_params, frequency_params, np.array([phi_0])])
return params
[docs]
def model_full_fit(
self,
params,
x,
y,
aperture_radius,
annulus_outer_radius=0,
annulus_inner_radius=0,
):
"""Compute the full model fit.
:param params: Parameters
:type params: np.ndarray
:param x: x coordinates
:type x: np.ndarray
:param y: y coordinates
:type y: np.ndarray
:param aperture_radius: Aperture radius
:type aperture_radius: float
:param annulus_outer_radius: Outer radius of the annulus
:type annulus_outer_radius: float
:param annulus_inner_radius: Inner radius of the annulus
:type annulus_inner_radius: float
:return: Full model, spectra, and total noise
:rtype: Tuple[np.ndarray, np.ndarray, np.ndarray]
"""
(
spectra,
noise,
aperture_spectra,
aperture_noise,
annulus_spectra,
annulus_noise,
) = self.get_spectra_set(
x, y, aperture_radius, annulus_outer_radius, annulus_inner_radius
)
wavelengths = self.scale_wavelengths_negative1_to_1(self._wavelengths)
# fit c_1 * aperture_spectra + c_3 * wavelengths**a + \sum_i c_i * wavelengths**i + c_N * annulus_spectra
# given non-linear parameter a, treat all c_i parameters as linear parameters and derive them using linear inversion
wiggle_model = self.wiggle_model(params)
A = self.get_design_matrix(
aperture_spectra,
annulus_spectra,
annulus_outer_radius,
self.get_exponent_param(params),
wavelengths,
)
A *= wiggle_model[:, np.newaxis]
# Solve the linear least squares problem with Tikhonov regularization (ridge regression)
coefficients = self.solve_ridge_regression(A, spectra)
full_model = A @ coefficients
fractional_variance = (noise / spectra) ** 2 + (
aperture_noise / aperture_spectra
) ** 2
if annulus_outer_radius > 0:
fractional_variance += (annulus_noise / annulus_spectra) ** 2
model_uncertainty = np.sqrt(fractional_variance * full_model**2)
return full_model, model_uncertainty, spectra, noise
[docs]
def get_design_matrix(
self,
aperture_spectra,
annulus_spectra,
annulus_outer_radius,
a,
wavelengths,
):
"""Get the design matrix for the full model fit.
:param a: Non-linear parameter, the exponent for the wavelength term in power-
law component of the continuum
:type a: float
:param annulus_outer_radius: Outer radius of the annulus
:type annulus_outer_radius: float
:param aperture_spectra: Aperture spectra
:type aperture_spectra: np.ndarray
:param annulus_spectra: Annulus spectra
:type annulus_spectra: np.ndarray
:param wavelengths: Wavelengths
:type wavelengths: np.ndarray
:return: Design matrix
:rtype: np.ndarray
"""
A = np.column_stack(
[
aperture_spectra,
np.ones_like(wavelengths),
]
)
for i in range(1, self._continuum_diff_polynomial_order + 1):
A = np.column_stack(
[
A,
wavelengths**i,
]
)
A = np.column_stack(
[
A,
self.scale_wavelengths_to_0_1(self._wavelengths) ** np.abs(a),
]
)
if annulus_outer_radius > 0:
# Add the annulus spectra to the design matrix
A = np.column_stack([A, annulus_spectra])
return A
[docs]
def residual_vector_full_fit(
self,
params,
x,
y,
aperture_radius,
annulus_outer_radius=0,
annulus_inner_radius=0,
):
"""Compute the residual vector for the full model fit.
:param params: Parameters
:type params: np.ndarray
:param x: x coordinates
:type x: np.ndarray
:param y: t coordinates
:type y: np.ndarray
:param aperture_radius: Aperture radius
:type aperture_radius: float
:param annulus_outer_radius: Outer radius of the annulus
:type annulus_outer_radius: float
:param annulus_inner_radius: Inner radius of the annulus
:type annulus_inner_radius: float
:return: Residual vector
:rtype: np.ndarray
"""
full_model, total_uncertainty, spectra, _ = self.model_full_fit(
params,
x,
y,
aperture_radius,
annulus_outer_radius,
annulus_inner_radius,
)
residual = (full_model - spectra) / total_uncertainty
residual = residual * self._gap_mask * self._outlier_mask
if self._use_huber_loss:
huber_loss = huber(self._huber_delta, residual)
residual = np.sqrt(np.abs(2 * huber_loss)) * np.sign(residual)
return residual
[docs]
def residual_vector(self, params, wiggle_signal, wiggle_noise):
"""Get the residual vector.
:param params: Parameters
:type params: np.ndarray
:param wiggle_signal: wiggle_signal
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle_noise
:type wiggle_noise: np.ndarray
:return: Residual vector
:rtype: np.ndarray
"""
model = self.wiggle_model(params)
total_noise = wiggle_noise
residual = (model - wiggle_signal) / total_noise
residual = residual * self._gap_mask * self._outlier_mask
if self._use_huber_loss:
huber_loss = huber(self._huber_delta, residual)
residual = np.sqrt(np.abs(2 * huber_loss)) * np.sign(residual)
return residual
[docs]
def cost_function(self, params, wiggle_signal, wiggle_noise):
"""Compute the cost function (sum of squared residuals).
:param params: Parameters
:type params: np.ndarray
:param wiggle_signal: wiggle data
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle_noise
:type wiggle_noise: np.ndarray
:return: cost
:rtype: float
"""
residual = self.residual_vector(params, wiggle_signal, wiggle_noise)
cost = np.sum(residual**2)
return cost
[docs]
def get_residual_func(self, wiggle_signal, wiggle_noise):
"""Get the residual function.
:param wiggle_signal: wiggle data
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle noise
:type wiggle_noise: np.ndarray
:return: Residual function
:rtype: Callable
"""
def residual_func(params):
return self.residual_vector(params, wiggle_signal, wiggle_noise)
return residual_func
[docs]
def get_residual_func_phase_only(self, init_params, wiggle_signal, wiggle_noise):
"""Get the residual function with phase only.
:param params: Parameters
:type params: np.ndarray
:param init_params: Initial parameters
:type init_params: np.ndarray
:param wiggle_signal: wiggle_signal
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle noise
:type wiggle_noise: np.ndarray
:return: Residual function
:rtype: Callable
"""
amplitude_params, _, _ = self.split_params(init_params)
def residual_func(params):
new_params = self.set_params(
amplitude_params,
params[:-1],
params[-1],
)
return self.residual_vector(new_params, wiggle_signal, wiggle_noise)
return residual_func
[docs]
def fit_wiggle(
self,
x,
y,
aperture_radius=4,
annulus_outer_radius=5,
annulus_inner_radius=3,
n_amplitude=None,
n_frequency=None,
specified_noise_level=0,
init_peak_detection_proximity_threshold=200,
do_interim_fit_phase_only=False,
extract_covariance=True,
outlier_rejection_method=None,
fdr_alpha=0.01,
fdr_outlier_max_fraction=0.1,
use_huber_loss=False,
huber_delta=1.35,
sigma_clip_sigma=5,
sigma_clip_max_iterations=5,
symmetric_sharpening=False,
asymmetric_sharpening=False,
plot=True,
verbose=True,
fit_full_model=True,
save_figure_dir=None,
):
"""Fit the wiggle data.
:param wiggle_signal: Wiggle data
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle noise
:type wiggle_noise: np.ndarray
:param n_amplitude: Number of amplitude parameters
:type n_amplitude: int
:param n_frequency: Number of frequency parameters
:type n_frequency: int
:param specified_noise_level: User-defined noise level to be used instead of the
actual noise. Set to 0 to disable.
:type specified_noise_level: float
:param init_peak_detection_proximity_threshold: Proximity lower limit in
Angstrom for initial identifaction of peaks and troughs
:type init_peak_detection_proximity_threshold: float
:param plot: If True, plot the results
:type plot: bool
:param verbose: If True, print the results
:type verbose: bool
:param do_interim_fit_phase_only: If True, do an interim fit with phase only
:type do_interim_fit_phase_only: bool
:param outlier_rejection_method: Outlier rejection method, "fdr" or
"sigma_clip", set None to disable
:type outlier_rejection_method: str
:param fdr_alpha: False discovery rate (FDR) correction threshold, smaller value
will reject less outliers
:type fdr_alpha: float
:param fdr_outlier_max_fraction: Maximum fraction of outliers to reject using
FDR
:type fdr_outlier_max_fraction: float
:param use_huber_loss: If True, use Huber loss function for outlier rejection with FDR
:type use_huber_loss: bool
:param huber_delta: Delta for Huber loss function
:type huber_delta: float
:param sigma_clip_sigma: Sigma threshold for sigma clipping
:type sigma_clip_sigma: float
:param sigma_clip_max_iterations: Number of sigma clip iterations
:type sigma_clip_max_iterations: int
:param extract_uncertainty: If True, extract the uncertainties
:type extract_uncertainty: bool
:return: Fitted parameters
:rtype: np.ndarray
"""
assert (
fdr_outlier_max_fraction < 1
), "fdr_outlier_max_fraction must be less than 1"
wiggle_signal, wiggle_noise = self.get_wiggle_signal(
x, y, aperture_radius, annulus_outer_radius, annulus_inner_radius
)
self._outlier_rejection_method = outlier_rejection_method
self._use_huber_loss = use_huber_loss
self._huber_delta = huber_delta
self._symmetric_sharpening = symmetric_sharpening
self._asymmetric_sharpening = asymmetric_sharpening
self._outlier_mask = np.ones_like(self._wavelengths)
n_amplitude, n_frequency = self.configure_polynomial_ns(
n_amplitude, n_frequency
)
wiggle_noise = self.configure_noise(
wiggle_signal, wiggle_noise, specified_noise_level
)
amplitude_spline, frequency_spline, init_phi_0 = Util.get_init_params_spline(
wiggle_signal,
self.scaled_w,
n_amplitude=n_amplitude,
n_frequency=n_frequency,
init_peak_detection_proximity_threshold=init_peak_detection_proximity_threshold
/ np.mean(np.diff(self._wavelengths)),
plot=False,
)
self._amplitude_spline = deepcopy(amplitude_spline)
init_amplitude_params = deepcopy(amplitude_spline.c)
self._frequency_spline = deepcopy(frequency_spline)
init_frequency_params = deepcopy(frequency_spline.c)
wiggle_signal = np.array(wiggle_signal)
wiggle_noise = np.array(wiggle_noise)
x0 = self.set_params(
init_amplitude_params,
init_frequency_params,
init_phi_0,
n_amplitude=n_amplitude,
n_frequency=n_frequency,
)
# first do an interim fit to solve for initial phase only
if do_interim_fit_phase_only:
result = least_squares(
self.get_residual_func_phase_only(x0, wiggle_signal, wiggle_noise),
np.concatenate([init_frequency_params, x0]),
)
interim_frquency_params = result.x[:-1]
interim_phi_0 = result.x[-1]
x0 = self.set_params(
init_amplitude_params,
interim_frquency_params,
interim_phi_0,
n_amplitude=n_amplitude,
n_frequency=n_frequency,
)
# Add parameters for asymmetric and symmetric sharpening
if self._symmetric_sharpening and self._asymmetric_sharpening:
x0 = np.concatenate([x0, np.array([0, 0])])
elif self._symmetric_sharpening or self._asymmetric_sharpening:
x0 = np.concatenate([x0, np.array([0])])
# fitting with the extracted wiggle signal in any case, as the fitted parameters
# will be used as initial parameters when fitting the full spectra
if outlier_rejection_method != "fdr":
self._use_huber_loss = False
# initial "robust" regression (using Huber loss)
# here, fitting the wiggle signal that was extracted from the spectra using the template(s)
result = least_squares(
self.get_residual_func(wiggle_signal, wiggle_noise),
x0,
)
if self._outlier_rejection_method is not None:
residual = self.residual_vector(result.x, wiggle_signal, wiggle_noise)
clipped_pixels = self.reject_outliers(
residual,
num_params=len(result.x),
fdr_alpha=fdr_alpha,
fdr_outlier_max_fraction=fdr_outlier_max_fraction,
sigma_clip_sigma=sigma_clip_sigma,
sigma_clip_max_iterations=sigma_clip_max_iterations,
)
self._outlier_mask[clipped_pixels] = 0
self._use_huber_loss = False
result = least_squares(
self.get_residual_func(wiggle_signal, wiggle_noise),
result.x,
)
result_params = result.x
if fit_full_model:
# Add a parameter for the exponent of the wavelength term in the power-law component of the continuum
x0 = np.concatenate(
[
result_params,
np.array([1.5]),
]
)
# Reset outlier mask for full fit
self._outlier_mask = np.ones_like(self._wavelengths)
# Turning on Huber loss again for the full fit if user_huber_loss is True
if outlier_rejection_method == "fdr" and use_huber_loss:
self._use_huber_loss = True
result = least_squares(
self.residual_vector_full_fit,
x0,
args=(
x,
y,
aperture_radius,
annulus_outer_radius,
annulus_inner_radius,
),
)
result_params = result.x[:-1]
if self._outlier_rejection_method is not None:
residual = self.residual_vector_full_fit(
result.x,
x,
y,
aperture_radius,
annulus_outer_radius,
annulus_inner_radius,
)
clipped_pixels = self.reject_outliers(
residual,
num_params=len(result.x),
fdr_alpha=fdr_alpha,
fdr_outlier_max_fraction=fdr_outlier_max_fraction,
sigma_clip_sigma=sigma_clip_sigma,
sigma_clip_max_iterations=sigma_clip_max_iterations,
)
self._outlier_mask[clipped_pixels] = 0
# Turn off Huber loss for the final fit
self._use_huber_loss = False
result = least_squares(
self.get_residual_func(wiggle_signal, wiggle_noise),
result.x,
)
# Update wiggle signal based on the full fit
bestfit_model, model_uncertainty, spectra, noise = self.model_full_fit(
result.x,
x,
y,
aperture_radius,
annulus_outer_radius,
annulus_inner_radius,
)
wiggle_model = self.wiggle_model(result_params)
wiggle_signal = spectra / (bestfit_model / wiggle_model)
wiggle_noise = noise / (bestfit_model / wiggle_model)
if plot:
self.plot_full_fit_model(
bestfit_model,
model_uncertainty,
spectra,
noise,
wiggle_model,
save_figure_info={"x": x, "y": y, "dir": save_figure_dir},
)
if extract_covariance:
residuals = result.fun
jacobian = result.jac
# Get number of observations (m) and parameters (n)
m, n = jacobian.shape
# Check degrees of freedom
assert (
m > n
), "Number of observations must exceed number of parameters to estimate uncertainty."
# Calculate residual sum of squares and variance estimate
sum_of_squared_residuals = np.sum(residuals**2)
dof = m - n
sigma_squared = sum_of_squared_residuals / dof
# Compute covariance matrix using pseudoinverse for stability
cov_matrix = sigma_squared * np.linalg.pinv(jacobian.T @ jacobian)
if fit_full_model:
cov_matrix = cov_matrix[:-1, :-1]
else:
cov_matrix = None
if verbose:
print(
"Cost (from fitting extracted wiggle signal): ",
self.cost_function(result_params, wiggle_signal, wiggle_noise),
)
if fit_full_model:
print(
"Cost (from fitting full spectra): ",
np.sum(
self.residual_vector_full_fit(
result.x,
x,
y,
aperture_radius,
annulus_outer_radius,
annulus_inner_radius,
)
** 2
),
)
if plot:
self.plot_wiggle_model(
wiggle_signal,
wiggle_noise,
result_params,
cov_matrix=cov_matrix,
save_figure_info={"x": x, "y": y, "dir": save_figure_dir},
)
return result_params, cov_matrix, wiggle_signal, wiggle_noise
[docs]
def plot_full_fit_model(
self,
bestfit_model,
model_uncertainty,
spectra,
noise,
wiggle_model=None,
save_figure_info={"dir": None, "x": None, "y": None},
):
"""Plot the full fit model.
:param bestfit_model: Best-fit model
:type bestfit_model: np.ndarray
:param model_uncertainty: Model uncertainty
:type model_uncertainty: np.ndarray
:param spectra: Spectra
:type spectra: np.ndarray
:param noise: Noise
:type noise: np.ndarray
:return: None
:rtype: None
"""
fig = plt.figure(figsize=(12, 5))
gs = GridSpec(4, 1, height_ratios=[3, 0.05, 1, 0.05], hspace=0)
ax = fig.add_subplot(gs[0])
ax.plot(
self._wavelengths,
bestfit_model,
label="Best-fit model",
lw=1,
c=self.ORANGE,
)
ax.fill_between(
self._wavelengths,
bestfit_model - model_uncertainty,
bestfit_model + model_uncertainty,
lw=1,
color=self.ORANGE,
alpha=0.3,
)
ax.errorbar(
self._wavelengths,
spectra,
yerr=noise,
markersize=2,
label="Data",
lw=1,
c=self.BLUE,
)
if wiggle_model is not None:
ax.plot(
self._wavelengths,
spectra / wiggle_model,
label="Corrected spectra",
lw=1,
ls="--",
c="k",
)
ax.legend()
ax.tick_params(labelbottom=False) # Hide x tick labels on top axis
ax.set_ylabel("Flux")
ax.set_title("Full model fit including wiggles")
# Residuals subplot
ax2 = fig.add_subplot(gs[2], sharex=ax)
residuals = bestfit_model - spectra
ax2.errorbar(
self._wavelengths,
residuals,
yerr=noise,
# label="Residuals",
lw=1,
c=self.GREEN,
)
ax2.axhline(0, color="black", lw=0.5, ls="--")
ax2.set_ylabel("Residuals")
ax2.set_xlabel(r"Wavelengths ($\rm\AA$)")
# ax2.legend()
# plot vertical shaded regions for gaps
for g in self._gaps:
ax.axvspan(g[0], g[1], color="gray", alpha=0.3, lw=0)
ax2.axvspan(g[0], g[1], color="gray", alpha=0.3, lw=0)
# Remove space between axes
plt.subplots_adjust(hspace=0)
ax.set_xlim(
self._wavelengths[0],
self._wavelengths[-1],
)
ax2.set_xlim(
self._wavelengths[0],
self._wavelengths[-1],
)
if save_figure_info["dir"] is not None:
plt.savefig(
f"{save_figure_info['dir']}/wiggle_full_fit_{save_figure_info['x']}_{save_figure_info['y']}.png",
bbox_inches="tight",
)
plt.show()
[docs]
def reject_outliers(
self,
residual,
num_params=0,
fdr_alpha=0.01,
fdr_outlier_max_fraction=0.1,
sigma_clip_sigma=5,
sigma_clip_max_iterations=5,
):
"""Reject outliers using the selected method.
:param residual: Residuals
:type residual: np.ndarray
:param num_params: Number of parameters
:type num_params: int
:param fdr_q: FDR correction threshold
:type fdr_q: float
:param fdr_outlier_max_fraction: Maximum fraction of outliers to reject
:type fdr_outlier_max_fraction: float
:param huber_delta: Delta for Huber loss function
:type huber_delta: float
:param sigma: Sigma threshold for sigma clipping
:type sigma: float
:param sigma_clip_max_iterations: Maximum number of iterations for sigma
clipping
:type sigma_clip_max_iterations: int
:return: Indices of outliers
:rtype: np.ndarray
"""
outlier_pixels = np.zeros_like(residual, dtype=bool)
if self._outlier_rejection_method == "sigma_clip":
clipped = sigma_clip(
residual,
sigma=sigma_clip_sigma,
maxiters=sigma_clip_max_iterations,
masked=True,
)
outlier_pixels = clipped.mask
elif self._outlier_rejection_method == "fdr":
# standardized_residuals = residuals / mad(residuals)
# Compute two-tailed p-values (assuming ~t-distribution)
# Degrees of freedom approximation (n - p - 1)
df = len(residual) - num_params - 1
p_values = 2 * (1 - stats.t.cdf(np.abs(residual), df))
# Apply FDR correction (Benjamini-Hochberg)
reject, corrected_p = fdrcorrection(p_values, alpha=fdr_alpha)
# Limit rejection to a maximum of 30% of the data points with the highest p-values
max_reject = int(fdr_outlier_max_fraction * len(residual))
sorted_indices = np.argsort(p_values)
top_indices = sorted_indices[:max_reject]
outlier_pixels = np.zeros_like(residual, dtype=bool)
outlier_pixels[top_indices] = reject[top_indices]
else:
raise ValueError(
f"Unrecognized outlier rejection_method: {self._outlier_rejection_method}"
)
return outlier_pixels
[docs]
def plot_wiggle_model(
self,
wiggle_signal,
wiggle_noise,
result_params,
cov_matrix=None,
num_samples_uncertainty_region=1000,
save_figure_info={"dir": None, "x": None, "y": None},
):
"""Plot the model.
:param wiggle_signal: wiggle_signal
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle noise
:type wiggle_noise: np.ndarray
:param result_params: Fitted parameters
:type result_params: np.ndarray
:param cov_matrix: Covariance matrix
:type cov_matrix: np.ndarray
:param num_samples_uncertainty_region: Number of samples for uncertainty region
:type num_samples_uncertainty_region: int
:return: None
:rtype: None
"""
fig = plt.figure(figsize=(12, 2.5))
ax = fig.add_subplot(111)
ax.errorbar(
self._wavelengths[(self._outlier_mask == 1) & (self._gap_mask == 1)],
wiggle_signal[(self._outlier_mask == 1) & (self._gap_mask == 1)],
yerr=wiggle_noise[(self._outlier_mask == 1) & (self._gap_mask == 1)],
label="Fitted points",
ls="None",
marker="o",
markersize=3,
alpha=0.3,
c=self.BLUE,
)
ax.errorbar(
self._wavelengths[(self._outlier_mask == 0) | (self._gap_mask == 0)],
wiggle_signal[(self._outlier_mask == 0) | (self._gap_mask == 0)],
yerr=wiggle_noise[(self._outlier_mask == 0) | (self._gap_mask == 0)],
label="Rejected outliers",
ls="None",
marker="o",
markersize=3,
alpha=0.4,
c=self.GREY,
)
model = self.wiggle_model(result_params)
line_color = self.ORANGE
ax.plot(
self._wavelengths,
model,
label="Model", # n_param({len(result_params)})",
lw=1,
c=line_color,
zorder=10,
)
if cov_matrix is not None:
model_uncertainty = self.get_model_uncertainty(
result_params, cov_matrix, num_samples_uncertainty_region
)
ax.fill_between(
self._wavelengths,
model - model_uncertainty,
model + model_uncertainty,
color=line_color,
alpha=0.4,
)
for g in self._gaps:
ax.axvspan(g[0], g[1], color=self.GREY, alpha=0.1)
ax.set_xlabel(r"Wavelengths ($\rm\AA$)")
ax.set_ylabel("Wiggle model")
ax.legend(loc="lower left", ncol=3)
ax.set_ylim(
np.min(wiggle_signal) * 0.7,
np.max(wiggle_signal) * 1.02,
)
# ax.set_ylim(np.min(wiggle_signal) * 0.95, np.max(wiggle_signal) * 1.05)
delta_lambda = self._wavelengths[1] - self._wavelengths[0]
ax.set_xlim(
self._wavelengths[0] - delta_lambda * 2,
self._wavelengths[-1] + delta_lambda * 2,
)
if save_figure_info["dir"] is not None:
fig.savefig(
f"{save_figure_info['dir']}/wiggle_model_{save_figure_info['x']}_{save_figure_info['y']}.png",
bbox_inches="tight",
)
plt.show()
[docs]
def get_model_uncertainty(
self, result_params, cov_matrix, num_samples_uncertainty_region=1000
):
"""Get the model uncertainty.
:param result_params: Fitted parameters
:type result_params: np.ndarray
:param cov_matrix: Covariance matrix
:type cov_matrix: np.ndarray
:param num_samples_uncertainty_region: Number of samples for uncertainty region
:type num_samples_uncertainty_region: int
:return: Model uncertainty at each wavelength
:rtype: np.ndarray
"""
models = []
for i in range(num_samples_uncertainty_region):
sampled_params = np.random.multivariate_normal(
result_params, cov_matrix, 1
)[0]
models.append(self.wiggle_model(sampled_params))
models = np.array(models)
model_up, model_down = np.percentile(models, [16, 84], axis=0)
model_uncertainty = (model_up - model_down) / 2
return model_uncertainty
[docs]
def get_wiggle_signal(
self,
x,
y,
aperture_radius=4,
annulus_outer_radius=0,
annulus_inner_radius=0,
plot=False,
):
"""Get the wiggle signal for preliminary assessments.
:param spaxel_x: Spaxel x
:type spaxel_x: int
:param spaxel_y: Spaxel y
:type spaxel_y: int
:param aperture: aperture size to sum the spectra to average out the wiggles
:type aperture: int
:return: Modulation wiggle_signal and noise
:rtype: Tuple of np.ndarray
"""
(
spectra,
noise,
aperture_spectra,
aperture_noise,
annulus_spectra,
annulus_noise,
) = self.get_spectra_set(
x, y, aperture_radius, annulus_outer_radius, annulus_inner_radius
)
wavelengths = self.scale_wavelengths_negative1_to_1(self._wavelengths)
# fit c_1 * aperture_spectra + c_3 * wavelengths**a + \sum_i c_i * wavelengths**i + c_N * annulus_spectra
# given non-linear parameter a, treat all c_i parameters as linear parameters and derive them using linear inversion
def model(a):
# Construct the design matrix for the current 'a'
A = self.get_design_matrix(
aperture_spectra,
annulus_spectra,
annulus_outer_radius,
a,
wavelengths,
)
coefficients = self.solve_ridge_regression(A, spectra)
return A @ coefficients, coefficients
def residual_vector(a):
model_spectra, _ = model(a)
return (model_spectra - spectra) / noise
result = least_squares(residual_vector, 0.5, bounds=(0, 6))
best_model, _ = model(result.x)
model_noise_fraction = (aperture_noise / aperture_spectra) ** 2
if annulus_outer_radius > annulus_inner_radius:
model_noise_fraction += (annulus_noise / annulus_spectra) ** 2
wiggle_signal = spectra / best_model
wiggle_noise = (
np.sqrt((noise / spectra) ** 2 + model_noise_fraction) * wiggle_signal
)
# replace non-positive noise with minimum non-negative value
min_positive_noise = np.nanmin(wiggle_noise[wiggle_noise >= 0])
wiggle_noise[wiggle_noise <= 0] = min_positive_noise
return wiggle_signal, wiggle_noise
[docs]
@staticmethod
def solve_ridge_regression(A, spectra):
"""Solve the ridge regression problem.
:param A: Design matrix
:type A: np.ndarray
:param spectra: Target spectra
:type spectra: np.ndarray
:return: Coefficients
:rtype: np.ndarray
"""
# Normalize columns to unit length
norms = np.linalg.norm(A, axis=0)
norms[norms == 0] = 1.0 # Avoid division by zero
A_normalized = A / norms
# Solve with Ridge Regression (Tikhonov regularization)
alpha = 1e-10 # Small regularization strength
B = np.vstack([A_normalized, np.sqrt(alpha) * np.eye(A_normalized.shape[1])])
target_extended = np.concatenate([spectra, np.zeros(A_normalized.shape[1])])
coef_normalized, _, _, _ = lstsq(B, target_extended)
# Rescale coefficients to original units
coefficients = coef_normalized / norms
return coefficients
[docs]
def get_spectra_set(
self, x, y, aperture_radius, annulus_outer_radius, annulus_inner_radius
):
"""Get the spectra set.
:param x: x coordinate
:type x: int
:param y: y coordinate
:type y: int
:param aperture_radius: Aperture radius
:type aperture_radius: int
:param annulus_outer_radius: annulus outer radius
:type annulus_outer_radius: int
:param annulus_inner_radius: annulus inner radius
:type annulus_inner_radius: int
:return: spectra, noise, aperture_spectra, aperture_noise, annulus_spectra,
annulus_noise
:rtype: Tuple of np.ndarray
"""
spectra = deepcopy(self._datacube[:, x, y])
noise = deepcopy(self._noise_cube[:, x, y])
# make circular mask around the pixel with radius s
mask = np.zeros_like(self._datacube[0], dtype=bool)
annulus_mask = np.zeros_like(self._datacube[0], dtype=bool)
for i in range(x - 2 * aperture_radius, x + 2 * aperture_radius):
for j in range(y - 2 * aperture_radius, y + 2 * aperture_radius):
if (i - x) ** 2 + (j - y) ** 2 <= aperture_radius**2:
mask[i, j] = True
if ((i - x) ** 2 + (j - y) ** 2 <= annulus_outer_radius**2) and (
(i - x) ** 2 + (j - y) ** 2 > annulus_inner_radius**2
):
annulus_mask[i, j] = True
aperture_spectra = np.nansum(self._datacube[:, mask], axis=(1))
aperture_noise = np.sqrt(np.nansum(self._noise_cube[:, mask] ** 2, axis=(1)))
annulus_spectra = np.nansum(self._datacube[:, annulus_mask], axis=(1)) + 0
annulus_noise = (
np.sqrt(np.nansum(self._noise_cube[:, annulus_mask] ** 2, axis=(1))) + 0
)
return (
spectra,
noise,
aperture_spectra,
aperture_noise,
annulus_spectra,
annulus_noise,
)
[docs]
def fit_wiggle_with_model_selection(
self,
x,
y,
aperture_radius=4,
annulus_outer_radius=0,
annulus_inner_radius=0,
n_amplitude=10,
n_frequency=1,
min_n_amplitude=None,
min_n_frequency=None,
specified_noise_level=0,
init_peak_detection_proximity_threshold=200,
plot=False,
selection_criteria="bic",
min_selection_difference=None,
extract_covariance=True,
outlier_rejection_method=None,
fdr_alpha=0.01,
fdr_outlier_max_fraction=0.1,
use_huber_loss=False,
huber_delta=1.35,
sigma_clip_sigma=5,
sigma_clip_max_iterations=3,
symmetric_sharpening=False,
asymmetric_sharpening=False,
fit_full_model=False,
):
"""Fit the wiggle signal with selecting amplitude polynomial order based on BIC.
:param x: x coordinate
:type x: int
:param y: y coordinate
:type y: int
:param aperture_radius: Aperture radius
:type aperture_radius: int
:param annulus_outer_radius: Annulus outer radius
:type annulus_outer_radius: int
:param annulus_inner_radius: Annulus inner radius
:type annulus_inner_radius: int
:param n_amplitude: Maximum number of amplitude parameters
:type n_amplitude: int
:param n_frequency: Number of frequency parameters
:type n_frequency: int
:param min_n_amplitude: Minimum number of amplitude parameters
:type min_n_amplitude: int
:param min_n_frequency: Minimum number of frequency parameters
:type min_n_frequency: int
:param specified_noise_level: Artificial noise level
:type specified_noise_level: float
:param init_peak_detection_proximity_threshold: Proximity lower limit in
Angstrom for initial identifaction of peaks and troughs
:type init_peak_detection_proximity_threshold: float
:param plot: If True, plot the results
:type plot: bool
:param selection_criteria: Selection criteria, "bic" or "chi2"
:type selection_criteria: str
:param sigma_clip_sigma: Sigma clip threshold
:type sigma_clip_sigma: float
:param sigma_clip_max_iterations: Number of sigma clip iterations
:type sigma_clip_max_iterations: int
:param combine_bic_weighted: If True, combine the BIC weighted by the number of
parameters
:type combine_bic_weighted: bool
:param extract_uncertainty: If True, extract the uncertainties
:type extract_uncertainty: bool
:param outlier_rejection_method: Outlier rejection method, "fdr" or
"sigma_clip", set None to disable
:type outlier_rejection_method: str
:param huber_delta: Delta for Huber loss function
:type huber_delta: float
:param fdr_alpha: False discovery rate (FDR) correction threshold, smaller value
will reject less outliers
:type fdr_alpha: float
:param fdr_outlier_max_fraction: Maximum fraction of outliers to reject using
FDR
:type fdr_outlier_max_fraction: float
:param sigma_clip_sigma: Sigma threshold for sigma clipping
:type sigma_clip_sigma: float
:param sigma_clip_max_iterations: Number of sigma clip iterations
:type sigma_clip_max_iterations: int
:param extract_uncertainty: If True, extract the uncertainties
:type extract_uncertainty: bool
:param symmetric_sharpening: If True, use symmetric sharpening
:type symmetric_sharpening: bool
:param asymmetric_sharpening: If True, use asymmetric sharpening
:type asymmetric_sharpening: bool
:param selection_criteria: Selection criteria for model selection, "bic" or "chi2"
:type selection_criteria: str
:param min_selection_difference: Minimum difference between the best and the next best model selection criteria
:type min_selection_difference: float
:param verbose: If True, print the results
:type verbose: bool
:param plot: If True, plot the results
:type plot: bool
:return: Fitted parameters
:rtype: np.ndarray
"""
print(
f"Computing {selection_criteria.upper()} for choices of n_amplitude and n_frequency..."
)
if min_n_amplitude is None:
min_n_amplitude = n_amplitude
elif min_n_amplitude < 2:
raise ValueError("min_n_amplitude must be at least 2")
if min_n_frequency is None:
min_n_frequency = n_frequency
elif min_n_frequency < 2:
raise ValueError("min_n_frequency must be at least 2")
best_metric = None
for k in tqdm(range(min_n_frequency, n_frequency + 1)):
for i in range(min_n_amplitude, n_amplitude + 1):
result_params, cov_matrix, wiggle_signal, wiggle_noise = (
self.fit_wiggle(
x,
y,
aperture_radius=aperture_radius,
annulus_outer_radius=annulus_outer_radius,
annulus_inner_radius=annulus_inner_radius,
n_amplitude=i,
n_frequency=k,
specified_noise_level=specified_noise_level,
init_peak_detection_proximity_threshold=init_peak_detection_proximity_threshold,
plot=False,
extract_covariance=extract_covariance,
outlier_rejection_method=outlier_rejection_method,
use_huber_loss=use_huber_loss,
huber_delta=huber_delta,
fdr_alpha=fdr_alpha,
fdr_outlier_max_fraction=fdr_outlier_max_fraction,
sigma_clip_sigma=sigma_clip_sigma,
sigma_clip_max_iterations=sigma_clip_max_iterations,
symmetric_sharpening=symmetric_sharpening,
asymmetric_sharpening=asymmetric_sharpening,
fit_full_model=fit_full_model,
verbose=False,
)
)
fit_metric = self.get_model_selection_metric(
wiggle_signal,
wiggle_noise,
result_params,
selection_criteria=selection_criteria,
)
if min_selection_difference is None:
if selection_criteria == "bic":
min_selection_difference = 10
elif selection_criteria == "chi2":
min_selection_difference = 25
if best_metric is None:
tqdm.write(
f"n_amplitude: {i}, n_frequency: {k}, {selection_criteria}: {fit_metric}"
)
best_n_amplitude = i
best_n_frequency = k
best_metric = fit_metric
elif fit_metric < best_metric - min_selection_difference:
tqdm.write(
f"n_amplitude: {i}, n_frequency: {k}, {selection_criteria}: {fit_metric}"
)
best_n_amplitude = i
best_n_frequency = k
best_metric = fit_metric
best_params, cov_matrix, wiggle_signal, wiggle_noise = self.fit_wiggle(
x,
y,
aperture_radius=aperture_radius,
annulus_outer_radius=annulus_outer_radius,
annulus_inner_radius=annulus_inner_radius,
n_amplitude=best_n_amplitude,
n_frequency=best_n_frequency,
specified_noise_level=specified_noise_level,
init_peak_detection_proximity_threshold=init_peak_detection_proximity_threshold,
extract_covariance=extract_covariance,
outlier_rejection_method=outlier_rejection_method,
use_huber_loss=use_huber_loss,
huber_delta=huber_delta,
fdr_alpha=fdr_alpha,
fdr_outlier_max_fraction=fdr_outlier_max_fraction,
sigma_clip_sigma=sigma_clip_sigma,
sigma_clip_max_iterations=sigma_clip_max_iterations,
symmetric_sharpening=symmetric_sharpening,
asymmetric_sharpening=asymmetric_sharpening,
plot=plot,
)
print("Best n_amplitude: ", best_n_amplitude)
print("Best n_frequency: ", best_n_frequency)
self._n_amplitude = best_n_amplitude
self._n_frequency = best_n_frequency
return best_params, cov_matrix
[docs]
def get_model_selection_metric(
self, wiggle_signal, wiggle_noise, result_params, selection_criteria="bic"
):
"""
Get the fit metric: BIC or chi^2.
:param wiggle_signal: wiggle_signal
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle noise
:type wiggle_noise: np.ndarray
:param result_params: Fitted parameters
:type result_params: np.ndarray
:param selection_criteria: Selection criteria, "bic" or "chi2"
:type selection_criteria: str
:return: Fit metric
:rtype: float
"""
n_dof = np.sum(np.logical_or(self._gap_mask, self._outlier_mask))
k = len(result_params)
chi2 = self.cost_function(result_params, wiggle_signal, wiggle_noise)
if selection_criteria == "bic":
return chi2 + k * np.log(n_dof)
elif selection_criteria == "chi2":
return chi2
else:
raise ValueError(f"Invalid selection_criteria: {selection_criteria}")
[docs]
def is_wiggle_detected(
self,
wiggle_signal,
wiggle_noise,
result_params,
sigma_threshold=5,
variance_ratio_threshold=0.8,
verbose=True,
):
"""Check if wiggle is detected.
:param wiggle_signal: wiggle_signal
:type wiggle_signal: np.ndarray
:param wiggle_noise: wiggle_noise
:type wiggle_noise: np.ndarray
:param sigma_threshold: Sigma threshold
:type sigma_threshold: float
:param variance_ratio_threshold: Variance ratio threshold, set 0 to disable
:type variance_ratio_threshold: float
:param verbose: If True, print the results
:type verbose: bool
:return: True if wiggle is detected
:rtype: bool
"""
# residual for flat spectrum
null_residual = wiggle_signal - np.ones_like(wiggle_signal)
n_data = np.sum(self._gap_mask)
total_noise = wiggle_noise
chi2 = np.sum(
(
(null_residual**2 / total_noise**2) * self._gap_mask
) # * self._outlier_mask)
)
chi2_red = chi2 / n_data
model = self.wiggle_model(result_params)
model_residual = wiggle_signal - model
chi2_model_red = (
np.sum(
model_residual**2
/ total_noise**2
* self._gap_mask # * self._outlier_mask
)
/ n_data
)
sigma = np.sqrt(chi2_red - chi2_model_red)
model_values = model[np.logical_and(self._gap_mask, self._outlier_mask)]
residual_values = model_residual[
np.logical_and(self._gap_mask, self._outlier_mask)
]
model_variance = np.std(model_values)
residual_variance = np.std(residual_values)
if verbose:
print(f"Wiggle detection sigma: {sigma:.2f}")
print(
f"Model variance: {model_variance:.4f}, residual variance: {residual_variance:.4f}, ratio: {model_variance / residual_variance:.4f}"
)
return (sigma > sigma_threshold) and (
model_variance > variance_ratio_threshold * residual_variance
)
[docs]
def clean_cube(
self,
wiggle_detection_sigma_threshold=3,
wiggle_detection_variance_ratio_threshold=0.8,
n_amplitude=10,
n_frequency=1,
min_n_amplitude=None,
min_n_frequency=None,
n_amplitude_for_detection=None,
n_frequency_for_detection=None,
specified_noise_level=0,
cleaning_mask=None,
init_peak_detection_proximity_threshold=200,
aperture_radius=4,
annulus_outer_radius=0,
annulus_inner_radius=0,
conserve_flux=True,
outlier_rejection_method="fdr",
fdr_alpha=0.01,
fdr_outlier_max_fraction=0.1,
use_huber_loss=False,
huber_delta=1.35,
sigma_clip_sigma=5,
sigma_clip_max_iterations=5,
extract_uncertainty=True,
symmetric_sharpening=False,
asymmetric_sharpening=False,
selection_criteria="bic",
min_selection_difference=None,
num_samples_uncertainty_region=1000,
verbose=True,
plot=True,
fit_full_model=True,
):
"""Clean the datacube.
:param sigma_threshold: Sigma threshold
:type sigma_threshold: float
:param n_amplitude: Number of amplitude parameters
:type n_amplitude: int
:param n_frequency: Number of frequency parameters
:type n_frequency: int
:param min_n_amplitude: Minimum number of amplitude parameters
:type min_n_amplitude: int
:param min_n_frequency: Minimum number of frequency parameters
:type min_n_frequency: int
:param specified_noise_level: Artificial noise level in fraction of the spectrum
:type specified_noise_level: float
:param init_peak_detection_proximity_threshold: Proximity lower limit in Angstrom for initial identifaction of peaks and troughs
:type init_peak_detection_proximity_threshold: float
:param aperture_size: Aperture size. Spaxels at the edges with width less than this value will not be cleaned.
:type aperture_size: int
:param min_x: Minimum spaxel x to set lower limit for the cleaning process
:type min_x: int
:param max_x: Maximum spaxel x to set upper limit for the cleaning process
:type max_x: int
:param min_y: Minimum spaxel y to set lower limit for the cleaning process
:type min_y: int
:param max_y: Maximum spaxel y to set upper limit for the cleaning process
:type max_y: int
:param conserve_flux: If True, conserve flux in each spaxel
:type conserve_flux: bool
:param cleaning_mask: Mask for cleaning, if None, clean all spaxels
:type cleaning_mask: np.ndarray
:param outlier_rejection_method: Outlier rejection method, "fdr" or "sigma_clip", set None to disable
:type outlier_rejection_method: str
:param use_huber_loss: If True, use Huber loss function
:type use_huber_loss: bool
:param huber_delta: Delta for Huber loss function
:type huber_delta: float
:param fdr_alpha: False discovery rate (FDR) correction threshold, smaller value will reject less outliers
:type fdr_alpha: float
:param fdr_outlier_max_fraction: Maximum fraction of outliers to reject using FDR
:type fdr_outlier_max_fraction: float
:param sigma_clip_sigma: Sigma threshold for sigma clipping
:type sigma_clip_sigma: float
:param sigma_clip_max_iterations: Number of sigma clip iterations
:type sigma_clip_max_iterations: int
:param extract_uncertainty: If True, extract the uncertainties
:type extract_uncertainty: bool
:param symmetric_sharpening: If True, use symmetric sharpening
:type symmetric_sharpening: bool
:param asymmetric_sharpening: If True, use asymmetric sharpening
:type asymmetric_sharpening: bool
:param selection_criteria: Selection criteria for model selection, "bic" or "chi2"
:type selection_criteria: str
:param min_selection_difference: Minimum difference between the best and the next best model selection criteria
:type min_selection_difference: float
:param verbose: If True, print the results
:type verbose: bool
:param plot: If True, plot the results
:type plot: bool
:return: Cleaned datacube
:rtype: np.ndarray
:return: Cleaned datacube
:rtype: np.ndarray
"""
self.cleaned_datacube = np.copy(self._datacube)
self.cleaned_noisecube = np.copy(self._noise_cube)
if cleaning_mask is None:
cleaning_mask = np.ones_like(self._datacube[0], dtype=bool)
cleaned_mask = np.zeros_like(cleaning_mask, dtype=bool)
n_amplitude, n_frequency = self.configure_polynomial_ns(
n_amplitude, n_frequency
)
total_iterations = np.sum(cleaning_mask)
with tqdm(total=total_iterations, desc="Cleaning spaxels") as pbar:
for i in range(cleaning_mask.shape[0]):
for j in range(cleaning_mask.shape[1]):
if not cleaning_mask[i, j]:
continue
if n_amplitude_for_detection is None:
n_amplitude_for_detection = n_amplitude
if n_frequency_for_detection is None:
n_frequency_for_detection = n_frequency
if verbose:
print("###########################")
print(f"Fitting spaxel: {i}, {j}")
result_params, cov_matrix, wiggle_signal, wiggle_noise = (
self.fit_wiggle(
i,
j,
aperture_radius=aperture_radius,
annulus_outer_radius=annulus_outer_radius,
annulus_inner_radius=annulus_inner_radius,
n_amplitude=n_amplitude_for_detection,
n_frequency=n_frequency_for_detection,
specified_noise_level=specified_noise_level,
init_peak_detection_proximity_threshold=init_peak_detection_proximity_threshold,
outlier_rejection_method=outlier_rejection_method,
use_huber_loss=use_huber_loss,
huber_delta=huber_delta,
fdr_alpha=fdr_alpha,
fdr_outlier_max_fraction=fdr_outlier_max_fraction,
extract_covariance=extract_uncertainty,
sigma_clip_sigma=sigma_clip_sigma,
sigma_clip_max_iterations=sigma_clip_max_iterations,
symmetric_sharpening=symmetric_sharpening,
asymmetric_sharpening=asymmetric_sharpening,
plot=False,
fit_full_model=False, # do not fit the full model for detection
)
)
plt.show()
if self.is_wiggle_detected(
wiggle_signal,
wiggle_noise,
result_params,
sigma_threshold=wiggle_detection_sigma_threshold,
variance_ratio_threshold=wiggle_detection_variance_ratio_threshold,
verbose=verbose,
):
if verbose:
print(f"Wiggle detected. Cleaning spaxel: {i}, {j}.")
cleaned_mask[i, j] = 1
if min_n_amplitude is None or min_n_frequency is None:
# performing model fitting with the specified n_amplitude and n_frequency,
# if they are not equal to n_amplitude_for_detection and n_frequency_for_detection
result_params, cov_matrix, _, _ = self.fit_wiggle(
i,
j,
aperture_radius=aperture_radius,
annulus_outer_radius=annulus_outer_radius,
annulus_inner_radius=annulus_inner_radius,
n_amplitude=n_amplitude,
n_frequency=n_frequency,
specified_noise_level=specified_noise_level,
init_peak_detection_proximity_threshold=init_peak_detection_proximity_threshold,
outlier_rejection_method=outlier_rejection_method,
use_huber_loss=use_huber_loss,
huber_delta=huber_delta,
fdr_alpha=fdr_alpha,
fdr_outlier_max_fraction=fdr_outlier_max_fraction,
extract_covariance=extract_uncertainty,
sigma_clip_sigma=sigma_clip_sigma,
sigma_clip_max_iterations=sigma_clip_max_iterations,
symmetric_sharpening=symmetric_sharpening,
asymmetric_sharpening=asymmetric_sharpening,
plot=plot,
fit_full_model=fit_full_model,
)
else:
# performing model selection as min_n_amplitude and min_n_frequency are set
result_params, cov_matrix = (
self.fit_wiggle_with_model_selection(
i,
j,
aperture_radius=aperture_radius,
annulus_outer_radius=annulus_outer_radius,
annulus_inner_radius=annulus_inner_radius,
n_amplitude=n_amplitude,
n_frequency=n_frequency,
min_n_amplitude=min_n_amplitude,
min_n_frequency=min_n_frequency,
specified_noise_level=specified_noise_level,
init_peak_detection_proximity_threshold=init_peak_detection_proximity_threshold,
outlier_rejection_method=outlier_rejection_method,
use_huber_loss=use_huber_loss,
huber_delta=huber_delta,
fdr_alpha=fdr_alpha,
fdr_outlier_max_fraction=fdr_outlier_max_fraction,
extract_covariance=extract_uncertainty,
sigma_clip_sigma=sigma_clip_sigma,
sigma_clip_max_iterations=sigma_clip_max_iterations,
symmetric_sharpening=symmetric_sharpening,
asymmetric_sharpening=asymmetric_sharpening,
plot=plot,
fit_full_model=fit_full_model,
selection_criteria=selection_criteria,
min_selection_difference=min_selection_difference,
)
)
amplitude_params, frequency_params, phi = self.split_params(
result_params
)
wiggle_model = self.wiggle_func(
self.scaled_w,
amplitude_params,
frequency_params,
phi,
)
if cov_matrix is not None:
model_uncertainty = self.get_model_uncertainty(
result_params,
cov_matrix,
num_samples_uncertainty_region=num_samples_uncertainty_region,
)
integral = 1
integral_base = 1
if conserve_flux:
integral = np.trapezoid(
self._datacube[:, i, j] / wiggle_model,
self.scaled_w,
)
integral_base = np.trapezoid(
self._datacube[:, i, j], self.scaled_w
)
self.cleaned_datacube[:, i, j] = (
self._datacube[:, i, j]
/ wiggle_model
/ integral
* integral_base
)
self.cleaned_noisecube[:, i, j] = (
np.sqrt(
(self._noise_cube[:, i, j] / self._datacube[:, i, j])
** 2
+ (model_uncertainty / wiggle_model) ** 2
)
* self.cleaned_datacube[:, i, j]
)
else:
if verbose:
print(
f"No wiggle detected at spaxel: {i}, {j}, skipping..."
)
pbar.update(1)
if verbose:
print("Cleaning done!")
return self.cleaned_datacube, self.cleaned_noisecube, cleaned_mask