Source code for production_configuration.derive_corsika_limits_grid

"""Derive CORSIKA limits for a grid of parameters."""

import datetime
import logging

import numpy as np
from astropy.table import Column, Table

import simtools.utils.general as gen
from simtools.data_model.metadata_collector import MetadataCollector
from simtools.io_operations import io_handler
from simtools.production_configuration.derive_corsika_limits import LimitCalculator

_logger = logging.getLogger(__name__)


[docs] def generate_corsika_limits_grid(args_dict): """ Generate CORSIKA limits for a grid of parameters. Requires at least one event data file per parameter set. Parameters ---------- args_dict : dict Dictionary containing command line arguments. """ event_data_files = gen.collect_data_from_file(args_dict["event_data_files"])["files"] telescope_configs = gen.collect_data_from_file(args_dict["telescope_ids"])["telescope_configs"] results = [] for file_path in event_data_files: for array_name, telescope_ids in telescope_configs.items(): _logger.info(f"Processing file: {file_path} with telescope config: {array_name}") result = _process_file( file_path, array_name, telescope_ids, args_dict["loss_fraction"], args_dict["plot_histograms"], ) result["layout"] = array_name results.append(result) write_results(results, args_dict)
def _process_file(file_path, array_name, telescope_ids, loss_fraction, plot_histograms): """ Compute limits for a single file. Parameters ---------- file_path : str Path to the event data file. array_name : str Name of the telescope array configuration. telescope_ids : list[int] List of telescope IDs to filter the events. loss_fraction : float Fraction of events to be lost. plot_histograms : bool Whether to plot histograms. Returns ------- dict Dictionary containing the computed limits and metadata. """ calculator = LimitCalculator(file_path, array_name=array_name, telescope_list=telescope_ids) limits = calculator.compute_limits(loss_fraction) if plot_histograms: calculator.plot_data(io_handler.IOHandler().get_output_directory()) return limits
[docs] def write_results(results, args_dict): """ Write the computed limits as astropy table to file. Parameters ---------- results : list[dict] List of computed limits. args_dict : dict Dictionary containing command line arguments. """ table = _create_results_table(results, args_dict["loss_fraction"]) output_dir = io_handler.IOHandler().get_output_directory("corsika_limits") output_file = output_dir / args_dict["output_file"] table.write(output_file, format="ascii.ecsv", overwrite=True) _logger.info(f"Results saved to {output_file}") MetadataCollector.dump(args_dict, output_file)
def _create_results_table(results, loss_fraction): """ Convert list of simulation results to an astropy Table with metadata. Round values to appropriate precision and add metadata. Parameters ---------- results : list[dict] Computed limits per file and telescope configuration. loss_fraction : float Fraction of lost events (added to metadata). Returns ------- astropy.table.Table Table with computed limits. """ cols = [ "primary_particle", "array_name", "telescope_ids", "zenith", "azimuth", "nsb_level", "lower_energy_limit", "upper_radius_limit", "viewcone_radius", ] columns = {name: [] for name in cols} units = {} for res in results: _process_result_row(res, cols, columns, units) table_cols = _create_table_columns(cols, columns, units) table = Table(table_cols) table.meta.update( { "created": datetime.datetime.now().isoformat(), "description": "Lookup table for CORSIKA limits computed from simulations.", "loss_fraction": loss_fraction, } ) return table def _process_result_row(res, cols, columns, units): """Process a single result row and add values to columns.""" for k in cols: val = res.get(k, None) if val is not None: val = _round_value(k, val) _logger.debug(f"Adding {k}: {val} to column data") if hasattr(val, "unit"): columns[k].append(val.value) units[k] = val.unit else: columns[k].append(val) if k not in units: units[k] = None def _round_value(key, val): """Round value based on key type.""" if key == "lower_energy_limit": return np.floor(val * 1e3) / 1e3 if key == "upper_radius_limit": return np.ceil(val / 25) * 25 if key == "viewcone_radius": return np.ceil(val / 0.25) * 0.25 return val def _create_table_columns(cols, columns, units): """Create table columns with appropriate data types.""" table_cols = [] for k in cols: col_data = columns[k] if any(isinstance(v, list | tuple) for v in col_data): col = Column(data=col_data, name=k, unit=units.get(k), dtype=object) else: col = Column(data=col_data, name=k, unit=units.get(k)) table_cols.append(col) return table_cols