#!/usr/bin/python3
"""Read model parameters and configuration from sim_telarray configuration files."""
import logging
import re
import numpy as np
import simtools.utils.general as gen
from simtools.data_model import validate_data
from simtools.data_model.model_data_writer import ModelDataWriter
from simtools.utils import names
__all__ = ["SimtelConfigReader"]
[docs]
class SimtelConfigReader:
"""
Reads model parameters from configuration files and converts to the simtools representation.
The output format are simtool-db-style json dicts.
Model parameters are read from sim_telarray configuration files.
The sim_telarray configuration can be generated using e.g., the following sim_telarray command:
... code-block:: console
sim_telarray/bin/sim_telarray \
-c sim_telarray/cfg/CTA/CTA-PROD6-LaPalma.cfg\
-C limits=no-internal -C initlist=no-internal -C list=no-internal\
-C typelist=no-internal -C maximum_telescopes=30\
-DNSB_AUTOSCALE -DNECTARCAM -DHYPER_LAYOUT\
-DNUM_TELESCOPES=30 /dev/null 2>|/dev/null | grep '(@cfg)'
Parameters
----------
schema_file: str
Schema file describing the model parameter.
simtel_config_file: str or Path
Path of the file to read from.
simtel_telescope_name: str
Telescope name (sim_telarray convention)
parameter_name: str
Parameter name (default: read from schema file)
camera_pixels: int
Number of camera pixels
"""
def __init__(
self,
schema_file,
simtel_config_file,
simtel_telescope_name,
parameter_name=None,
camera_pixels=None,
):
"""Initialize SimtelConfigReader."""
self._logger = logging.getLogger(__name__)
self._logger.debug("Init SimtelConfigReader")
self.schema_file = schema_file
self.schema_dict = (
gen.collect_data_from_file_or_dict(file_name=self.schema_file, in_dict=None)
if self.schema_file is not None
else None
)
self.parameter_name = self.schema_dict.get("name") if self.schema_dict else parameter_name
self.simtel_parameter_name = self._get_simtel_parameter_name(self.parameter_name)
self.simtel_telescope_name = simtel_telescope_name
self.camera_pixels = camera_pixels
self.parameter_dict = self._read_simtel_config_file(
simtel_config_file, simtel_telescope_name
)
[docs]
def get_validated_parameter_dict(self, telescope_name, model_version=None):
"""
Return a validated model parameter dictionary as filled into the database.
Parameters
----------
telescope_name: str
Telescope name (e.g., LSTN-01)
model_version: str
Model version string.
Returns
-------
dict
Model parameter dictionary.
"""
self._logger.debug(f"Getting validated parameter dictionary for {telescope_name}")
type_mapping = {"str": "string", "bool": "boolean"}
type_value = type_mapping.get(
self.parameter_dict.get("type"), self.parameter_dict.get("type")
)
_json_dict = {
"parameter": self.parameter_name,
"instrument": telescope_name,
"site": names.get_site_from_array_element_name(telescope_name),
"version": model_version,
"value": self.parameter_dict.get(self.simtel_telescope_name),
"unit": self._get_unit_from_schema(),
"type": type_value,
"applicable": self._check_parameter_applicability(telescope_name),
"file": self._parameter_is_a_file(),
}
return self._validate_parameter_dict(_json_dict)
[docs]
def export_parameter_dict_to_json(self, file_name, dict_to_write):
"""
Export parameter dictionary to json.
Parameters
----------
file_name: str or Path
File name to export to.
dict_to_write: dict
Dictionary to export.
"""
try:
dict_to_write["value"] = gen.convert_list_to_string(dict_to_write["value"])
dict_to_write["unit"] = gen.convert_list_to_string(dict_to_write["unit"], True)
dict_to_write["limits"] = gen.convert_list_to_string(dict_to_write["limits"])
except KeyError:
pass
self._logger.info(f"Exporting parameter dictionary to {file_name}")
ModelDataWriter.write_dict_to_model_parameter_json(
file_name=file_name, data_dict=dict_to_write
)
def _should_skip_limits_check(self, data_type):
"""Check if limits should be skipped."""
return data_type == "limits" and self.parameter_dict.get("type") == "bool"
def _get_schema_values(self, data_type):
"""Check schema values for limits and defaults."""
try:
if data_type == "limits":
_from_schema = [
self.schema_dict["data"][0]["allowed_range"].get("min"),
self.schema_dict["data"][0]["allowed_range"].get("max"),
]
return _from_schema[0] if _from_schema[1] is None else _from_schema
if len(self.schema_dict["data"]) == 1:
return self.schema_dict["data"][0]["default"]
return [data.get("default") for data in self.schema_dict["data"]]
except (KeyError, IndexError):
return None
@staticmethod
def _values_match(_from_simtel, _from_schema):
"""Check if values match (are close for floats)."""
try:
if not isinstance(_from_schema, list | np.ndarray) and _from_simtel == _from_schema:
return True
except ValueError:
pass
try:
if np.all(np.isclose(_from_simtel, _from_schema)):
return True
except (TypeError, ValueError):
pass
return False
def _log_mismatch_warning(self, data_type, _from_simtel, _from_schema):
"""Log mismatch warning."""
self._logger.warning(f"Values for {data_type} do not match:")
self._logger.warning(
f" from simtel: {self.simtel_parameter_name} {_from_simtel} ({type(_from_simtel)})"
)
self._logger.warning(
f" from schema: {self.parameter_name} {_from_schema} ({type(_from_schema)})"
)
[docs]
def compare_simtel_config_with_schema(self):
"""
Compare limits and defaults reported by simtel_array with schema.
This is mostly for debugging purposes and includes simple printing.
Check for differences in 'default' and 'limits' entries.
"""
for data_type in ["default", "limits"]:
_from_simtel = self.parameter_dict.get(data_type)
if self._should_skip_limits_check(data_type):
continue
_from_schema = self._get_schema_values(data_type)
if isinstance(_from_schema, list):
_from_schema = np.array(_from_schema, dtype=np.dtype(self.parameter_dict["type"]))
if self._values_match(_from_simtel, _from_schema):
self._logger.debug(f"Values for {data_type} match")
else:
self._log_mismatch_warning(data_type, _from_simtel, _from_schema)
def _read_simtel_config_file(self, simtel_config_file, simtel_telescope_name):
"""
Read sim_telarray configuration file and return a dictionary with the parameter values.
Parameters
----------
simtel_config_file: str or Path
Path of the file to read from.
simtel_telescope_name: str
Telescope name (sim_telarray convention)
Returns
-------
dict
Dictionary with the parameter values.
"""
self._logger.debug(
f"Reading simtel config file {simtel_config_file} "
f"for parameter {self.parameter_name}"
)
matching_lines = {}
try:
with open(simtel_config_file, encoding="utf-8") as file:
for line in file:
# split line into parts (space, tabs, comma separated)
parts_of_lines = re.split(r",\s*|\s+", line.strip())
if self.simtel_parameter_name == parts_of_lines[1].upper():
matching_lines[parts_of_lines[0]] = parts_of_lines[2:]
except FileNotFoundError as exc:
self._logger.error(f"File {simtel_config_file} not found.")
raise exc
if len(matching_lines) == 0:
self._logger.info(f"No entries found for parameter {self.simtel_parameter_name}")
return None
_para_dict = {}
# first: extract line type (required for conversions and dimension)
_para_dict["type"], _para_dict["dimension"] = self._get_type_and_dimension_from_simtel_cfg(
matching_lines["type"]
)
# then: extract other fields
# (order of keys matter; not all field are present for all parameters)
for key in ["default", simtel_telescope_name, "limits"]:
try:
_para_dict[key], _ = self._add_value_from_simtel_cfg(
matching_lines[key],
dtype=_para_dict.get("type"),
n_dim=_para_dict.get("dimension"),
default=_para_dict.get("default"),
)
except KeyError:
pass
return _para_dict
def _resolve_all_in_column(self, column):
"""
Resolve 'all' entries in a column.
This needs to resolve the following cases:
no 'all' in any entry; ['all:', '5'], ['all: 5'], ['all:5', '3:1']
This function is fine-tuned to the simtel configuration output.
Parameters
----------
column: list
List of strings to resolve.
Returns
-------
list
List of resolved strings.
"""
# don't do anything if all string items in column do not start with 'all'
if not any(isinstance(item, str) and item.startswith("all") for item in column):
return column, {}
self._logger.debug(f"Resolving 'all' entries in column: {column}")
# remove 'all:' entries
column = [item for item in column if item not in ("all:", "all")]
# resolve 'all:5' type entries
column = [
item.split(":")[1].replace(" ", "") if item.startswith("all:") else item
for item in column
]
# find 'index:value' type entries
except_from_all = {}
for item in column:
if ":" in item:
index, value = item.split(":")
except_from_all[index] = value
# finally remove entries containing ':'
column = [item for item in column if ":" not in item]
return column, except_from_all
def _add_value_from_simtel_cfg(self, column, dtype=None, n_dim=1, default=None):
"""
Extract value(s) from simtel configuration file columns.
This function is fine-tuned to the simtel configuration output.
Parameters
----------
column: list
List of strings to extract value from.
dtype: str
Data type to convert value to.
n_dim: int
Length of array to be returned.
default: object
Default value to extend array to required length.
Returns
-------
object, int
Values extracted from column. Of object is a list of array, return length of array.
"""
# string represents a lists of values (space or comma separated)
if len(column) == 1:
column = column[0].split(",") if "," in column[0] else column[0].split(" ")
self._logger.debug(
f"Adding value from simtel config: {column} (n_dim={n_dim}, default={default})"
)
column = [None if item.lower() == "none" else item for item in column]
column, except_from_all = self._resolve_all_in_column(column)
# extend array to required length (simtel uses sometimes 'all:' for all entries)
if n_dim > 1 and len(column) < n_dim:
try:
# skip formatting: black reformats and violates E203
column += default[len(column):] # fmt: skip
except TypeError:
# extend array to required length using previous value
column.extend([column[-1]] * (n_dim - len(column)))
for index, value in except_from_all.items():
column[int(index)] = value
if dtype == "bool":
column = np.array([bool(int(item)) for item in column])
return self._process_column(column, dtype)
def _process_column(self, column, dtype):
"""
Process and return column prepared in _add_value_from_simtel_cfg.
Parameters
----------
column: list
List of strings to process.
dtype: str
Data type to convert value to.
"""
if len(column) == 1:
if column[0] is not None:
array_dtype = np.dtype(dtype) if dtype else None
processed_value = np.array(column, dtype=array_dtype)[0]
return processed_value, 1
return None, 1
if len(column) > 1:
return np.array(column, dtype=np.dtype(dtype) if dtype else None), len(column)
return None, None
def _get_type_and_dimension_from_simtel_cfg(self, column):
"""
Return type and dimension from simtel configuration column.
'Func' type from simtel is treated as string. Return number
of camera pixel for a hard-wired set up parameters.
Parameters
----------
column: list
List of strings to extract value from.
Returns
-------
str, int
Type and dimension.
"""
if column[0].lower() == "text" or column[0].lower() == "func":
return "str", 1
if column[0].lower() == "ibool":
return "bool", int(column[1])
if self.camera_pixels is not None and self.simtel_parameter_name in ["NIGHTSKY_BACKGROUND"]:
return str(np.dtype(column[0].lower())), self.camera_pixels
return str(np.dtype(column[0].lower())), int(column[1])
def _get_simtel_parameter_name(self, parameter_name):
"""
Return parameter name as used in sim_telarray.
This is documented in the schema file.
Parameters
----------
parameter_name: str
Model parameter name (as used in simtools)
Returns
-------
str
Parameter name as used in sim_telarray.
"""
try:
for sim_soft in self.schema_dict["simulation_software"]:
if sim_soft["name"] == "sim_telarray":
return sim_soft["internal_parameter_name"].upper()
except (KeyError, TypeError):
pass
return parameter_name.upper()
def _check_parameter_applicability(self, telescope_name):
"""
Check if a parameter is applicable for a given telescope using schema files.
First check for exact telescope name, if not listed in the schema
use telescope type.
Parameters
----------
telescope_name: str
Telescope name (e.g., LSTN-01)
Returns
-------
bool
True if parameter is applicable to telescope.
"""
try:
if telescope_name in self.schema_dict["instrument"]["type"]:
return True
except KeyError as exc:
self._logger.error("Schema file does not contain 'instrument:type' key.")
raise exc
return (
names.get_array_element_type_from_name(telescope_name)
in self.schema_dict["instrument"]["type"]
)
def _parameter_is_a_file(self):
"""
Check if parameter is a file.
Returns
-------
bool
True if parameter is a file.
"""
try:
return self.schema_dict["data"][0]["type"] == "file"
except (KeyError, IndexError):
pass
return False
def _get_unit_from_schema(self):
"""
Return unit(s) from schema dict.
Returns
-------
str or list
Parameter unit(s)
"""
try:
unit_list = []
for data in self.schema_dict["data"]:
unit_list.append(data["unit"] if data["unit"] != "dimensionless" else None)
return unit_list if len(unit_list) > 1 else unit_list[0]
except (KeyError, IndexError):
pass
return None
def _validate_parameter_dict(self, parameter_dict):
"""
Validate json dictionary against model parameter data schema.
Parameters
----------
parameter_dict: dict
Dictionary to validate.
Returns
-------
dict
Validated dictionary (possibly converted to reference units).
"""
self._logger.debug(
f"Validating parameter dictionary {parameter_dict} using {self.schema_file}"
)
data_validator = validate_data.DataValidator(
schema_file=self.schema_file,
data_dict=parameter_dict,
check_exact_data_type=False,
)
data_validator.validate_and_transform()
return data_validator.data_dict