Source code for testing.assertions
"""Functions asserting certain conditions are met (used e.g., in integration tests)."""
import gzip
import json
import logging
import tarfile
from collections import defaultdict
from pathlib import Path
import numpy as np
import yaml
from simtools.simtel.simtel_io_metadata import read_sim_telarray_metadata
_logger = logging.getLogger(__name__)
[docs]
def assert_file_type(file_type, file_name):
"""
Assert that the file is of the given type.
Parameters
----------
file_type: str
File type (json, yaml).
file_name: str
File name.
"""
if file_type == "json":
try:
with open(file_name, encoding="utf-8") as file:
json.load(file)
return True
except (json.JSONDecodeError, FileNotFoundError):
return False
if file_type in ("yaml", "yml"):
if Path(file_name).suffix[1:] not in ("yaml", "yml"):
return False
try:
with open(file_name, encoding="utf-8") as file:
yaml.safe_load(file)
return True
except (yaml.YAMLError, FileNotFoundError):
return False
# no dedicated tests for other file types, checking suffix only
_logger.info(f"File type test is checking suffix only for {file_name} (suffix: {file_type}))")
return Path(file_name).suffix[1:] == file_type
[docs]
def assert_n_showers_and_energy_range(file):
"""
Assert the number of showers and the energy range.
The number of showers should be consistent with the required one (up to 1% tolerance)
and the energies simulated are required to be within the configured ones.
Parameters
----------
file: Path
Path to the sim_telarray file.
"""
from eventio.simtel.simtelfile import SimTelFile # pylint: disable=import-outside-toplevel
simulated_energies = []
simulation_config = {}
with SimTelFile(file, skip_non_triggered=False) as f:
simulation_config = f.mc_run_headers[0]
for event in f:
simulated_energies.append(event["mc_shower"]["energy"])
# The relative tolerance is set to 1% because ~0.5% shower simulations do not
# succeed, without resulting in an error. This tolerance therefore is not an issue.
consistent_n_showers = np.isclose(
len(np.unique(simulated_energies)), simulation_config["n_showers"], rtol=1e-2
)
consistent_energy_range = all(
simulation_config["E_range"][0] <= energy <= simulation_config["E_range"][1]
for energy in simulated_energies
)
return consistent_n_showers and consistent_energy_range
[docs]
def assert_expected_output(file, expected_output):
"""
Assert that the expected output is present in the sim_telarray file.
Parameters
----------
file: Path
Path to the sim_telarray file.
expected_output: dict
Expected output values.
"""
from eventio.simtel.simtelfile import SimTelFile # pylint: disable=import-outside-toplevel
item_to_check = defaultdict(list)
with SimTelFile(file) as f:
for event in f:
if "pe_sum" in expected_output:
item_to_check["pe_sum"].extend(
event["photoelectron_sums"]["n_pe"][event["photoelectron_sums"]["n_pe"] > 0]
)
if "trigger_time" in expected_output:
item_to_check["trigger_time"].extend(event["trigger_information"]["trigger_times"])
if "photons" in expected_output:
item_to_check["photons"].extend(
event["photoelectron_sums"]["photons_atm_qe"][
event["photoelectron_sums"]["photons"] > 0
]
)
for key, value in expected_output.items():
if len(item_to_check[key]) == 0:
_logger.error(f"No data found for {key}")
return False
if not value[0] < np.mean(item_to_check[key]) < value[1]:
_logger.error(
f"Mean of {key} is not in the expected range, got {np.mean(item_to_check[key])}"
)
return False
return True
[docs]
def check_output_from_sim_telarray(file, file_test):
"""
Check that the sim_telarray simulation result is reasonable and matches the expected output.
Parameters
----------
file: Path
Path to the sim_telarray file.
file_test: dict
File test description including expected output and metadata.
Raises
------
ValueError
If the file is not a zstd compressed file.
"""
if "expected_output" not in file_test and "expected_simtel_metadata" not in file_test:
_logger.debug(f"No expected output or metadata provided, skipping checks {file_test}")
return True
assert_output = assert_metadata = True
if "expected_output" in file_test:
assert_output = assert_expected_output(
file=file, expected_output=file_test["expected_output"]
)
if "expected_simtel_metadata" in file_test:
assert_metadata = assert_expected_simtel_metadata(
file=file, expected_metadata=file_test["expected_simtel_metadata"]
)
return assert_n_showers_and_energy_range(file=file) and assert_output and assert_metadata
def _find_patterns(text, patterns):
"""Find patterns in text."""
return {p for p in patterns if p in text}
def _read_log(member, tar):
"""Read and decode a gzipped log file from a tar archive."""
with tar.extractfile(member) as gz, gzip.open(gz, "rb") as f:
return f.read().decode("utf-8", "ignore")
[docs]
def check_simulation_logs(tar_file, file_test):
"""
Check log files of CORSIKA and sim_telarray for expected output.
Parameters
----------
tar_file: Path
Path to a log file tar package.
file_test: dict
File test description including expected log output.
Raises
------
ValueError
If the file is not a tar file.
"""
expected_log = file_test.get("expected_log_output", {})
wanted = expected_log.get("pattern", [])
forbidden = expected_log.get("forbidden_pattern", [])
if not (wanted or forbidden):
_logger.debug(f"No expected log output provided, skipping checks {file_test}")
return True
if not tarfile.is_tarfile(tar_file):
raise ValueError(f"File {tar_file} is not a tar file.")
found, bad = set(), set()
with tarfile.open(tar_file, "r:*") as tar:
for member in tar.getmembers():
if not member.name.endswith(".log.gz"):
continue
_logger.info(f"Scanning {member.name}")
text = _read_log(member, tar)
found |= _find_patterns(text, wanted)
bad |= _find_patterns(text, forbidden)
if bad:
_logger.error(f"Forbidden patterns found: {list(bad)}")
return False
missing = [p for p in wanted if p not in found]
if missing:
_logger.error(f"Missing expected patterns: {missing}")
return False
_logger.debug(f"All expected patterns found: {wanted}")
return True