#!/usr/bin/python3
r"""
Derives the mirror alignment parameters using cumulative PSF measurement.
This includes parameters mirror_reflection_random_angle, \
mirror_align_random_horizontal and mirror_align_random_vertical.
The telescope zenith angle and the source distance can be set by command line arguments.
The measured cumulative PSF should be provided by using the command line argument data. \
A file name is expected, in which the file should contain 3 columns: radial distance in mm, \
differential value of photon intensity and its integral value.
The derivation is performed through a random search. A number of random combination of the \
parameters are tested and the best ones are selected based on the minimum value of \
the Root Mean Squared Deviation between data and simulations. The range in which the \
parameter are drawn uniformly are defined based on the previous value on the telescope model.
The assumption are:
a) mirror_align_random_horizontal and mirror_align_random_vertical are the same.
b) mirror_align_random_horizontal/vertical have no dependence on the zenith angle.
One example of the plot generated by this applications are shown below.
.. _derive_psf_parameters_plot:
.. image:: images/derive_psf_parameters.png
:width: 49 %
Command line arguments
----------------------
site (str, required)
North or South.
telescope (str, required)
Telescope model name (e.g. LST-1, SST-D, ...).
model_version (str, optional)
Model version.
src_distance (float, optional)
Source distance in km.
zenith (float, optional)
Zenith angle in deg.
data (str, optional)
Name of the data file with the measured cumulative PSF.
plot_all (activation mode, optional)
If activated, plots will be generated for all values tested during tuning.
fixed (activation mode, optional)
Keep the first entry of mirror_reflection_random_angle fixed.
test (activation mode, optional)
If activated, application will be faster by simulating fewer photons.
Example
-------
LSTN-01 5.0.0
Runtime < 3 min.
Get PSF data from the DB:
.. code-block:: console
simtools-db-get-file-from-db --file_name PSFcurve_data_v2.txt
Run the application:
.. code-block:: console
simtools-derive-psf-parameters --site North --telescope LSTN-01 \\
--model_version 6.0.0 --data PSFcurve_data_v2.txt --plot_all --test
The output is saved in simtools-output/derive_psf_parameters.
Expected final print-out message:
.. code-block:: console
Best parameters:
mirror_reflection_random_angle = [0.006, 0.133, 0.005]
mirror_align_random_horizontal = [0.005, 28.0, 0.0, 0.0]
mirror_align_random_vertical = [0.005, 28.0, 0.0, 0.0]
"""
import logging
from collections import OrderedDict
import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.backends.backend_pdf import PdfPages
import simtools.utils.general as gen
from simtools.configuration import configurator
from simtools.io_operations import io_handler
from simtools.model.telescope_model import TelescopeModel
from simtools.ray_tracing.ray_tracing import RayTracing
from simtools.visualization import visualize
[docs]
def load_data(data_file):
"""
Load data from a text file containing cumulative PSF measurements.
Parameters
----------
data_file : str
Name of the data file with the measured cumulative PSF.
Returns
-------
numpy.ndarray
Loaded and processed data from the file.
"""
radius_cm = "Radius [cm]"
cumulative_psf = "Cumulative PSF"
d_type = {"names": (radius_cm, cumulative_psf), "formats": ("f8", "f8")}
data = np.loadtxt(data_file, dtype=d_type, usecols=(0, 2))
data[radius_cm] *= 0.1
data[cumulative_psf] /= np.max(np.abs(data[cumulative_psf]))
return data
def _parse():
config = configurator.Configurator(
description=(
"Derive mirror_reflection_random_angle, mirror_align_random_horizontal "
"and mirror_align_random_vertical using cumulative PSF measurement."
)
)
config.parser.add_argument(
"--src_distance",
help="Source distance in km",
type=float,
default=10,
)
config.parser.add_argument("--zenith", help="Zenith angle in deg", type=float, default=20)
config.parser.add_argument(
"--data", help="Data file name with the measured PSF vs radius [cm]", type=str
)
config.parser.add_argument(
"--plot_all",
help=(
"On: plot cumulative PSF for all tested combinations, "
"Off: plot it only for the best set of values"
),
action="store_true",
)
config.parser.add_argument(
"--fixed",
help=("Keep the first entry of mirror_reflection_random_angle fixed."),
action="store_true",
)
return config.initialize(db_config=True, simulation_model="telescope")
[docs]
def add_parameters(
all_parameters,
mirror_reflection,
mirror_align,
mirror_reflection_fraction=0.15,
mirror_reflection_2=0.035,
):
"""
Transform and add parameters to the all_parameters list.
Parameters
----------
mirror_reflection : float
The random angle of mirror reflection.
mirror_align : float
The random angle for mirror alignment (both horizontal and vertical).
mirror_reflection_fraction : float, optional
The fraction of the mirror reflection. Default is 0.15.
mirror_reflection_2 : float, optional
A secondary random angle for mirror reflection. Default is 0.035.
Returns
-------
None
Updates the all_parameters list in place.
"""
# If we want to start from values different than the ones currently in the model:
# align = 0.0046
# pars_to_change = {
# 'mirror_reflection_random_angle': '0.0075 0.125 0.0037',
# 'mirror_align_random_horizontal': f'{align} 28 0 0',
# 'mirror_align_random_vertical': f'{align} 28 0 0',
# }
# tel_model.change_multiple_parameters(**pars_to_change)
pars = {
"mirror_reflection_random_angle": [
mirror_reflection,
mirror_reflection_fraction,
mirror_reflection_2,
],
"mirror_align_random_horizontal": [mirror_align, 28.0, 0.0, 0.0],
"mirror_align_random_vertical": [mirror_align, 28.0, 0.0, 0.0],
}
all_parameters.append(pars)
[docs]
def get_previous_values(tel_model, logger):
"""
Retrieve previous parameter values from the telescope model.
Parameters
----------
tel_model : TelescopeModel
Telescope model object.
logger : logging.Logger
Logger object for logging messages.
Returns
-------
tuple
Tuple containing the previous values of mirror_reflection_random_angle (first entry),
mirror_reflection_fraction, second entry), mirror_reflection_random_angle (third entry),
and mirror_align_random_horizontal/vertical.
"""
split_par = tel_model.get_parameter_value("mirror_reflection_random_angle")
mrra_0, mfr_0, mrra2_0 = split_par[0], split_par[1], split_par[2]
mar_0 = tel_model.get_parameter_value("mirror_align_random_horizontal")[0]
logger.debug(
"Previous parameter values:\n"
f"MRRA = {mrra_0!s}\n"
f"MRF = {mfr_0!s}\n"
f"MRRA2 = {mrra2_0!s}\n"
f"MAR = {mar_0!s}\n"
)
return mrra_0, mfr_0, mrra2_0, mar_0
[docs]
def generate_random_parameters(
all_parameters, n_runs, args_dict, mrra_0, mfr_0, mrra2_0, mar_0, logger
):
"""
Generate random parameters for tuning.
Parameters
----------
all_parameters : list
List to store all parameter sets.
n_runs : int
Number of random parameter combinations to test.
args_dict : dict
Dictionary containing parsed command-line arguments.
mrra_0 : float
Initial value of mirror_reflection_random_angle.
mfr_0 : float
Initial value of mirror_reflection_fraction.
mrra2_0 : float
Initial value of the second mirror_reflection_random_angle.
mar_0 : float
Initial value of mirror_align_random_horizontal/vertical.
logger : logging.Logger
Logger object for logging messages.
"""
# Range around the previous values are hardcoded
# Number of runs is hardcoded
if args_dict["fixed"]:
logger.debug("fixed=True - First entry of mirror_reflection_random_angle is kept fixed.")
for _ in range(n_runs):
mrra_range = 0.004 if not args_dict["fixed"] else 0
mrf_range = 0.1
mrra2_range = 0.03
mar_range = 0.005
rng = np.random.default_rng()
mrra = rng.uniform(max(mrra_0 - mrra_range, 0), mrra_0 + mrra_range)
mrf = rng.uniform(max(mfr_0 - mrf_range, 0), mfr_0 + mrf_range)
mrra2 = rng.uniform(max(mrra2_0 - mrra2_range, 0), mrra2_0 + mrra2_range)
mar = rng.uniform(max(mar_0 - mar_range, 0), mar_0 + mar_range)
add_parameters(all_parameters, mrra, mar, mrf, mrra2)
[docs]
def load_and_process_data(args_dict):
"""
Load and process data if specified in the command-line arguments.
Returns
-------
- data_to_plot: OrderedDict containing loaded and processed data.
- radius: Radius data from loaded data (if available).
"""
data_to_plot = OrderedDict()
radius = None
if args_dict["data"] is not None:
data_file = gen.find_file(args_dict["data"], args_dict["model_path"])
data_to_plot["measured"] = load_data(data_file)
radius = data_to_plot["measured"]["Radius [cm]"]
return data_to_plot, radius
[docs]
def calculate_rmsd(data, sim):
"""Calculate Root Mean Squared Deviation to be used as metric to find the best parameters."""
return np.sqrt(np.mean((data - sim) ** 2))
[docs]
def run_pars(tel_model, args_dict, pars, data_to_plot, radius, pdf_pages):
"""
Run the tuning for one set of parameters, add a plot to the pdfPages and return RMSD and D80.
Plotting is optional (if plot=True).
"""
cumulative_psf = "Cumulative PSF"
if pars is not None:
tel_model.change_multiple_parameters(**pars)
else:
raise ValueError("No best parameters found")
ray = RayTracing(
telescope_model=tel_model,
simtel_path=args_dict["simtel_path"],
zenith_angle=args_dict["zenith"] * u.deg,
source_distance=args_dict["src_distance"] * u.km,
off_axis_angle=[0.0] * u.deg,
)
ray.simulate(test=args_dict["test"], force=True)
ray.analyze(force=True, use_rx=False)
# Plotting cumulative PSF
im = ray.images()[0]
d80 = im.get_psf()
if radius is not None:
# Simulated cumulative PSF
data_to_plot["simulated"] = im.get_cumulative_data(radius * u.cm)
else:
raise ValueError("Radius data is not available.")
rmsd = calculate_rmsd(
data_to_plot["measured"][cumulative_psf], data_to_plot["simulated"][cumulative_psf]
)
if args_dict["plot_all"]:
fig = visualize.plot_1d(
data_to_plot,
plot_difference=True,
no_markers=True,
)
ax = fig.get_axes()[0]
ax.set_ylim(0, 1.05)
ax.set_title(
f"refl_rnd={pars['mirror_reflection_random_angle']}, "
f"align_rnd={pars['mirror_align_random_vertical']}"
)
ax.text(
0.8,
0.3,
f"D80 = {d80:.3f} cm\nRMSD = {rmsd:.4f}",
verticalalignment="center",
horizontalalignment="center",
transform=ax.transAxes,
)
plt.tight_layout()
pdf_pages.savefig(fig)
plt.clf()
return d80, rmsd
[docs]
def find_best_parameters(all_parameters, tel_model, args_dict, data_to_plot, radius, pdf_pages):
"""
Find the best parameters from all parameter sets.
Returns
-------
- Tuple of best parameters and their D80 value.
"""
min_rmsd = 100
best_pars = None
for pars in all_parameters:
_, rmsd = run_pars(tel_model, args_dict, pars, data_to_plot, radius, pdf_pages)
if rmsd < min_rmsd:
min_rmsd = rmsd
best_pars = pars
return best_pars, min_rmsd
def main(): # noqa: D103
args_dict, db_config = _parse()
label = "tune_psf"
logger = logging.getLogger()
logger.setLevel(gen.get_log_level_from_user(args_dict["log_level"]))
# Output directory to save files related directly to this app
_io_handler = io_handler.IOHandler()
output_dir = _io_handler.get_output_directory(label, sub_dir="application-plots")
tel_model = TelescopeModel(
site=args_dict["site"],
telescope_name=args_dict["telescope"],
mongo_db_config=db_config,
model_version=args_dict["model_version"],
label=label,
)
all_parameters = []
mrra_0, mfr_0, mrra2_0, mar_0 = get_previous_values(tel_model, logger)
n_runs = 5 if args_dict["test"] else 50
generate_random_parameters(
all_parameters, n_runs, args_dict, mrra_0, mfr_0, mrra2_0, mar_0, logger
)
data_to_plot, radius = load_and_process_data(args_dict)
# Preparing figure name
plot_file_name = "_".join((label, tel_model.name + ".pdf"))
plot_file = output_dir.joinpath(plot_file_name)
pdf_pages = PdfPages(plot_file)
best_pars, _ = find_best_parameters(
all_parameters, tel_model, args_dict, data_to_plot, radius, pdf_pages
)
# Rerunning and plotting the best pars
run_pars(tel_model, args_dict, best_pars, data_to_plot, radius, pdf_pages)
plt.close()
pdf_pages.close()
# Printing the results
print("Best parameters:")
for par, value in best_pars.items():
print(f"{par} = {value}")
if __name__ == "__main__":
main()