Source code for ray_tracing.incident_angles

"""Calculate photon incident angles on focal plane and primary/secondary mirrors.

Parses the imaging list (``.lis``) produced by sim_telarray_debug_trace and uses
Angle of incidence at focal surface, with respect to the optical axis [deg],
Angle of incidence on to primary mirror [deg], and
Angle of incidence on to secondary mirror [deg] (if available).
"""

import logging
import math
import re
import subprocess
from pathlib import Path

import astropy.units as u
from astropy.table import QTable

from simtools.data_model.metadata_collector import MetadataCollector
from simtools.model.model_utils import initialize_simulation_models


[docs] class IncidentAnglesCalculator: """Run a PSF-style sim_telarray job and compute incident angles at mirrors or focal surfaces. Parameters ---------- simtel_path : str or pathlib.Path Path to the sim_telarray installation directory (containing ``sim_telarray/bin``). db_config : dict Database configuration passed to ``initialize_simulation_models``. config_data : dict Simulation configuration (e.g. ``site``, ``telescope``, ``model_version``, ``off_axis_angle``, ``source_distance``, ``number_of_photons``). output_dir : str or pathlib.Path Output directory where logs, scripts, photons files and results are written. label : str, optional Label used to name outputs; defaults to ``incident_angles_<telescope>`` when omitted. Notes ----- Additional options are read from ``config_data`` when present: - ``perfect_mirror`` (bool, default False) - ``calculate_primary_secondary_angles`` (bool, default True) """ # Use fixed zenith angle (degrees) for incident-angle simulations. ZENITH_ANGLE_DEG = 0 def __init__( self, simtel_path, db_config, config_data, output_dir, label=None, ): self.logger = logging.getLogger(__name__) self._simtel_path = Path(simtel_path) self.config_data = config_data self.output_dir = Path(output_dir) self.label = label or f"incident_angles_{config_data['telescope']}" cfg = config_data self.perfect_mirror = cfg.get("perfect_mirror", False) self.calculate_primary_secondary_angles = cfg.get( "calculate_primary_secondary_angles", True ) self.results = None self.output_dir.mkdir(parents=True, exist_ok=True) self.logs_dir = self.output_dir / "logs" self.scripts_dir = self.output_dir / "scripts" self.photons_dir = self.output_dir / "photons_files" self.results_dir = self.output_dir / "incident_angles" for d in (self.logs_dir, self.scripts_dir, self.photons_dir, self.results_dir): d.mkdir(parents=True, exist_ok=True) self.logger.info( "Initializing models for %s, %s", config_data["site"], config_data["telescope"], ) self.telescope_model, self.site_model, _ = initialize_simulation_models( label=self.label, db_config=db_config, site=config_data["site"], telescope_name=config_data["telescope"], model_version=config_data["model_version"], ) def _label_suffix(self): """Build a filename suffix including telescope and off-axis angle. Returns ------- str Suffix of the form ``"<label>_<telescope>_off<angle>"`` where ``<angle>`` is formatted without trailing zeros. """ tel = str(self.config_data.get("telescope", "TEL")) off = float(self.config_data.get("off_axis_angle", 0.0 * u.deg).to_value(u.deg)) return f"{self.label}_{tel}_off{off:g}"
[docs] def run(self): """Run sim_telarray, parse the imaging list, and return an angle table. Returns ------- astropy.table.QTable Table containing at least the ``angle_incidence_focal`` column and, when configured, primary/secondary angles and hit geometry. """ self.telescope_model.write_sim_telarray_config_file(additional_models=self.site_model) photons_file, stars_file, log_file = self._prepare_psf_io_files() run_script = self._write_run_script(photons_file, stars_file, log_file) self._run_script(run_script, log_file) data = self._compute_incidence_angles_from_imaging_list(photons_file) self.results = QTable() self.results["angle_incidence_focal"] = data["angle_incidence_focal_deg"] * u.deg if self.calculate_primary_secondary_angles: field_map = { "angle_incidence_primary_deg": ("angle_incidence_primary", u.deg), "angle_incidence_secondary_deg": ("angle_incidence_secondary", u.deg), "primary_hit_radius_m": ("primary_hit_radius", u.m), "secondary_hit_radius_m": ("secondary_hit_radius", u.m), "primary_hit_x_m": ("primary_hit_x", u.m), "primary_hit_y_m": ("primary_hit_y", u.m), "secondary_hit_x_m": ("secondary_hit_x", u.m), "secondary_hit_y_m": ("secondary_hit_y", u.m), } for key, (name, unit) in field_map.items(): if key in data: self.results[name] = data[key] * unit self._save_results() return self.results
[docs] def run_for_offsets(self, offsets): """Run the simulation for multiple off-axis angles. For each off-axis angle provided, run a full simulation, labeling output files accordingly. Parameters ---------- offsets : Iterable[float] Off-axis angles in degrees. Returns ------- dict[float, astropy.table.QTable] Mapping from off-axis angle (deg) to the resulting table. """ results_by_offset = {} base_off = self.config_data.get("off_axis_angle", 0.0 * u.deg) for off in offsets: self.config_data["off_axis_angle"] = float(off) * u.deg self.logger.info(f"Running for off-axis angle {off:g} deg") tbl = self.run() results_by_offset[float(off)] = tbl.copy() self.config_data["off_axis_angle"] = base_off return results_by_offset
def _prepare_psf_io_files(self): """Prepare photons, stars, and log file paths for a PSF-style incident angle simulation. Returns ------- tuple[pathlib.Path, pathlib.Path, pathlib.Path] Paths to the photons file, stars file, and log file. """ suffix = self._label_suffix() photons_file = self.photons_dir / f"incident_angles_photons_{suffix}.lis" stars_file = self.photons_dir / f"incident_angles_stars_{suffix}.lis" log_file = self.logs_dir / f"incident_angles_{suffix}.log" if photons_file.exists(): try: photons_file.unlink() except OSError as err: self.logger.error(f"Failed to remove existing photons file {photons_file}: {err}") with photons_file.open("w", encoding="utf-8") as pf: pf.write(f"#{'=' * 50}\n") pf.write("# Imaging list for Incident Angle simulations\n") pf.write(f"#{'=' * 50}\n") pf.write(f"# config_file = {self.telescope_model.config_file_path}\n") pf.write(f"# zenith_angle [deg] = {self.ZENITH_ANGLE_DEG}\n") pf.write( f"# off_axis_angle [deg] = {self.config_data['off_axis_angle'].to_value(u.deg)}\n" ) pf.write(f"# source_distance [km] = {self.config_data['source_distance']}\n") with stars_file.open("w", encoding="utf-8") as sf: zen = self.ZENITH_ANGLE_DEG dist = float(self.config_data["source_distance"]) sf.write(f"0. {90.0 - zen} 1.0 {dist}\n") return photons_file, stars_file, log_file def _write_run_script(self, photons_file, stars_file, log_file): """Generate a run script for sim_telarray with the provided configuration and inputs. Parameters ---------- photons_file, stars_file, log_file : pathlib.Path Input/output files for the run. Returns ------- pathlib.Path Path to the generated shell script. """ script_path = self.scripts_dir / f"run_incident_angles_{self._label_suffix()}.sh" simtel_bin = self._simtel_path / "sim_telarray/bin/sim_telarray_debug_trace" corsika_dummy = self._simtel_path / "sim_telarray/run9991.corsika.gz" theta = self.ZENITH_ANGLE_DEG off = float(self.config_data["off_axis_angle"].to_value(u.deg)) star_photons = self.config_data["number_of_photons"] def cfg(par, val): return f"-C {par}={val}" opts = [ f"-c {self.telescope_model.config_file_path}", f"-I{self.telescope_model.config_file_directory}", ] if self.perfect_mirror: opts += [ "-DPERFECT_DISH=1", "-C random_focal_length=0", "-C mirror_reflection_random_angle=0", "-C mirror_align_random_distance=0", "-C mirror_align_random_horizontal=0,28,0,0", "-C mirror_align_random_vertical=0,28,0,0", ] opts += [ cfg("IMAGING_LIST", str(photons_file)), cfg("stars", str(stars_file)), cfg("altitude", self.site_model.get_parameter_value("corsika_observation_level")), cfg("telescope_theta", theta + off), cfg("star_photons", star_photons), cfg("telescope_phi", 0), cfg("camera_transmission", 1.0), cfg("nightsky_background", "all:0."), cfg("trigger_current_limit", "1e10"), cfg("telescope_random_angle", 0), cfg("telescope_random_error", 0), cfg("convergent_depth", 0), cfg("maximum_telescopes", 1), cfg("show", "all"), cfg("camera_filter", "none"), ] command = f"{simtel_bin} {' '.join(opts)} {corsika_dummy}" with script_path.open("w", encoding="utf-8") as sh: sh.write("#!/usr/bin/env bash\n\n") sh.write("set -e\nset -o pipefail\n\n") sh.write(f"exec > '{log_file}' 2>&1\n\n") sh.write(f"{command}\n") script_path.chmod(script_path.stat().st_mode | 0o110) return script_path def _run_script(self, script_path, log_file): """Execute the script and log output; raise an error if execution fails. Parameters ---------- script_path : pathlib.Path Path to the script to execute. log_file : pathlib.Path Destination log file. """ self.logger.info("Executing %s (logging to %s)", script_path, log_file) try: subprocess.check_call([str(script_path)]) except subprocess.CalledProcessError as exc: raise RuntimeError(f"Incident angles run failed, see log: {log_file}") from exc def _compute_incidence_angles_from_imaging_list(self, photons_file): """Compute incidence angles from an imaging list file. Column positions may differ between telescope types and sim_telarray versions. Header lines (``# Column N: ...``) are parsed to find indices; otherwise legacy positions (1-based) are used: focal=26, primary=32, secondary=36, primary X/Y = 29/30, secondary X/Y = 33/34. Parameters ---------- photons_file : pathlib.Path Path to the imaging list file (``.lis``). Returns ------- dict[str, list[float]] Parsed columns in degrees/meters as plain Python lists. Always contains ``angle_incidence_focal_deg``; additional keys are present when primary/secondary angles are enabled. """ col_idx = self._find_column_indices(photons_file) focal = [] # Initialize optional arrays once based on the configuration primary = secondary = radius_m = secondary_radius_m = None primary_hit_x_m = primary_hit_y_m = secondary_hit_x_m = secondary_hit_y_m = None if self.calculate_primary_secondary_angles: primary, secondary = [], [] radius_m, secondary_radius_m = [], [] primary_hit_x_m, primary_hit_y_m = [], [] secondary_hit_x_m, secondary_hit_y_m = [], [] for parts in self._iter_data_rows(photons_file): self._append_values( parts, col_idx, focal, primary, secondary, radius_m, secondary_radius_m, primary_hit_x_m, primary_hit_y_m, secondary_hit_x_m, secondary_hit_y_m, ) result = {"angle_incidence_focal_deg": focal} if self.calculate_primary_secondary_angles: result["angle_incidence_primary_deg"] = primary result["angle_incidence_secondary_deg"] = secondary result["primary_hit_radius_m"] = radius_m result["secondary_hit_radius_m"] = secondary_radius_m result["primary_hit_x_m"] = primary_hit_x_m result["primary_hit_y_m"] = primary_hit_y_m result["secondary_hit_x_m"] = secondary_hit_x_m result["secondary_hit_y_m"] = secondary_hit_y_m return result def _find_column_indices(self, photons_file): """Return 0-based column indices found from headers as a dict. Returns a mapping with keys ``'focal'`` and, when applicable, ``'primary'``, ``'secondary'``, ``'prim_x'``, ``'prim_y'``, ``'sec_x'``, ``'sec_y'``. Parameters ---------- photons_file : pathlib.Path Imaging list file whose headers may define column numbers. Returns ------- dict[str, int] 0-based indices for the required columns. """ indices = self._default_column_indices() col_pat = re.compile(r"^\s*#\s*Column\s+(\d{1,4})\s*:(.*)$", re.IGNORECASE) with photons_file.open("r", encoding="utf-8") as fh: for raw in fh: s = raw.strip() if not s or not s.startswith("#"): continue m = col_pat.match(s) if not m: continue num = int(m.group(1)) desc = m.group(2).strip().lower() self._update_indices_from_header_desc(desc, num, indices) return indices def _default_column_indices(self): """Return default 0-based indices matching SST-like photon files. Fallbacks (1-based): focal=26, primary=32, secondary=36, primary X/Y=29/30, secondary X/Y=33/34. Returns ------- dict[str, int] Default index mapping. When primary/secondary angles are disabled, only ``'focal'`` is included. """ idx = {"focal": 25} if self.calculate_primary_secondary_angles: idx.update( { "primary": 31, "secondary": 35, "prim_x": 28, "prim_y": 29, "sec_x": 32, "sec_y": 33, } ) return idx def _update_indices_from_header_desc(self, desc, num, indices): """Update indices dict in-place based on a header description and column number. Parameters ---------- desc : str Header description text (lower-cased). num : int 1-based column number from the header. indices : dict[str, int] Mapping to update in-place. """ # Angles if "angle of incidence" in desc: if "focal surface" in desc: indices["focal"] = num - 1 return if self.calculate_primary_secondary_angles: if "primary mirror" in desc: indices["primary"] = num - 1 return if "secondary mirror" in desc: indices["secondary"] = num - 1 return # Reflection points (X/Y) if not self.calculate_primary_secondary_angles or "reflection point" not in desc: return self._set_reflection_index_if_match(desc, num, indices) @staticmethod def _contains_axis(desc, axis): """Check whether a description contains a stand-alone axis label. Parameters ---------- desc : str Header description string. axis : str Either ``"x"`` or ``"y"``. Returns ------- bool True when the token is present as a separate word; False otherwise. """ desc_l = desc.lower() axis_l = axis.lower() return bool(re.search(r"(^|\s)" + re.escape(axis_l) + r"(\s|$)", desc_l)) def _set_reflection_index_if_match(self, desc, num, indices): """Set reflection point indices for primary/secondary mirrors if the header matches. Parameters ---------- desc : str Header description string (lower-cased). num : int 1-based column number from the header. indices : dict[str, int] Mapping to update in-place with keys ``prim_x``, ``prim_y``, ``sec_x``, or ``sec_y``. """ is_primary = "primary mirror" in desc is_secondary = "secondary mirror" in desc if not is_primary and not is_secondary: return is_x = self._contains_axis(desc, "x") is_y = self._contains_axis(desc, "y") if not (is_x or is_y): return key_prefix = "prim" if is_primary else "sec" key = f"{key_prefix}_{'x' if is_x else 'y'}" indices[key] = num - 1 @staticmethod def _parse_float(parts, idx): """Try to parse a float from ``parts[idx]``. Parameters ---------- parts : list[str] Tokenized row. idx : int | None Index into ``parts``. Returns ------- tuple[bool, float] Tuple of ``(ok, value)`` where ``ok`` is False when parsing fails or the index is out of range; in that case ``value`` is 0.0. """ if idx is None or idx < 0 or idx >= len(parts): return False, 0.0 try: return True, float(parts[idx]) except ValueError: return False, 0.0 @staticmethod def _parse_float_with_nan(parts, idx): """Parse a float or return NaN when missing/invalid. Parameters ---------- parts : list[str] Tokenized row. idx : int | None Index into ``parts``. Returns ------- float Parsed float value, or ``nan`` when unavailable/invalid. """ if idx is None or idx < 0 or idx >= len(parts): return float("nan") try: return float(parts[idx]) except ValueError: return float("nan") @staticmethod def _iter_data_rows(photons_file): """Iterate over tokenized, non-empty, non-comment rows. Parameters ---------- photons_file : pathlib.Path Imaging list file to read. Returns ------- Iterator[list[str]] Iterator over tokenized rows. """ with photons_file.open("r", encoding="utf-8") as fh: for line in fh: if not line.strip() or line.lstrip().startswith("#"): continue yield line.split() def _append_values( self, parts, col_idx, focal, primary, secondary, radius_m, secondary_radius_m, primary_hit_x_m, primary_hit_y_m, secondary_hit_x_m, secondary_hit_y_m, ): """Append parsed values from parts into target arrays if valid. Parameters ---------- parts : list[str] Tokenized input row. col_idx : dict[str, int] Column indices used to read values. focal, primary, secondary : list | None Output arrays to append into. radius_m, secondary_radius_m : list | None Output arrays for radii in meters. primary_hit_x_m, primary_hit_y_m, secondary_hit_x_m, secondary_hit_y_m : list | None Output arrays for hit coordinates in meters. """ foc_ok, foc_val = self._parse_float(parts, col_idx.get("focal")) if not foc_ok: return focal.append(foc_val) if not self.calculate_primary_secondary_angles: return self._append_primary_secondary_angles(parts, col_idx, primary, secondary) self._append_primary_hit_geometry( parts, col_idx, radius_m, primary_hit_x_m, primary_hit_y_m ) self._append_secondary_hit_geometry( parts, col_idx, secondary_radius_m, secondary_hit_x_m, secondary_hit_y_m ) def _append_primary_secondary_angles(self, parts, col_idx, primary, secondary): """Append primary/secondary angle values (or NaN) if arrays are provided. Parameters ---------- parts : list[str] Tokenized input row. col_idx : dict[str, int] Indices for angle columns. primary, secondary : list | None Output arrays to append into. """ if primary is not None: primary.append(self._parse_float_with_nan(parts, col_idx.get("primary"))) if secondary is not None: secondary.append(self._parse_float_with_nan(parts, col_idx.get("secondary"))) def _append_primary_hit_geometry( self, parts, col_idx, radius_m, primary_hit_x_m, primary_hit_y_m ): """Append primary-mirror hit geometry (radius and x/y in meters). Parameters ---------- parts : list[str] Tokenized input row. col_idx : dict[str, int] Indices for hit coordinate columns. radius_m, primary_hit_x_m, primary_hit_y_m : list | None Output arrays to append into. """ x_ok, x_cm = self._parse_float(parts, col_idx.get("prim_x")) y_ok, y_cm = self._parse_float(parts, col_idx.get("prim_y")) if x_ok and y_ok: x_m, y_m = x_cm / 100.0, y_cm / 100.0 r_m = math.hypot(x_cm, y_cm) / 100.0 else: x_m = y_m = r_m = math.nan if radius_m is not None: radius_m.append(r_m) if primary_hit_x_m is not None: primary_hit_x_m.append(x_m) if primary_hit_y_m is not None: primary_hit_y_m.append(y_m) def _append_secondary_hit_geometry( self, parts, col_idx, secondary_radius_m, secondary_hit_x_m, secondary_hit_y_m ): """Append secondary-mirror hit geometry (radius and x/y in meters). Parameters ---------- parts : list[str] Tokenized input row. col_idx : dict[str, int] Indices for hit coordinate columns. secondary_radius_m, secondary_hit_x_m, secondary_hit_y_m : list | None Output arrays to append into. """ sx_ok, sx_cm = self._parse_float(parts, col_idx.get("sec_x")) sy_ok, sy_cm = self._parse_float(parts, col_idx.get("sec_y")) if sx_ok and sy_ok: x_m, y_m = sx_cm / 100.0, sy_cm / 100.0 r_m = math.hypot(sx_cm, sy_cm) / 100.0 else: x_m = y_m = r_m = math.nan if secondary_radius_m is not None: secondary_radius_m.append(r_m) if secondary_hit_x_m is not None: secondary_hit_x_m.append(x_m) if secondary_hit_y_m is not None: secondary_hit_y_m.append(y_m) @staticmethod def _match_header_column(col_pat, raw): """Parse a header line for a known angle column. Parameters ---------- col_pat : Pattern[str] Compiled regular expression matching ``# Column N`` prefix. raw : str Raw header line. Returns ------- tuple[str, int] | None ``(kind, column_number)`` when recognized, otherwise ``None``. """ s = raw.strip() if s and ":" in s: prefix, desc = s.split(":", 1) m = col_pat.match(prefix) if m: num = int(m.group(1)) desc = desc.strip().lower() if "angle of incidence at focal surface" in desc and "optical axis" in desc: return "focal", num if re.search(r"angle of incidence\s+on(to)?\s+primary mirror", desc): return "primary", num if re.search(r"angle of incidence\s+on(to)?\s+secondary mirror", desc): return "secondary", num return None def _save_results(self): """Save the results to an ECSV file with metadata.""" if self.results is None or len(self.results) == 0: self.logger.warning("No results to save") return output_file = self.results_dir / f"incident_angles_{self._label_suffix()}.ecsv" self.results.write(output_file, format="ascii.ecsv", overwrite=True) MetadataCollector.dump( args_dict=self.config_data, output_file=output_file.with_suffix(".yml"), )