Source code for visualization.plot_tables

#!/usr/bin/python3
"""Plot tabular data."""

import logging
from pathlib import Path

import numpy as np
from astropy.table import Table

import simtools.utils.general as gen
from simtools.constants import SCHEMA_PATH
from simtools.db import db_handler
from simtools.io_operations import legacy_data_handler
from simtools.visualization import visualize


[docs] def plot(config, output_file, db_config=None): """ Plot tabular data from data or from model parameter files. Parameters ---------- config: dict Configuration dictionary for plotting. output_file: str Output file. db_config: dict, optional Database configuration dictionary for accessing the model parameter database. """ data = read_table_data(config, db_config) fig = visualize.plot_1d( data, **config, ) visualize.save_figure(fig, output_file) return output_file
[docs] def read_table_data(config, db_config): """ Read table data from file or parameter database. Parameters ---------- config: dict Configuration dictionary for plotting. Returns ------- Dict Dict with table data (astropy tables). """ data = {} for _config in config["tables"]: if "parameter" in _config: table = _read_table_from_model_database(_config, db_config) elif "file_name" in _config: if "legacy" in _config.get("type", ""): table = legacy_data_handler.read_legacy_data_as_table( _config["file_name"], _config["type"] ) else: table = Table.read(_config["file_name"], format="ascii.ecsv") else: raise ValueError("No table data defined in configuration.") if _config.get("normalize_y"): table[_config["column_y"]] = ( table[_config["column_y"]] / table[_config["column_y"]].max() ) if _config.get("select_values"): table = _select_values_from_table( table, _config["select_values"]["column_name"], _config["select_values"]["value"], ) label = _config.get("label", f"{_config.get('column_x')} vs {_config.get('column_y')}") data[label] = gen.get_structure_array_from_table( table, [ _config["column_x"], _config["column_y"], _config.get("column_x_err"), _config.get("column_y_err"), ], ) return data
def _read_table_from_model_database(table_config, db_config): """ Read table data from model parameter database. Parameters ---------- table_config: dict Configuration dictionary for table data. Returns ------- Table Astropy table """ db = db_handler.DatabaseHandler(mongo_db_config=db_config) return db.export_model_file( parameter=table_config["parameter"], site=table_config["site"], array_element_name=table_config.get("telescope"), parameter_version=table_config.get("parameter_version"), model_version=table_config.get("model_version"), export_file_as_table=True, ) def _select_values_from_table(table, column_name, value): """Return a table with only the rows where column_name == value.""" return table[np.isclose(table[column_name], value)] def _filter_config_by_plot_type(config, plot_type): """Filter a configuration based on plot type.""" if plot_type != "all" and config.get("type") != plot_type: return False return True def _validate_config_columns(config, valid_columns, logger): """Validate that all required columns in a config exist and have valid data.""" for table_config in config.get("tables", []): required_cols = [table_config.get("column_x"), table_config.get("column_y")] if not all(col in valid_columns for col in required_cols if col): missing_cols = [col for col in required_cols if col not in valid_columns] logger.info( f"Skipping plot config {config.get('type')}: " f"Missing valid data in columns: {missing_cols}" ) return False return True def _get_valid_columns(table): """Return columns that exist and have valid data (not all NaN).""" valid_columns = [] for col in table.colnames: if not all(np.isnan(table[col])): valid_columns.append(col) return valid_columns
[docs] def generate_plot_configurations( parameter, parameter_version, site, telescope, output_path, plot_type, db_config ): """ Generate plot configurations for a model parameter from schema files. Parameters ---------- parameter: str Model parameter name. parameter_version: str Parameter version. site: str Site name. telescope: str Telescope name. output_path: str or Path Output path for the plots. plot_type: str Plot type or "all" for all plots. db_config: dict Database configuration. Returns ------- tuple Tuple containing a list of plot configurations and a list of output file names. Return None, if no plot configurations are found. """ logger = logging.getLogger(__name__) # Get schema configuration schema = gen.change_dict_keys_case( gen.collect_data_from_file( file_name=SCHEMA_PATH / "model_parameters" / f"{parameter}.schema.yml" ) ) configs = schema.get("plot_configuration") if not configs: return None # Get data table and determine valid columns table = _read_table_from_model_database( { "parameter": parameter, "site": site, "telescope": telescope, "parameter_version": parameter_version, }, db_config=db_config, ) valid_columns = _get_valid_columns(table) # Filter configs based on plot type and column validity valid_configs = [] for config in configs: if not _filter_config_by_plot_type(config, plot_type): continue if _validate_config_columns(config, valid_columns, logger): valid_configs.append(config) if not valid_configs: if plot_type != "all": logger.warning("No valid plot config found.") return None # Generate output files output_files = [] for _config in valid_configs: for _table in _config.get("tables", []): _table["parameter_version"] = parameter_version _table["site"] = site _table["telescope"] = telescope output_files.append( _generate_output_file_name( parameter=parameter, parameter_version=parameter_version, site=site, telescope=telescope, plot_type=_config.get("type"), output_path=output_path, ) ) return valid_configs, output_files
def _generate_output_file_name( parameter, parameter_version, site, telescope, plot_type, output_path=None, file_extension=".pdf", ): """Generate output file name based on table file and appendix.""" parts = [parameter, parameter_version, site] if telescope: parts.append(telescope) if plot_type != parameter: parts.append(plot_type) filename = "_".join(parts) + file_extension return Path(output_path) / filename if output_path else Path(filename)