Source code for testing.validate_output

"""Compare application output to reference output."""

import logging
from pathlib import Path

import numpy as np
from astropy.table import Table

import simtools.utils.general as gen
from simtools.testing import assertions

_logger = logging.getLogger(__name__)


[docs] def validate_all_tests(config, request, config_file_model_version): """ Validate test output for all integration tests. Parameters ---------- config: dict Integration test configuration dictionary. request: request Request object. config_file_model_version: str Model version from the configuration file. """ if request.config.getoption("--model_version") is None: validate_application_output(config) elif config_file_model_version is not None: _from_command_line = request.config.getoption("--model_version") _from_config_file = config_file_model_version if _from_command_line == _from_config_file: validate_application_output(config)
[docs] def validate_application_output(config): """ Validate application output against expected output. Expected output is defined in configuration file. Parameters ---------- config: dict dictionary with the configuration for the application test. """ if "INTEGRATION_TESTS" not in config: return for integration_test in config["INTEGRATION_TESTS"]: _logger.info(f"Testing application output: {integration_test}") if "REFERENCE_OUTPUT_FILE" in integration_test: _validate_reference_output_file(config, integration_test) if "OUTPUT_FILE" in integration_test: _validate_output_path_and_file(config, integration_test) if "FILE_TYPE" in integration_test: assert assertions.assert_file_type( integration_test["FILE_TYPE"], Path(config["CONFIGURATION"]["OUTPUT_PATH"]).joinpath( config["CONFIGURATION"]["OUTPUT_FILE"] ), )
def _validate_reference_output_file(config, integration_test): """Compare with reference output file.""" assert compare_files( integration_test["REFERENCE_OUTPUT_FILE"], Path(config["CONFIGURATION"]["OUTPUT_PATH"]).joinpath( config["CONFIGURATION"]["OUTPUT_FILE"] ), integration_test.get("TOLERANCE", 1.0e-5), integration_test.get("TEST_COLUMNS", None), ) def _validate_output_path_and_file(config, integration_test): """Check if output path and file exist.""" _logger.info(f"PATH {config['CONFIGURATION']['OUTPUT_PATH']}") _logger.info(f"File {integration_test['OUTPUT_FILE']}") data_path = config["CONFIGURATION"].get( "DATA_DIRECTORY", config["CONFIGURATION"]["OUTPUT_PATH"] ) output_file_path = Path(data_path) / integration_test["OUTPUT_FILE"] _logger.info(f"Checking path: {output_file_path}") assert output_file_path.exists() expected_output = [ d["EXPECTED_OUTPUT"] for d in config["INTEGRATION_TESTS"] if "EXPECTED_OUTPUT" in d ] if expected_output and "log_hist" not in integration_test["OUTPUT_FILE"]: # Get the expected output from the configuration file expected_output = expected_output[0] _logger.info( f"Checking the output of {integration_test['OUTPUT_FILE']} " "complies with the expected output: " f"{expected_output}" ) assert assertions.check_output_from_sim_telarray(output_file_path, expected_output)
[docs] def compare_files(file1, file2, tolerance=1.0e-5, test_columns=None): """ Compare two files of file type ecsv, json or yaml. Parameters ---------- file1: str First file to compare file2: str Second file to compare tolerance: float Tolerance for comparing numerical values. test_columns: list List of columns to compare. If None, all columns are compared. Returns ------- bool True if the files are equal, False otherwise. """ _file1_suffix = Path(file1).suffix _file2_suffix = Path(file2).suffix if _file1_suffix != _file2_suffix: raise ValueError(f"File suffixes do not match: {file1} and {file2}") if _file1_suffix == ".ecsv": return compare_ecsv_files(file1, file2, tolerance, test_columns) if _file1_suffix in (".json", ".yaml", ".yml"): return compare_json_or_yaml_files(file1, file2) _logger.warning(f"Unknown file type for files: {file1} and {file2}") return False
[docs] def compare_json_or_yaml_files(file1, file2, tolerance=1.0e-2): """ Compare two json or yaml files. Take into account float comparison for sim_telarray string-embedded floats. Parameters ---------- file1: str First file to compare file2: str Second file to compare tolerance: float Tolerance for comparing numerical values. Returns ------- bool True if the files are equal, False otherwise. """ data1 = gen.collect_data_from_file_or_dict(file1, in_dict=None) data2 = gen.collect_data_from_file_or_dict(file2, in_dict=None) _logger.debug(f"Comparing json/yaml files: {file1} and {file2}") if data1 == data2: return True if "value" in data1 and isinstance(data1["value"], str): value_list_1 = gen.convert_string_to_list(data1.pop("value")) value_list_2 = gen.convert_string_to_list(data2.pop("value")) return np.allclose(value_list_1, value_list_2, rtol=tolerance) return data1 == data2
[docs] def compare_ecsv_files(file1, file2, tolerance=1.0e-5, test_columns=None): """ Compare two ecsv files. The comparison is successful if: - same number of rows - numerical values in columns are close The comparison can be restricted to a subset of columns with some additional cuts applied. This is configured through the test_columns parameter. This is a list of dictionaries, where each dictionary contains the following key-value pairs: - TEST_COLUMN_NAME: column name to compare. - CUT_COLUMN_NAME: column for filtering. - CUT_CONDITION: condition for filtering. Parameters ---------- file1: str First file to compare file2: str Second file to compare tolerance: float Tolerance for comparing numerical values. test_columns: list List of columns to compare. If None, all columns are compared. """ _logger.info(f"Comparing files: {file1} and {file2}") table1 = Table.read(file1, format="ascii.ecsv") table2 = Table.read(file2, format="ascii.ecsv") if test_columns is None: test_columns = [{"TEST_COLUMN_NAME": col} for col in table1.colnames] def generate_mask(table, column, condition): """Generate a boolean mask based on the condition (note the usage of eval).""" return ( eval(f"table['{column}'] {condition}") # pylint: disable=eval-used if condition else np.ones(len(table), dtype=bool) ) for col_dict in test_columns: col_name = col_dict["TEST_COLUMN_NAME"] mask1 = generate_mask( table1, col_dict.get("CUT_COLUMN_NAME", ""), col_dict.get("CUT_CONDITION", "") ) mask2 = generate_mask( table2, col_dict.get("CUT_COLUMN_NAME", ""), col_dict.get("CUT_CONDITION", "") ) table1_masked, table2_masked = table1[mask1], table2[mask2] if len(table1_masked) != len(table2_masked): return False if np.issubdtype(table1_masked[col_name].dtype, np.floating): if not np.allclose(table1_masked[col_name], table2_masked[col_name], rtol=tolerance): return False return True