Source code for simtel.simtel_io_event_reader

"""Read reduced datasets in form of astropy tables from file."""

import logging
from dataclasses import dataclass, field

import astropy.units as u
import numpy as np
from astropy.coordinates import AltAz, angular_separation
from ctapipe.coordinates import GroundFrame, TiltedGroundFrame

from simtools.corsika.primary_particle import PrimaryParticle
from simtools.io_operations import io_table_handler


[docs] @dataclass class ShowerEventData: """Container for shower event data.""" shower_id: list[np.uint32] = field(default_factory=list) event_id: list[np.uint32] = field(default_factory=list) file_id: list[np.uint32] = field(default_factory=list) simulated_energy: list[np.float64] = field(default_factory=list) x_core: list[np.float64] = field(default_factory=list) y_core: list[np.float64] = field(default_factory=list) shower_azimuth: list[np.float64] = field(default_factory=list) shower_altitude: list[np.float64] = field(default_factory=list) area_weight: list[np.float64] = field(default_factory=list) x_core_shower: list[np.float64] = field(default_factory=list) y_core_shower: list[np.float64] = field(default_factory=list) core_distance_shower: list[np.float64] = field(default_factory=list)
[docs] @dataclass class TriggeredEventData: """Container for triggered event data.""" shower_id: list[np.uint32] = field(default_factory=list) event_id: list[np.uint32] = field(default_factory=list) file_id: list[np.uint32] = field(default_factory=list) array_altitude: list[float] = field(default_factory=list) array_azimuth: list[float] = field(default_factory=list) telescope_list: list[np.ndarray] = field(default_factory=list) angular_distance: list[float] = field(default_factory=list)
[docs] class SimtelIOEventDataReader: """Read reduced MC data set stored in astropy tables.""" def __init__(self, event_data_file, telescope_list=None): """Initialize SimtelIOEventDataReader.""" self._logger = logging.getLogger(__name__) self.telescope_list = telescope_list self.data_sets = self.read_table_list(event_data_file)
[docs] def read_table_list(self, event_data_file): """ Read available tables from the event data file. Rearrange dictionary with tables names into a list of dictionaries under the assumption that the file contains the tables "SHOWERS", "TRIGGERS", and "FILE_INFO". Parameters ---------- event_data_file : str Path to the event data file. Returns ------- list List of dictionaries containing the data from the tables. """ dataset_dict = io_table_handler.read_table_list( event_data_file, ["SHOWERS", "TRIGGERS", "FILE_INFO"], include_indexed_tables=True, ) data_sets = [] for i in range(len(dataset_dict["SHOWERS"])): data_sets.append( { "SHOWERS": dataset_dict["SHOWERS"][i], "TRIGGERS": dataset_dict["TRIGGERS"][i], "FILE_INFO": dataset_dict["FILE_INFO"][i], } ) return data_sets
def _table_to_shower_data(self, table): """ Convert tables with shower event data and add derived quantities. Parameters ---------- table : astropy.table.Table Table "SHOWERS" Returns ------- ShowerEventData An instance of ShowerEventData with the data from the table. """ shower_data = ShowerEventData() for col in table.colnames: setattr(shower_data, col, np.array(table[col].data)) if table[col].unit: setattr(shower_data, f"{col}_unit", table[col].unit) shower_data.x_core_shower, shower_data.y_core_shower = ( self._transform_to_shower_coordinates( shower_data.x_core, shower_data.y_core, shower_data.shower_azimuth, shower_data.shower_altitude, ) ) shower_data.core_distance_shower = np.sqrt( shower_data.x_core_shower**2 + shower_data.y_core_shower**2 ) return shower_data def _table_to_triggered_data(self, table): """ Convert table with triggered event data. Converts telescope lists from comma-separated string to numpy array. Parameters ---------- table : astropy.table.Table Table "TRIGGERS" Returns ------- TriggeredEventData An instance of TriggeredEventData with the data from the table. """ triggered_data = TriggeredEventData() for col in table.colnames: if col == "telescope_list": arrays = [ np.array(list(map(int, tel_list.split(","))), dtype=np.int16) for tel_list in table[col] ] triggered_data.telescope_list = arrays else: data = np.array(table[col].data) setattr(triggered_data, col, data) if table[col].unit: setattr(triggered_data, f"{col}_unit", table[col].unit) return triggered_data def _get_triggered_shower_data( self, shower_data, triggered_file_id, triggered_event_id, triggered_shower_id ): """ Get shower data corresponding to triggered events. Matches triggered events with showers based on shower_id, event_id, and file_id. Parameters ---------- shower_data : ShowerEventData The shower data containing all showers. triggered_file_id : list List of file IDs for triggered events. triggered_event_id : list List of event IDs for triggered events. triggered_shower_id : list List of shower IDs for triggered events. Returns ------- ShowerEventData An instance of ShowerEventData containing only the triggered showers. """ triggered_shower = ShowerEventData() matched_indices = [] for tr_shower_id, tr_event_id, tr_file_id in zip( triggered_shower_id, triggered_event_id, triggered_file_id ): mask = ( (shower_data.shower_id == tr_shower_id) & (shower_data.event_id == tr_event_id) & (shower_data.file_id == tr_file_id) ) matched_idx = np.where(mask)[0] if len(matched_idx) == 1: matched_indices.append(matched_idx[0]) else: self._logger.warning( f"Found {len(matched_idx)} matches for shower {tr_shower_id}" f" event {tr_event_id} file {tr_file_id}" ) for attr in vars(shower_data): if not attr.endswith("_unit"): value = getattr(shower_data, attr) if isinstance(value, list | np.ndarray): setattr(triggered_shower, attr, np.array(value)[matched_indices]) return triggered_shower
[docs] def read_event_data(self, event_data_file, table_name_map=None): """ Read event data and file info tables from file and apply transformations. Allows to map tables names to their actual names in the file (e.g., "SHOWER" to "SHOWER_1"). Parameters ---------- event_data_file : str Path to the event data file. table_name_map : dict, optional Mapping of table names to their actual names in the file. Defaults to using the standard names "SHOWERS", "TRIGGERS", and "FILE_INFO". Returns ------- tuple A tuple with file info table, shower, triggered shower, and triggered event data. """ table_name_map = table_name_map or {} def get_name(key): return table_name_map.get(key, key) tables = io_table_handler.read_tables( event_data_file, table_names=[get_name(k) for k in ("SHOWERS", "TRIGGERS", "FILE_INFO")], ) shower_data = self._table_to_shower_data(tables[get_name("SHOWERS")]) triggered_data = self._table_to_triggered_data(tables[get_name("TRIGGERS")]) triggered_shower = self._get_triggered_shower_data( shower_data, tables[get_name("TRIGGERS")]["file_id"], tables[get_name("TRIGGERS")]["event_id"], tables[get_name("TRIGGERS")]["shower_id"], ) triggered_data.angular_distance = ( angular_separation( triggered_shower.shower_azimuth * u.rad, triggered_shower.shower_altitude * u.rad, triggered_data.array_azimuth * u.rad, triggered_data.array_altitude * u.rad, ) .to(u.deg) .value ) if self.telescope_list: triggered_data, triggered_shower = self._filter_by_telescopes( triggered_data, triggered_shower ) self._logger.info(f"Number of triggered events: {len(triggered_data.array_altitude)}") return ( tables[get_name("FILE_INFO")], shower_data, triggered_shower, triggered_data, )
def _filter_by_telescopes(self, triggered_data, triggered_shower): """Filter trigger data and triggered shower data by the specified telescope list.""" mask = np.array( [ any(tel in event for tel in self.telescope_list) for event in triggered_data.telescope_list ] ) filtered_triggered_data = TriggeredEventData( shower_id=triggered_data.shower_id[mask], event_id=triggered_data.event_id[mask], file_id=triggered_data.file_id[mask], array_altitude=triggered_data.array_altitude[mask], array_azimuth=triggered_data.array_azimuth[mask], telescope_list=[triggered_data.telescope_list[i] for i in np.arange(len(mask))[mask]], angular_distance=triggered_data.angular_distance[mask], ) filtered_triggered_shower_data = self._get_triggered_shower_data( triggered_shower, filtered_triggered_data.file_id, filtered_triggered_data.event_id, filtered_triggered_data.shower_id, ) return filtered_triggered_data, filtered_triggered_shower_data def _transform_to_shower_coordinates(self, x_core, y_core, shower_azimuth, shower_altitude): """ Transform core positions from ground coordinates to shower coordinates. Parameters ---------- x_core : np.ndarray Core x positions in ground coordinates. y_core : np.ndarray Core y positions in ground coordinates. shower_azimuth : np.ndarray Shower azimuth angles. shower_altitude : np.ndarray Shower altitude angles. Returns ------- tuple Core positions in shower coordinates (x, y). """ ground = GroundFrame(x=x_core * u.m, y=y_core * u.m, z=np.zeros_like(x_core) * u.m) shower_frame = ground.transform_to( TiltedGroundFrame( pointing_direction=AltAz(az=shower_azimuth * u.rad, alt=shower_altitude * u.rad) ) ) return shower_frame.x.value, shower_frame.y.value
[docs] def get_reduced_simulation_file_info(self, simulation_file_info): """ Return reduced simulation file info assuming single-valued parameters. Applies rounding and uniqueness functions extract representative values for zenith, azimuth, and NSB level. Assumes all files share identical simulation parameters except for file names. Returns particle name instead of ID. Logs a warning if multiple unique values are found. Parameters ---------- simulation_file_info : astropy.table.Table Dictionary containing simulation file info. Returns ------- dict Dictionary containing the reduced simulation file info. """ particle_id = np.unique(simulation_file_info["particle_id"].data) keys = [ "zenith", "azimuth", "nsb_level", "energy_min", "energy_max", "viewcone_min", "viewcone_max", "core_scatter_min", "core_scatter_max", ] float_arrays = {} for key in keys: if key == "energy_min": float_arrays[key] = np.unique(np.round(simulation_file_info[key].data, decimals=3)) else: float_arrays[key] = np.unique(np.round(simulation_file_info[key].data, decimals=2)) if any(len(arr) > 1 for arr in (particle_id, *(float_arrays[key] for key in keys))): self._logger.warning("Simulation file info has non-unique values.") reduced_info = { "primary_particle": PrimaryParticle( particle_id_type="corsika7_id", particle_id=int(particle_id[0]), ).name, } for key in keys: value = float(float_arrays[key][0]) if simulation_file_info[key].unit is not None: value = value * simulation_file_info[key].unit reduced_info[key] = value return reduced_info