"""
PSF parameter optimisation and fitting routines for mirror alignment and reflection parameters.
This module provides functions for loading PSF data, generating random parameter sets,
running PSF simulations, calculating RMSD, and finding the best-fit parameters for a given
telescope model.
PSF (Point Spread Function) describes how a point source of light is spread out by the
optical system, and RMSD (Root Mean Squared Deviation) is used as the optimization metric
to quantify the difference between measured and simulated PSF curves.
"""
import logging
from collections import OrderedDict
import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.backends.backend_pdf import PdfPages
from simtools.data_model import model_data_writer as writer
from simtools.model import model_utils
from simtools.ray_tracing.ray_tracing import RayTracing
from simtools.utils import general as gen
from simtools.visualization import visualize
logger = logging.getLogger(__name__)
# Constants
RADIUS_CM = "Radius [cm]"
CUMULATIVE_PSF = "Cumulative PSF"
MRRA_RANGE_DEFAULT = 0.004 # Mirror reflection random angle range
MRF_RANGE_DEFAULT = 0.1 # Mirror reflection fraction range
MRRA2_RANGE_DEFAULT = 0.03 # Second mirror reflection random angle range
MAR_RANGE_DEFAULT = 0.005 # Mirror alignment random range
MAX_OFFSET_DEFAULT = 4.5 # Maximum off-axis angle in degrees
OFFSET_STEPS_DEFAULT = 0.1 # Step size for off-axis angle sampling
[docs]
def load_psf_data(data_file):
"""
Load data from a text file containing cumulative PSF measurements.
Parameters
----------
data_file : str
Name of the data file with the measured cumulative PSF.
Expected format:
Column 0: radial distance in mm
Column 2: cumulative PSF values
Returns
-------
numpy.ndarray
Loaded and processed data with radius in cm and normalized cumulative PSF.
"""
d_type = {"names": (RADIUS_CM, CUMULATIVE_PSF), "formats": ("f8", "f8")}
data = np.loadtxt(data_file, dtype=d_type, usecols=(0, 2))
data[RADIUS_CM] *= 0.1 # Convert from mm to cm
data[CUMULATIVE_PSF] /= np.max(np.abs(data[CUMULATIVE_PSF])) # Normalize to max = 1.0
return data
[docs]
def calculate_rmsd(data, sim):
"""Calculate Root Mean Squared Deviation to be used as metric to find the best parameters."""
return np.sqrt(np.mean((data - sim) ** 2))
[docs]
def add_parameters(
all_parameters,
mirror_reflection,
mirror_align,
mirror_reflection_fraction=0.15,
mirror_reflection_2=0.035,
):
"""
Transform and add parameters to the all_parameters list.
Parameters
----------
mirror_reflection : float
The random angle of mirror reflection.
mirror_align : float
The random angle for mirror alignment (both horizontal and vertical).
mirror_reflection_fraction : float, optional
The fraction of the mirror reflection. Default is 0.15.
mirror_reflection_2 : float, optional
A secondary random angle for mirror reflection. Default is 0.035.
Returns
-------
None
Updates the all_parameters list in place.
"""
pars = {
"mirror_reflection_random_angle": [
mirror_reflection,
mirror_reflection_fraction,
mirror_reflection_2,
],
"mirror_align_random_horizontal": [mirror_align, 28.0, 0.0, 0.0],
"mirror_align_random_vertical": [mirror_align, 28.0, 0.0, 0.0],
}
all_parameters.append(pars)
[docs]
def get_previous_values(tel_model):
"""
Retrieve previous parameter values from the telescope model.
Parameters
----------
tel_model : TelescopeModel
Telescope model object.
Returns
-------
tuple
Tuple containing the previous values of mirror_reflection_random_angle (first entry),
mirror_reflection_fraction, second entry), mirror_reflection_random_angle (third entry),
and mirror_align_random_horizontal/vertical.
"""
split_par = tel_model.get_parameter_value("mirror_reflection_random_angle")
mrra_0, mfr_0, mrra2_0 = split_par[0], split_par[1], split_par[2]
mar_0 = tel_model.get_parameter_value("mirror_align_random_horizontal")[0]
logger.debug(
"Previous parameter values:\n"
f"MRRA = {mrra_0!s}\n"
f"MRF = {mfr_0!s}\n"
f"MRRA2 = {mrra2_0!s}\n"
f"MAR = {mar_0!s}\n"
)
return mrra_0, mfr_0, mrra2_0, mar_0
[docs]
def generate_random_parameters(
all_parameters, n_runs, args_dict, mrra_0, mfr_0, mrra2_0, mar_0, tel_model
):
"""
Generate random parameters for tuning.
The parameter ranges around the previous values are configurable via module constants.
Parameters
----------
all_parameters : list
List to store all parameter sets.
n_runs : int
Number of random parameter combinations to test.
args_dict : dict
Dictionary containing parsed command-line arguments.
mrra_0 : float
Initial value of mirror_reflection_random_angle.
mfr_0 : float
Initial value of mirror_reflection_fraction.
mrra2_0 : float
Initial value of the second mirror_reflection_random_angle.
mar_0 : float
Initial value of mirror_align_random_horizontal/vertical.
tel_model : TelescopeModel
Telescope model object to check if it's a dual mirror telescope.
"""
if args_dict["fixed"]:
logger.debug("fixed=True - First entry of mirror_reflection_random_angle is kept fixed.")
is_dual_mirror = model_utils.is_two_mirror_telescope(tel_model.name)
if is_dual_mirror:
mar_fixed_value = 0.0
else:
mar_fixed_value = None
for _ in range(n_runs):
mrra_range = MRRA_RANGE_DEFAULT if not args_dict["fixed"] else 0
mrf_range = MRF_RANGE_DEFAULT
mrra2_range = MRRA2_RANGE_DEFAULT
mar_range = MAR_RANGE_DEFAULT
rng = np.random.default_rng(seed=args_dict.get("random_seed"))
mrra = rng.uniform(max(mrra_0 - mrra_range, 0), mrra_0 + mrra_range)
mrf = rng.uniform(max(mfr_0 - mrf_range, 0), mfr_0 + mrf_range)
mrra2 = rng.uniform(max(mrra2_0 - mrra2_range, 0), mrra2_0 + mrra2_range)
# Set mar to 0 for dual mirror telescopes, otherwise use random value
if mar_fixed_value is not None:
mar = mar_fixed_value
else:
mar = rng.uniform(max(mar_0 - mar_range, 0), mar_0 + mar_range)
add_parameters(all_parameters, mrra, mar, mrf, mrra2)
def _run_ray_tracing_simulation(tel_model, site_model, args_dict, pars):
"""
Run a ray tracing simulation with the given telescope parameters.
Parameters
----------
tel_model : TelescopeModel
Telescope model object.
site_model : SiteModel
Site model object.
args_dict : dict
Dictionary containing parsed command-line arguments.
pars : dict
Parameter set dictionary.
Returns
-------
tuple
(d80, simulated_data) - D80 value and simulated data from ray tracing.
"""
if pars is not None:
tel_model.change_multiple_parameters(**pars)
else:
raise ValueError("No best parameters found")
ray = RayTracing(
telescope_model=tel_model,
site_model=site_model,
simtel_path=args_dict["simtel_path"],
zenith_angle=args_dict["zenith"] * u.deg,
source_distance=args_dict["src_distance"] * u.km,
off_axis_angle=[0.0] * u.deg,
)
ray.simulate(test=args_dict.get("test", False), force=True)
ray.analyze(force=True, use_rx=False)
im = ray.images()[0]
d80 = im.get_psf()
return d80, im
def _create_psf_simulation_plot(data_to_plot, pars, d80, rmsd, is_best, pdf_pages):
"""
Create a plot for PSF simulation results.
Parameters
----------
data_to_plot : dict
Data dictionary for plotting.
pars : dict
Parameter set dictionary.
d80 : float
D80 value.
rmsd : float
RMSD value.
is_best : bool
Whether this is the best parameter set.
pdf_pages : PdfPages
PDF pages object for saving plots.
"""
fig = visualize.plot_1d(
data_to_plot,
plot_difference=True,
no_markers=True,
)
ax = fig.get_axes()[0]
ax.set_ylim(0, 1.05)
ax.set_ylabel(CUMULATIVE_PSF)
title_prefix = "* " if is_best else ""
ax.set_title(
f"{title_prefix}refl_rnd = "
f"{pars['mirror_reflection_random_angle'][0]:.5f}, "
f"{pars['mirror_reflection_random_angle'][1]:.5f}, "
f"{pars['mirror_reflection_random_angle'][2]:.5f}\n"
f"align_rnd = {pars['mirror_align_random_vertical'][0]:.5f}, "
f"{pars['mirror_align_random_vertical'][1]:.5f}, "
f"{pars['mirror_align_random_vertical'][2]:.5f}, "
f"{pars['mirror_align_random_vertical'][3]:.5f}"
)
d80_color = "red" if is_best else "black"
d80_weight = "bold" if is_best else "normal"
d80_text = f"D80 = {d80:.5f} cm"
ax.text(
0.5,
0.3,
f"{d80_text}\nRMSD = {rmsd:.4f}",
verticalalignment="center",
horizontalalignment="left",
transform=ax.transAxes,
color=d80_color,
weight=d80_weight,
bbox={"boxstyle": "round,pad=0.3", "facecolor": "yellow", "alpha": 0.7}
if is_best
else None,
)
if is_best:
fig.text(
0.02,
0.02,
"* Best parameter set (lowest RMSD)",
fontsize=8,
style="italic",
color="red",
)
pdf_pages.savefig(fig, bbox_inches="tight")
plt.clf()
[docs]
def run_psf_simulation(
tel_model,
site_model,
args_dict,
pars,
data_to_plot,
radius,
pdf_pages=None,
is_best=False,
return_simulated_data=False,
):
"""
Run the simulation for one set of parameters and return D80, RMSD.
Parameters
----------
tel_model : TelescopeModel
Telescope model object.
site_model : SiteModel
Site model object.
args_dict : dict
Dictionary containing parsed command-line arguments.
pars : dict
Parameter set dictionary.
data_to_plot : dict
Data dictionary for plotting.
radius : array-like
Radius data.
pdf_pages : PdfPages, optional
PDF pages object for plotting. If None, no plotting is done.
is_best : bool, optional
Whether this is the best parameter set for highlighting in plots.
return_simulated_data : bool, optional
If True, returns simulated data as third element in return tuple.
Returns
-------
tuple
(d80, rmsd) if return_simulated_data=False
(d80, rmsd, simulated_data) if return_simulated_data=True
"""
d80, im = _run_ray_tracing_simulation(tel_model, site_model, args_dict, pars)
if radius is None:
raise ValueError("Radius data is not available.")
simulated_data = im.get_cumulative_data(radius * u.cm)
rmsd = calculate_rmsd(data_to_plot["measured"][CUMULATIVE_PSF], simulated_data[CUMULATIVE_PSF])
# Handle plotting if requested
if pdf_pages is not None and args_dict.get("plot_all", False):
data_to_plot["simulated"] = simulated_data
_create_psf_simulation_plot(data_to_plot, pars, d80, rmsd, is_best, pdf_pages)
del data_to_plot["simulated"]
return (d80, rmsd, simulated_data) if return_simulated_data else (d80, rmsd)
[docs]
def load_and_process_data(args_dict):
"""
Load and process data if specified in the command-line arguments.
Returns
-------
- data_to_plot: OrderedDict containing loaded and processed data.
- radius: Radius data from loaded data (if available).
"""
data_to_plot = OrderedDict()
radius = None
if args_dict["data"] is not None:
data_file = gen.find_file(args_dict["data"], args_dict["model_path"])
data_to_plot["measured"] = load_psf_data(data_file)
radius = data_to_plot["measured"][RADIUS_CM]
return data_to_plot, radius
def _create_plot_for_parameters(pars, rmsd, d80, simulated_data, data_to_plot, is_best, pdf_pages):
"""
Create a single plot for a parameter set.
Parameters
----------
pars : dict
Parameter set dictionary
rmsd : float
RMSD value for this parameter set
d80 : float
D80 value for this parameter set
simulated_data : array
Simulated data for plotting
data_to_plot : dict
Data dictionary for plotting
is_best : bool
Whether this is the best parameter set
pdf_pages : PdfPages
PDF pages object to save the plot
"""
original_simulated = data_to_plot.get("simulated")
data_to_plot["simulated"] = simulated_data
fig = visualize.plot_1d(
data_to_plot,
plot_difference=True,
no_markers=True,
)
ax = fig.get_axes()[0]
ax.set_ylim(0, 1.05)
ax.set_ylabel(CUMULATIVE_PSF)
title_prefix = "* " if is_best else ""
ax.set_title(
f"{title_prefix}reflection = "
f"{pars['mirror_reflection_random_angle'][0]:.5f}, "
f"{pars['mirror_reflection_random_angle'][1]:.5f}, "
f"{pars['mirror_reflection_random_angle'][2]:.5f}\n"
f"align_vertical = {pars['mirror_align_random_vertical'][0]:.5f}, "
f"{pars['mirror_align_random_vertical'][1]:.5f}, "
f"{pars['mirror_align_random_vertical'][2]:.5f}, "
f"{pars['mirror_align_random_vertical'][3]:.5f}\n"
f"align_horizontal = {pars['mirror_align_random_horizontal'][0]:.5f}, "
f"{pars['mirror_align_random_horizontal'][1]:.5f}, "
f"{pars['mirror_align_random_horizontal'][2]:.5f}, "
f"{pars['mirror_align_random_horizontal'][3]:.5f}"
)
d80_color = "red" if is_best else "black"
d80_weight = "bold" if is_best else "normal"
ax.text(
0.5,
0.3,
f"D80 = {d80:.5f} cm\nRMSD = {rmsd:.4f}",
verticalalignment="center",
horizontalalignment="left",
transform=ax.transAxes,
color=d80_color,
weight=d80_weight,
bbox={"boxstyle": "round,pad=0.3", "facecolor": "yellow", "alpha": 0.7}
if is_best
else None,
)
if is_best:
fig.text(
0.02,
0.02,
"* Best parameter set (lowest RMSD)",
fontsize=8,
style="italic",
color="red",
)
pdf_pages.savefig(fig, bbox_inches="tight")
plt.clf()
if original_simulated is not None:
data_to_plot["simulated"] = original_simulated
def _create_all_plots(results, best_pars, data_to_plot, pdf_pages):
"""
Create plots for all parameter sets if requested.
Parameters
----------
results : list
List of (pars, rmsd, d80, simulated_data) tuples
best_pars : dict
Best parameter set for highlighting
data_to_plot : dict
Data dictionary for plotting
pdf_pages : PdfPages
PDF pages object to save plots
"""
logger.info("Creating plots for all parameter sets...")
for i, (pars, rmsd, d80, simulated_data) in enumerate(results):
is_best = pars is best_pars
logger.info(f"Creating plot {i + 1}/{len(results)}{' (BEST)' if is_best else ''}")
_create_plot_for_parameters(
pars, rmsd, d80, simulated_data, data_to_plot, is_best, pdf_pages
)
[docs]
def find_best_parameters(
all_parameters, tel_model, site_model, args_dict, data_to_plot, radius, pdf_pages=None
):
"""
Find the best parameters by running simulations for all parameter sets.
Loop over all parameter sets, run the simulation, compute RMSD,
and return the best parameters and their RMSD.
"""
best_rmsd = float("inf")
best_pars = None
best_d80 = None
results = [] # Store (pars, rmsd, d80, simulated_data)
logger.info(f"Running {len(all_parameters)} simulations...")
for i, pars in enumerate(all_parameters):
try:
logger.info(f"Running simulation {i + 1}/{len(all_parameters)}")
d80, rmsd, simulated_data = run_psf_simulation(
tel_model,
site_model,
args_dict,
pars,
data_to_plot,
radius,
return_simulated_data=True,
pdf_pages=None,
)
except (ValueError, RuntimeError) as e:
logger.warning(f"Simulation failed for parameters {pars}: {e}")
continue
results.append((pars, rmsd, d80, simulated_data))
if rmsd < best_rmsd:
best_rmsd = rmsd
best_pars = pars
best_d80 = d80
logger.info(f"Best RMSD found: {best_rmsd:.5f}")
# Create all plots if requested
if pdf_pages is not None and args_dict.get("plot_all", False) and results:
_create_all_plots(results, best_pars, data_to_plot, pdf_pages)
return best_pars, best_d80, results
[docs]
def create_d80_vs_offaxis_plot(tel_model, site_model, args_dict, best_pars, output_dir):
"""
Create D80 vs off-axis angle plot using the best parameters.
Parameters
----------
tel_model : TelescopeModel
Telescope model object.
site_model : SiteModel
Site model object.
args_dict : dict
Dictionary containing parsed command-line arguments.
best_pars : dict
Best parameter set.
output_dir : Path
Output directory for saving plots.
"""
logger.info("Creating D80 vs off-axis angle plot with best parameters...")
# Apply best parameters to telescope model
tel_model.change_multiple_parameters(**best_pars)
# Create off-axis angle array
max_offset = args_dict.get("max_offset", MAX_OFFSET_DEFAULT)
offset_steps = args_dict.get("offset_steps", OFFSET_STEPS_DEFAULT)
off_axis_angles = np.linspace(
0,
max_offset,
int(max_offset / offset_steps) + 1,
)
ray = RayTracing(
telescope_model=tel_model,
site_model=site_model,
simtel_path=args_dict["simtel_path"],
zenith_angle=args_dict["zenith"] * u.deg,
source_distance=args_dict["src_distance"] * u.km,
off_axis_angle=off_axis_angles * u.deg,
)
logger.info(f"Running ray tracing for {len(off_axis_angles)} off-axis angles...")
ray.simulate(test=args_dict.get("test", False), force=True)
ray.analyze(force=True)
for key in ["d80_cm", "d80_deg"]:
plt.figure(figsize=(10, 6), tight_layout=True)
ray.plot(key, marker="o", linestyle="-", color="blue", linewidth=2, markersize=6)
plt.title(
f"PSF D80 vs Off-axis Angle - {tel_model.name}\n"
f"Best Parameters: \n"
f"reflection=[{best_pars['mirror_reflection_random_angle'][0]:.4f},"
f"{best_pars['mirror_reflection_random_angle'][1]:.4f},"
f"{best_pars['mirror_reflection_random_angle'][2]:.4f}],\n"
f"align_horizontal={best_pars['mirror_align_random_horizontal'][0]:.4f}\n"
f"align_vertical={best_pars['mirror_align_random_vertical'][0]:.4f}\n"
)
plt.xlabel("Off-axis Angle (degrees)")
plt.ylabel("D80 (cm)" if key == "d80_cm" else "D80 (degrees)")
plt.ylim(bottom=0)
plt.xticks(rotation=45)
plt.xlim(0, max_offset)
plt.grid(True, alpha=0.3)
plot_file_name = f"tune_psf_{tel_model.name}_best_params_{key}.pdf"
plot_file = output_dir.joinpath(plot_file_name)
visualize.save_figure(plt, plot_file, log_title=f"D80 vs off-axis ({key})")
plt.close("all")
[docs]
def write_tested_parameters_to_file(results, best_pars, best_d80, output_dir, tel_model):
"""
Write all tested parameters and their metrics to a text file.
Parameters
----------
results : list
List of (pars, rmsd, d80, simulated_data) tuples
best_pars : dict
Best parameter set
best_d80 : float
Best D80 value
output_dir : Path
Output directory path
tel_model : TelescopeModel
Telescope model object for filename generation
"""
param_file = output_dir.joinpath(f"psf_optimization_{tel_model.name}.log")
with open(param_file, "w", encoding="utf-8") as f:
f.write("# PSF Parameter Optimization Log\n")
f.write(f"# Telescope: {tel_model.name}\n")
f.write(f"# Total parameter sets tested: {len(results)}\n")
f.write("#" + "=" * 60 + "\n\n")
f.write("PARAMETER TESTING RESULTS:\n")
for i, (pars, rmsd, d80, _) in enumerate(results):
is_best = pars is best_pars
status = "BEST" if is_best else "TESTED"
f.write(f"[{status}] Set {i + 1:03d}: RMSD={rmsd:.5f}, D80={d80:.5f} cm\n")
for par, value in pars.items():
f.write(f" {par}: {value}\n")
f.write("\n")
f.write("OPTIMIZATION SUMMARY:\n")
f.write(f"Best RMSD: {min(result[1] for result in results):.5f}\n")
f.write(f"Best D80: {best_d80:.5f} cm\n")
f.write("\nOPTIMIZED PARAMETERS:\n")
for par, value in best_pars.items():
f.write(f"{par}: {value}\n")
return param_file
def _add_units_to_psf_parameters(best_pars):
"""
Add proper astropy units to PSF parameters based on their schemas.
Parameters
----------
best_pars : dict
Dictionary with PSF parameter names as keys and values as lists
Returns
-------
dict
Dictionary with same keys but values converted to astropy quantities with units
"""
psf_pars_with_units = {}
for param_name, param_values in best_pars.items():
if param_name == "mirror_reflection_random_angle":
psf_pars_with_units[param_name] = [
param_values[0] * u.deg,
param_values[1] * u.dimensionless_unscaled,
param_values[2] * u.deg,
]
elif param_name in ["mirror_align_random_horizontal", "mirror_align_random_vertical"]:
psf_pars_with_units[param_name] = [
param_values[0] * u.deg,
param_values[1] * u.deg,
param_values[2] * u.dimensionless_unscaled,
param_values[3] * u.dimensionless_unscaled,
]
else:
psf_pars_with_units[param_name] = param_values
return psf_pars_with_units
[docs]
def export_psf_parameters(best_pars, tel_model, parameter_version, output_dir):
"""
Export PSF parameters as simulation model parameter files.
Parameters
----------
best_pars : dict
Best parameter set
tel_model : TelescopeModel
Telescope model object
parameter_version : str
Parameter version string
output_dir : Path
Output directory path
"""
try:
logger.info("Exporting best PSF parameters as simulation model parameter files")
psf_pars_with_units = _add_units_to_psf_parameters(best_pars)
parameter_output_path = output_dir / tel_model.name
for parameter_name, parameter_value in psf_pars_with_units.items():
writer.ModelDataWriter.dump_model_parameter(
parameter_name=parameter_name,
value=parameter_value,
instrument=tel_model.name,
parameter_version=parameter_version,
output_file=f"{parameter_name}-{parameter_version}.json",
output_path=parameter_output_path,
use_plain_output_path=True,
)
logger.info(f"simulation model parameter files exported to {output_dir}")
except ImportError as e:
logger.warning(f"Could not export simulation parameters: {e}")
except (ValueError, KeyError, OSError) as e:
logger.error(f"Error exporting simulation parameters: {e}")
[docs]
def run_psf_optimization_workflow(tel_model, site_model, args_dict, output_dir):
"""
Run the complete PSF parameter optimization workflow.
This function consolidates the main optimization logic to make the application lighter.
Parameters
----------
tel_model : TelescopeModel
Telescope model object
site_model : SiteModel
Site model object
args_dict : dict
Dictionary containing parsed command-line arguments
output_dir : Path
Output directory path
Returns
-------
None
All results are saved to files and printed to console
"""
# Generate parameter sets
all_parameters = []
mrra_0, mfr_0, mrra2_0, mar_0 = get_previous_values(tel_model)
n_runs = args_dict.get("n_runs")
generate_random_parameters(
all_parameters, n_runs, args_dict, mrra_0, mfr_0, mrra2_0, mar_0, tel_model
)
data_to_plot, radius = load_and_process_data(args_dict)
# Preparing figure name and PDF pages for plotting
plot_file_name = "_".join(("tune_psf", tel_model.name + ".pdf"))
plot_file = output_dir.joinpath(plot_file_name)
pdf_pages = PdfPages(plot_file)
# Find best parameters
best_pars, best_d80, results = find_best_parameters(
all_parameters, tel_model, site_model, args_dict, data_to_plot, radius, pdf_pages
)
plt.close()
pdf_pages.close()
# Write all tested parameters and their metrics to a file
param_file = write_tested_parameters_to_file(
results, best_pars, best_d80, output_dir, tel_model
)
print(f"\nParameter results written to {param_file}")
# Automatically create D80 vs off-axis angle plot for best parameters
create_d80_vs_offaxis_plot(tel_model, site_model, args_dict, best_pars, output_dir)
print("D80 vs off-axis angle plots created successfully")
print("\nBest parameters:")
for par, value in best_pars.items():
print(f"{par} = {value}")
# Export best parameters as simulation model parameter files (if flag is provided)
if args_dict.get("write_psf_parameters", False):
export_psf_parameters(
best_pars,
tel_model,
args_dict.get("parameter_version", "0.0.0"),
output_dir.parent,
)