"""Ray tracing simulations and analysis."""
import gzip
import logging
import shlex
import shutil
import subprocess
from copy import copy
from math import pi, tan
from pathlib import Path
import astropy.io.ascii
import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
from astropy.table import QTable
import simtools.utils.general as gen
from simtools.io_operations import io_handler
from simtools.model.model_utils import compute_telescope_transmission
from simtools.model.telescope_model import TelescopeModel
from simtools.psf_analysis import PSFImage
from simtools.simtel.simulator_ray_tracing import SimulatorRayTracing
from simtools.utils import names, value_conversion
from simtools.visualization import visualize
__all__ = ["RayTracing"]
INVALID_KEY_TO_PLOT = "Invalid key to plot"
[docs]
class RayTracing:
"""
Ray tracing simulations and analysis.
Parameters
----------
telescope_model: TelescopeModel
Instance of the TelescopeModel class.
label: str
Instance label.
simtel_path: str (or Path)
Location of sim_telarray installation.
config_data: dict.
Dict containing the configurable parameters.
config_file: str or Path
Path of the yaml file containing the configurable parameters.
"""
YLABEL = {
"d80_cm": r"$D_{80}$",
"d80_deg": r"$D_{80}$",
"eff_area": "Eff. mirror area",
"eff_flen": "Eff. focal length",
}
def __init__(
self,
telescope_model,
simtel_path,
label=None,
config_data=None,
config_file=None,
):
"""Initialize RayTracing class."""
self._logger = logging.getLogger(__name__)
self._logger.debug("Initializing RayTracing class")
self._simtel_path = Path(simtel_path)
self._io_handler = io_handler.IOHandler()
self._telescope_model = self._validate_telescope_model(telescope_model)
self.config = value_conversion.validate_config_data(
gen.collect_data_from_file_or_dict(config_file, config_data),
SimulatorRayTracing.ray_tracing_default_configuration(False),
)
# Due to float representation, round the off-axis angles so the values in results table
# are the same as provided.
self.config = self.config._replace(off_axis_angle=np.around(self.config.off_axis_angle, 5))
self.label = label if label is not None else self._telescope_model.label
self._output_directory = self._io_handler.get_output_directory(
label=self.label, sub_dir="ray-tracing"
)
# Loading relevant attributes in case of single mirror mode.
if self.config.single_mirror_mode:
# Recalculating source distance.
self._logger.debug(
"Single mirror mode is activated - "
"source distance is being recalculated to 2 * flen"
)
mir_flen = self._telescope_model.get_parameter_value("mirror_focal_length")
self._source_distance = 2 * float(mir_flen) * u.cm.to(u.km) # km
# Setting mirror numbers.
if self.config.mirror_numbers[0] == "all":
self._mirror_numbers = list(
range(0, self._telescope_model.mirrors.number_of_mirrors)
)
else:
self._mirror_numbers = self.config.mirror_numbers
else:
self._source_distance = self.config.source_distance
self._psf_images = None
self._results = None
self._has_results = False
# Results file
file_name_results = names.generate_file_name(
file_type="ray-tracing",
suffix=".ecsv",
site=self._telescope_model.site,
telescope_model_name=self._telescope_model.name,
source_distance=self._source_distance,
zenith_angle=self.config.zenith_angle,
label=self.label,
)
self._output_directory.joinpath("results").mkdir(parents=True, exist_ok=True)
self._file_results = self._output_directory.joinpath("results").joinpath(file_name_results)
[docs]
@classmethod
def from_kwargs(cls, **kwargs):
"""
Build a RayTracing object from kwargs only.
The configurable parameters can be given as kwargs, instead of using the
config_data or config_file arguments.
Parameters
----------
kwargs
Containing the arguments and the configurable parameters.
Returns
-------
Instance of this class.
"""
args, config_data = gen.separate_args_and_config_data(
expected_args=[
"telescope_model",
"label",
"simtel_path",
],
**kwargs,
)
return cls(**args, config_data=config_data)
def __repr__(self):
"""Return string representation of RayTracing class."""
return f"RayTracing(label={self.label})\n"
def _validate_telescope_model(self, tel):
"""Validate TelescopeModel."""
if isinstance(tel, TelescopeModel):
self._logger.debug("RayTracing contains a valid TelescopeModel")
return tel
msg = "Invalid TelescopeModel"
self._logger.error(msg)
raise ValueError(msg)
[docs]
def simulate(self, test=False, force=False):
"""
Simulate RayTracing using SimulatorRayTracing.
Parameters
----------
test: bool
Test flag will make it faster by simulating much fewer photons.
force: bool
Force flag will remove existing files and simulate again.
"""
all_mirrors = self._mirror_numbers if self.config.single_mirror_mode else [0]
for this_off_axis in self.config.off_axis_angle:
for this_mirror in all_mirrors:
self._logger.info(
f"Simulating RayTracing for off_axis={this_off_axis}, mirror={this_mirror}"
)
simtel = SimulatorRayTracing(
simtel_path=self._simtel_path,
telescope_model=self._telescope_model,
test=test,
config_data={
"zenith_angle": self.config.zenith_angle * u.deg,
"source_distance": self._source_distance * u.km,
"off_axis_angle": this_off_axis * u.deg,
"mirror_numbers": this_mirror,
"use_random_focal_length": self.config.use_random_focal_length,
"single_mirror_mode": self.config.single_mirror_mode,
},
force_simulate=force,
)
simtel.run(test=test)
photons_file_name = names.generate_file_name(
file_type="photons",
suffix=".lis",
site=self._telescope_model.site,
telescope_model_name=self._telescope_model.name,
source_distance=self._source_distance,
zenith_angle=self.config.zenith_angle,
off_axis_angle=this_off_axis,
mirror_number=this_mirror if self.config.single_mirror_mode else None,
label=self.label,
)
photons_file = self._output_directory.joinpath(photons_file_name)
self._logger.debug("Using gzip to compress the photons file.")
with open(photons_file, "rb") as f_in:
with gzip.open(
photons_file.with_suffix(photons_file.suffix + ".gz"), "wb"
) as f_out:
shutil.copyfileobj(f_in, f_out)
photons_file.unlink()
[docs]
def analyze(
self,
export=True,
force=False,
use_rx=False,
no_tel_transmission=False,
containment_fraction=0.8,
):
"""
Raytracing analysis.
Involves the following: read simtel files, compute psfs and eff areas and store the
results in _results.
Parameters
----------
export: bool
If True, results will be exported to a file automatically. Alternatively,
export_results function can be used.
force: bool
If True, existing results files will be removed and analysis will be done again.
use_rx: bool
If True, calculations are done using the rx binary provided by sim_telarray. If False,
calculations are done internally, by the module psf_analysis.
no_tel_transmission: bool
If True, the telescope transmission is not applied.
containment_fraction: float
Containment fraction for PSF containment calculation. Allowed values are in the
interval [0,1]
"""
do_analyze = not self._file_results.exists() or force
if not do_analyze:
self._read_results()
focal_length = float(self._telescope_model.get_parameter_value("focal_length"))
tel_transmission_pars = self._get_telescope_transmission_params(no_tel_transmission)
cm_to_deg = 180.0 / pi / focal_length
self._psf_images = {}
# Call the helper function to process off-axis angles and mirrors
_rows = self._process_off_axis_and_mirror(
self._get_all_mirrors(),
focal_length,
tel_transmission_pars,
cm_to_deg,
do_analyze,
use_rx,
containment_fraction,
)
if do_analyze:
self._store_results(_rows)
self._has_results = True
if export:
self.export_results()
def _process_off_axis_and_mirror(
self,
all_mirrors,
focal_length,
tel_transmission_pars,
cm_to_deg,
do_analyze,
use_rx,
containment_fraction,
):
"""
Process off-axis angles and mirrors for RayTracing analysis.
Parameters
----------
all_mirrors: list
List of mirror numbers to analyze.
focal_length: float
Focal length of the telescope.
tel_transmission_pars: list
Telescope transmission parameters.
cm_to_deg: float
Conversion factor from centimeters to degrees.
do_analyze: bool
Flag indicating whether to perform analysis or not.
use_rx: bool
Flag indicating whether to use the RX method for analysis.
containment_fraction: float
Containment fraction for PSF containment calculation.
Returns
-------
list
List of results for each combination of off-axis angle and mirror.
"""
_rows = []
for this_off_axis in self.config.off_axis_angle:
for this_mirror in all_mirrors:
self._logger.debug(f"Analyzing RayTracing for off_axis={this_off_axis}")
if self.config.single_mirror_mode:
self._logger.debug(f"mirror_number={this_mirror}")
photons_file = self._get_photons_file(this_off_axis, this_mirror)
tel_transmission = compute_telescope_transmission(
tel_transmission_pars, this_off_axis
)
image = self._create_psf_image(photons_file, focal_length, this_off_axis)
if do_analyze:
_current_results = self._analyze_image(
image,
photons_file,
this_off_axis,
use_rx,
cm_to_deg,
containment_fraction,
tel_transmission,
)
if self.config.single_mirror_mode:
_current_results += (this_mirror,)
_rows.append(_current_results)
return _rows
def _get_telescope_transmission_params(self, no_tel_transmission):
"""
Get telescope transmission parameters.
Parameters
----------
no_tel_transmission: bool
Flag indicating whether to apply telescope transmission or not.
Returns
-------
list
Telescope transmission parameters.
"""
return (
self._telescope_model.get_parameter_value("telescope_transmission")
if not no_tel_transmission
else [1, 0, 0, 0]
)
def _get_all_mirrors(self):
"""
Get number of mirrors.
Returns
-------
list
List of number of mirrors.
"""
return self._mirror_numbers if self.config.single_mirror_mode else [0]
def _get_photons_file(self, this_off_axis, this_mirror):
"""
Get path to photons file for a given off-axis angle and mirror.
Parameters
----------
this_off_axis: float
Off-axis angle.
this_mirror: int or None
Mirror number.
Returns
-------
Path
Path to the photons file.
"""
photons_file_name = names.generate_file_name(
file_type="photons",
suffix=".lis",
site=self._telescope_model.site,
telescope_model_name=self._telescope_model.name,
source_distance=self._source_distance,
zenith_angle=self.config.zenith_angle,
off_axis_angle=this_off_axis,
mirror_number=this_mirror if self.config.single_mirror_mode else None,
label=self.label,
)
return self._output_directory.joinpath(photons_file_name + ".gz")
def _create_psf_image(self, photons_file, focal_length, this_off_axis):
"""
Create PSF image from photons file.
Parameters
----------
photons_file: Path
Path to the photons file.
focal_length: float
Focal length of the telescope.
this_off_axis: float
Off-axis angle.
Returns
-------
PSFImage
PSF image object.
"""
image = PSFImage(focal_length, None)
image.read_photon_list_from_simtel_file(photons_file)
self._psf_images[this_off_axis] = copy(image)
return image
def _analyze_image(
self,
image,
photons_file,
this_off_axis,
use_rx,
cm_to_deg,
containment_fraction,
tel_transmission,
):
"""
Analyze PSF image.
Parameters
----------
image: PSFImage
PSF image object.
photons_file: Path
Path to the photons file.
this_off_axis: float
Off-axis angle.
use_rx: bool
Flag indicating whether to use the RX method for analysis.
cm_to_deg: float
Conversion factor from centimeters to degrees.
containment_fraction: float
Containment fraction for PSF containment calculation.
tel_transmission: float
Telescope transmission factor.
Returns
-------
tuple
Tuple containing analyzed results.
"""
if use_rx:
containment_diameter_cm, centroid_x, centroid_y, eff_area = self._process_rx(
photons_file
)
containment_diameter_deg = containment_diameter_cm * cm_to_deg
image.set_psf(containment_diameter_cm, fraction=containment_fraction, unit="cm")
image.centroid_x = centroid_x
image.centroid_y = centroid_y
image.set_effective_area(eff_area * tel_transmission)
else:
containment_diameter_cm = image.get_psf(containment_fraction, "cm")
containment_diameter_deg = image.get_psf(containment_fraction, "deg")
centroid_x = image.centroid_x
eff_area = image.get_effective_area() * tel_transmission
eff_flen = np.nan if this_off_axis == 0 else centroid_x / tan(this_off_axis * pi / 180.0)
return (
this_off_axis * u.deg,
containment_diameter_cm * u.cm,
containment_diameter_deg * u.deg,
eff_area * u.m * u.m,
eff_flen * u.cm,
)
def _store_results(self, _rows):
"""
Store analysis results.
Parameters
----------
_rows: list
List of rows containing analysis results.
"""
_columns = ["Off-axis angle"]
_columns.extend(list(self.YLABEL.keys()))
if self.config.single_mirror_mode:
_columns.append("mirror_number")
self._results = QTable(rows=_rows, names=_columns)
def _process_rx(self, file, containment_fraction=0.8):
"""
Process sim_telarray photon list with rx binary and return results.
Parameters
----------
file: str or Path
Photon list file.
containment_fraction: float
Containment fraction for PSF containment calculation. Allowed values are in the
interval [0,1]
Returns
-------
(containment_diameter_cm, x_mean, y_mean, eff_area)
"""
try:
rx_output = subprocess.Popen( # pylint: disable=consider-using-with
shlex.split(
f"{self._simtel_path}/sim_telarray/bin/rx -f {containment_fraction:.2f} -v"
),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
with gzip.open(file, "rb") as _stdin:
with rx_output.stdin:
shutil.copyfileobj(_stdin, rx_output.stdin)
try:
rx_output = rx_output.communicate()[0].splitlines()[-1:][0].split()
except IndexError:
self._logger.error(f"Invalid output from rx: {rx_output}")
raise
except FileNotFoundError:
self._logger.error(f"Photon list file not found: {file}")
raise
containment_diameter_cm = 2 * float(rx_output[0])
x_mean = float(rx_output[1])
y_mean = float(rx_output[2])
eff_area = float(rx_output[5])
return containment_diameter_cm, x_mean, y_mean, eff_area
[docs]
def export_results(self):
"""Export results to a csv file."""
if not self._has_results:
self._logger.error("Cannot export results because it does not exist")
else:
self._logger.info(f"Exporting results to {self._file_results}")
astropy.io.ascii.write(self._results, self._file_results, format="ecsv", overwrite=True)
def _read_results(self):
"""Read existing results file and store it in _results."""
self._results = astropy.io.ascii.read(self._file_results, format="ecsv")
self._has_results = True
[docs]
def plot(self, key, save=False, **kwargs):
"""
Plot key vs off-axis angle and save the figure in pdf.
Parameters
----------
key: str
d80_cm, d80_deg, eff_area or eff_flen
save: bool
If True, figure will be saved.
**kwargs:
kwargs for plt.plot
Raises
------
KeyError
If key is not among the valid options.
"""
self._logger.info(f"Plotting {key} vs off-axis angle")
try:
plot = visualize.plot_table(
self._results["Off-axis angle", key], self.YLABEL[key], no_legend=True, **kwargs
)
except KeyError as exc:
msg = INVALID_KEY_TO_PLOT
self._logger.error(msg)
raise exc
if save:
plot_file_name = names.generate_file_name(
file_type="ray-tracing",
suffix=".pdf",
extra_label=key,
site=self._telescope_model.site,
telescope_model_name=self._telescope_model.name,
source_distance=self._source_distance,
zenith_angle=self.config.zenith_angle,
label=self.label,
)
self._output_directory.joinpath("figures").mkdir(exist_ok=True)
plot_file = self._output_directory.joinpath("figures").joinpath(plot_file_name)
self._logger.info(f"Saving fig in {plot_file}")
plot.savefig(plot_file)
[docs]
def plot_histogram(self, key, **kwargs):
"""
Plot histogram of key.
Parameters
----------
key: str
d80_cm, d80_deg, eff_area or eff_flen
**kwargs:
kwargs for plt.hist
Raises
------
KeyError
If key is not among the valid options.
"""
if key not in self.YLABEL:
msg = INVALID_KEY_TO_PLOT
self._logger.error(msg)
raise KeyError(msg)
ax = plt.gca()
ax.hist([r.value for r in self._results[key]], **kwargs)
[docs]
def get_mean(self, key):
"""
Get mean value of key.
Parameters
----------
key: str
d80_cm, d80_deg, eff_area or eff_flen
Returns
-------
float
Mean value of key.
Raises
------
KeyError
If key is not among the valid options.
"""
if key not in self.YLABEL:
msg = INVALID_KEY_TO_PLOT
self._logger.error(msg)
raise KeyError(msg)
return np.mean(self._results[key])
[docs]
def get_std_dev(self, key):
"""
Get std dev of key.
Parameters
----------
key: str
d80_cm, d80_deg, eff_area or eff_flen
Returns
-------
float
Std deviation of key.
Raises
------
KeyError
If key is not among the valid options.
"""
if key not in self.YLABEL:
msg = INVALID_KEY_TO_PLOT
self._logger.error(msg)
raise KeyError(msg)
return np.std(self._results[key])
[docs]
def images(self):
"""
Get list of PSFImages.
Returns
-------
List of PSFImage's
"""
images = []
for this_off_axis in self.config.off_axis_angle:
if self._psf_images and this_off_axis in self._psf_images:
images.append(self._psf_images[this_off_axis])
if len(images) == 0:
self._logger.error("No image found")
return None
return images