"""CORSIKA configuration."""
import logging
from pathlib import Path
import numpy as np
from astropy import units as u
import simtools.utils.general as gen
from simtools.corsika.primary_particle import PrimaryParticle
from simtools.io_operations import io_handler
from simtools.model.model_parameter import ModelParameter
__all__ = [
"CorsikaConfig",
"InvalidCorsikaInputError",
]
[docs]
class CorsikaConfig:
"""
Configuration for the CORSIKA air shower simulation software.
Follows closely the CORSIKA definitions and output format (see CORSIKA manual).
The configuration is set as a dict corresponding to the command line configuration groups
(especially simulation_software, simulation configuration, simulation parameters).
Parameters
----------
array_model : ArrayModel
Array model.
args_dict : dict
Configuration dictionary.
db_config : dict
MongoDB configuration.
label : str
Instance label.
"""
def __init__(self, array_model, args_dict, db_config=None, label=None):
"""Initialize CorsikaConfig."""
self._logger = logging.getLogger(__name__)
self._logger.debug("Init CorsikaConfig")
self.label = label
self.zenith_angle = None
self.azimuth_angle = None
self._run_number = None
self.config_file_path = None
# The following uses the setter defined below, that is why the args_dict is passed
self.primary_particle = args_dict
self.io_handler = io_handler.IOHandler()
self.array_model = array_model
self.config = self.fill_corsika_configuration(args_dict, db_config)
self._is_file_updated = False
def __repr__(self):
"""CorsikaConfig class representation."""
return (
f"<class {self.__class__.__name__}> "
f"(site={self.array_model.site}, "
f"layout={self.array_model.layout_name}, label={self.label})"
)
@property
def primary_particle(self):
"""Primary particle."""
return self._primary_particle
@primary_particle.setter
def primary_particle(self, args_dict):
"""
Set primary particle from input dictionary.
This is to make sure that when setting the primary particle,
we get the full PrimaryParticle object expected.
Parameters
----------
args_dict: dict
Configuration dictionary
"""
self._primary_particle = self._set_primary_particle(args_dict)
[docs]
def fill_corsika_configuration(self, args_dict, db_config=None):
"""
Fill CORSIKA configuration.
Dictionary keys are CORSIKA parameter names.
Values are converted to CORSIKA-consistent units.
Parameters
----------
args_dict : dict
Configuration dictionary.
db_config: dict
Database configuration.
Returns
-------
dict
Dictionary with CORSIKA parameters.
"""
if args_dict is None:
return {}
self._logger.debug("Setting CORSIKA parameters ")
self._is_file_updated = False
self.azimuth_angle = int(args_dict["azimuth_angle"].to("deg").value)
self.zenith_angle = args_dict["zenith_angle"].to("deg").value
self._logger.debug(
f"Setting CORSIKA parameters from database ({args_dict['model_version']})"
)
config = {}
config["USER_INPUT"] = self._corsika_configuration_from_user_input(args_dict)
if db_config is None: # all following parameter require DB
return config
db_model_parameters = ModelParameter(
mongo_db_config=db_config, model_version=args_dict["model_version"]
)
parameters_from_db = db_model_parameters.get_simulation_software_parameters("corsika")
config["INTERACTION_FLAGS"] = self._corsika_configuration_interaction_flags(
parameters_from_db
)
config["CHERENKOV_EMISSION_PARAMETERS"] = self._corsika_configuration_cherenkov_parameters(
parameters_from_db
)
config["DEBUGGING_OUTPUT_PARAMETERS"] = self._corsika_configuration_debugging_parameters()
config["IACT_PARAMETERS"] = self._corsika_configuration_iact_parameters(parameters_from_db)
return config
def _corsika_configuration_from_user_input(self, args_dict):
"""
Get CORSIKA configuration from user input.
Parameters
----------
args_dict : dict
Configuration dictionary.
Returns
-------
dict
Dictionary with CORSIKA parameters.
"""
return {
"EVTNR": [args_dict["event_number_first_shower"]],
"NSHOW": [args_dict["nshow"]],
"PRMPAR": [self.primary_particle.corsika7_id],
"ESLOPE": [args_dict["eslope"]],
"ERANGE": [
args_dict["energy_range"][0].to("GeV").value,
args_dict["energy_range"][1].to("GeV").value,
],
"THETAP": [
float(args_dict["zenith_angle"].to("deg").value),
float(args_dict["zenith_angle"].to("deg").value),
],
"PHIP": [
self._rotate_azimuth_by_180deg(
args_dict["azimuth_angle"].to("deg").value,
correct_for_geomagnetic_field_alignment=args_dict[
"correct_for_b_field_alignment"
],
),
self._rotate_azimuth_by_180deg(
args_dict["azimuth_angle"].to("deg").value,
correct_for_geomagnetic_field_alignment=args_dict[
"correct_for_b_field_alignment"
],
),
],
"VIEWCONE": [
args_dict["view_cone"][0].to("deg").value,
args_dict["view_cone"][1].to("deg").value,
],
"CSCAT": [
args_dict["core_scatter"][0],
args_dict["core_scatter"][1].to("cm").value,
0.0,
],
}
def _corsika_configuration_interaction_flags(self, parameters_from_db):
"""
Return CORSIKA interaction flags / parameters.
Parameters
----------
parameters_from_db : dict
CORSIKA parameters from the database.
Returns
-------
interaction_parameters : dict
Dictionary with CORSIKA interaction parameters.
"""
parameters = {}
parameters["FIXHEI"] = self._input_config_first_interaction_height(
parameters_from_db["corsika_first_interaction_height"]
)
parameters["FIXCHI"] = [
self._input_config_corsika_starting_grammage(
parameters_from_db["corsika_starting_grammage"]
)
]
parameters["TSTART"] = ["T"]
parameters["ECUTS"] = self._input_config_corsika_particle_kinetic_energy_cutoff(
parameters_from_db["corsika_particle_kinetic_energy_cutoff"]
)
parameters["MUADDI"] = ["F"]
parameters["MUMULT"] = ["T"]
parameters["LONGI"] = self._input_config_corsika_longitudinal_parameters(
parameters_from_db["corsika_longitudinal_shower_development"]
)
parameters["MAXPRT"] = ["10"]
parameters["ECTMAP"] = ["1.e6"]
self._logger.debug(f"Interaction parameters: {parameters}")
return parameters
def _input_config_first_interaction_height(self, entry):
"""Return FIXHEI parameter CORSIKA format."""
return [f"{entry['value']*u.Unit(entry['unit']).to('cm'):.2f}", "0"]
def _input_config_corsika_starting_grammage(self, entry):
"""Return FIXCHI parameter CORSIKA format."""
return f"{entry['value']*u.Unit(entry['unit']).to('g/cm2')}"
def _input_config_corsika_particle_kinetic_energy_cutoff(self, entry):
"""Return ECUTS parameter CORSIKA format."""
e_cuts = gen.convert_string_to_list(entry["value"])
return [
f"{e_cuts[0]*u.Unit(entry['unit']).to('GeV')} "
f"{e_cuts[1]*u.Unit(entry['unit']).to('GeV')} "
f"{e_cuts[2]*u.Unit(entry['unit']).to('GeV')} "
f"{e_cuts[3]*u.Unit(entry['unit']).to('GeV')}"
]
def _input_config_corsika_longitudinal_parameters(self, entry):
"""Return LONGI parameter CORSIKA format."""
return ["T", f"{entry['value']*u.Unit(entry['unit']).to('g/cm2')}", "F", "F"]
def _corsika_configuration_cherenkov_parameters(self, parameters_from_db):
"""
Return CORSIKA Cherenkov emission parameters.
Parameters
----------
parameters_from_db : dict
CORSIKA parameters from the database.
Returns
-------
dict
Dictionary with CORSIKA Cherenkov emission parameters.
"""
parameters = {}
parameters["CERSIZ"] = [parameters_from_db["corsika_cherenkov_photon_bunch_size"]["value"]]
parameters["CERFIL"] = "0"
parameters["CWAVLG"] = self._input_config_corsika_cherenkov_wavelength(
parameters_from_db["corsika_cherenkov_photon_wavelength_range"]
)
self._logger.debug(f"Cherenkov parameters: {parameters}")
return parameters
def _input_config_corsika_cherenkov_wavelength(self, entry):
"""Return CWAVLG parameter CORSIKA format."""
wavelength_range = gen.convert_string_to_list(entry["value"])
return [
f"{wavelength_range[0]*u.Unit(entry['unit']).to('nm')}",
f"{wavelength_range[1]*u.Unit(entry['unit']).to('nm')}",
]
def _corsika_configuration_iact_parameters(self, parameters_from_db):
"""
Return CORSIKA IACT parameters.
Parameters
----------
parameters_from_db : dict
CORSIKA parameters from the database.
Returns
-------
dict
Dictionary with CORSIKA IACT parameters.
"""
parameters = {}
parameters["SPLIT_AUTO"] = [parameters_from_db["corsika_iact_split_auto"]["value"]]
parameters["IO_BUFFER"] = [
self._input_config_io_buff(parameters_from_db["corsika_iact_io_buffer"])
]
parameters["MAX_BUNCHES"] = [parameters_from_db["corsika_iact_max_bunches"]["value"]]
self._logger.debug(f"IACT parameters: {parameters}")
return parameters
def _corsika_configuration_debugging_parameters(self):
"""Return CORSIKA debugging output parameters."""
return {
"DEBUG": ["F", 6, "F", 1000000],
"DATBAS": ["yes"],
"DIRECT": ["/dev/null"],
}
def _input_config_io_buff(self, entry):
"""Return IO_BUFFER parameter CORSIKA format."""
return f"{entry['value']}{entry['unit']}"
def _rotate_azimuth_by_180deg(self, az, correct_for_geomagnetic_field_alignment=True):
"""
Convert azimuth angle to the CORSIKA coordinate system.
Parameters
----------
az: float
Azimuth angle in degrees.
correct_for_geomagnetic_field_alignment: bool
Whether to correct for the geomagnetic field alignment.
Returns
-------
float
Azimuth angle in degrees in the CORSIKA coordinate system.
"""
b_field_declination = 0
if correct_for_geomagnetic_field_alignment:
b_field_declination = self.array_model.site_model.get_parameter_value("geomag_rotation")
return (az + 180 + b_field_declination) % 360
@property
def primary(self):
"""Primary particle name."""
return self.primary_particle.name
def _set_primary_particle(self, args_dict):
"""
Set primary particle from input dictionary.
Parameters
----------
args_dict: dict
Input dictionary.
Returns
-------
PrimaryParticle
Primary particle.
"""
if not args_dict or args_dict.get("primary_id_type") is None:
return PrimaryParticle()
return PrimaryParticle(
particle_id_type=args_dict.get("primary_id_type"), particle_id=args_dict.get("primary")
)
[docs]
def get_config_parameter(self, par_name):
"""
Get value of CORSIKA configuration parameter.
Parameters
----------
par_name: str
Name of the parameter as used in the CORSIKA input file (e.g. PRMPAR, THETAP ...).
Raises
------
KeyError
When par_name is not a valid parameter name.
Returns
-------
list
Value(s) of the parameter.
"""
par_value = []
for _, values in self.config.items():
if par_name in values:
par_value = values[par_name]
if len(par_value) == 0:
self._logger.error(f"Parameter {par_name} is not a CORSIKA config parameter")
raise KeyError
return par_value if len(par_value) > 1 else par_value[0]
[docs]
def print_config_parameter(self):
"""Print CORSIKA config parameters for inspection."""
for parameter_type, parameter_dict in self.config.items():
print(f"Parameter type: {parameter_type}\n")
for par, value in parameter_dict.items():
print(f"{par} = {value}")
@staticmethod
def _get_text_single_line(pars, line_begin=""):
"""
Return one parameter per line for each input parameter.
Parameters
----------
pars: dict
Dictionary with the parameters to be written in the file.
Returns
-------
str
Text with the parameters.
"""
text = ""
for par, values in pars.items():
line = line_begin + par + " "
for v in values:
line += str(v) + " "
line += "\n"
text += line
return text
[docs]
def get_corsika_config_file_name(self, file_type, run_number=None):
"""
Get a CORSIKA config style file name for various configuration file types.
Parameters
----------
file_type: str
The type of file (determines the file suffix).
Choices are config_tmp, config or output_generic.
run_number: int
Run number.
Returns
-------
str
for file_type="config_tmp":
Return CORSIKA input file name for one specific run.
This is the input file after being pre-processed by sim_telarray (pfp).
for file_type="config":
Return generic CORSIKA config input file name.
for file_type="output_generic"
Return generic file name for the TELFIL option in the CORSIKA inputs file.
for file_type="multipipe"
Return multipipe "file name" for the TELFIL option in the CORSIKA inputs file.
Raises
------
ValueError
If file_type is unknown or if the run number is not given for file_type==config_tmp.
"""
file_label = f"_{self.label}" if self.label is not None else ""
_vc_low = self.get_config_parameter("VIEWCONE")[0]
_vc_high = self.get_config_parameter("VIEWCONE")[1]
view_cone = (
f"_cone{int(_vc_low):d}-{int(_vc_high):d}" if _vc_low != 0 or _vc_high != 0 else ""
)
base_name = (
f"{self.primary_particle.name}_{self.array_model.site}_{self.array_model.layout_name}_"
f"za{int(self.get_config_parameter('THETAP')[0]):03}-"
f"azm{self.azimuth_angle:03}deg"
f"{view_cone}{file_label}"
)
if file_type == "config_tmp":
if run_number is None:
raise ValueError("Must provide a run number for a temporary CORSIKA config file")
return f"corsika_config_run{run_number:06}_{base_name}.txt"
if file_type == "config":
return f"corsika_config_{base_name}.input"
if file_type == "output_generic":
# The XXXXXX will be replaced by the run number after the pfp step with sed
return (
f"runXXXXXX_{base_name}_{self.array_model.site}_"
f"{self.array_model.layout_name}{file_label}.zst"
)
if file_type == "multipipe":
return f"multi_cta-{self.array_model.site}-{self.array_model.layout_name}.cfg"
raise ValueError(f"The requested file type ({file_type}) is unknown")
[docs]
def set_output_file_and_directory(self, use_multipipe=False):
"""
Set output file names and directories.
Parameters
----------
use_multipipe: bool
Whether to set the CORSIKA Inputs file to pipe
the output directly to sim_telarray. Defines directory names.
Returns
-------
str
Output file name.
"""
sub_dir = "corsika_simtel" if use_multipipe else "corsika"
config_file_name = self.get_corsika_config_file_name(file_type="config")
file_directory = self.io_handler.get_output_directory(label=self.label, sub_dir=sub_dir)
self._logger.debug(f"Creating directory {file_directory}")
file_directory.mkdir(parents=True, exist_ok=True)
self.config_file_path = file_directory.joinpath(config_file_name)
return self.get_corsika_config_file_name(file_type="output_generic")
def _write_seeds(self, file, use_test_seeds=False):
"""
Generate and write seeds in the CORSIKA input file.
Parameters
----------
file: stream
File where the telescope positions will be written.
"""
random_seed = self.get_config_parameter("PRMPAR") + self.run_number
rng = np.random.default_rng(random_seed)
corsika_seeds = [534, 220, 1104, 382]
if not use_test_seeds:
corsika_seeds = [int(rng.uniform(0, 1e7)) for _ in range(4)]
for s in corsika_seeds:
file.write(f"SEED {s} 0 0\n")
[docs]
def get_corsika_telescope_list(self):
"""
List of telescope positions in the format required for the CORSIKA input file.
Returns
-------
str
Piece of text to be added to the CORSIKA input file.
"""
corsika_input_list = ""
for telescope_name, telescope in self.array_model.telescope_model.items():
positions = telescope.get_parameter_value_with_unit("array_element_position_ground")
corsika_input_list += "TELESCOPE"
for pos in positions:
corsika_input_list += f"\t {pos.to('cm').value:.3f}"
sphere_radius = telescope.get_parameter_value_with_unit("telescope_sphere_radius").to(
"cm"
)
corsika_input_list += f"\t {sphere_radius:.3f}"
corsika_input_list += f"\t # {telescope_name}\n"
return corsika_input_list
@property
def run_number(self):
"""Set run number."""
return self._run_number
@run_number.setter
def run_number(self, run_number):
"""
Set run number and validate it.
Parameters
----------
run_number: int
Run number.
"""
self._run_number = self.validate_run_number(run_number)
[docs]
def validate_run_number(self, run_number):
"""
Validate run number and return it.
Return run number from configuration if None.
Parameters
----------
run_number: int
Run number.
Returns
-------
int
Run number.
Raises
------
ValueError
If run_number is not a valid value (e.g., < 1).
"""
if run_number is None:
return self.run_number
if not float(run_number).is_integer() or run_number < 1 or run_number > 999999:
msg = f"Invalid type of run number ({run_number}) - it must be an uint < 1000000."
self._logger.error(msg)
raise ValueError(msg)
return run_number