Source code for ray_tracing.psf_analysis

"""
Module to analyse psf images (e.g. results from ray tracing simulations).

Main functionalities are: computing centroids, psf containers etc.

"""

import gzip
import logging
import shlex
import shutil
import subprocess
from math import fabs, pi, sqrt
from pathlib import Path

import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np

from simtools.utils.general import collect_kwargs, set_default_kwargs

__all__ = ["PSFImage"]


[docs] class PSFImage: """ Image composed of list of photon positions (2D). Load photon list from sim_telarray file and compute centroids, psf containers, effective area, as well as plot the image as a 2D histogram. Internal units: photon positions in cm internally. Parameters ---------- focal_length: float Focal length of the system in cm. If not given, PSF can only be computed in cm. total_scattered_area: float Scatter area of all photons in cm^2. If not given, effective area cannot be computed. containment_fraction: float Containment fraction for PSF calculation. simtel_path: str Path to sim_telarray installation. """ __PSF_RADIUS = "Radius [cm]" __PSF_CUMULATIVE = "Cumulative PSF" def __init__( self, focal_length=None, total_scattered_area=None, containment_fraction=None, simtel_path=None, ): """Initialize PSFImage class.""" self._logger = logging.getLogger(__name__) self.simtel_path = simtel_path self._total_photons = None self._number_of_detected_photons = None self._effective_area = None self.photon_pos_x = [] self.photon_pos_y = [] self.photon_r = [] self.centroid_x = None self.centroid_y = None self._total_area = total_scattered_area self._stored_psf = {} try: self._cm_to_deg = 180.0 / pi / focal_length if focal_length is not None else None except ZeroDivisionError: self._cm_to_deg = None self._logger.warning("Focal length is zero; no conversion from cm to deg possible.") self._containment_fraction = containment_fraction
[docs] def process_photon_list(self, photon_file, use_rx): """ Read and process a photon list file generated by sim_telarray. Parameters ---------- photons_file: str Name of sim_telarray file with photon list. use_rx: bool Use the RX method for analysis. """ if use_rx: self._process_simtel_file_using_rx(photon_file) else: self.read_photon_list_from_simtel_file(photon_file)
def _process_simtel_file_using_rx(self, photon_file): """ Process a simtel file with photon lists using the RX method. Parameters ---------- photons_file: str Name of sim_telarray file with photon list. """ try: rx_output = subprocess.Popen( # pylint: disable=consider-using-with shlex.split( f"{self.simtel_path}/sim_telarray/bin/rx -f {self._containment_fraction:.2f} -v" ), stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) with gzip.open(photon_file, "rb") as _stdin: with rx_output.stdin: shutil.copyfileobj(_stdin, rx_output.stdin) try: rx_output = rx_output.communicate()[0].splitlines()[-1:][0].split() except IndexError as e: raise IndexError(f"Unexpected output format from rx: {rx_output}") from e except FileNotFoundError as e: raise FileNotFoundError(f"Photon list file not found: {photon_file}") from e try: self.set_psf(2 * float(rx_output[0]), fraction=self._containment_fraction, unit="cm") self.centroid_x = float(rx_output[1]) self.centroid_y = float(rx_output[2]) self._effective_area = float(rx_output[5]) except IndexError as e: raise IndexError(f"Unexpected output format from rx: {rx_output}") from e except ValueError as e: raise ValueError(f"Invalid output format from rx: {rx_output}") from e
[docs] def read_photon_list_from_simtel_file(self, photons_file): """ Read photon list file generated by sim_telarray and store the photon positions (2D). Parameters ---------- photons_file: str Name of sim_telarray file with photon list. Raises ------ RuntimeError If photon positions X and Y are not compatible or are empty. """ self._logger.info(f"Reading sim_telarray file {photons_file}") self._total_photons = 0 if Path(photons_file).suffix == ".gz": file_open_function = gzip.open else: file_open_function = open with file_open_function(photons_file, "rb") as f: for line in f: self._process_simtel_line(line) if not self._is_photon_positions_ok(): msg = "Problems reading Simtel file - invalid data" self._logger.error(msg) raise RuntimeError(msg) self.centroid_x = np.mean(self.photon_pos_x) self.centroid_y = np.mean(self.photon_pos_y) self._number_of_detected_photons = len(self.photon_pos_x) self._effective_area = ( self._number_of_detected_photons * self._total_area / self._total_photons ) self.photon_r = np.sort( np.sqrt( (self.photon_pos_x - self.centroid_x) ** 2 + (self.photon_pos_y - self.centroid_y) ** 2 ) )
def _is_photon_positions_ok(self): """ Verify if the photon positions are ok. Returns ------- bool True if photon positions are ok, False if they are not. """ cond1 = len(self.photon_pos_x) != 0 cond2 = len(self.photon_pos_y) != 0 cond3 = len(self.photon_pos_x) == len(self.photon_pos_y) return cond1 and cond2 and cond3 def _process_simtel_line(self, line): """ Supporting function to read_photon_list_from_simtel_file. Parameters ---------- line: str A line from the photon list file generated by sim_telarray. """ words = line.split() if b"falling on an area of" in line: self._total_photons += int(words[4]) total_area_in_file = float(words[14]) if self._total_area is None: self._total_area = total_area_in_file elif total_area_in_file != self._total_area: self._logger.warning( "Conflicting value of the total area found" f" {self._total_area} != {total_area_in_file}" " - Keeping the original value" ) elif b"#" in line or len(words) == 0: # Skipping comments pass else: # Storing photon position from cols 2 and 3 self.photon_pos_x.append(float(words[2])) self.photon_pos_y.append(float(words[3]))
[docs] def get_effective_area(self, tel_transmission=1.0): """ Return effective area pre calculated. Parameters ---------- telescope_transmission : float Telescope transmission parameter. Returns ------- float Pre-calculated effective area. None if it could not be calculated (e.g because the total scattering area was not set). """ if "_effective_area" in self.__dict__ and self._effective_area is not None: return self._effective_area * tel_transmission self._logger.error("Effective Area could not be calculated") return None
[docs] def set_effective_area(self, value): """ Set effective area. Parameters ---------- value: float Effective area """ self._effective_area = value
[docs] def get_psf(self, fraction=0.8, unit="cm"): """ Return PSF. Parameters ---------- fraction: float Fraction of photons within the containing radius. unit: str 'cm' or 'deg'. 'deg' will not work if focal length was not set. Returns ------- float: Containing diameter for a certain intensity fraction (PSF). """ if unit == "deg" and self._cm_to_deg is None: self._logger.error("PSF cannot be computed in deg because focal length is not set") return None if fraction not in self._stored_psf: self._compute_psf(fraction) unit_factor = 1 if unit == "cm" else self._cm_to_deg return self._stored_psf[fraction] * unit_factor
[docs] def set_psf(self, value, fraction=0.8, unit="cm"): """ Set PSF calculated from other methods. Parameters ---------- value: float PSF value to be set fraction: float Fraction of photons within the containing radius. unit: str 'cm' or 'deg'. 'deg' will not work if focal length was not set. """ if unit == "deg" and self._cm_to_deg is None: self._logger.error("PSF cannot be set in deg because focal length is not set") return unit_factor = 1 if unit == "cm" else 1.0 / self._cm_to_deg self._stored_psf[fraction] = value * unit_factor
def _compute_psf(self, fraction): """ Compute and store PSF. Parameters ---------- fraction: float Fraction of photons within the containing radius """ self._stored_psf[fraction] = self._find_psf(fraction) def _find_psf(self, fraction): """ Try to find PSF by a smart algorithm first. If it fails, _find_radius_by_scanning is called and do it by brute force. Parameters ---------- fraction: float Fraction of photons within the containing radius. Returns ------- float: Diameter of the circular container with a certain fraction of the photons. """ self._logger.debug(f"Finding PSF for fraction = {fraction}") x_pos_sq = [i**2 for i in self.photon_pos_x] y_pos_sq = [i**2 for i in self.photon_pos_y] x_pos_sig = sqrt(np.mean(x_pos_sq) - self.centroid_x**2) y_pos_sig = sqrt(np.mean(y_pos_sq) - self.centroid_y**2) radius_sig = sqrt(x_pos_sig**2 + y_pos_sig**2) target_number = fraction * self._number_of_detected_photons current_radius = 1.5 * radius_sig start_number = self._sum_photons_in_radius(current_radius) scale = 0.5 * sqrt(current_radius * current_radius / start_number) delta_number = start_number - target_number n_iter = 0 max_iter = 1000 tolerance = self._number_of_detected_photons / 1000.0 found_radius = False while not found_radius and n_iter < max_iter: n_iter += 1 dr = -delta_number * scale / sqrt(target_number) while current_radius + dr < 0: dr *= 0.5 current_radius += dr current_number = self._sum_photons_in_radius(current_radius) delta_number = current_number - target_number found_radius = fabs(delta_number) < tolerance if found_radius: return 2 * current_radius self._logger.warning("Could not find PSF efficiently - trying by scanning") return 2 * self._find_radius_by_scanning(target_number, radius_sig) def _find_radius_by_scanning(self, target_number, radius_sig): """ Find radius by scanning, aka brute force. Parameters ---------- target_number: float Number of photons inside the diameter to be found. radius_sig: float Sigma of the radius to be used as scale. Returns ------- float: Radius of the circle with target_number photons inside. """ self._logger.debug("Finding PSF by scanning") def scan(dr, rad_min, rad_max): """ Scan the image from rad_min to rad_max until it finds target_number photons inside. Scanning is done in steps of dr. Returns ------- (float, float, float): Average radius, min radius, max radius of the interval where target_number photons are inside. Raises ------ RuntimeError if radius is not found (found_radius is False) """ r0, r1 = rad_min, rad_min + dr s0, s1 = 0, 0 found_radius = False while not found_radius: s0, s1 = self._sum_photons_in_radius(r0), self._sum_photons_in_radius(r1) if s0 < target_number <= s1: found_radius = True break if r1 > rad_max: break r0 += dr r1 += dr if found_radius: return (r0 + r1) / 2, r0, r1 self._logger.error("Could not find PSF by scanning") raise RuntimeError # Run scan few times with smaller dr to optimize search. # Step 0 radius, rad_min, rad_max = scan(0.1 * radius_sig, 0, 4 * radius_sig) # Step 1 radius, rad_min, rad_max = scan(0.005 * radius_sig, rad_min, rad_max) return radius def _sum_photons_in_radius(self, radius): """Return the number of photons inside a certain radius.""" return np.searchsorted(self.photon_r, radius)
[docs] def get_image_data(self, centralized=True): """ Provide image data (2D photon positions in cm) as lists. Parameters ---------- centralized: bool Centroid of the image is set to (0, 0) if True. Returns ------- (x, y), the photons positions in cm. """ if centralized: x_pos_data = np.array(self.photon_pos_x) - self.centroid_x y_pos_data = np.array(self.photon_pos_y) - self.centroid_y else: x_pos_data = np.array(self.photon_pos_x) y_pos_data = np.array(self.photon_pos_y) d_type = {"names": ("X", "Y"), "formats": ("f8", "f8")} result = np.recarray((len(x_pos_data),), dtype=d_type) result.X = x_pos_data result.Y = y_pos_data return result
[docs] def plot_image(self, centralized=True, file_name=None, **kwargs): """ Plot 2D image as histogram (in cm). Parameters ---------- centralized: bool Centroid of the image is set to (0, 0) if True. **kwargs: image_* for the histogram plot and psf_* for the psf circle. """ data = self.get_image_data(centralized) kwargs = set_default_kwargs( kwargs, image_bins=80, image_cmap=plt.cm.gist_heat_r, psf_color="k", psf_fill=False, psf_lw=2, psf_ls="--", ) kwargs_for_image = collect_kwargs("image", kwargs) kwargs_for_psf = collect_kwargs("psf", kwargs) ax = plt.gca() ax.set_xlabel("X Position (cm)") ax.set_ylabel("Y Position (cm)") ax.hist2d(data["X"], data["Y"], **kwargs_for_image) ax.set_aspect("equal", "datalim") # PSF circle (80%) center = (0, 0) if centralized else (self.centroid_x, self.centroid_y) circle = plt.Circle(center, self.get_psf(0.8) / 2, **kwargs_for_psf) ax.add_artist(circle) ax.axhline(0, color="k", linestyle="--", zorder=3, linewidth=0.5) ax.axvline(0, color="k", linestyle="--", zorder=3, linewidth=0.5) if file_name is not None: plt.savefig(file_name) plt.close()
[docs] def get_cumulative_data(self, radius=None): """ Provide cumulative data (intensity vs radius). Parameters ---------- radius: array Array with radius calculate the cumulative PSF in distance units. Returns ------- (radius, intensity) """ if radius is not None: radius_all = radius.to(u.cm).value if isinstance(radius, u.Quantity) else radius else: radius_all = list(np.linspace(0, 1.6 * self.get_psf(0.8), 30)) intensity = [] for rad in radius_all: intensity.append(self._sum_photons_in_radius(rad) / self._number_of_detected_photons) d_type = { "names": (self.__PSF_RADIUS, self.__PSF_CUMULATIVE), "formats": ("f8", "f8"), } result = np.recarray((len(radius_all),), dtype=d_type) result[self.__PSF_RADIUS] = radius_all result[self.__PSF_CUMULATIVE] = intensity return result
[docs] def plot_cumulative(self, file_name=None, d80=None, **kwargs): """Plot cumulative data (intensity vs radius). Parameters ---------- **kwargs: image_* for the histogram plot and psf_* for the psf circle. """ data = self.get_cumulative_data() ax = plt.gca() plt.tight_layout(pad=1.5) ax.set_xlabel("Radius (cm)") ax.set_ylabel("Contained light %") plt.plot(data[self.__PSF_RADIUS], data[self.__PSF_CUMULATIVE], **kwargs) plt.axvline(x=self.get_psf(0.8) / 2, color="b", linestyle="--", linewidth=1) if d80 is not None: plt.axvline(x=d80 / 2.0, color="r", linestyle="--", linewidth=1) if file_name is not None: plt.savefig(file_name) plt.close()