Source code for simtel.simtel_config_reader

#!/usr/bin/python3
"""Read model parameters and configuration from sim_telarray configuration files."""

import logging
import re

import astropy.units as u
import numpy as np

import simtools.utils.general as gen
from simtools.utils import names

__all__ = ["SimtelConfigReader"]


def get_list_of_simtel_parameters(simtel_config_file):
    """
    Return list of simtel parameters found in simtel configuration file.

    Parameters
    ----------
    simtel_config_file: str
        File name for sim_telarray configuration

    Returns
    -------
    list
        List of parameters found in simtel configuration file.

    """
    simtel_parameter_set = set()
    with open(simtel_config_file, encoding="utf-8") as file:
        for line in file:
            parts_of_lines = re.split(r",\s*|\s+", line.strip())
            simtel_parameter_set.add(parts_of_lines[1].lower())
    return sorted(simtel_parameter_set)


[docs] class SimtelConfigReader: """ Reads model parameters from configuration files and converts to the simtools representation. The output format are simtool-db-style json dicts. Model parameters are read from sim_telarray configuration files. The sim_telarray configuration can be generated using e.g., the following sim_telarray command: ... code-block:: console sim_telarray/bin/sim_telarray \ -c sim_telarray/cfg/CTA/CTA-PROD6-LaPalma.cfg\ -C limits=no-internal -C initlist=no-internal -C list=no-internal\ -C typelist=no-internal -C maximum_telescopes=30\ -DNSB_AUTOSCALE -DNECTARCAM -DHYPER_LAYOUT\ -DNUM_TELESCOPES=30 /dev/null 2>|/dev/null | grep '(@cfg)' | sed 's/^(@cfg) Parameters ---------- schema_file: str Schema file describing the model parameter. simtel_config_file: str or Path Path of the file to read from. simtel_telescope_name: str Telescope name (sim_telarray convention) parameter_name: str Parameter name (default: read from schema file) camera_pixels: int Number of camera pixels """ def __init__( self, schema_file, simtel_config_file, simtel_telescope_name, parameter_name=None, camera_pixels=None, ): """Initialize SimtelConfigReader.""" self._logger = logging.getLogger(__name__) self._logger.debug("Init SimtelConfigReader") self.schema_file = schema_file self.schema_dict = ( gen.collect_data_from_file(file_name=self.schema_file) if self.schema_file is not None else None ) self.parameter_name = self.schema_dict.get("name") if self.schema_dict else parameter_name try: self.simtel_parameter_name = names.get_simulation_software_name_from_parameter_name( self.parameter_name ).upper() except (KeyError, AttributeError): self.simtel_parameter_name = self.parameter_name.upper() self.simtel_telescope_name = simtel_telescope_name self.camera_pixels = camera_pixels self.parameter_dict = self.read_simtel_config_file( simtel_config_file, simtel_telescope_name ) def _should_skip_limits_check(self, data_type): """Check if limits should be skipped.""" return data_type == "limits" and self.parameter_dict.get("type") == "bool" def _get_schema_values(self, data_type): """Check schema values for limits, unit, and default.""" try: if data_type == "limits": _from_schema = [ self.schema_dict["data"][0]["allowed_range"].get("min"), self.schema_dict["data"][0]["allowed_range"].get("max"), ] return _from_schema[0] if _from_schema[1] is None else _from_schema if len(self.schema_dict["data"]) == 1: return self.schema_dict["data"][0].get(data_type) return [data.get(data_type) for data in self.schema_dict["data"]] except (KeyError, IndexError): return None @staticmethod def _values_match(_from_simtel, _from_schema): """ Check if values match (are close for floats). Convert where necessary astropy.Quantity to float. """ if isinstance(_from_simtel, u.Quantity): _from_simtel = _from_simtel.value if isinstance(_from_simtel, np.ndarray) and len(_from_simtel) > 0: _from_simtel = np.array( [v.value if isinstance(v, u.Quantity) else v for v in _from_simtel] ) try: if not isinstance(_from_schema, list | np.ndarray) and _from_simtel == _from_schema: return True except ValueError: pass try: if np.all(np.isclose(_from_simtel, _from_schema)): return True except (TypeError, ValueError): pass return False def _log_mismatch_warning(self, data_type, _from_simtel, _from_schema): """Log mismatch warning.""" self._logger.warning(f"Values for {data_type} do not match:") self._logger.warning( f" from simtel: {self.simtel_parameter_name} {_from_simtel} ({type(_from_simtel)})" ) self._logger.warning( f" from schema: {self.parameter_name} {_from_schema} ({type(_from_schema)})" )
[docs] def compare_simtel_config_with_schema(self): """ Compare limits and defaults reported by simtel_array with schema. This is mostly for debugging purposes and includes simple printing. Check for differences in 'default' and 'limits' entries. """ for data_type in ["default", "limits"]: _from_simtel = self.parameter_dict.get(data_type) if self._should_skip_limits_check(data_type): continue _from_schema = self._get_schema_values(data_type) if isinstance(_from_schema, list): _from_schema = np.array(_from_schema, dtype=np.dtype(self.parameter_dict["type"])) if self._values_match(_from_simtel, _from_schema): self._logger.debug(f"Values for {data_type} match") else: self._log_mismatch_warning(data_type, _from_simtel, _from_schema)
[docs] def read_simtel_config_file(self, simtel_config_file, simtel_telescope_name): """ Read sim_telarray configuration file and return a dictionary with the parameter values. Parameters ---------- simtel_config_file: str or Path Path of the file to read from. simtel_telescope_name: str Telescope name (sim_telarray convention) Returns ------- dict Dictionary with the parameter values. """ self._logger.debug( f"Reading simtel config file {simtel_config_file} for parameter {self.parameter_name}" ) matching_lines = {} try: with open(simtel_config_file, encoding="utf-8") as file: for line in file: # split line into parts (space, tabs, comma separated) parts_of_lines = re.split(r",\s*|\s+", line.strip()) if self.simtel_parameter_name == parts_of_lines[1].upper(): matching_lines[parts_of_lines[0]] = parts_of_lines[2:] except FileNotFoundError as exc: self._logger.error(f"File {simtel_config_file} not found.") raise exc if len(matching_lines) == 0: self._logger.info(f"No entries found for parameter {self.simtel_parameter_name}") return None _para_dict = {} # first: extract line type (required for conversions and dimension) _para_dict["type"], _para_dict["dimension"] = self._get_type_and_dimension_from_simtel_cfg( matching_lines["type"] ) # then: extract other fields # (order of keys matter; not all field are present for all parameters) for key in ["default", simtel_telescope_name, "limits"]: try: _para_dict[key], _ = self._add_value_from_simtel_cfg( matching_lines[key], dtype=_para_dict.get("type"), n_dim=_para_dict.get("dimension"), default=_para_dict.get("default"), is_limit=(key == "limits"), ) except KeyError: pass return _para_dict
def _resolve_all_in_column(self, column): """ Resolve 'all' entries in a column. This needs to resolve the following cases: no 'all' in any entry; ['all:', '5'], ['all: 5'], ['all:5', '3:1'] This function is fine-tuned to the simtel configuration output. Parameters ---------- column: list List of strings to resolve. Returns ------- list List of resolved strings. """ # don't do anything if all string items in column do not start with 'all' if not any(isinstance(item, str) and item.startswith("all") for item in column): return column, {} self._logger.debug(f"Resolving 'all' entries in column: {column}") # remove 'all:' entries column = [item for item in column if item not in ("all:", "all")] # resolve 'all:5' type entries column = [ item.split(":")[1].replace(" ", "") if item.startswith("all:") else item for item in column ] # find 'index:value' type entries except_from_all = {} for item in column: if ":" in item: index, value = item.split(":") except_from_all[index] = value # finally remove entries containing ':' column = [item for item in column if ":" not in item] return column, except_from_all def _add_value_from_simtel_cfg(self, column, dtype=None, n_dim=1, default=None, is_limit=False): """ Extract value(s) from simtel configuration file columns. This function is fine-tuned to the simtel configuration output. Parameters ---------- column: list List of strings to extract value from. dtype: str Data type to convert value to. n_dim: int Length of array to be returned. default: object Default value to extend array to required length. Returns ------- object, int Values extracted from column. Of object is a list of array, return length of array. """ # string represents a lists of values (space or comma separated) if len(column) == 1: column = column[0].split(",") if "," in column[0] else column[0].split(" ") self._logger.debug( f"Adding value from simtel config: {column} (n_dim={n_dim}, default={default})" ) column = [None if item.lower() == "none" else item for item in column] column, except_from_all = self._resolve_all_in_column(column) # extend array to required length (simtel uses sometimes 'all:' for all entries) if n_dim > 1 and len(column) < n_dim: try: # skip formatting: black reformats and violates E203 column += default[len(column):] # fmt: skip except TypeError: # extend array to required length using previous value column.extend([column[-1]] * (n_dim - len(column))) for index, value in except_from_all.items(): column[int(index)] = value if dtype == "bool": column = np.array([bool(int(item)) for item in column]) column, ndim = self._process_column(column, dtype) if not is_limit: column = self._add_units(column) return column, ndim def _add_units(self, column): """ Add units as given in schema file to column. Take into account array types and dimensionless units. Ensure that integer values are returned as integers (astropy converts values to floats when multiplying them with units). """ try: unit = self._get_schema_values("unit") except TypeError: # no schema defined return column if unit is None or unit == "dimensionless": return column if isinstance(column, np.ndarray) and len(column) == len(unit): return np.array( [ col * u.Unit(un) if un != "dimensionless" else col for col, un in zip(column, unit) ], dtype=object, ) if isinstance(unit, str): column_with_unit = column * u.Unit(unit) if isinstance(column, int | np.integer): return u.Quantity(int(column_with_unit.value), unit, dtype=type(column)) return column_with_unit return None def _process_column(self, column, dtype): """ Process and return column prepared in _add_value_from_simtel_cfg. Parameters ---------- column: list List of strings to process. dtype: str Data type to convert value to. """ if len(column) == 1: if column[0] is not None: array_dtype = np.dtype(dtype) if dtype else None processed_value = np.array(column, dtype=array_dtype)[0] return processed_value, 1 return None, 1 if len(column) > 1: return np.array(column, dtype=np.dtype(dtype) if dtype else None), len(column) return None, None def _get_type_and_dimension_from_simtel_cfg(self, column): """ Return type and dimension from simtel configuration column. 'Func' type from simtel is treated as string. Return number of camera pixel for a hard-wired set up parameters. Parameters ---------- column: list List of strings to extract value from. Returns ------- str, int Type and dimension. """ if column[0].lower() == "text" or column[0].lower() == "func": return "str", 1 if column[0].lower() == "ibool": return "bool", int(column[1]) if self.camera_pixels is not None and self.simtel_parameter_name in ["NIGHTSKY_BACKGROUND"]: return str(np.dtype(column[0].lower())), self.camera_pixels return str(np.dtype(column[0].lower())), int(column[1])