"""Simulation using the light emission package for calibration devices."""
import logging
import shutil
import stat
import subprocess
from pathlib import Path
import astropy.units as u
import numpy as np
from simtools.io import io_handler
from simtools.runners.simtel_runner import SimtelRunner
from simtools.utils.general import clear_default_sim_telarray_cfg_directories
__all__ = ["SimulatorLightEmission"]
[docs]
class SimulatorLightEmission(SimtelRunner):
"""
Interface with sim_telarray to perform light emission package simulations.
The light emission package is used to simulate an artificial light source, used for calibration.
"""
def __init__(
self,
*,
telescope_model,
calibration_model=None,
flasher_model=None,
site_model=None,
light_emission_config=None,
light_source_setup=None,
simtel_path=None,
light_source_type=None,
label=None,
test=False,
):
"""Initialize SimtelRunner.
Parameters
----------
telescope_model : TelescopeModel
Model of the telescope to be simulated
calibration_model : CalibrationModel, optional
Model of the calibration device to be simulated
flasher_model : FlasherModel, optional
Model of the flasher device to be simulated
site_model : SiteModel, optional
Model of the site
light_emission_config : dict, optional
Configuration for the light emission
light_source_setup : str, optional
Setup for light source positioning ("variable" or "layout")
simtel_path : Path, optional
Path to the sim_telarray installation
light_source_type : str, optional
Type of light source: 'illuminator', or 'flasher'
label : str, optional
Label for the simulation
test : bool, optional
Whether this is a test run
"""
super().__init__(simtel_path=simtel_path, label=label, corsika_config=None)
self._logger = logging.getLogger(__name__)
self._telescope_model = telescope_model
self._calibration_model = calibration_model
self._flasher_model = flasher_model
self._site_model = site_model
self.light_emission_config = light_emission_config or {}
self.light_source_setup = light_source_setup
self.light_source_type = light_source_type or "illuminator"
self.test = test
# IO
self.io_handler = io_handler.IOHandler()
self.output_directory = self.io_handler.get_output_directory(self.label)
self.number_events = self.light_emission_config["number_events"]
# photons per run
if self._calibration_model is not None:
self.photons_per_run = (
self._calibration_model.get_parameter_value("photons_per_run")
if not self.test
else 1e8
)
elif self._flasher_model is not None:
self.photons_per_run = (
self._flasher_model.get_parameter_value("photons_per_flasher")
if not self.test
else 1e8
)
else:
self.photons_per_run = 1e8
# Ensure sim_telarray config exists on disk
if hasattr(self._telescope_model, "write_sim_telarray_config_file"):
self._telescope_model.write_sim_telarray_config_file(additional_model=site_model)
# Runtime variables
self.distance = None
def _get_prefix(self) -> str:
prefix = self.light_emission_config.get("output_prefix", "")
if prefix is not None:
return f"{prefix}_"
return ""
def _infer_application(self) -> tuple[str, str]:
"""Infer the LightEmission application and mode from type/setup.
Returns
-------
tuple[str, str]
(app_name, mode)
"""
if self.light_source_type == "flasher":
return ("ff-1m", "flasher")
# default to illuminator xyzls, mode from setup
mode = self.light_source_setup or "layout"
return ("xyzls", mode)
[docs]
@staticmethod
def light_emission_default_configuration():
"""
Get default light emission configuration.
Returns
-------
dict
Default configuration light emission.
"""
return {
"zenith_angle": {
"len": 1,
"unit": u.Unit("deg"),
"default": 0.0 * u.deg,
"names": ["zenith", "theta"],
},
"azimuth_angle": {
"len": 1,
"unit": u.Unit("deg"),
"default": 0.0 * u.deg,
"names": ["azimuth", "phi"],
},
"source_distance": {
"len": 1,
"unit": u.Unit("m"),
"default": 1000 * u.m,
"names": ["sourcedist", "srcdist"],
},
"off_axis_angle": {
"len": 1,
"unit": u.Unit("deg"),
"default": 0 * u.deg,
"names": ["off_axis"],
},
"fadc_bins": {
"len": 1,
"unit": u.dimensionless_unscaled,
"default": 128,
"names": ["fadc_bins"],
},
}
[docs]
@staticmethod
def flasher_default_configuration():
"""
Get default flasher configuration.
Returns
-------
dict
Default configuration for flasher devices.
"""
return {
"number_events": {
"len": 1,
"unit": None,
"default": 1,
"names": ["number_events"],
},
"photons_per_flasher": {
"len": 1,
"unit": None,
"default": 2.5e6,
"names": ["photons"],
},
"bunch_size": {
"len": 1,
"unit": None,
"default": 1.0,
"names": ["bunchsize"],
},
"flasher_position": {
"len": 2,
"unit": u.Unit("cm"),
"default": [0.0, 0.0] * u.cm,
"names": ["xy", "position"],
},
"flasher_depth": {
"len": 1,
"unit": u.Unit("cm"),
"default": 60 * u.cm,
"names": ["depth", "distance"],
},
"flasher_inclination": {
"len": 1,
"unit": u.Unit("deg"),
"default": 0.0 * u.deg,
"names": ["inclination"],
},
"spectrum": {
"len": 1,
"unit": u.Unit("nm"),
"default": 400 * u.nm,
"names": ["wavelength"],
},
"lightpulse": {
"len": 1,
"unit": None,
"default": "Simple:0",
"names": ["pulse"],
},
"angular_distribution": {
"len": 1,
"unit": None,
"default": "isotropic",
"names": ["angular"],
},
"flasher_pattern": {
"len": 1,
"unit": None,
"default": "all",
"names": ["fire", "pattern"],
},
}
[docs]
def calibration_pointing_direction(self):
"""
Calculate the pointing of the calibration device towards the telescope.
Returns
-------
list
The pointing vector from the calibration device to the telescope.
"""
x_cal, y_cal, z_cal = self._calibration_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_cal, y_cal, z_cal = [coord.to(u.m).value for coord in (x_cal, y_cal, z_cal)]
cal_vect = np.array([x_cal, y_cal, z_cal])
x_tel, y_tel, z_tel = self._telescope_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_tel, y_tel, z_tel = [coord.to(u.m).value for coord in (x_tel, y_tel, z_tel)]
tel_vect = np.array([x_tel, y_tel, z_tel])
direction_vector = tel_vect - cal_vect
# pointing vector from calibration device to telescope
pointing_vector = np.round(direction_vector / np.linalg.norm(direction_vector), 6)
# Calculate telescope theta and phi angles
tel_theta = 180 - np.round(
np.rad2deg(np.arccos(direction_vector[2] / np.linalg.norm(direction_vector))), 6
)
tel_phi = 180 - np.round(
np.rad2deg(np.arctan2(direction_vector[1], direction_vector[0])), 6
)
# Calculate source beam theta and phi angles
direction_vector_inv = direction_vector * -1
source_theta = np.round(
np.rad2deg(np.arccos(direction_vector_inv[2] / np.linalg.norm(direction_vector_inv))),
6,
)
source_phi = np.round(
np.rad2deg(np.arctan2(direction_vector_inv[1], direction_vector_inv[0])), 6
)
return pointing_vector.tolist(), [tel_theta, tel_phi, source_theta, source_phi]
def _write_telpos_file(self):
"""
Write the telescope positions to a telpos file.
The file will contain lines in the format: x y z r in cm
Returns
-------
Path
The path to the generated telpos file.
"""
telpos_file = self.output_directory.joinpath("telpos.dat")
x_tel, y_tel, z_tel = self._telescope_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_tel, y_tel, z_tel = [coord.to(u.cm).value for coord in (x_tel, y_tel, z_tel)]
radius = self._telescope_model.get_parameter_value_with_unit("telescope_sphere_radius")
radius = radius.to(u.cm).value # Convert radius to cm
with telpos_file.open("w", encoding="utf-8") as file:
file.write(f"{x_tel} {y_tel} {z_tel} {radius}\n")
return telpos_file
def _prepare_flasher_atmosphere_files(self, config_directory: Path) -> int:
"""Prepare canonical atmosphere aliases for ff-1m and return model id 1."""
atmo_name = self._site_model.get_parameter_value("atmospheric_profile")
self._logger.debug(f"Using atmosphere profile: {atmo_name}")
src_path = config_directory.joinpath(atmo_name)
for canonical in ("atmprof1.dat", "atm_profile_model_1.dat"):
dst = config_directory.joinpath(canonical)
if dst.exists() or dst.is_symlink():
try:
dst.unlink()
except OSError:
pass
try:
dst.symlink_to(src_path)
except OSError:
try:
shutil.copy2(src_path, dst)
except OSError as copy_err:
self._logger.warning(
f"Failed to create atmosphere alias {dst.name}: {copy_err}"
)
return 1
def _make_light_emission_script(self):
"""
Create the light emission script to run the light emission package.
Require the specified pre-compiled light emission package application
in the sim_telarray/LightEmission/ path.
Returns
-------
str
The commands to run the Light Emission package
"""
x_tel, y_tel, z_tel = self._telescope_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
config_directory = self.io_handler.get_model_configuration_directory(
label=self.label, model_version=self._site_model.model_version
)
telpos_file = self._write_telpos_file()
app_name, _ = self._infer_application()
parts: list[str] = []
# application path
parts.append(str(self._simtel_path.joinpath("sim_telarray/LightEmission/")))
parts.append(f"/{app_name}")
corsika_observation_level = self._site_model.get_parameter_value_with_unit(
"corsika_observation_level"
)
parts.append(
self._build_altitude_atmo_block(
app_name, config_directory, corsika_observation_level, telpos_file
)
)
parts.append(self._build_source_specific_block(x_tel, y_tel, z_tel, config_directory))
if self.light_source_type == "illuminator":
parts.append(f" -A {config_directory}/")
parts.append(f"{self._telescope_model.get_parameter_value('atmospheric_profile')}")
parts.append(f" -o {self.output_directory}/{app_name}.iact.gz")
parts.append("\n")
return "".join(parts)
def _build_altitude_atmo_block(
self, app_name, config_directory: Path, corsika_observation_level, telpos_file: Path
) -> str:
"""Return CLI segment for altitude/atmosphere and telpos handling."""
if app_name in ("ff-1m",):
seg = []
seg.append(" -I.")
seg.append(f" -I{self._simtel_path.joinpath('sim_telarray/cfg')}")
seg.append(f" -I{config_directory}")
seg.append(f" --altitude {corsika_observation_level.to(u.m).value}")
atmo_id = self._prepare_flasher_atmosphere_files(config_directory)
seg.append(f" --atmosphere {atmo_id}")
return "".join(seg)
# default path (not used for flasher now, but kept for completeness)
return f" -h {corsika_observation_level.to(u.m).value} --telpos-file {telpos_file}"
def _build_source_specific_block(self, _x_tel, _y_tel, _z_tel, _config_directory: Path) -> str:
"""Return CLI segment for light-source specific flags."""
if self.light_source_type == "flasher":
return self._add_flasher_command_options("")
if self.light_source_type == "illuminator":
return self._add_illuminator_command_options("")
self._logger.warning("Unknown light_source_type '%s'", self.light_source_type)
return ""
def _add_flasher_command_options(self, command):
"""Add flasher-specific options to the script (uniform ff-1m)."""
return self._add_flasher_options(command)
def _add_flasher_options(self, command):
"""Add flasher options for all telescope types (ff-1m style)."""
# For MST/LST we used to use ff-1m; now apply same for all telescopes
flasher_xy = self._flasher_model.get_parameter_value_with_unit("flasher_position")
flasher_distance = self._flasher_model.get_parameter_value_with_unit("flasher_depth")
# Camera radius required for application, Radius of fiducial sphere enclosing camera
camera_radius = (
self._telescope_model.get_parameter_value_with_unit("camera_body_diameter")
.to(u.cm)
.value
/ 2
)
spectrum = self._flasher_model.get_parameter_value_with_unit("spectrum")
pulse = self._flasher_model.get_parameter_value("lightpulse")
angular = self._flasher_model.get_parameter_value("angular_distribution")
bunch_size = self._flasher_model.get_parameter_value("bunch_size")
# Convert to plain numbers for CLI
fx = flasher_xy[0].to(u.cm).value
fy = flasher_xy[1].to(u.cm).value
dist_cm = flasher_distance.to(u.cm).value
spec_nm = int(spectrum.to(u.nm).value)
command += f" --events {self.number_events}"
command += f" --photons {self.photons_per_run}"
command += f" --bunchsize {bunch_size}"
command += f" --xy {fx},{fy}"
command += f" --distance {dist_cm}"
command += f" --camera-radius {camera_radius}"
command += f" --spectrum {spec_nm}"
command += f" --lightpulse {pulse}"
command += f" --angular-distribution {angular}"
return command
def _add_illuminator_command_options(self, command):
"""
Add illuminator-specific command options to the light emission script.
Parameters
----------
command : str
The command string to add options to
Returns
-------
str
The updated command string
"""
if self.light_source_setup == "variable":
command += f" -x {self.light_emission_config['x_pos']['default'].to(u.cm).value}"
command += f" -y {self.light_emission_config['y_pos']['default'].to(u.cm).value}"
command += f" -z {self.light_emission_config['z_pos']['default'].to(u.cm).value}"
command += (
f" -d {','.join(map(str, self.light_emission_config['direction']['default']))}"
)
command += f" -n {self.photons_per_run}"
elif self.light_source_setup == "layout":
x_cal, y_cal, z_cal = self._calibration_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
command += f" -x {x_cal.to(u.cm).value}"
command += f" -y {y_cal.to(u.cm).value}"
command += f" -z {z_cal.to(u.cm).value}"
pointing_vector = self.calibration_pointing_direction()[0]
command += f" -d {','.join(map(str, pointing_vector))}"
command += f" -n {self.photons_per_run}"
self._logger.info(f"Photons per run: {self.photons_per_run} ")
laser_wavelength = self._calibration_model.get_parameter_value_with_unit(
"laser_wavelength"
)
command += f" -s {int(laser_wavelength.to(u.nm).value)}"
led_pulse_sigtime = self._calibration_model.get_parameter_value_with_unit(
"led_pulse_sigtime"
)
command += f" -p Gauss:{led_pulse_sigtime.to(u.ns).value}"
command += " -a isotropic"
return command
def _make_simtel_script(self):
"""
Return the command to run sim_telarray using the output from the previous step.
Returns
-------
str
The command to run sim_telarray
"""
# For flasher sims, avoid calibration pointing entirely; default angles to (0,0)
if self.light_source_type == "flasher":
angles = [0, 0]
else:
_, angles = self.calibration_pointing_direction()
simtel_bin = self._simtel_path.joinpath("sim_telarray/bin/sim_telarray/")
# Build command without prefix; caller will add SIM_TELARRAY_CONFIG_PATH once
command = f"{simtel_bin} "
command += f"-I{self._telescope_model.config_file_directory} "
command += f"-I{simtel_bin} "
command += f"-c {self._telescope_model.config_file_path} "
self._remove_line_from_config(self._telescope_model.config_file_path, "array_triggers")
self._remove_line_from_config(self._telescope_model.config_file_path, "axes_offsets")
command += "-DNUM_TELESCOPES=1 "
command += super().get_config_option(
"altitude",
self._site_model.get_parameter_value_with_unit("corsika_observation_level")
.to(u.m)
.value,
)
command += super().get_config_option(
"atmospheric_transmission",
self._site_model.get_parameter_value("atmospheric_transmission"),
)
command += super().get_config_option("TRIGGER_TELESCOPES", "1")
command += super().get_config_option("TELTRIG_MIN_SIGSUM", "2")
command += super().get_config_option("PULSE_ANALYSIS", "-30")
command += super().get_config_option("MAXIMUM_TELESCOPES", 1)
if self.light_source_type == "variable":
command += super().get_config_option("telescope_theta", 0)
command += super().get_config_option("telescope_phi", 0)
else:
command += super().get_config_option("telescope_theta", f"{angles[0]}")
command += super().get_config_option("telescope_phi", f"{angles[1]}")
# For flasher runs, bypass reflections on primary mirror
if self.light_source_type == "flasher":
command += super().get_config_option("Bypass_Optics", "1")
command += super().get_config_option("power_law", "2.68")
app_name, app_mode = self._infer_application()
pref = self._get_prefix()
command += super().get_config_option(
"input_file", f"{self.output_directory}/{app_name}.iact.gz"
)
dist_suffix = ""
if self.light_source_setup == "variable":
try:
dist_val = int(self._get_distance_for_plotting().to_value(u.m))
dist_suffix = f"_d_{dist_val}"
except Exception: # pylint:disable=broad-except
dist_suffix = ""
command += super().get_config_option(
"output_file",
f"{self.output_directory}/{pref}{app_name}_{app_mode}{dist_suffix}.simtel.zst",
)
command += super().get_config_option(
"histogram_file",
f"{self.output_directory}/{pref}{app_name}_{app_mode}{dist_suffix}.ctsim.hdata\n",
)
# Remove the default sim_telarray configuration directories
return clear_default_sim_telarray_cfg_directories(command)
def _remove_line_from_config(self, file_path, line_prefix):
"""
Remove lines starting with a specific prefix from the config.
Parameters
----------
file_path : Path
The path to the configuration file.
line_prefix : str
The prefix of lines to be removed.
"""
file_path = Path(file_path)
with file_path.open("r", encoding="utf-8") as file:
lines = file.readlines()
with file_path.open("w", encoding="utf-8") as file:
for line in lines:
if not line.startswith(line_prefix):
file.write(line)
[docs]
def prepare_script(self):
"""
Build and return bash run script containing the light-emission command.
Returns
-------
Path
Full path of the run script.
"""
self._logger.debug("Creating run bash script")
_script_dir = self.output_directory.joinpath("scripts")
_script_dir.mkdir(parents=True, exist_ok=True)
_script_file = _script_dir.joinpath(f"{self._infer_application()[0]}-lightemission.sh")
self._logger.debug(f"Run bash script - {_script_file}")
target_out = Path(self._get_simulation_output_filename())
if target_out.exists():
msg = f"Simtel output file exists already, cancelling simulation: {target_out}"
self._logger.error(msg)
raise FileExistsError(msg)
command_le = self._make_light_emission_script()
command_simtel = self._make_simtel_script()
with _script_file.open("w", encoding="utf-8") as file:
file.write("#!/usr/bin/env bash\n")
file.write(f"{command_le}\n\n")
app_name, _ = self._infer_application()
file.write(
f"[ -s '{self.output_directory}/{app_name}.iact.gz' ] || "
f"{{ echo 'LightEmission did not produce IACT file' >&2; exit 1; }}\n\n"
)
file.write(f"{command_simtel}\n\n")
# Cleanup intermediate IACT file at the end of the run
file.write(f"rm -f '{self.output_directory}/{app_name}.iact.gz'\n\n")
_script_file.chmod(_script_file.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP)
return _script_file
def _get_simulation_output_filename(self):
"""Get the filename of the simulation output."""
dist_suffix = ""
if self.light_source_setup == "variable":
try:
dist_val = int(self._get_distance_for_plotting().to_value(u.m))
dist_suffix = f"_d_{dist_val}"
except Exception: # pylint:disable=broad-except
dist_suffix = ""
app_name, app_mode = self._infer_application()
pref = self._get_prefix()
return f"{self.output_directory}/{pref}{app_name}_{app_mode}{dist_suffix}.simtel.zst"
def _get_distance_for_plotting(self):
"""Get the distance to be used for plotting as an astropy Quantity.
For flasher runs, use the flasher_depth (cm) from the flasher model.
For illuminator runs, use the configured z_pos quantity.
Otherwise, fall back to self.distance if set, or 0 m.
"""
if self.light_source_type == "flasher" and self._flasher_model is not None:
return self._flasher_model.get_parameter_value_with_unit("flasher_depth").to(u.m)
def _as_meters(val):
if isinstance(val, u.Quantity):
return val.to(u.m)
try:
return float(val) * u.m
except (TypeError, ValueError):
return None
cfg = self.light_emission_config or {}
z = cfg.get("z_pos")
if isinstance(z, dict):
z_def = z.get("default")
z_val = z_def[0] if isinstance(z_def, list | tuple) and z_def else z_def
z_q = _as_meters(z_val)
if z_q is not None:
return z_q
d_q = _as_meters(getattr(self, "distance", None))
if d_q is not None:
return d_q
return 0 * u.m
[docs]
def run_simulation(self) -> Path:
"""Run the light emission simulation and return the output simtel file path."""
run_script = self.prepare_script()
log_path = Path(self.output_directory) / "logfile.log"
with open(log_path, "w", encoding="utf-8") as fh:
subprocess.run(
run_script,
shell=False,
check=False,
text=True,
stdout=fh,
stderr=fh,
)
out = Path(self._get_simulation_output_filename())
if not out.exists():
self._logger.warning(f"Expected simtel output not found: {out}")
return out
[docs]
def distance_list(self, arg):
"""
Convert distance list to astropy quantities.
Parameters
----------
arg: list
List of distances.
Returns
-------
values: list
List of distances as astropy quantities.
"""
try:
return [float(x) * u.m for x in arg]
except ValueError as exc:
raise ValueError("Distances must be numeric values") from exc
[docs]
def update_light_emission_config(self, key: str, value):
"""
Update the light emission configuration.
Parameters
----------
key : str
The key in the configuration to update.
value : Any
The new value to set for the key.
"""
if key in self.light_emission_config:
self.light_emission_config[key]["default"] = value
else:
raise KeyError(f"Key '{key}' not found in light emission configuration.")
[docs]
def calculate_distance_telescope_calibration_device(self):
"""Calculate distance(s) between telescope and calibration device."""
if self.light_source_setup == "layout":
# Layout positions: Use DB coordinates
x_cal, y_cal, z_cal = self._calibration_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_cal, y_cal, z_cal = [coord.to(u.m).value for coord in (x_cal, y_cal, z_cal)]
x_tel, y_tel, z_tel = self._telescope_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_tel, y_tel, z_tel = [coord.to(u.m).value for coord in (x_tel, y_tel, z_tel)]
tel_vect = np.array([x_tel, y_tel, z_tel])
cal_vect = np.array([x_cal, y_cal, z_cal])
distance = np.linalg.norm(cal_vect - tel_vect)
self._logger.info(f"Distance between telescope and calibration device: {distance} m")
return [distance * u.m]
# Variable positions: Calculate distances for all positions
x_tel = self.light_emission_config["x_pos"]["default"].to(u.m).value
y_tel = self.light_emission_config["y_pos"]["default"].to(u.m).value
z_positions = self.light_emission_config["z_pos"]["default"]
distances = []
for z in z_positions:
tel_vect = np.array([x_tel, y_tel, z.to(u.m).value])
cal_vect = np.array([0, 0, 0]) # Calibration device at origin
distances.append(np.linalg.norm(cal_vect - tel_vect) * u.m)
return distances
[docs]
def simulate_variable_distances(self, args_dict):
"""Simulate light emission for variable distances and return output files list."""
if args_dict["distances_ls"] is not None:
self.update_light_emission_config(
"z_pos", self.distance_list(args_dict["distances_ls"])
)
self._logger.info(
f"Simulating for distances: {self.light_emission_config['z_pos']['default']}"
)
outputs: list[Path] = []
distances = self.calculate_distance_telescope_calibration_device()
for current_distance, z_pos in zip(
distances, self.light_emission_config["z_pos"]["default"]
):
self.update_light_emission_config("z_pos", z_pos)
self.distance = current_distance
outputs.append(self.run_simulation())
return outputs
[docs]
def simulate_layout_positions(self, args_dict): # pylint: disable=unused-argument
"""Simulate light emission for layout positions and return output files list."""
# args_dict kept for API symmetry; explicitly mark as unused
del args_dict
self.distance = self.calculate_distance_telescope_calibration_device()[0]
# Single distance for layout
return [self.run_simulation()]