Source code for data_model.metadata_model

"""
Definition of metadata model for input to and output of simtools.

Follows CTAO top-level data model definition.

* data products submitted to SimPipe ('input')
* data products generated by SimPipe ('output')

"""

import logging

import simtools.data_model.schema

_logger = logging.getLogger(__name__)


[docs] def get_default_metadata_dict(schema_file=None, observatory="CTA"): """ Return metadata schema with default values. Follows the CTA Top-Level Data Model. Parameters ---------- schema_file: str Schema file (jsonschema format) used for validation observatory: str Observatory name Returns ------- dict Reference schema dictionary. """ schema = simtools.data_model.schema.load_schema(schema_file) return _fill_defaults(schema["definitions"], observatory)
def _resolve_references(yaml_data, observatory="CTA"): """ Resolve references in yaml data and expand the received dictionary accordingly. Parameters ---------- yaml_data: dict Dictionary with yaml data. observatory: str Observatory name Returns ------- dict Dictionary with resolved references. """ def expand_ref(ref): ref_path = ref.lstrip("#/") parts = ref_path.split("/") ref_data = yaml_data for part in parts: if part in ("definitions", observatory): continue ref_data = ref_data.get(part, {}) return ref_data def resolve_dict(data): if "$ref" in data: ref = data["$ref"] resolved_data = expand_ref(ref) if isinstance(resolved_data, dict) and len(resolved_data) > 1: return _resolve_references_recursive(resolved_data) return resolved_data return {k: _resolve_references_recursive(v) for k, v in data.items()} def resolve_list(data): return [_resolve_references_recursive(item) for item in data] def _resolve_references_recursive(data): if isinstance(data, dict): return resolve_dict(data) if isinstance(data, list): return resolve_list(data) return data return _resolve_references_recursive(yaml_data) def _fill_defaults(schema, observatory="CTA"): """ Fill default values from json schema. Parameters ---------- schema: dict Schema describing the input data. observatory: str Observatory name Returns ------- dict Dictionary with default values. """ defaults = {observatory: {}} resolved_schema = _resolve_references(schema[observatory]) _fill_defaults_recursive(resolved_schema, defaults[observatory]) return defaults def _fill_defaults_recursive(sub_schema, current_dict): """ Recursively fill default values from the sub_schema into the current dictionary. Parameters ---------- sub_schema: dict Sub schema describing part of the input data. current_dict: dict Current dictionary to fill with default values. """ if "properties" not in sub_schema: _raise_missing_properties_error() for prop, prop_schema in sub_schema["properties"].items(): _process_property(prop, prop_schema, current_dict) def _process_property(prop, prop_schema, current_dict): """ Process each property and fill the default values accordingly. Parameters ---------- prop: str Property name. prop_schema: dict Schema of the property. current_dict: dict Current dictionary to fill with default values. """ if "default" in prop_schema: current_dict[prop] = prop_schema["default"] elif "type" in prop_schema: if prop_schema["type"] == "object": current_dict[prop] = {} _fill_defaults_recursive(prop_schema, current_dict[prop]) elif prop_schema["type"] == "array": current_dict[prop] = [{}] if "items" in prop_schema and isinstance(prop_schema["items"], dict): _fill_defaults_recursive(prop_schema["items"], current_dict[prop][0]) def _raise_missing_properties_error(): """Raise an error when the 'properties' key is missing in the schema.""" msg = "Missing 'properties' key in schema." _logger.error(msg) raise KeyError(msg)