Source code for data_model.metadata_collector
"""
Metadata collector for simtools.
This should be the only module in simtools with knowledge on the
implementation of the observatory metadata model.
"""
import datetime
import getpass
import logging
import uuid
from importlib.resources import files
from pathlib import Path
import simtools.constants
import simtools.utils.general as gen
import simtools.version
from simtools.data_model import metadata_model
from simtools.io_operations import io_handler
from simtools.utils import names
__all__ = ["MetadataCollector"]
[docs]
class MetadataCollector:
"""
Collects metadata to describe the current simtools activity and its data products.
Collect metadata from command line configuration, input data, environment,
and schema descriptions. Depends on the CTAO top-level metadata definition.
Two dictionaries store two different types of metadata:
- top_level_meta: metadata for the current activity
- input_metadata: metadata from input data
Parameters
----------
args_dict: dict
Command line parameters
metadata_file_name: str
Name of metadata file (only required when args_dict is None)
data_model_name: str
Name of data model parameter
observatory: str
Name of observatory (default: "cta")
clean_meta: bool
Clean metadata from None values and empty lists (default: True)
"""
def __init__(
self,
args_dict,
metadata_file_name=None,
data_model_name=None,
observatory="cta",
clean_meta=True,
):
"""Initialize metadata collector."""
self._logger = logging.getLogger(__name__)
self.observatory = observatory
self.io_handler = io_handler.IOHandler()
self.args_dict = args_dict if args_dict else {}
self.data_model_name = data_model_name
self.schema_file = None
self.schema_dict = None
self.top_level_meta = gen.change_dict_keys_case(
data_dict=metadata_model.get_default_metadata_dict(), lower_case=True
)
self.input_metadata = self._read_input_metadata_from_file(
metadata_file_name=metadata_file_name
)
self.collect_meta_data()
if clean_meta:
self.top_level_meta = self.clean_meta_data(self.top_level_meta)
[docs]
def collect_meta_data(self):
"""Collect and verify product metadata for each main-level metadata type."""
meta_types = self.top_level_meta[self.observatory].keys()
for meta_type in meta_types:
try:
fill_method = getattr(self, f"_fill_{meta_type}_meta")
fill_method(self.top_level_meta[self.observatory][meta_type])
except AttributeError:
self._logger.debug(f"Method _fill_{meta_type}_meta not implemented")
[docs]
def get_top_level_metadata(self):
"""
Return top level metadata dictionary (with updated activity end time).
Returns
-------
dict
Top level metadata dictionary.
"""
try:
self.top_level_meta[self.observatory]["activity"][
"end"
] = datetime.datetime.now().isoformat(timespec="seconds")
except KeyError:
pass
return self.top_level_meta
[docs]
def get_data_model_schema_file_name(self):
"""
Return data model schema file name.
The schema file name is taken (in this order) from the command line,
from the metadata file, from the data model name, or from the input
metadata file.
Returns
-------
str
Name of schema file.
"""
# from command line
if self.args_dict.get("schema"):
self._logger.debug(f"Schema file from command line: {self.args_dict['schema']}")
return self.args_dict["schema"]
# from metadata
try:
url = self.top_level_meta[self.observatory]["product"]["data"]["model"]["url"]
if url:
self._logger.debug(f"Schema file from product metadata: {url}")
return url
except KeyError:
pass
# from data model name
if self.data_model_name:
self._logger.debug(f"Schema file from data model name: {self.data_model_name}")
return f"{files('simtools')}/schemas/model_parameters/{self.data_model_name}.schema.yml"
# from input metadata
try:
url = self.input_metadata[self.observatory]["product"]["data"]["model"]["url"]
self._logger.debug(f"Schema file from input metadata: {url}")
return url
except KeyError:
pass
self._logger.warning("No schema file found.")
return None
[docs]
def get_data_model_schema_dict(self):
"""
Return data model schema dictionary.
Returns
-------
dict
Data model schema dictionary.
"""
try:
return gen.collect_data_from_file(file_name=self.schema_file)
except TypeError:
self._logger.debug(f"No valid schema file provided ({self.schema_file}).")
return {}
[docs]
def get_site(self, from_input_meta=False):
"""
Get site entry from metadata. Allow to get from collected or from input metadata.
Parameters
----------
from_input_meta: bool
Get site from input metadata (default: False)
Returns
-------
str
Site name
"""
try:
_site = (
self.top_level_meta[self.observatory]["instrument"]["site"]
if not from_input_meta
else self.input_metadata[self.observatory]["instrument"]["site"]
)
if _site is not None:
return names.validate_site_name(_site)
except KeyError:
pass
return None
def _fill_contact_meta(self, contact_dict):
"""
Fill contact metadata fields. Get user name from system level if not given.
Parameters
----------
contact_dict: dict
Dictionary for contact metadata fields.
"""
if contact_dict.get("name", None) is None:
contact_dict["name"] = getpass.getuser()
def _fill_context_meta(self, context_dict):
"""
Fill context metadata fields with product metadata from input data.
Parameters
----------
context_dict: dict
Dictionary for context metadata fields.
"""
try: # wide try..except as for some cases we expect that there is no product metadata
reduced_product_meta = {
key: value
for key, value in self.input_metadata[self.observatory]["product"].items()
if key in {"description", "id", "creation_time", "valid", "format", "filename"}
}
self._fill_context_sim_list(context_dict["associated_data"], reduced_product_meta)
except (KeyError, TypeError):
self._logger.debug("No input product metadata appended to associated data.")
def _read_input_metadata_from_file(self, metadata_file_name=None):
"""
Read and validate input metadata from file.
In case of an ecsv file including a table, the metadata is read from the table meta data.
Returns empty dict in case no file is given.
Parameter
---------
metadata_file_name: str or Path
Name of metadata file.
Returns
-------
dict
Metadata dictionary.
Raises
------
gen.InvalidConfigDataError, FileNotFoundError
if metadata cannot be read from file.
KeyError:
if metadata does not exist
"""
metadata_file_name = (
self.args_dict.get("input_meta", None) or self.args_dict.get("input", None)
if metadata_file_name is None
else metadata_file_name
)
if metadata_file_name is None:
self._logger.debug("No input metadata file defined.")
return {}
self._logger.debug("Reading meta data from %s", metadata_file_name)
if Path(metadata_file_name).suffix in (".yaml", ".yml", ".json"):
_input_metadata = self._read_input_metadata_from_yml_or_json(metadata_file_name)
elif Path(metadata_file_name).suffix == ".ecsv":
_input_metadata = self._read_input_metadata_from_ecsv(metadata_file_name)
else:
self._logger.error("Unknown metadata file format: %s", metadata_file_name)
raise gen.InvalidConfigDataError
metadata_model.validate_schema(_input_metadata, None)
return gen.change_dict_keys_case(
self._process_metadata_from_file(_input_metadata),
lower_case=True,
)
def _read_input_metadata_from_ecsv(self, metadata_file_name):
"""Read input metadata from ecsv file."""
from astropy.table import Table # pylint: disable=C0415
try:
return {
self.observatory.upper(): Table.read(metadata_file_name).meta[
self.observatory.upper()
]
}
except (FileNotFoundError, KeyError, AttributeError) as exc:
self._logger.error(
"Failed reading metadata for %s from %s", self.observatory, metadata_file_name
)
raise exc
def _read_input_metadata_from_yml_or_json(self, metadata_file_name):
"""Read input metadata from yml or json file."""
try:
_input_metadata = gen.collect_data_from_file(file_name=metadata_file_name)
_json_type_metadata = {"Metadata", "metadata", "METADATA"}.intersection(_input_metadata)
if len(_json_type_metadata) == 1:
_input_metadata = _input_metadata[_json_type_metadata.pop()]
if len(_json_type_metadata) > 1:
self._logger.error("More than one metadata entry found in %s", metadata_file_name)
raise gen.InvalidConfigDataError
except (gen.InvalidConfigDataError, FileNotFoundError) as exc:
self._logger.error("Failed reading metadata from %s", metadata_file_name)
raise exc
return _input_metadata
def _fill_product_meta(self, product_dict):
"""
Fill metadata for data products fields.
If a schema file is given for the data products, try and read product:data:model metadata
from there.
Parameters
----------
product_dict: dict
Dictionary describing data product.
Raises
------
KeyError
if relevant fields are not defined in top level metadata dictionary.
"""
self.schema_file = self.get_data_model_schema_file_name()
self.schema_dict = self.get_data_model_schema_dict()
product_dict["id"] = str(uuid.uuid4())
product_dict["creation_time"] = datetime.datetime.now().isoformat(timespec="seconds")
product_dict["description"] = self.schema_dict.get("description", None)
# DATA:CATEGORY
product_dict["data"]["category"] = "SIM"
product_dict["data"]["level"] = "R1"
product_dict["data"]["type"] = "Service"
try:
product_dict["data"]["association"] = self.schema_dict["instrument"]["class"]
except KeyError:
pass
# DATA:MODEL
helper_dict = {"name": "name", "version": "version", "type": "meta_schema"}
for key, value in helper_dict.items():
product_dict["data"]["model"][key] = self.schema_dict.get(value, None)
product_dict["data"]["model"]["url"] = self.schema_file
product_dict["format"] = self.args_dict.get("output_file_format", None)
product_dict["filename"] = str(self.args_dict.get("output_file", None))
def _fill_instrument_meta(self, instrument_dict):
"""
Fill instrument metadata fields.
Note inconsistency in command line arguments for 'ID',
which is either 'instrument' or 'telescope'.
Parameters
----------
instrument_dict: dict
Dictionary for instrument metadata fields.
"""
instrument_dict["site"] = self.args_dict.get("site", None)
instrument_dict["ID"] = self.args_dict.get("instrument") or self.args_dict.get(
"telescope", None
)
if instrument_dict["ID"]:
instrument_dict["class"] = names.get_collection_name_from_array_element_name(
instrument_dict["ID"]
)
def _fill_process_meta(self, process_dict):
"""
Fill process fields in metadata.
Parameters
----------
process_dict: dict
Dictionary for process metadata fields.
"""
process_dict["type"] = "simulation"
def _fill_activity_meta(self, activity_dict):
"""
Fill activity (software) related metadata.
Parameters
----------
activity_dict: dict
Dictionary for top-level activity metadata.
"""
activity_dict["name"] = self.args_dict.get("label", None)
activity_dict["type"] = "software"
activity_dict["id"] = self.args_dict.get("activity_id", "UNDEFINED_ACTIVITY_ID")
activity_dict["start"] = datetime.datetime.now().isoformat(timespec="seconds")
activity_dict["end"] = activity_dict["start"]
activity_dict["software"]["name"] = "simtools"
activity_dict["software"]["version"] = simtools.version.__version__
def _merge_config_dicts(self, dict_high, dict_low, add_new_fields=False):
"""
Merge two config dicts and replace values in dict_high which are Nonetype.
Priority to dict_high in case of conflicting entries.
Parameters
----------
dict_high: dict
Dictionary into which values are merged.
dict_low: dict
Dictionary from which values are taken for merging.
add_new_fields: bool
If true: add fields from dict_low to dict_high, if they don't exist in dict_high
"""
try:
for k in dict_low:
if k in dict_high:
if isinstance(dict_low[k], dict):
self._merge_config_dicts(dict_high[k], dict_low[k], add_new_fields)
elif dict_high[k] is None:
dict_high[k] = dict_low[k]
elif dict_high[k] != dict_low[k] and dict_low[k] is not None:
self._logger.debug(
f"Conflicting entries between dict: {dict_high[k]} vs {dict_low[k]} "
f"(use {dict_high[k]})"
)
elif add_new_fields:
dict_high[k] = dict_low[k]
except TypeError as exc:
raise TypeError("Error merging dictionaries") from exc
def _fill_context_sim_list(self, meta_list, new_entry_dict):
"""
Fill list-type entries into metadata.
Take into account the first list entry is the default value filled with Nones.
Parameters
----------
meta_list: list
List of metadata entries.
new_entry_dict: dict
New metadata entry to be added to meta_list.
Returns
-------
list
Updated meta list.
"""
if len(new_entry_dict) == 0:
return []
try:
if self._all_values_none(meta_list[0]):
meta_list[0] = new_entry_dict
else:
meta_list.append(new_entry_dict)
except (TypeError, IndexError):
meta_list = [new_entry_dict]
return meta_list
def _process_metadata_from_file(self, meta_dict):
"""
Process metadata from file to ensure compatibility with metadata model.
Changes keys to lower case and removes line feeds from description fields.
Parameters
----------
meta_dict: dict
Input metadata dictionary.
Returns
-------
dict
Metadata dictionary.
"""
meta_dict = gen.change_dict_keys_case(meta_dict, True)
try:
meta_dict[self.observatory]["product"]["description"] = self._remove_line_feed(
meta_dict[self.observatory]["product"]["description"]
)
except (KeyError, AttributeError):
pass
return meta_dict
@staticmethod
def _remove_line_feed(string):
"""
Remove all line feeds from a string.
Parameters
----------
str
input string
Returns
-------
str
with line feeds removed
"""
return string.replace("\n", " ").replace("\r", "").replace(" ", " ")
def _copy_list_type_metadata(self, context_dict, _input_metadata, key):
"""
Copy list-type metadata from file.
Very fine tuned.
Parameters
----------
context_dict: dict
Dictionary for top level metadata (context level)
_input_metadata: dict
Dictionary for metadata from file.
key: str
Key for metadata entry.
"""
try:
for document in _input_metadata["context"][key]:
self._fill_context_sim_list(context_dict[key], document)
except KeyError:
pass
def _all_values_none(self, input_dict):
"""
Check recursively if all values in a dictionary are None.
Parameters
----------
input_dict: dict
Input dictionary.
Returns
-------
bool
True if all entries are None.
"""
if not isinstance(input_dict, dict):
return input_dict is None
return all(self._all_values_none(value) for value in input_dict.values())
[docs]
def clean_meta_data(self, meta_dict):
"""
Clean metadata dictionary from None values and empty lists.
Parameters
----------
meta_dict: dict
Metadata dictionary.
"""
def clean_list(value):
nested_list = [
self.clean_meta_data(item) if isinstance(item, dict) else item for item in value
]
return [item for item in nested_list if item not in (None, "", [], {})]
cleaned = {}
for key, value in meta_dict.items():
if value in (None, []):
continue
if isinstance(value, dict):
nested = self.clean_meta_data(value)
if nested: # Only add if not empty
cleaned[key] = nested
elif isinstance(value, list):
nested_list = clean_list(value)
if nested_list: # Only add if not empty
cleaned[key] = nested_list
else:
cleaned[key] = value
return cleaned