Source code for data_model.schema

"""Module providing functionality to read and validate dictionaries using schema."""

import logging
from pathlib import Path

import jsonschema
from referencing import Registry, Resource

import simtools.utils.general as gen
from simtools.constants import (
    METADATA_JSON_SCHEMA,
    MODEL_PARAMETER_METASCHEMA,
    MODEL_PARAMETER_SCHEMA_PATH,
    SCHEMA_PATH,
)
from simtools.data_model import format_checkers
from simtools.dependencies import get_software_version
from simtools.io import ascii_handler
from simtools.utils import names
from simtools.version import check_version_constraint

_logger = logging.getLogger(__name__)


[docs] def get_model_parameter_schema_files(schema_directory=MODEL_PARAMETER_SCHEMA_PATH): """ Return list of parameters and schema files located in schema file directory. Returns ------- list List of parameters found in schema file directory. list List of schema files found in schema file directory. """ schema_files = sorted(Path(schema_directory).rglob("*.schema.yml")) if not schema_files: raise FileNotFoundError(f"No schema files found in {schema_directory}") parameters = [] for schema_file in schema_files: # reading parameter 'name' only - first document in schema file should be ok schema_dict = ascii_handler.collect_data_from_file(file_name=schema_file, yaml_document=0) parameters.append(schema_dict.get("name")) return parameters, schema_files
[docs] def get_model_parameter_schema_file(parameter): """ Return schema file path for a given model parameter. Parameters ---------- parameter: str Model parameter name. Returns ------- Path Schema file path. """ schema_file = MODEL_PARAMETER_SCHEMA_PATH / f"{parameter}.schema.yml" if not schema_file.exists(): raise FileNotFoundError(f"Schema file not found: {schema_file}") return schema_file
[docs] def get_model_parameter_schema_version(schema_version=None): """ Validate and return schema versions. If no schema_version is given, the most recent version is provided. Parameters ---------- schema_version: str Schema version. Returns ------- str Schema version. """ schemas = ascii_handler.collect_data_from_file(MODEL_PARAMETER_METASCHEMA) if schema_version is None and schemas: return schemas[0].get("schema_version") if any(schema.get("schema_version") == schema_version for schema in schemas): return schema_version raise ValueError(f"Schema version {schema_version} not found in {MODEL_PARAMETER_METASCHEMA}.")
[docs] def validate_dict_using_schema( data, schema_file=None, json_schema=None, ignore_software_version=False, offline=False ): """ Validate a data dictionary against a schema. Parameters ---------- data dictionary to be validated schema_file (dict) schema used for validation json_schema (dict) schema used for validation ignore_software_version: bool If True, ignore software version check. Raises ------ jsonschema.exceptions.ValidationError if validation fails """ if json_schema is None and schema_file is None: _logger.warning(f"No schema provided for validation of {data}") return None if json_schema is None: json_schema = load_schema(schema_file, get_schema_version_from_data(data)) validate_deprecation_and_version(data, ignore_software_version=ignore_software_version) validator = jsonschema.Draft6Validator( schema=json_schema, format_checker=format_checkers.format_checker, registry=Registry(retrieve=_retrieve_yaml_schema_from_uri), ) try: validator.validate(instance=data) except jsonschema.exceptions.ValidationError as exc: _logger.error(f"Validation failed using schema: {json_schema} for data: {data}") raise exc if not offline: _validate_meta_schema_url(data) _logger.debug(f"Successful validation of data using schema ({json_schema.get('name')})") return data
def _validate_meta_schema_url(data): """Validate meta_schema_url if present in data.""" if not isinstance(data, dict): return if data.get("meta_schema_url") is not None and not gen.url_exists(data["meta_schema_url"]): raise FileNotFoundError(f"Meta schema URL does not exist: {data['meta_schema_url']}") def _retrieve_yaml_schema_from_uri(uri): """Load schema from a file URI.""" path = SCHEMA_PATH / Path(uri.removeprefix("file:/")) contents = ascii_handler.collect_data_from_file(file_name=path) return Resource.from_contents(contents)
[docs] def get_schema_version_from_data(data, observatory="cta"): """ Get schema version from data dictionary. Parameters ---------- data: dict data dictionary. Returns ------- str Schema version. If not found, returns 'latest'. """ schema_version = data.get("schema_version") or data.get("SCHEMA_VERSION") if schema_version: return schema_version reference_version = data.get(observatory.upper(), {}).get("REFERENCE", {}).get( "VERSION" ) or data.get(observatory.lower(), {}).get("reference", {}).get("version") if reference_version: return reference_version return "latest"
[docs] def load_schema(schema_file=None, schema_version="latest"): """ Load parameter schema from file. Parameters ---------- schema_file: str Path to schema file. schema_version: str Schema version. Returns ------- schema: dict Schema dictionary. Raises ------ FileNotFoundError if schema file is not found """ schema_file = schema_file or METADATA_JSON_SCHEMA for path in (schema_file, SCHEMA_PATH / schema_file): try: schema = ascii_handler.collect_data_from_file(file_name=path) break except FileNotFoundError: continue else: raise FileNotFoundError(f"Schema file not found: {schema_file}") _logger.debug(f"Loading schema from {schema_file} for schema version {schema_version}") schema = _get_schema_for_version(schema, schema_file, schema_version) _add_array_elements("InstrumentTypeElement", schema) return schema
def _get_schema_for_version(schema, schema_file, schema_version): """ Get schema for a specific version. Allow for 'latest' version to return the most recent schema. Parameters ---------- schema: dict or list Schema dictionary or list of dictionaries. schema_file: str Path to schema file. schema_version: str or None Schema version to retrieve. If 'latest', the most recent version is returned. Returns ------- dict Schema dictionary for the specified version. """ if schema_version is None: raise ValueError(f"Schema version not given in {schema_file}.") if isinstance(schema, list): # schema file with several schemas defined if len(schema) == 0: raise ValueError(f"No schemas found in {schema_file}.") if schema_version == "latest": schema_version = schema[0].get("schema_version") schema = next((doc for doc in schema if doc.get("schema_version") == schema_version), None) if schema is None: raise ValueError(f"Schema version {schema_version} not found in {schema_file}.") if schema_version not in (None, "latest") and schema_version != schema.get("schema_version"): _logger.warning( f"Schema version {schema_version} does not match {schema.get('schema_version')}" ) return schema def _get_array_element_list(): """Build complete list of array elements including design types.""" elements = set(names.array_elements().keys()) for array_element in names.array_elements(): for design_type in names.array_element_design_types(array_element): elements.add(f"{array_element}-{design_type}") return sorted(elements) def _add_array_elements(key, schema): """ Add list of array elements to schema. Avoids having to list all array elements in multiple schema. Assumes an element [key]['enum'] is a list of elements. Parameters ---------- key: str Key in schema dictionary schema: dict Schema dictionary Returns ------- dict Schema dictionary with added array elements. """ array_elements = _get_array_element_list() def update_enum(sub_schema): if "enum" in sub_schema and isinstance(sub_schema["enum"], list): sub_schema["enum"] = list(set(sub_schema["enum"] + array_elements)) else: sub_schema["enum"] = array_elements def recursive_search(sub_schema, target_key): if target_key in sub_schema: update_enum(sub_schema[target_key]) return for v in sub_schema.values(): if isinstance(v, dict): recursive_search(v, target_key) recursive_search(schema, key) return schema
[docs] def validate_deprecation_and_version(data, software_name=None, ignore_software_version=False): """ Check if data contains deprecated parameters or version mismatches. Parameters ---------- data: dict Data dictionary to check. software_name: str or None Name of the software to check version against. If None, use complete list ignore_software_version: bool If True, ignore software version check. """ if not isinstance(data, dict): return data_name = data.get("name", "<unknown>") if data.get("deprecated", False): note = data.get("deprecation_note", "(no deprecation note provided)") _logger.warning(f"Data for {data_name} is deprecated. Note: {note}") for sw in data.get("simulation_software", []): name, constraint = sw.get("name"), sw.get("version") if not name or not constraint: continue if software_name is not None and name.lower() != software_name.lower(): continue software_version = get_software_version(name) if check_version_constraint(software_version, constraint): _logger.debug( f"{data_name}: version {software_version} of {name} matches " f"constraint {constraint}." ) continue msg = f"{data_name}: version {software_version} of {name} does not match {constraint}." if ignore_software_version: _logger.warning(f"{msg}, but version check is ignored.") else: raise ValueError(msg)
[docs] def validate_schema_from_files( file_directory, file_name=None, schema_file=None, ignore_software_version=False ): """ Validate a schema file or several files in a directory. Files to be validated are taken from file_directory and file_name pattern. The schema is either given as command line argument, read from the meta_schema_url or from the metadata section of the data dictionary. Parameters ---------- file_directory : str or Path, optional Directory with files to be validated. file_name : str or Path, optional File name pattern to be validated. schema_file : str, optional Schema file name provided directly. ignore_software_version : bool If True, ignore software version check. """ if file_directory and file_name: file_list = sorted(Path(file_directory).rglob(file_name)) else: file_list = [Path(file_name)] if file_name else [] for _file_name in file_list: try: data = ascii_handler.collect_data_from_file(file_name=_file_name) except FileNotFoundError as exc: raise FileNotFoundError(f"Error reading schema file from {_file_name}") from exc data = data if isinstance(data, list) else [data] try: for data_dict in data: validate_dict_using_schema( data_dict, _get_schema_file_name(schema_file, _file_name, data_dict), ignore_software_version=ignore_software_version, ) except Exception as exc: raise ValueError(f"Validation of file {_file_name} failed") from exc _logger.info(f"Successful validation of file {_file_name}")
def _get_schema_file_name(schema_file=None, file_name=None, data_dict=None): """ Get schema file name from metadata, data dict, or from file. Parameters ---------- schema_file : str, optional Schema file name provided directly. file_name : str or Path, optional File name to extract schema information from. data_dict : dict, optional Dictionary with metaschema information. Returns ------- str or None Schema file name. """ if schema_file is not None: return schema_file if data_dict and (url := data_dict.get("meta_schema_url")): return url if file_name: return _extract_schema_from_file(file_name) return None def _extract_schema_url_from_metadata_dict(metadata, observatory="cta"): """Extract schema URL from metadata dictionary.""" for key in (observatory, observatory.lower()): url = metadata.get(key, {}).get("product", {}).get("data", {}).get("model", {}).get("url") if url: return url return None def _extract_schema_from_file(file_name, observatory="cta"): """ Extract schema file name from a metadata or data file. Parameters ---------- file_name : str or Path File name to extract schema information from. observatory : str Observatory name (default: "cta"). Returns ------- str or None Schema file name or None if not found. """ try: metadata = ascii_handler.collect_data_from_file(file_name=file_name, yaml_document=0) except FileNotFoundError: return None return _extract_schema_url_from_metadata_dict(metadata, observatory)