"""Simulator class for managing simulations of showers and array of telescopes."""
import gzip
import logging
import re
import shutil
from collections import defaultdict
from pathlib import Path
import numpy as np
from astropy import units as u
from simtools.corsika.corsika_config import CorsikaConfig
from simtools.io import eventio_handler, io_handler, table_handler
from simtools.job_execution.job_manager import JobManager
from simtools.model.array_model import ArrayModel
from simtools.runners.corsika_runner import CorsikaRunner
from simtools.runners.corsika_simtel_runner import CorsikaSimtelRunner
from simtools.simtel.simtel_io_event_writer import SimtelIOEventDataWriter
from simtools.simtel.simulator_array import SimulatorArray
from simtools.testing.sim_telarray_metadata import assert_sim_telarray_metadata
from simtools.utils import general, names
from simtools.version import semver_to_int
[docs]
class Simulator:
"""
Simulator is managing the simulation of showers and of the array of telescopes.
It interfaces with simulation software packages (e.g., CORSIKA or sim_telarray).
A single run is simulated per instance, possibly for multiple model versions.
The configuration is set as a dict corresponding to the command line configuration groups
(especially simulation_software, simulation_model, simulation_parameters).
Parameters
----------
args_dict : dict
Configuration dictionary
(includes simulation_software, simulation_model, simulation_parameters groups).
label: str
Instance label.
extra_commands: str or list of str
Extra commands to be added to the run script before the run command.
db_config: dict
Database configuration.
"""
def __init__(
self,
args_dict,
label=None,
extra_commands=None,
db_config=None,
):
"""Initialize Simulator class."""
self.logger = logging.getLogger(__name__)
self.label = label
self.args_dict = args_dict
self.db_config = db_config
self.site = self.args_dict.get("site", None)
self.model_version = self.args_dict.get("model_version", None)
self.simulation_software = self.args_dict.get("simulation_software", "corsika_sim_telarray")
self.logger.debug(f"Init Simulator {self.simulation_software}")
self.run_mode = args_dict.get("run_mode", None)
self.io_handler = io_handler.IOHandler()
self.run_number = None
self._results = defaultdict(list)
self._test = None
self._extra_commands = extra_commands
self.sim_telarray_seeds = None
self._initialize_from_tool_configuration()
self.array_models, self.corsika_configurations = self._initialize_array_models()
self._simulation_runner = self._initialize_simulation_runner()
@property
def simulation_software(self):
"""The attribute simulation_software."""
return self._simulation_software
@simulation_software.setter
def simulation_software(self, simulation_software):
"""
Set and test simulation_software type.
Parameters
----------
simulation_software: choices: [sim_telarray, corsika, corsika_sim_telarray]
implemented are sim_telarray and CORSIKA or corsika_sim_telarray
(running CORSIKA and piping it directly to sim_telarray)
Raises
------
ValueError
"""
if simulation_software not in ["sim_telarray", "corsika", "corsika_sim_telarray"]:
raise ValueError(f"Invalid simulation software: {simulation_software}")
self._simulation_software = simulation_software.lower()
def _initialize_from_tool_configuration(self):
"""Initialize simulator from tool configuration."""
self._test = self.args_dict.get("test", False)
self.sim_telarray_seeds = {
"seed": self.args_dict.get("sim_telarray_instrument_seeds"),
"random_instrument_instances": self.args_dict.get(
"sim_telarray_random_instrument_instances"
),
"seed_file_name": "sim_telarray_instrument_seeds.txt", # name only; no directory
}
if self.args_dict.get("corsika_file"):
self.run_number = eventio_handler.get_corsika_run_number(self.args_dict["corsika_file"])
else:
self.run_number = self.args_dict.get("run_number_offset", 0) + self.args_dict.get(
"run_number", 1
)
def _initialize_array_models(self):
"""
Initialize array simulation models and CORSIKA config (one per model version).
Returns
-------
list
List of ArrayModel objects.
"""
versions = general.ensure_iterable(self.model_version)
array_model = []
corsika_configurations = []
for version in versions:
array_model.append(
ArrayModel(
label=self.label,
site=self.site,
layout_name=self.args_dict.get("array_layout_name"),
db_config=self.db_config,
model_version=version,
calibration_device_types=self._get_calibration_device_types(self.run_mode),
overwrite_model_parameters=self.args_dict.get("overwrite_model_parameters"),
simtel_path=self.args_dict.get("simtel_path"),
)
)
corsika_configurations.append(
CorsikaConfig(
array_model=array_model[-1],
label=self.label,
args_dict=self.args_dict,
db_config=self.db_config,
dummy_simulations=self._is_calibration_run(self.run_mode),
)
)
array_model[-1].sim_telarray_seeds = {
"seed": self._get_seed_for_random_instrument_instances(
self.sim_telarray_seeds["seed"],
version,
corsika_configurations[-1].zenith_angle,
corsika_configurations[-1].azimuth_angle,
),
"random_instrument_instances": self.sim_telarray_seeds[
"random_instrument_instances"
],
"seed_file_name": self.sim_telarray_seeds["seed_file_name"],
}
# 'corsika_sim_telarray' allows for multiple model versions (multipipe option)
corsika_configurations = (
corsika_configurations
if self.simulation_software == "corsika_sim_telarray"
else corsika_configurations[0]
)
return array_model, corsika_configurations
def _get_seed_for_random_instrument_instances(
self, seed, model_version, zenith_angle, azimuth_angle
):
"""
Generate seed for random instances of the instrument.
Parameters
----------
seed : str
Seed string given through configuration.
model_version: str
Model version.
zenith_angle: float
Zenith angle of the observation (in degrees).
azimuth_angle: float
Azimuth angle of the observation (in degrees).
Returns
-------
int
Seed for random instances of the instrument.
"""
if seed:
return int(seed.split(",")[0].strip())
def key_index(key):
try:
return list(names.site_names()).index(key) + 1
except ValueError:
return 1
seed = semver_to_int(model_version) * 10000000
seed = seed + key_index(self.site) * 1000000
seed = seed + (int)(zenith_angle) * 1000
return seed + (int)(azimuth_angle)
def _initialize_simulation_runner(self):
"""
Initialize corsika configuration and simulation runners.
Returns
-------
CorsikaRunner or SimulatorArray or CorsikaSimtelRunner
Simulation runner object.
"""
runner_class = {
"corsika": CorsikaRunner,
"sim_telarray": SimulatorArray,
"corsika_sim_telarray": CorsikaSimtelRunner,
}.get(self.simulation_software)
runner_args = {
"label": self.label,
"corsika_config": self.corsika_configurations,
"simtel_path": self.args_dict.get("simtel_path"),
"use_multipipe": runner_class is CorsikaSimtelRunner,
}
if runner_class is not SimulatorArray:
runner_args["keep_seeds"] = self.args_dict.get("corsika_test_seeds", False)
runner_args["curved_atmosphere_min_zenith_angle"] = self.args_dict.get(
"curved_atmosphere_min_zenith_angle", 65 * u.deg
)
if runner_class is not CorsikaRunner:
runner_args["sim_telarray_seeds"] = self.sim_telarray_seeds
if runner_class is CorsikaSimtelRunner:
runner_args["sequential"] = self.args_dict.get("sequential", False)
runner_args["calibration_config"] = (
self.args_dict if self._is_calibration_run(self.run_mode) else None
)
return runner_class(**runner_args)
[docs]
def simulate(self):
"""
Prepare and submit a run script as a job.
Writes submission scripts using the simulation runners and submits the
run script to the job manager. Collects generated files.
"""
run_script = self._simulation_runner.prepare_run_script(
run_number=self.run_number,
input_file=self._get_corsika_file(),
extra_commands=self._extra_commands,
)
job_manager = JobManager(test=self._test)
job_manager.submit(
run_script=run_script,
run_out_file=self._simulation_runner.get_file_name(
file_type="sub_log", run_number=self.run_number
),
log_file=self._simulation_runner.get_file_name(
file_type=("log"), run_number=self.run_number
),
)
self._fill_list_of_generated_files()
def _get_corsika_file(self):
"""
Get the CORSIKA input file if applicable (for sim_telarray simulations).
Returns
-------
Path, None
Path to the CORSIKA input file.
"""
if self.simulation_software == "sim_telarray":
return self.args_dict.get("corsika_file", None)
return None
def _fill_list_of_generated_files(self):
"""Fill a dictionary with lists of generated files."""
keys = [
"simtel_output",
"sub_out",
"log",
"input",
"histogram",
"corsika_log",
"corsika_output",
"event_data",
]
results = {key: [] for key in keys}
def get_file_name(name, **kwargs):
return str(self._simulation_runner.get_file_name(file_type=name, **kwargs))
results["sub_out"].append(get_file_name("sub_log", mode="out", run_number=self.run_number))
for i in range(len(self.array_models)):
results["simtel_output"].append(
get_file_name("simtel_output", run_number=self.run_number, model_version_index=i)
)
if "sim_telarray" in self.simulation_software:
for file_type in ("log", "histogram", "event_data"):
results[file_type].append(
get_file_name(
file_type,
simulation_software="sim_telarray",
run_number=self.run_number,
model_version_index=i,
)
)
if "corsika" in self.simulation_software:
for file_type in ("corsika_output", "corsika_log"):
results[file_type].append(
get_file_name(
file_type,
simulation_software="corsika",
run_number=self.run_number,
model_version_index=i,
)
)
for key in keys:
self._results[key].extend(results[key])
[docs]
def get_file_list(self, file_type="simtel_output"):
"""
Get list of files generated by simulations.
Not all file types are available for all simulation types.
Returns an empty list for an unknown file type.
Parameters
----------
file_type : str
File type to be listed.
Returns
-------
list
List with the full path of all output files.
"""
return self._results[file_type]
def _make_resources_report(self, input_file_list):
"""
Prepare a simple report on computing wall clock time used in the simulations.
Parameters
----------
input_file_list: str or list of str
Single file or list of files of shower simulations.
Returns
-------
str
string reporting on computing resources
"""
if len(self._results["sub_out"]) == 0 and input_file_list is None:
return "Mean wall time/run [sec]: np.nan"
runtime = []
_resources = {}
_resources = self._simulation_runner.get_resources(run_number=self.run_number)
if _resources.get("runtime"):
runtime.append(_resources["runtime"])
mean_runtime = np.mean(runtime)
resource_summary = f"Mean wall time/run [sec]: {mean_runtime}"
if "n_events" in _resources and _resources["n_events"] > 0:
resource_summary += f", #events/run: {_resources['n_events']}"
return resource_summary
[docs]
def report(self, input_file_list=None):
"""
Report on simulations and computing resources used.
Includes run time per run only at this point.
Parameters
----------
input_file_list: str or list of str
Single file or list of files of shower simulations.
"""
_corsika_config = self._get_first_corsika_config()
self.logger.info(
f"Production run complete for primary {_corsika_config.primary_particle} showers "
f"from {_corsika_config.azimuth_angle} azimuth and "
f"{_corsika_config.zenith_angle} zenith "
f"at {self.site} site, using {self.model_version} model."
)
self.logger.info(
f"Computing for {self.simulation_software} Simulations: "
f"{self._make_resources_report(input_file_list)}"
)
[docs]
def save_file_lists(self):
"""Save files lists for output and log files."""
for file_type in ["simtel_output", "log", "corsika_log", "histogram"]:
file_name = self.io_handler.get_output_directory().joinpath(f"{file_type}_files.txt")
file_list = self.get_file_list(file_type=file_type)
if all(element is not None for element in file_list) and len(file_list) > 0:
self.logger.info(f"Saving list of {file_type} files to {file_name}")
with open(file_name, "w", encoding="utf-8") as f:
for line in self.get_file_list(file_type=file_type):
f.write(f"{line}\n")
else:
self.logger.debug(f"No files to save for {file_type} files.")
[docs]
def save_reduced_event_lists(self):
"""
Save reduced event lists with event data on simulated and triggered events.
The files are saved with the same name as the sim_telarray output file
but with a 'hdf5' extension.
"""
if "sim_telarray" not in self.simulation_software:
self.logger.warning(
"Reduced event lists can only be saved for sim_telarray simulations."
)
return
input_files = self.get_file_list(file_type="simtel_output")
output_files = self.get_file_list(file_type="event_data")
for input_file, output_file in zip(input_files, output_files):
generator = SimtelIOEventDataWriter([input_file])
table_handler.write_tables(
tables=generator.process_files(),
output_file=Path(output_file),
overwrite_existing=True,
)
[docs]
def pack_for_register(self, directory_for_grid_upload=None):
"""
Pack simulation output files for registering on the grid.
Creates separate tarballs for each model version's log files.
Parameters
----------
directory_for_grid_upload: str
Directory for the tarball with output files.
"""
self.logger.info(
f"Packing the output files for registering on the grid ({directory_for_grid_upload})"
)
output_files = self.get_file_list(file_type="simtel_output")
log_files = self.get_file_list(file_type="log")
corsika_log_files = self.get_file_list(file_type="corsika_log")
histogram_files = self.get_file_list(file_type="histogram")
reduced_event_files = (
self.get_file_list(file_type="event_data")
if self.args_dict.get("save_reduced_event_lists")
else []
)
directory_for_grid_upload = (
Path(directory_for_grid_upload)
if directory_for_grid_upload
else self.io_handler.get_output_directory().joinpath("directory_for_grid_upload")
)
directory_for_grid_upload.mkdir(parents=True, exist_ok=True)
# If there are more than one model version,
# duplicate the corsika log file to have one for each model version with the "right name".
if len(self.array_models) > 1 and corsika_log_files:
self._copy_corsika_log_file_for_all_versions(corsika_log_files)
# Group files by model version
for model in self.array_models:
model_version = model.model_version
model_files = general.ensure_iterable(model.pack_model_files())
# Filter files for this model version
model_logs = [f for f in log_files if model_version in f]
model_hists = [f for f in histogram_files if model_version in f]
model_corsika_logs = [f for f in corsika_log_files if model_version in f]
if model_logs:
tar_file_name = Path(model_logs[0]).name.replace("log.gz", "log_hist.tar.gz")
tar_file_path = directory_for_grid_upload.joinpath(tar_file_name)
# Add all relevant model, log, histogram, and CORSIKA log files to the tarball
files_to_tar = model_logs + model_hists + model_corsika_logs + model_files
general.pack_tar_file(tar_file_path, files_to_tar)
for file_to_move in output_files + reduced_event_files:
source_file = Path(file_to_move)
destination_file = directory_for_grid_upload / source_file.name
if destination_file.exists():
self.logger.warning(f"Overwriting existing file: {destination_file}")
shutil.move(source_file, destination_file)
self.logger.info(f"Output files for the grid placed in {directory_for_grid_upload!s}")
def _copy_corsika_log_file_for_all_versions(self, corsika_log_files):
"""
Create copies of the CORSIKA log file for each model version.
Adds a header comment to each copy explaining its relationship to the original.
Parameters
----------
corsika_log_files: list
List containing the original CORSIKA log file path.
"""
original_log = Path(corsika_log_files[0])
# Find which model version the original log belongs to
original_version = next(
model.model_version
for model in self.array_models
if re.search(
rf"(?<![0-9A-Za-z]){re.escape(model.model_version)}(?![0-9A-Za-z])",
original_log.name,
)
)
for model in self.array_models:
if model.model_version == original_version:
continue
new_log = original_log.parent / original_log.name.replace(
original_version, model.model_version
)
with gzip.open(new_log, "wt", encoding="utf-8") as new_file:
# Write the header to the new file
header = (
f"###############################################################\n"
f"Copy of CORSIKA log file from model version {original_version}.\n"
f"Applicable also for {model.model_version} (same CORSIKA configuration,\n"
f"different sim_telarray model versions in the same run).\n"
f"###############################################################\n\n"
)
new_file.write(header)
# Copy the content of the original log file, ignoring invalid characters
with gzip.open(original_log, "rt", encoding="utf-8", errors="ignore") as orig_file:
for line in orig_file:
new_file.write(line)
corsika_log_files.append(str(new_log))
@staticmethod
def _is_calibration_run(run_mode):
"""
Check if this simulation is a calibration run.
Parameters
----------
run_mode: str
Run mode of the simulation.
Returns
-------
bool
True if it is a calibration run, False otherwise.
"""
return run_mode in [
"pedestals",
"pedestals_dark",
"pedestals_nsb_only",
"direct_injection",
]
@staticmethod
def _get_calibration_device_types(run_mode):
"""
Get the list of calibration device types based on the run mode.
Parameters
----------
run_mode: str
Run mode of the simulation.
Returns
-------
list
List of calibration device types.
"""
if run_mode == "direct_injection":
return ["flat_fielding"]
return []
def _get_first_corsika_config(self):
"""
Return first instance from list of CORSIKA configurations.
Most values stored in the CORSIKA configurations are identical,
with the exception of the simulation model version dependent parameters.
Returns
-------
CorsikaConfig
First CORSIKA configuration instance.
"""
try:
return (
self.corsika_configurations[0]
if isinstance(self.corsika_configurations, list)
else self.corsika_configurations
)
except (IndexError, TypeError) as exc:
raise ValueError("CORSIKA configuration not found for verification.") from exc
[docs]
def verify_simulations(self):
"""
Verify simulations.
This includes checking the number of simulated events.
"""
self.logger.info("Verifying simulations.")
_corsika_config = self._get_first_corsika_config()
expected_shower_events = _corsika_config.shower_events
expected_mc_events = _corsika_config.mc_events
self.logger.info(
f"Expected number of shower events: {expected_shower_events}, "
f"expected number of MC events: {expected_mc_events}"
)
if self.simulation_software in ["corsika_sim_telarray", "sim_telarray"]:
self._verify_simulated_events_in_sim_telarray(
expected_shower_events, expected_mc_events
)
if self.simulation_software == "corsika":
self._verify_simulated_events_corsika(expected_mc_events)
if self.args_dict.get("save_reduced_event_lists"):
self._verify_simulated_events_in_reduced_event_lists(expected_mc_events)
def _verify_simulated_events_corsika(self, expected_mc_events, tolerance=1.0e-3):
"""
Verify the number of simulated events in CORSIKA output files.
Allow for a small mismatch in the number of requested events.
Parameters
----------
expected_mc_events: int
Expected number of simulated MC events.
Raises
------
ValueError
If the number of simulated events does not match the expected number.
"""
def consistent(a, b, tol):
return abs(a - b) / max(a, b) <= tol
event_errors = []
for file in self.get_file_list(file_type="corsika_output"):
shower_events, _ = eventio_handler.get_simulated_events(file)
if shower_events != expected_mc_events:
if consistent(shower_events, expected_mc_events, tol=tolerance):
self.logger.warning(
f"Small mismatch in number of events in: {file}: "
f"shower events: {shower_events} (expected: {expected_mc_events})"
)
continue
event_errors.append(
f"Number of simulated MC events ({shower_events}) does not match "
f"the expected number ({expected_mc_events}) in CORSIKA {file}."
)
else:
self.logger.info(
f"Consistent number of events in: {file}: shower events: {shower_events}"
)
if event_errors:
self.logger.error("Inconsistent event counts found in CORSIKA output:")
for error in event_errors:
self.logger.error(f" - {error}")
error_message = "Inconsistent event counts found in CORSIKA output:\n" + "\n".join(
f" - {error}" for error in event_errors
)
raise ValueError(error_message)
def _verify_simulated_events_in_sim_telarray(self, expected_shower_events, expected_mc_events):
"""
Verify the number of simulated events.
Parameters
----------
expected_shower_events: int
Expected number of simulated shower events.
expected_mc_events: int
Expected number of simulated MC events.
Raises
------
ValueError
If the number of simulated events does not match the expected number.
"""
event_errors = []
for file in self.get_file_list(file_type="simtel_output"):
shower_events, mc_events = eventio_handler.get_simulated_events(file)
if (shower_events, mc_events) != (expected_shower_events, expected_mc_events):
event_errors.append(
f"Event mismatch: shower/MC events in {file}: {shower_events}/{mc_events}"
f" (expected: {expected_shower_events}/{expected_mc_events})"
)
else:
self.logger.info(
f"Consistent number of events in: {file}: "
f"shower events: {shower_events}, "
f"MC events: {mc_events}"
)
if event_errors:
self.logger.error("Inconsistent event counts found:")
for error in event_errors:
self.logger.error(f" - {error}")
error_message = "Inconsistent event counts found:\n" + "\n".join(
f" - {error}" for error in event_errors
)
raise ValueError(error_message)
def _verify_simulated_events_in_reduced_event_lists(self, expected_mc_events):
"""
Verify the number of simulated events in reduced event lists.
Parameters
----------
expected_mc_events: int
Expected number of simulated MC events.
Raises
------
ValueError
If the number of simulated events does not match the expected number.
"""
event_errors = []
for file in self.get_file_list(file_type="event_data"):
tables = table_handler.read_tables(file, ["SHOWERS"])
try:
mc_events = len(tables["SHOWERS"])
except KeyError as exc:
raise ValueError(f"SHOWERS table not found in reduced event list {file}.") from exc
if mc_events != expected_mc_events:
event_errors.append(
f"Number of simulated MC events ({mc_events}) does not match "
f"the expected number ({expected_mc_events}) in reduced event list {file}."
)
else:
self.logger.info(
f"Consistent number of events in reduced event list: {file}: MC events:"
f" {mc_events}"
)
if event_errors:
self.logger.error("Inconsistent event counts found in reduced event lists:")
for error in event_errors:
self.logger.error(f" - {error}")
error_message = "Inconsistent event counts found in reduced event lists:\n" + "\n".join(
f" - {error}" for error in event_errors
)
raise ValueError(error_message)