"""
PSF parameter optimisation and fitting routines for mirror alignment and reflection parameters.
This module provides functions for loading PSF data, generating random parameter sets,
running PSF simulations, calculating RMSD, and finding the best-fit parameters for a given
telescope model.
PSF (Point Spread Function) describes how a point source of light is spread out by the
optical system, and RMSD (Root Mean Squared Deviation) is used as the optimization metric
to quantify the difference between measured and simulated PSF curves.
"""
import logging
from collections import OrderedDict
import astropy.units as u
import numpy as np
from astropy.table import Table
from scipy import stats
from simtools.data_model import model_data_writer as writer
from simtools.ray_tracing.ray_tracing import RayTracing
from simtools.utils import general as gen
from simtools.visualization import plot_psf
from simtools.visualization.plot_psf import DEFAULT_FRACTION, get_psf_diameter_label
logger = logging.getLogger(__name__)
# Constants
RADIUS = "Radius"
CUMULATIVE_PSF = "Cumulative PSF"
KS_STATISTIC_NAME = "KS statistic"
def _create_log_header_and_format_value(title, tel_model, additional_info=None, value=None):
"""Create log header and format parameter values."""
if value is not None: # Format value mode
if isinstance(value, list):
return "[" + ", ".join([f"{v:.6f}" for v in value]) + "]"
if isinstance(value, int | float):
return f"{value:.6f}"
return str(value)
# Create header mode
header_lines = [f"# {title}", f"# Telescope: {tel_model.name}"]
if additional_info:
for key, val in additional_info.items():
header_lines.append(f"# {key}: {val}")
header_lines.extend(["#" + "=" * 65, ""])
return "\n".join(header_lines) + "\n"
[docs]
def calculate_rmsd(data, sim):
"""Calculate RMSD between measured and simulated cumulative PSF curves."""
return np.sqrt(np.mean((data - sim) ** 2))
[docs]
def calculate_ks_statistic(data, sim):
"""Calculate the KS statistic between measured and simulated cumulative PSF curves."""
return stats.ks_2samp(data, sim)
[docs]
def get_previous_values(tel_model):
"""
Retrieve current PSF parameter values from the telescope model.
Parameters
----------
tel_model : TelescopeModel
Telescope model object containing parameter configurations.
Returns
-------
dict
Dictionary containing current values of PSF optimization parameters:
- 'mirror_reflection_random_angle': Random reflection angle parameters
- 'mirror_align_random_horizontal': Horizontal alignment parameters
- 'mirror_align_random_vertical': Vertical alignment parameters
"""
return {
"mirror_reflection_random_angle": tel_model.get_parameter_value(
"mirror_reflection_random_angle"
),
"mirror_align_random_horizontal": tel_model.get_parameter_value(
"mirror_align_random_horizontal"
),
"mirror_align_random_vertical": tel_model.get_parameter_value(
"mirror_align_random_vertical"
),
}
def _run_ray_tracing_simulation(tel_model, site_model, args_dict, pars):
"""Run a ray tracing simulation with the given telescope parameters."""
if pars is None:
raise ValueError("No best parameters found")
tel_model.change_multiple_parameters(**pars)
ray = RayTracing(
telescope_model=tel_model,
site_model=site_model,
simtel_path=args_dict["simtel_path"],
zenith_angle=args_dict["zenith"] * u.deg,
source_distance=args_dict["src_distance"] * u.km,
off_axis_angle=[0.0] * u.deg,
)
ray.simulate(test=args_dict.get("test", False), force=True)
ray.analyze(force=True, use_rx=False)
im = ray.images()[0]
fraction = args_dict.get("fraction", DEFAULT_FRACTION)
return im.get_psf(fraction=fraction), im
[docs]
def run_psf_simulation(
tel_model,
site,
args_dict,
pars,
data_to_plot,
radius,
pdf_pages=None,
is_best=False,
use_ks_statistic=False,
):
"""
Run PSF simulation for given parameters and calculate optimization metric.
Parameters
----------
tel_model : TelescopeModel
Telescope model object to be configured with the test parameters.
site : Site
Site model object with environmental conditions.
args_dict : dict
Dictionary containing simulation configuration arguments.
pars : dict
Dictionary of parameter values to test in the simulation.
data_to_plot : dict
Dictionary containing measured PSF data under "measured" key.
radius : array-like
Radius values in cm for PSF evaluation.
pdf_pages : PdfPages, optional
PDF pages object for saving plots (default: None).
is_best : bool, optional
Flag indicating if this is the best parameter set (default: False).
use_ks_statistic : bool, optional
If True, use KS statistic as metric; if False, use RMSD (default: False).
Returns
-------
tuple of (float, float, float or None, array)
- psf_diameter: PSF containment diameter of the simulated PSF in cm
- metric: RMSD or KS statistic value
- p_value: p-value from KS test (None if using RMSD)
- simulated_data: Structured array with simulated cumulative PSF data
"""
psf_diameter, im = _run_ray_tracing_simulation(tel_model, site, args_dict, pars)
if radius is None:
raise ValueError("Radius data is not available.")
simulated_data = im.get_cumulative_data(radius * u.cm)
if use_ks_statistic:
ks_statistic, p_value = calculate_ks_statistic(
data_to_plot["measured"][CUMULATIVE_PSF], simulated_data[CUMULATIVE_PSF]
)
metric = ks_statistic
else:
metric = calculate_rmsd(
data_to_plot["measured"][CUMULATIVE_PSF], simulated_data[CUMULATIVE_PSF]
)
p_value = None
# Handle plotting if requested
if pdf_pages is not None and args_dict.get("plot_all", False):
data_to_plot["simulated"] = simulated_data
plot_psf.create_psf_parameter_plot(
data_to_plot,
pars,
psf_diameter,
metric,
is_best,
pdf_pages,
fraction=args_dict.get("fraction", DEFAULT_FRACTION),
p_value=p_value,
use_ks_statistic=use_ks_statistic,
)
del data_to_plot["simulated"]
return psf_diameter, metric, p_value, simulated_data
[docs]
def load_and_process_data(args_dict):
"""
Load and process PSF measurement data from ECSV file.
Parameters
----------
args_dict : dict
Dictionary containing command-line arguments with 'data' and 'model_path' keys.
Returns
-------
tuple of (OrderedDict, array)
- data_dict: OrderedDict with "measured" key containing structured array
of radius and cumulative PSF data
- radius: Array of radius values in cm
Raises
------
FileNotFoundError
If no data file is specified in args_dict.
"""
if args_dict["data"] is None:
raise FileNotFoundError("No data file specified for PSF optimization.")
data_file = gen.find_file(args_dict["data"], args_dict["model_path"])
table = Table.read(data_file, format="ascii.ecsv")
radius_column = next((col for col in table.colnames if "radius" in col.lower()), None)
integral_psf_column = next((col for col in table.colnames if "integral" in col.lower()), None)
# Create structured array with converted data
d_type = {"names": (RADIUS, CUMULATIVE_PSF), "formats": ("f8", "f8")}
data = np.zeros(len(table), dtype=d_type)
data[RADIUS] = table[radius_column].to(u.cm).value
data[CUMULATIVE_PSF] = table[integral_psf_column]
data[CUMULATIVE_PSF] /= np.max(np.abs(data[CUMULATIVE_PSF])) # Normalize to max = 1.0
return OrderedDict([("measured", data)]), data[RADIUS]
[docs]
def write_tested_parameters_to_file(
results, best_pars, best_psf_diameter, output_dir, tel_model, fraction=DEFAULT_FRACTION
):
"""
Write optimization results and tested parameters to a log file.
Parameters
----------
results : list
List of tuples containing (parameters, ks_statistic, p_value, psf_diameter, simulated_data)
for each tested parameter set.
best_pars : dict
Dictionary containing the best parameter values found.
best_psf_diameter : float
PSF containment diameter in cm for the best parameter set.
output_dir : Path
Directory where the log file will be written.
tel_model : TelescopeModel
Telescope model object for naming the output file.
fraction : float, optional
PSF containment fraction for labeling (default: 0.8).
Returns
-------
Path
Path to the created log file.
"""
param_file = output_dir.joinpath(f"psf_optimization_{tel_model.name}.log")
psf_label = get_psf_diameter_label(fraction)
with open(param_file, "w", encoding="utf-8") as f:
header = _create_log_header_and_format_value(
"PSF Parameter Optimization Log",
tel_model,
{"Total parameter sets tested": len(results)},
)
f.write(header)
f.write("PARAMETER TESTING RESULTS:\n")
for i, (pars, ks_statistic, p_value, psf_diameter, _) in enumerate(results):
status = "BEST" if pars is best_pars else "TESTED"
f.write(
f"[{status}] Set {i + 1:03d}: KS_stat={ks_statistic:.5f}, "
f"p_value={p_value:.5f}, {psf_label}={psf_diameter:.5f} cm\n"
)
for par, value in pars.items():
f.write(f" {par}: {value}\n")
f.write("\n")
f.write("OPTIMIZATION SUMMARY:\n")
f.write(f"Best KS statistic: {min(result[1] for result in results):.5f}\n")
f.write(f"Best {psf_label}: {best_psf_diameter:.5f} cm\n")
f.write("\nOPTIMIZED PARAMETERS:\n")
for par, value in best_pars.items():
f.write(f"{par}: {value}\n")
return param_file
def _add_units_to_psf_parameters(best_pars):
"""Add astropy units to PSF parameters based on their schemas."""
psf_pars_with_units = {}
for param_name, param_values in best_pars.items():
if param_name == "mirror_reflection_random_angle":
psf_pars_with_units[param_name] = [
param_values[0] * u.deg,
param_values[1] * u.dimensionless_unscaled,
param_values[2] * u.deg,
]
elif param_name in ["mirror_align_random_horizontal", "mirror_align_random_vertical"]:
psf_pars_with_units[param_name] = [
param_values[0] * u.deg,
param_values[1] * u.deg,
param_values[2] * u.dimensionless_unscaled,
param_values[3] * u.dimensionless_unscaled,
]
else:
psf_pars_with_units[param_name] = param_values
return psf_pars_with_units
[docs]
def export_psf_parameters(best_pars, telescope, parameter_version, output_dir):
"""
Export optimized PSF parameters as simulation model parameter files.
Parameters
----------
best_pars : dict
Dictionary containing the optimized parameter values.
telescope : str
Telescope name for the parameter files.
parameter_version : str
Version string for the parameter files.
output_dir : Path
Base directory for parameter file output.
Notes
-----
Creates individual JSON files for each optimized parameter with
units. Files are saved in the format:
{output_dir}/{telescope}/{parameter_name}-{parameter_version}.json
Raises
------
ValueError, KeyError, OSError
If parameter export fails due to invalid values, missing keys, or file I/O errors.
"""
try:
psf_pars_with_units = _add_units_to_psf_parameters(best_pars)
parameter_output_path = output_dir.parent / telescope
for parameter_name, parameter_value in psf_pars_with_units.items():
writer.ModelDataWriter.dump_model_parameter(
parameter_name=parameter_name,
value=parameter_value,
instrument=telescope,
parameter_version=parameter_version,
output_file=f"{parameter_name}-{parameter_version}.json",
output_path=parameter_output_path,
)
logger.info(f"simulation model parameter files exported to {output_dir}")
except (ValueError, KeyError, OSError) as e:
logger.error(f"Error exporting simulation parameters: {e}")
def _calculate_param_gradient(
tel_model,
site_model,
args_dict,
current_params,
data_to_plot,
radius,
current_rmsd,
param_name,
param_values,
epsilon,
use_ks_statistic,
):
"""
Calculate numerical gradient for a single parameter using finite differences.
The gradient is calculated using forward finite differences:
gradient = (f(x + epsilon) - f(x)) / epsilon
Parameters
----------
tel_model : TelescopeModel
The telescope model object containing the current parameter configuration.
site_model : SiteModel
The site model object with environmental conditions.
args_dict : dict
Dictionary containing simulation arguments and configuration options.
current_params : dict
Dictionary of current parameter values for all optimization parameters.
data_to_plot : dict
Dictionary containing measured PSF data with "measured" key.
radius : array-like
Radius values in cm for PSF evaluation.
current_rmsd : float
Current RMSD at the current parameter configuration.
param_name : str
Name of the parameter for which to calculate the gradient.
param_values : float or list
Current value(s) of the parameter. Can be a single value or list of values.
epsilon : float
Small perturbation value for finite difference calculation.
use_ks_statistic : bool
If True, calculate gradient with respect to KS statistic; if False, use RMSD.
Returns
-------
float or list
Gradient value(s) for the parameter. Returns a single float if param_values
is a single value, or a list of gradients if param_values is a list.
If a simulation fails during gradient calculation, a gradient of 0.0 is assigned
for that component to ensure the optimization can continue.
"""
param_gradients = []
values_list = param_values if isinstance(param_values, list) else [param_values]
for i, value in enumerate(values_list):
perturbed_params = {
k: v.copy() if isinstance(v, list) else v for k, v in current_params.items()
}
if isinstance(param_values, list):
perturbed_params[param_name][i] = value + epsilon
else:
perturbed_params[param_name] = value + epsilon
try:
_, perturbed_rmsd, _, _ = run_psf_simulation(
tel_model,
site_model,
args_dict,
perturbed_params,
data_to_plot,
radius,
pdf_pages=None,
is_best=False,
use_ks_statistic=use_ks_statistic,
)
param_gradients.append((perturbed_rmsd - current_rmsd) / epsilon)
except (ValueError, RuntimeError):
param_gradients.append(0.0)
return param_gradients[0] if not isinstance(param_values, list) else param_gradients
[docs]
def calculate_gradient(
tel_model,
site_model,
args_dict,
current_params,
data_to_plot,
radius,
current_rmsd,
epsilon=0.0005,
use_ks_statistic=False,
):
"""
Calculate numerical gradients for all optimization parameters.
Parameters
----------
tel_model : TelescopeModel
Telescope model object for simulations.
site_model : SiteModel
Site model object with environmental conditions.
args_dict : dict
Dictionary containing simulation configuration arguments.
current_params : dict
Dictionary of current parameter values for all optimization parameters.
data_to_plot : dict
Dictionary containing measured PSF data.
radius : array-like
Radius values in cm for PSF evaluation.
current_rmsd : float
Current RMSD or KS statistic value.
epsilon : float, optional
Perturbation value for finite difference calculation (default: 0.0005).
use_ks_statistic : bool, optional
If True, calculate gradients for KS statistic; if False, use RMSD (default: False).
Returns
-------
dict
Dictionary mapping parameter names to their gradient values.
For parameters with multiple components, gradients are returned as lists.
"""
gradients = {}
for param_name, param_values in current_params.items():
gradients[param_name] = _calculate_param_gradient(
tel_model,
site_model,
args_dict,
current_params,
data_to_plot,
radius,
current_rmsd,
param_name,
param_values,
epsilon,
use_ks_statistic,
)
return gradients
[docs]
def apply_gradient_step(current_params, gradients, learning_rate):
"""
Apply gradient descent step to update parameters.
Parameters
----------
current_params : dict
Dictionary of current parameter values.
gradients : dict
Dictionary of gradient values for each parameter.
learning_rate : float
Step size for the gradient descent update.
Returns
-------
dict
Dictionary of updated parameter values after applying the gradient step.
"""
new_params = {}
for param_name, param_values in current_params.items():
param_gradients = gradients[param_name]
if isinstance(param_values, list):
new_params[param_name] = [
value - learning_rate * gradient
for value, gradient in zip(param_values, param_gradients)
]
else:
new_params[param_name] = param_values - learning_rate * param_gradients
return new_params
def _perform_gradient_step_with_retries(
tel_model,
site_model,
args_dict,
current_params,
current_metric,
data_to_plot,
radius,
learning_rate,
max_retries=3,
):
"""
Attempt gradient descent step with adaptive learning rate reduction on rejection.
The learning rate reduction strategy follows these rules:
- If step is rejected: learning_rate *= 0.7
- If attempt number < number of max retries then try again
- If learning_rate drops below 1e-5: reset to 0.001
- If all retries fail: returns None values with step_accepted=False
This adaptive approach helps navigate local minima and ensures robust convergence
by automatically adjusting the step size based on optimization progress.
Parameters
----------
tel_model : TelescopeModel
Telescope model object containing the current parameter configuration.
site_model : SiteModel
Site model object with environmental conditions for ray tracing simulations.
args_dict : dict
Dictionary containing simulation configuration arguments and settings.
current_params : dict
Dictionary of current parameter values for all optimization parameters.
current_metric : float
Current optimization metric value (RMSD or KS statistic) to improve upon.
data_to_plot : dict
Dictionary containing measured PSF data under "measured" key for comparison.
radius : array-like
Radius values in cm for PSF evaluation and comparison.
learning_rate : float
Initial learning rate for the gradient descent step.
max_retries : int, optional
Maximum number of attempts with learning rate reduction (default: 3).
Returns
-------
tuple of (dict, float, float, float or None, array, bool, float)
- new_params: Updated parameter dictionary if step accepted, None if rejected
- new_psf_diameter: PSF containment diameter in cm for new parameters, None if step rejected
- new_metric: New optimization metric value, None if step rejected
- new_p_value: p-value from KS test if applicable, None otherwise
- new_simulated_data: Simulated PSF data array, None if step rejected
- step_accepted: Boolean indicating if any step was accepted
- final_learning_rate: Learning rate after potential reductions
"""
current_lr = learning_rate
for attempt in range(max_retries):
try:
gradients = calculate_gradient(
tel_model,
site_model,
args_dict,
current_params,
data_to_plot,
radius,
current_metric,
use_ks_statistic=False,
)
new_params = apply_gradient_step(current_params, gradients, current_lr)
new_psf_diameter, new_metric, new_p_value, new_simulated_data = run_psf_simulation(
tel_model,
site_model,
args_dict,
new_params,
data_to_plot,
radius,
pdf_pages=None,
is_best=False,
use_ks_statistic=False,
)
if new_metric < current_metric:
return (
new_params,
new_psf_diameter,
new_metric,
new_p_value,
new_simulated_data,
True,
current_lr,
)
logger.info(
f"Step rejected (RMSD {current_metric:.6f} -> {new_metric:.6f}), "
f"reducing learning rate {current_lr:.6f} -> {current_lr * 0.7:.6f}"
)
current_lr *= 0.7
if current_lr < 1e-5:
current_lr = 0.001
except (ValueError, RuntimeError, KeyError) as e:
logger.warning(f"Simulation failed on attempt {attempt + 1}: {e}")
continue
return None, None, None, None, None, False, current_lr
def _create_step_plot(
pdf_pages,
args_dict,
data_to_plot,
current_params,
new_psf_diameter,
new_metric,
new_p_value,
new_simulated_data,
):
"""Create plot for an accepted gradient step."""
if pdf_pages is None or not args_dict.get("plot_all", False) or new_simulated_data is None:
return
data_to_plot["simulated"] = new_simulated_data
plot_psf.create_psf_parameter_plot(
data_to_plot,
current_params,
new_psf_diameter,
new_metric,
False,
pdf_pages,
fraction=args_dict.get("fraction", DEFAULT_FRACTION),
p_value=new_p_value,
use_ks_statistic=False,
)
del data_to_plot["simulated"]
def _create_final_plot(
pdf_pages,
tel_model,
site_model,
args_dict,
best_params,
data_to_plot,
radius,
best_psf_diameter,
):
"""Create final plot for best parameters."""
if pdf_pages is None or best_params is None:
return
logger.info("Creating final plot for best parameters with both RMSD and KS statistic...")
_, best_ks_stat, best_p_value, best_simulated_data = run_psf_simulation(
tel_model,
site_model,
args_dict,
best_params,
data_to_plot,
radius,
pdf_pages=None,
is_best=False,
use_ks_statistic=True,
)
best_rmsd = calculate_rmsd(
data_to_plot["measured"][CUMULATIVE_PSF], best_simulated_data[CUMULATIVE_PSF]
)
data_to_plot["simulated"] = best_simulated_data
plot_psf.create_psf_parameter_plot(
data_to_plot,
best_params,
best_psf_diameter,
best_rmsd,
True,
pdf_pages,
fraction=args_dict.get("fraction", DEFAULT_FRACTION),
p_value=best_p_value,
use_ks_statistic=False,
second_metric=best_ks_stat,
)
del data_to_plot["simulated"]
pdf_pages.close()
logger.info("Cumulative PSF plots saved")
[docs]
def run_gradient_descent_optimization(
tel_model,
site_model,
args_dict,
data_to_plot,
radius,
rmsd_threshold,
learning_rate,
output_dir,
):
"""
Run gradient descent optimization to minimize PSF fitting metric.
Parameters
----------
tel_model : TelescopeModel
Telescope model object to be optimized.
site_model : SiteModel
Site model object with environmental conditions.
args_dict : dict
Dictionary containing simulation configuration arguments.
data_to_plot : dict
Dictionary containing measured PSF data under "measured" key.
radius : array-like
Radius values in cm for PSF evaluation.
rmsd_threshold : float
Convergence threshold for RMSD improvement.
learning_rate : float
Initial learning rate for gradient descent steps.
output_dir : Path
Directory for saving optimization plots and results.
Returns
-------
tuple of (dict, float, list)
- best_params: Dictionary of optimized parameter values
- best_psf_diameter: PSF containment diameter in cm for the best parameters
- results: List of (params, metric, p_value, psf_diameter, simulated_data)
for each iteration
Returns None values if optimization fails or no measurement data is provided.
"""
if data_to_plot is None or radius is None:
logger.error("No PSF measurement data provided. Cannot run optimization.")
return None, None, []
current_params = get_previous_values(tel_model)
pdf_pages = plot_psf.setup_pdf_plotting(args_dict, output_dir, tel_model.name)
results = []
# Evaluate initial parameters
current_psf_diameter, current_metric, current_p_value, simulated_data = run_psf_simulation(
tel_model,
site_model,
args_dict,
current_params,
data_to_plot,
radius,
pdf_pages=pdf_pages if args_dict.get("plot_all", False) else None,
is_best=False,
use_ks_statistic=False,
)
results.append(
(
current_params.copy(),
current_metric,
current_p_value,
current_psf_diameter,
simulated_data,
)
)
best_metric, best_params, best_psf_diameter = (
current_metric,
current_params.copy(),
current_psf_diameter,
)
logger.info(f"Initial RMSD: {current_metric:.6f}, PSF diameter: {current_psf_diameter:.6f} cm")
iteration = 0
max_total_iterations = 100
while iteration < max_total_iterations:
if current_metric <= rmsd_threshold:
logger.info(
f"Optimization converged: RMSD {current_metric:.6f} <= "
f"threshold {rmsd_threshold:.6f}"
)
break
iteration += 1
logger.info(f"Gradient descent iteration {iteration}")
step_result = _perform_gradient_step_with_retries(
tel_model,
site_model,
args_dict,
current_params,
current_metric,
data_to_plot,
radius,
learning_rate,
)
(
new_params,
new_psf_diameter,
new_metric,
new_p_value,
new_simulated_data,
step_accepted,
learning_rate,
) = step_result
if not step_accepted or new_params is None:
learning_rate *= 2.0
logger.info(f"No step accepted, increasing learning rate to {learning_rate:.6f}")
continue
# Step was accepted - update state
current_params, current_metric, current_psf_diameter = (
new_params,
new_metric,
new_psf_diameter,
)
results.append(
(current_params.copy(), current_metric, None, current_psf_diameter, new_simulated_data)
)
if current_metric < best_metric:
best_metric, best_params, best_psf_diameter = (
current_metric,
current_params.copy(),
current_psf_diameter,
)
_create_step_plot(
pdf_pages,
args_dict,
data_to_plot,
current_params,
new_psf_diameter,
new_metric,
new_p_value,
new_simulated_data,
)
logger.info(f" Accepted step: improved to {new_metric:.6f}")
_create_final_plot(
pdf_pages,
tel_model,
site_model,
args_dict,
best_params,
data_to_plot,
radius,
best_psf_diameter,
)
return best_params, best_psf_diameter, results
def _write_log_interpretation(f, use_ks_statistic):
"""Write interpretation section for the log file."""
if use_ks_statistic:
f.write(
"P-VALUE INTERPRETATION:\n p > 0.05: Distributions are statistically similar "
"(good fit)\n"
" p < 0.05: Distributions are significantly different (poor fit)\n"
" p < 0.01: Very significant difference (very poor fit)\n\n"
)
else:
f.write(
"RMSD INTERPRETATION:\n Lower RMSD values indicate better agreement between "
"measured and simulated PSF curves\n\n"
)
def _write_iteration_entry(
f,
iteration,
pars,
metric,
p_value,
psf_diameter,
use_ks_statistic,
metric_name,
total_iterations,
fraction=DEFAULT_FRACTION,
):
"""Write a single iteration entry."""
status = "FINAL" if iteration == total_iterations - 1 else f"ITER-{iteration:02d}"
if use_ks_statistic and p_value is not None:
significance = plot_psf.get_significance_label(p_value)
label = get_psf_diameter_label(fraction)
f.write(
f"[{status}] Iteration {iteration}: KS_stat={metric:.6f}, "
f"p_value={p_value:.6f} ({significance}), {label}={psf_diameter:.6f} cm\n"
)
else:
label = get_psf_diameter_label(fraction)
f.write(
f"[{status}] Iteration {iteration}: {metric_name}={metric:.6f}, "
f"{label}={psf_diameter:.6f} cm\n"
)
for par, value in pars.items():
f.write(f" {par}: {_create_log_header_and_format_value(None, None, None, value)}\n")
f.write("\n")
def _write_optimization_summary(
f, gd_results, best_pars, best_psf_diameter, metric_name, fraction=DEFAULT_FRACTION
):
"""Write optimization summary section."""
f.write("OPTIMIZATION SUMMARY:\n")
best_metric_from_results = min(metric for _, metric, _, _, _ in gd_results)
f.write(f"Best {metric_name.lower()}: {best_metric_from_results:.6f}\n")
label = get_psf_diameter_label(fraction)
f.write(
f"Best {label}: {best_psf_diameter:.6f} cm\n"
if best_psf_diameter is not None
else f"Best {label}: N/A\n"
)
f.write(f"Total iterations: {len(gd_results)}\n\nFINAL OPTIMIZED PARAMETERS:\n")
for par, value in best_pars.items():
f.write(f"{par}: {_create_log_header_and_format_value(None, None, None, value)}\n")
[docs]
def write_gradient_descent_log(
gd_results,
best_pars,
best_psf_diameter,
output_dir,
tel_model,
use_ks_statistic=False,
fraction=DEFAULT_FRACTION,
):
"""
Write gradient descent optimization progression to a log file.
Parameters
----------
gd_results : list
List of tuples containing (params, metric, p_value, psf_diameter, simulated_data)
for each optimization iteration.
best_pars : dict
Dictionary containing the best parameter values found.
best_psf_diameter : float
PSF containment diameter in cm for the best parameter set.
output_dir : Path
Directory where the log file will be written.
tel_model : TelescopeModel
Telescope model object for naming the output file.
use_ks_statistic : bool, optional
If True, log KS statistic values; if False, log RMSD values (default: False).
fraction : float, optional
PSF containment fraction for labeling (default: 0.8).
Returns
-------
Path
Path to the created log file.
"""
metric_name = "KS Statistic" if use_ks_statistic else "RMSD"
file_suffix = "ks" if use_ks_statistic else "rmsd"
param_file = output_dir.joinpath(f"psf_gradient_descent_{file_suffix}_{tel_model.name}.log")
with open(param_file, "w", encoding="utf-8") as f:
header = _create_log_header_and_format_value(
f"PSF Parameter Optimization - Gradient Descent Progression ({metric_name})",
tel_model,
{"Total iterations": len(gd_results)},
)
f.write(header)
f.write(
"GRADIENT DESCENT PROGRESSION:\n(Each entry shows the parameters chosen "
"at each iteration)\n\n"
)
_write_log_interpretation(f, use_ks_statistic)
for iteration, (pars, metric, p_value, psf_diameter, _) in enumerate(gd_results):
_write_iteration_entry(
f,
iteration,
pars,
metric,
p_value,
psf_diameter,
use_ks_statistic,
metric_name,
len(gd_results),
fraction,
)
_write_optimization_summary(
f, gd_results, best_pars, best_psf_diameter, metric_name, fraction
)
return param_file
[docs]
def analyze_monte_carlo_error(
tel_model, site_model, args_dict, data_to_plot, radius, n_simulations=500
):
"""
Analyze Monte Carlo uncertainty in PSF optimization metrics.
Runs multiple simulations with the same parameters to quantify the
statistical uncertainty in the optimization metric due to Monte Carlo
noise in the ray tracing simulations. Returns None values if no
measurement data is provided or all simulations fail.
Parameters
----------
tel_model : TelescopeModel
Telescope model object with current parameter configuration.
site_model : SiteModel
Site model object with environmental conditions.
args_dict : dict
Dictionary containing simulation configuration arguments.
data_to_plot : dict
Dictionary containing measured PSF data under "measured" key.
radius : array-like
Radius values in cm for PSF evaluation.
n_simulations : int, optional
Number of Monte Carlo simulations to run (default: 500).
Returns
-------
tuple of (float, float, list, float, float, list, float, float, list)
- mean_metric: Mean RMSD or KS statistic value
- std_metric: Standard deviation of metric values
- metric_values: List of all metric values from simulations
- mean_p_value: Mean p-value (None if using RMSD)
- std_p_value: Standard deviation of p-values (None if using RMSD)
- p_values: List of all p-values from simulations
- mean_psf_diameter: Mean PSF containment diameter in cm
- std_psf_diameter: Standard deviation of PSF diameter values
- psf_diameter_values: List of all PSF diameter values from simulations
"""
if data_to_plot is None or radius is None:
logger.error("No PSF measurement data provided. Cannot analyze Monte Carlo error.")
return None, None, []
initial_params = get_previous_values(tel_model)
for param_name, param_values in initial_params.items():
logger.info(f" {param_name}: {param_values}")
use_ks_statistic = args_dict.get("ks_statistic", False)
metric_values, p_values, psf_diameter_values = [], [], []
for i in range(n_simulations):
try:
psf_diameter, metric, p_value, _ = run_psf_simulation(
tel_model,
site_model,
args_dict,
initial_params,
data_to_plot,
radius,
use_ks_statistic=use_ks_statistic,
)
metric_values.append(metric)
psf_diameter_values.append(psf_diameter)
p_values.append(p_value)
except (ValueError, RuntimeError) as e:
logger.warning(f"WARNING: Simulation {i + 1} failed: {e}")
if not metric_values:
logger.error("All Monte Carlo simulations failed.")
return None, None, [], None, None, []
mean_metric, std_metric = np.mean(metric_values), np.std(metric_values, ddof=1)
mean_psf_diameter, std_psf_diameter = (
np.mean(psf_diameter_values),
np.std(psf_diameter_values, ddof=1),
)
if use_ks_statistic:
valid_p_values = [p for p in p_values if p is not None]
mean_p_value = np.mean(valid_p_values) if valid_p_values else None
std_p_value = np.std(valid_p_values, ddof=1) if valid_p_values else None
else:
mean_p_value = std_p_value = None
return (
mean_metric,
std_metric,
metric_values,
mean_p_value,
std_p_value,
p_values,
mean_psf_diameter,
std_psf_diameter,
psf_diameter_values,
)
[docs]
def write_monte_carlo_analysis(
mc_results, output_dir, tel_model, use_ks_statistic=False, fraction=DEFAULT_FRACTION
):
"""
Write Monte Carlo uncertainty analysis results to a log file.
Parameters
----------
mc_results : tuple
Tuple of Monte Carlo analysis results from analyze_monte_carlo_error().
output_dir : Path
Directory where the log file will be written.
tel_model : TelescopeModel
Telescope model object for naming the output file.
use_ks_statistic : bool, optional
If True, analyze KS statistic results; if False, analyze RMSD results (default: False).
fraction : float, optional
PSF containment fraction for labeling (default: 0.8).
Returns
-------
Path
Path to the created log file.
"""
(
mean_metric,
std_metric,
metric_values,
mean_p_value,
std_p_value,
p_values,
mean_psf_diameter,
std_psf_diameter,
psf_diameter_values,
) = mc_results
metric_name = "KS Statistic" if use_ks_statistic else "RMSD"
file_suffix = "ks" if use_ks_statistic else "rmsd"
mc_file = output_dir.joinpath(f"monte_carlo_{file_suffix}_analysis_{tel_model.name}.log")
psf_label = get_psf_diameter_label(fraction)
with open(mc_file, "w", encoding="utf-8") as f:
header = _create_log_header_and_format_value(
f"Monte Carlo {metric_name} Error Analysis",
tel_model,
{"Number of simulations": len(metric_values)},
)
f.write(header)
f.write(
f"MONTE CARLO SIMULATION RESULTS:\nNumber of successful simulations: "
f"{len(metric_values)}\n\n"
)
f.write(f"{metric_name.upper()} STATISTICS:\n")
f.write(
f"Mean {metric_name.lower()}: {mean_metric:.6f}\n"
f"Standard deviation: {std_metric:.6f}\n"
f"Minimum {metric_name.lower()}: {min(metric_values):.6f}\n"
f"Maximum {metric_name.lower()}: {max(metric_values):.6f}\n"
f"Relative error: {(std_metric / mean_metric) * 100:.2f}%\n\n"
)
if use_ks_statistic and mean_p_value is not None:
valid_p_values = [p for p in p_values if p is not None]
f.write(
f"P-VALUE STATISTICS:\nMean p-value: {mean_p_value:.6f}\n"
f"Standard deviation: {std_p_value:.6f}\n"
f"Minimum p-value: {min(valid_p_values):.6f}\n"
f"Maximum p-value: {max(valid_p_values):.6f}\n"
f"Relative error: {(std_p_value / mean_p_value) * 100:.2f}%\n"
)
good_fits = sum(1 for p in valid_p_values if p > 0.05)
fair_fits = sum(1 for p in valid_p_values if 0.01 < p <= 0.05)
poor_fits = sum(1 for p in valid_p_values if p <= 0.01)
f.write(
f"Good fits (p > 0.05): {good_fits}/{len(valid_p_values)} "
f"({100 * good_fits / len(valid_p_values):.1f}%)\n"
f"Fair fits (0.01 < p <= 0.05): {fair_fits}/{len(valid_p_values)} "
f"({100 * fair_fits / len(valid_p_values):.1f}%)\n"
f"Poor fits (p <= 0.01): {poor_fits}/{len(valid_p_values)} "
f"({100 * poor_fits / len(valid_p_values):.1f}%)\n\n"
)
f.write(
f"{psf_label} STATISTICS:\nMean {psf_label}: {mean_psf_diameter:.6f} cm\n"
f"Standard deviation: {std_psf_diameter:.6f} cm\n"
f"Minimum {psf_label}: {min(psf_diameter_values):.6f} cm\n"
f"Maximum {psf_label}: {max(psf_diameter_values):.6f} cm\n"
f"Relative error: {(std_psf_diameter / mean_psf_diameter) * 100:.2f}%\n\n"
)
f.write("INDIVIDUAL SIMULATION RESULTS:\n")
for i, (metric_val, p_value, psf_diameter) in enumerate(
zip(metric_values, p_values, psf_diameter_values)
):
if use_ks_statistic and p_value is not None:
if p_value > 0.05:
significance = "GOOD"
elif p_value > 0.01:
significance = "FAIR"
else:
significance = "POOR"
f.write(
f"Simulation {i + 1:2d}: {metric_name}={metric_val:.6f}, "
f"p_value={p_value:.6f} ({significance}), {psf_label}={psf_diameter:.6f} cm\n"
)
else:
f.write(
f"Simulation {i + 1:2d}: {metric_name}={metric_val:.6f}, "
f"{psf_label}={psf_diameter:.6f} cm\n"
)
return mc_file
def _handle_monte_carlo_analysis(
tel_model, site_model, args_dict, data_to_plot, radius, output_dir, use_ks_statistic
):
"""Handle Monte Carlo analysis if requested."""
if not args_dict.get("monte_carlo_analysis", False):
return False
mc_results = analyze_monte_carlo_error(tel_model, site_model, args_dict, data_to_plot, radius)
if mc_results[0] is not None:
mc_file = write_monte_carlo_analysis(
mc_results,
output_dir,
tel_model,
use_ks_statistic,
args_dict.get("fraction", DEFAULT_FRACTION),
)
logger.info(f"Monte Carlo analysis results written to {mc_file}")
mc_plot_file = output_dir.joinpath(f"monte_carlo_uncertainty_{tel_model.name}.pdf")
plot_psf.create_monte_carlo_uncertainty_plot(
mc_results, mc_plot_file, args_dict.get("fraction", DEFAULT_FRACTION), use_ks_statistic
)
return True
[docs]
def run_psf_optimization_workflow(tel_model, site_model, args_dict, output_dir):
"""Run the complete PSF parameter optimization workflow using gradient descent."""
data_to_plot, radius = load_and_process_data(args_dict)
use_ks_statistic = args_dict.get("ks_statistic", False)
if _handle_monte_carlo_analysis(
tel_model, site_model, args_dict, data_to_plot, radius, output_dir, use_ks_statistic
):
return
# Run gradient descent optimization
threshold = args_dict.get("rmsd_threshold")
learning_rate = args_dict.get("learning_rate")
best_pars, best_psf_diameter, gd_results = run_gradient_descent_optimization(
tel_model,
site_model,
args_dict,
data_to_plot,
radius,
rmsd_threshold=threshold,
learning_rate=learning_rate,
output_dir=output_dir,
)
# Check if optimization was successful
if not gd_results or best_pars is None:
logger.error("Gradient descent optimization failed. No valid results found.")
if radius is None:
logger.error(
"Possible cause: No PSF measurement data provided. "
"Use --data argument to provide PSF data."
)
return
plot_psf.create_optimization_plots(args_dict, gd_results, tel_model, data_to_plot, output_dir)
convergence_plot_file = output_dir.joinpath(
f"gradient_descent_convergence_{tel_model.name}.png"
)
plot_psf.create_gradient_descent_convergence_plot(
gd_results,
threshold,
convergence_plot_file,
args_dict.get("fraction", DEFAULT_FRACTION),
use_ks_statistic,
)
param_file = write_gradient_descent_log(
gd_results,
best_pars,
best_psf_diameter,
output_dir,
tel_model,
use_ks_statistic,
args_dict.get("fraction", DEFAULT_FRACTION),
)
logger.info(f"\nGradient descent progression written to {param_file}")
plot_psf.create_psf_vs_offaxis_plot(tel_model, site_model, args_dict, best_pars, output_dir)
if args_dict.get("write_psf_parameters", False):
logger.info("Exporting best parameters as model files...")
export_psf_parameters(
best_pars, args_dict.get("telescope"), args_dict.get("parameter_version"), output_dir
)