"""Light emission simulation (e.g. illuminators or flashers)."""
import logging
import shutil
import stat
import subprocess
from pathlib import Path
import astropy.units as u
import numpy as np
from simtools.io import io_handler
from simtools.model.model_utils import initialize_simulation_models
from simtools.runners.simtel_runner import SimtelRunner
from simtools.utils.general import clear_default_sim_telarray_cfg_directories
from simtools.utils.geometry import fiducial_radius_from_shape
[docs]
class SimulatorLightEmission(SimtelRunner):
"""
Light emission simulation (e.g. illuminators or flashers).
Uses the sim_telarray LightEmission package to simulate the light emission.
Parameters
----------
light_emission_config : dict, optional
Configuration for the light emission (e.g. number of events, model names)
label : str, optional
Label for the simulation
"""
def __init__(self, light_emission_config, db_config=None, label=None):
"""Initialize SimulatorLightEmission."""
self._logger = logging.getLogger(__name__)
self.io_handler = io_handler.IOHandler()
super().__init__(
simtel_path=light_emission_config.get("simtel_path"), label=label, corsika_config=None
)
self.output_directory = self.io_handler.get_output_directory()
self.telescope_model, self.site_model, self.calibration_model = (
initialize_simulation_models(
label=label,
db_config=db_config,
site=light_emission_config.get("site"),
telescope_name=light_emission_config.get("telescope"),
calibration_device_name=light_emission_config.get("light_source"),
model_version=light_emission_config.get("model_version"),
)
)
self.telescope_model.write_sim_telarray_config_file(additional_models=self.site_model)
self.light_emission_config = self._initialize_light_emission_configuration(
light_emission_config
)
def _initialize_light_emission_configuration(self, config):
"""Initialize light emission configuration."""
if self.calibration_model.get_parameter_value("flasher_type"):
config["light_source_type"] = self.calibration_model.get_parameter_value(
"flasher_type"
).lower()
config["flasher_photons"] = (
self.calibration_model.get_parameter_value("flasher_photons")
if not config.get("test", False)
else 1e8
)
if config.get("light_source_position") is not None:
config["light_source_position"] = (
np.array(config["light_source_position"], dtype=float) * u.m
)
return config
[docs]
def simulate(self):
"""
Simulate light emission.
Returns
-------
Path
The output simtel file path.
"""
run_script = self.prepare_script()
log_path = Path(self.output_directory) / "logfile.log"
with open(log_path, "w", encoding="utf-8") as fh:
subprocess.run(
run_script,
shell=False,
check=False,
text=True,
stdout=fh,
stderr=fh,
)
out = Path(self._get_simulation_output_filename())
if not out.exists():
self._logger.warning(f"Expected sim_telarray output not found: {out}")
return out
[docs]
def prepare_script(self):
"""
Build and return bash run script containing the light-emission command.
Returns
-------
Path
Full path of the run script.
"""
script_dir = self.output_directory.joinpath("scripts")
script_dir.mkdir(parents=True, exist_ok=True)
app_name = self._get_light_emission_application_name()
script_file = script_dir / f"{app_name}-light_emission.sh"
self._logger.debug(f"Run bash script - {script_file}")
target_out = Path(self._get_simulation_output_filename())
if target_out.exists():
raise FileExistsError(
f"sim_telarray output file exists, cancelling simulation: {target_out}"
)
lines = [
"#!/usr/bin/env bash\n",
f"{self._make_light_emission_script()}\n\n",
(
f"[ -s '{self.output_directory}/{app_name}.iact.gz' ] || "
f"{{ echo 'LightEmission did not produce IACT file' >&2; exit 1; }}\n\n"
),
f"{self._make_simtel_script()}\n\n",
f"rm -f '{self.output_directory}/{app_name}.iact.gz'\n\n",
]
script_file.write_text("".join(lines), encoding="utf-8")
script_file.chmod(script_file.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP)
return script_file
def _get_prefix(self):
prefix = self.light_emission_config.get("output_prefix", "")
if prefix is not None:
return f"{prefix}_"
return ""
def _get_light_emission_application_name(self):
"""
Return the LightEmission application and mode from type.
Returns
-------
str
app_name
"""
if self.light_emission_config["light_source_type"] == "flat_fielding":
return "ff-1m"
# default to illuminator xyzls, mode from setup
return "xyzls"
def _get_telescope_pointing(self):
"""
Return telescope pointing based on light source type.
For flat_fielding sims, avoid calibration pointing entirely; default angles to (0,0).
Returns
-------
tuple
The telescope pointing angles (theta, phi).
"""
if self.light_emission_config["light_source_type"] == "flat_fielding":
return 0.0, 0.0
if self.light_emission_config.get("light_source_position") is not None:
self._logger.info("Using fixed (vertical up) telescope pointing.")
return 0.0, 0.0
_, angles = self._calibration_pointing_direction()
return angles[0], angles[1]
def _calibration_pointing_direction(self, x_cal=None, y_cal=None, z_cal=None):
"""
Calculate the pointing of the calibration device towards the telescope.
This is for calibration devices not installed on telescopes (e.g. illuminators).
Returns
-------
list
The pointing vector from the calibration device to the telescope.
"""
if x_cal is None or y_cal is None or z_cal is None:
x_cal, y_cal, z_cal = self.calibration_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_cal, y_cal, z_cal = [coord.to(u.m).value for coord in (x_cal, y_cal, z_cal)]
cal_vect = np.array([x_cal, y_cal, z_cal])
x_tel, y_tel, z_tel = self.telescope_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_tel, y_tel, z_tel = [coord.to(u.m).value for coord in (x_tel, y_tel, z_tel)]
tel_vect = np.array([x_tel, y_tel, z_tel])
direction_vector = tel_vect - cal_vect
# pointing vector from calibration device to telescope
pointing_vector = np.round(direction_vector / np.linalg.norm(direction_vector), 6)
# Calculate telescope theta and phi angles
tel_theta = 180 - np.round(
np.rad2deg(np.arccos(direction_vector[2] / np.linalg.norm(direction_vector))), 6
)
tel_phi = 180 - np.round(
np.rad2deg(np.arctan2(direction_vector[1], direction_vector[0])), 6
)
# Calculate source beam theta and phi angles
direction_vector_inv = direction_vector * -1
source_theta = np.round(
np.rad2deg(np.arccos(direction_vector_inv[2] / np.linalg.norm(direction_vector_inv))),
6,
)
source_phi = np.round(
np.rad2deg(np.arctan2(direction_vector_inv[1], direction_vector_inv[0])), 6
)
return pointing_vector.tolist(), [tel_theta, tel_phi, source_theta, source_phi]
def _write_telescope_position_file(self):
"""
Write the telescope positions to a telescope_position file.
The file will contain lines in the format: x y z r in cm
Returns
-------
Path
The path to the generated telescope_position file.
"""
x_tel, y_tel, z_tel = self.telescope_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_tel, y_tel, z_tel = [coord.to(u.cm).value for coord in (x_tel, y_tel, z_tel)]
radius = self.telescope_model.get_parameter_value_with_unit("telescope_sphere_radius")
radius = radius.to(u.cm).value # Convert radius to cm
telescope_position_file = self.output_directory.joinpath("telescope_position.dat")
telescope_position_file.write_text(f"{x_tel} {y_tel} {z_tel} {radius}\n", encoding="utf-8")
return telescope_position_file
def _prepare_flasher_atmosphere_files(self, config_directory, model_id=1):
"""
Prepare canonical atmosphere aliases for ff-1m and return model id.
The ff-1m tool requires atmosphere files atmprof1.dat or atm_profile_model_1.dat and
as configuration parameter the atmosphere id ('--atmosphere id').
"""
src_path = config_directory / self.site_model.get_parameter_value("atmospheric_profile")
self._logger.debug(f"Using atmosphere profile: {src_path}")
for name in (f"atmprof{model_id}.dat", f"atm_profile_model_{model_id}.dat"):
dst = config_directory / name
try:
if dst.exists() or dst.is_symlink():
dst.unlink()
try:
dst.symlink_to(src_path)
except OSError:
shutil.copy2(src_path, dst)
except OSError as copy_err:
self._logger.warning(f"Failed to create atmosphere alias {dst.name}: {copy_err}")
return model_id
def _make_light_emission_script(self):
"""
Create the light emission script to run the light emission package.
Require the specified pre-compiled light emission package application
in the sim_telarray/LightEmission/ path.
Returns
-------
str
The commands to run the Light Emission package
"""
config_directory = self.io_handler.get_model_configuration_directory(
model_version=self.site_model.model_version
)
app_name = self._get_light_emission_application_name()
corsika_observation_level = self.site_model.get_parameter_value_with_unit(
"corsika_observation_level"
)
parts = [str(self._simtel_path / "sim_telarray/LightEmission") + f"/{app_name}"]
parts.extend(self._get_site_command(app_name, config_directory, corsika_observation_level))
parts.extend(self._get_light_source_command())
if self.light_emission_config["light_source_type"] == "illuminator":
parts += [
"-A",
(
f"{config_directory}/"
f"{self.telescope_model.get_parameter_value('atmospheric_profile')}"
),
]
parts += [f"-o {self.output_directory}/{app_name}.iact.gz", "\n"]
return " ".join(parts)
def _get_site_command(self, app_name, config_directory, corsika_observation_level):
"""Return site command with altitude, atmosphere and telescope_position handling."""
if app_name in ("ff-1m",):
atmo_id = self._prepare_flasher_atmosphere_files(config_directory)
return [
"-I.",
f"-I{self._simtel_path / 'sim_telarray/cfg'}",
f"-I{config_directory}",
f"--altitude {corsika_observation_level.to(u.m).value}",
f"--atmosphere {atmo_id}",
]
# default path (not used for flasher now, but kept for completeness)
return [
f"-h {corsika_observation_level.to(u.m).value} ",
f"--telpos-file {self._write_telescope_position_file()}",
]
def _get_light_source_command(self):
"""Return light-source specific command options."""
if self.light_emission_config["light_source_type"] == "flat_fielding":
return self._add_flasher_command_options()
if self.light_emission_config["light_source_type"] == "illuminator":
return self._add_illuminator_command_options()
raise ValueError(
f"Unknown light_source_type '{self.light_emission_config['light_source_type']}'"
)
def _add_flasher_command_options(self):
"""Add flasher options for all telescope types (ff-1m style)."""
flasher_xyz = self.calibration_model.get_parameter_value_with_unit("flasher_position")
camera_radius = fiducial_radius_from_shape(
self.telescope_model.get_parameter_value_with_unit("camera_body_diameter")
.to(u.cm)
.value,
self.telescope_model.get_parameter_value("camera_body_shape"),
)
flasher_wavelength = self.calibration_model.get_parameter_value_with_unit(
"flasher_wavelength"
)
dist_cm = self.calculate_distance_focal_plane_calibration_device().to(u.cm).value
angular_distribution = self._get_angular_distribution_string_for_sim_telarray()
return [
f"--events {self.light_emission_config['number_of_events']}",
f"--photons {self.light_emission_config['flasher_photons']}",
f"--bunchsize {self.calibration_model.get_parameter_value('flasher_bunch_size')}",
f"--xy {flasher_xyz[0].to(u.cm).value},{flasher_xyz[1].to(u.cm).value}",
f"--distance {dist_cm}",
f"--camera-radius {camera_radius}",
f"--spectrum {int(flasher_wavelength.to(u.nm).value)}",
f"--lightpulse {self._get_pulse_shape_string_for_sim_telarray()}",
f"--angular-distribution {angular_distribution}",
]
def _add_illuminator_command_options(self):
"""Get illuminator-specific command options for light emission script."""
pos = self.light_emission_config.get("light_source_position")
if pos is None:
pos = self.calibration_model.get_parameter_value_with_unit(
"array_element_position_ground"
)
x_cal, y_cal, z_cal = pos
if self.light_emission_config.get("light_source_pointing"):
pointing_vector = self.light_emission_config["light_source_pointing"]
else:
pointing_vector = self._calibration_pointing_direction(x_cal, y_cal, z_cal)[0]
flasher_wavelength = self.calibration_model.get_parameter_value_with_unit(
"flasher_wavelength"
)
angular_distribution = self._get_angular_distribution_string_for_sim_telarray()
return [
f"-x {x_cal.to(u.cm).value}",
f"-y {y_cal.to(u.cm).value}",
f"-z {z_cal.to(u.cm).value}",
f"-d {','.join(map(str, pointing_vector))}",
f"-n {self.light_emission_config['flasher_photons']}",
f"-s {int(flasher_wavelength.to(u.nm).value)}",
f"-p {self._get_pulse_shape_string_for_sim_telarray()}",
f"-a {angular_distribution}",
]
def _make_simtel_script(self):
"""
Return the command to run sim_telarray using the output from the previous step.
Returns
-------
str
The command to run sim_telarray
"""
theta, phi = self._get_telescope_pointing()
simtel_bin = self._simtel_path.joinpath("sim_telarray/bin/sim_telarray/")
parts = [
f"{simtel_bin}",
f"-I{self.telescope_model.config_file_directory}",
f"-I{simtel_bin}",
f"-c {self.telescope_model.config_file_path}",
"-DNUM_TELESCOPES=1",
super().get_config_option(
"altitude",
self.site_model.get_parameter_value_with_unit("corsika_observation_level")
.to(u.m)
.value,
),
super().get_config_option(
"atmospheric_transmission",
self.site_model.get_parameter_value("atmospheric_transmission"),
),
super().get_config_option("TRIGGER_TELESCOPES", "1"),
super().get_config_option("TELTRIG_MIN_SIGSUM", "2"),
super().get_config_option("PULSE_ANALYSIS", "-30"),
super().get_config_option("MAXIMUM_TELESCOPES", 1),
super().get_config_option("telescope_theta", f"{theta}"),
super().get_config_option("telescope_phi", f"{phi}"),
]
if self.light_emission_config["light_source_type"] == "flat_fielding":
parts.append(super().get_config_option("Bypass_Optics", "1"))
app_name = self._get_light_emission_application_name()
pref = self._get_prefix()
parts += [
super().get_config_option("power_law", "2.68"),
super().get_config_option("input_file", f"{self.output_directory}/{app_name}.iact.gz"),
super().get_config_option(
"output_file", f"{self.output_directory}/{pref}{app_name}.simtel.zst"
),
super().get_config_option(
"histogram_file", f"{self.output_directory}/{pref}{app_name}.ctsim.hdata\n"
),
]
return clear_default_sim_telarray_cfg_directories(" ".join(parts))
def _get_simulation_output_filename(self):
"""Get the filename of the simulation output."""
app_name = self._get_light_emission_application_name()
pref = self._get_prefix()
return f"{self.output_directory}/{pref}{app_name}.simtel.zst"
[docs]
def calculate_distance_focal_plane_calibration_device(self):
"""
Calculate distance between focal plane and calibration device.
For flasher-type light sources. Flasher position is given in mirror coordinates,
with positive z pointing towards the camera, so the distance is focal_length - flasher_z.
Returns
-------
astropy.units.Quantity
Distance between calibration device and focal plane.
"""
focal_length = self.telescope_model.get_parameter_value_with_unit("focal_length").to(u.m)
flasher_z = self.calibration_model.get_parameter_value_with_unit("flasher_position")[2].to(
u.m
)
return focal_length - flasher_z
def _get_angular_distribution_string_for_sim_telarray(self):
"""
Get the angular distribution string for sim_telarray.
Returns
-------
str
The angular distribution string.
"""
option_string = self.calibration_model.get_parameter_value(
"flasher_angular_distribution"
).lower()
width = self.calibration_model.get_parameter_value_with_unit(
"flasher_angular_distribution_width"
)
return f"{option_string}:{width.to(u.deg).value}" if width is not None else option_string
def _get_pulse_shape_string_for_sim_telarray(self):
"""
Get the pulse shape string for sim_telarray.
Returns
-------
str
The pulse shape string.
"""
option_string = self.calibration_model.get_parameter_value("flasher_pulse_shape").lower()
width = self.calibration_model.get_parameter_value_with_unit("flasher_pulse_width")
return f"{option_string}:{width.to(u.ns).value}" if width is not None else option_string