Source code for simtel.simtel_table_reader

#!/usr/bin/python3
"""Read tabular data in sim_telarray format and return as astropy table."""

import logging
import re

import astropy.units as u
from astropy.table import Table

from simtools.utils import general as gen

logger = logging.getLogger(__name__)


def _data_columns(parameter_name, n_columns, n_dim):
    """
    Get column information for a given parameter.

    Individual functions are adapted to the specific format of the sim_telarray tables.

    Parameters
    ----------
    parameter_name: str
        Model parameter name.
    n_columns: int
        Number of columns in the table.
    n_dim: list
        List of columns for n-dimensional tables defined by RPOL lines.

    Returns
    -------
    list, str
        List of columns for n-dimensional tables, description.
    """
    if parameter_name == "mirror_reflectivity":
        return _data_columns_mirror_reflectivity(n_columns, n_dim)
    if parameter_name in ("discriminator_pulse_shape", "fadc_pulse_shape"):
        return _data_columns_pulse_shape(n_columns)
    try:
        return globals()[f"_data_columns_{parameter_name}"]()
    except KeyError as exc:
        raise ValueError(
            f"Unsupported parameter for sim_telarray table reading: {parameter_name}"
        ) from exc


def _data_columns_atmospheric_profile():
    """Column representation for parameter atmospheric_profile."""
    return (
        [
            {"name": "altitude", "description": "Altitude", "unit": "km"},
            {"name": "density", "description": "Density", "unit": "g/cm^3"},
            {"name": "thickness", "description": "Thickness", "unit": "g/cm^2"},
            {
                "name": "refractive_index",
                "description": "Refractive index (n-1)",
                "unit": None,
            },
            {
                "name": "temperature",
                "description": "Temperature",
                "unit": "K",
            },
            {
                "name": "pressure",
                "description": "Pressure",
                "unit": "mbar",
            },
            {
                "name": "pw/w",
                "description": "Partial pressure of water vapor",
                "unit": None,
            },
        ],
        "Atmospheric profile",
    )


def _data_columns_pm_photoelectron_spectrum():
    """Column description for parameter pm_photoelectron_spectrum."""
    return (
        [
            {"name": "amplitude", "description": "Signal amplitude", "unit": None},
            {
                "name": "response",
                "description": "response without afterpulsing component",
                "unit": None,
            },
            {
                "name": "response_with_ap",
                "description": "response including afterpulsing component",
                "unit": None,
            },
        ],
        "Photoelectron spectrum",
    )


def _data_columns_quantum_efficiency():
    """Column description for parameter quantum_efficiency."""
    return (
        [
            {"name": "wavelength", "description": "Wavelength", "unit": "nm"},
            {
                "name": "efficiency",
                "description": "Quantum efficiency",
                "unit": None,
            },
            {
                "name": "efficiency_rms",
                "description": "Quantum efficiency (standard deviation)",
                "unit": None,
            },
        ],
        "Quantum efficiency",
    )


def _data_columns_camera_filter():
    """Column description for parameter camera_filter."""
    return (
        [
            {"name": "wavelength", "description": "Wavelength", "unit": "nm"},
            {
                "name": "transmission",
                "description": "Average transmission",
                "unit": None,
            },
        ],
        "Camera window transmission",
    )


def _data_columns_lightguide_efficiency_vs_wavelength():
    """Column description for parameter lightguide_efficiency_vs_wavelength."""
    return _data_columns_lightguide_efficiency_vs_incidence_angle()


def _data_columns_lightguide_efficiency_vs_incidence_angle():
    """Column description for (parameter lightguide_efficiency_vs_incidence_angle."""
    return (
        [
            {"name": "angle", "description": "Incidence angle", "unit": "deg"},
            {
                "name": "efficiency",
                "description": "Light guide efficiency",
                "unit": None,
            },
        ],
        "Light guide efficiency vs incidence angle",
    )


def _data_columns_mirror_reflectivity(n_columns, n_dim):
    """Column description for parameter mirror_reflectivity."""
    _columns = [
        {"name": "wavelength", "description": "Wavelength", "unit": "nm"},
    ]
    if n_dim:
        for angle in n_dim:
            _columns.append(
                {
                    "name": f"reflectivity_{angle}deg",
                    "description": f"Mirror reflectivity at {angle} deg",
                    "unit": None,
                },
            )
    else:
        _columns.append(
            {
                "name": "reflectivity",
                "description": "Mirror reflectivity",
                "unit": None,
            },
        )
        if n_columns == 3:
            _columns.append(
                {
                    "name": "reflectivity_rms",
                    "description": "Mirror reflectivity (standard deviation)",
                    "unit": None,
                },
            )
        if n_columns == 4:
            _columns.append(
                {
                    "name": "reflectivity_min",
                    "description": "Mirror reflectivity (min)",
                    "unit": None,
                },
            )
            _columns.append(
                {
                    "name": "reflectivity_max",
                    "description": "Mirror reflectivity (max)",
                    "unit": None,
                },
            )

    return _columns, "Mirror reflectivity"


def _data_columns_pulse_shape(n_columns):
    """Column description for parameters discriminator_pulse_shape, fadc_pulse_shape."""
    _columns = [
        {"name": "time", "description": "Time", "unit": "ns"},
        {
            "name": "amplitude",
            "description": "Amplitude",
            "unit": None,
        },
    ]
    if n_columns == 3:
        _columns.append(
            {
                "name": "amplitude (low gain)",
                "description": "Amplitude (low gain)",
                "unit": None,
            },
        )

    return _columns, "Pulse shape"


def _data_columns_nsb_reference_spectrum():
    """Column description for parameter nsb_reference_spectrum."""
    return (
        [
            {"name": "wavelength", "description": "Wavelength", "unit": "nm"},
            {
                "name": "differential photon rate",
                "description": "Differential photon rate",
                "unit": "1.e9 / (nm s m^2 sr)",
            },
        ],
        "NSB reference spectrum",
    )


[docs] def read_simtel_table(parameter_name, file_path): """ Read sim_telarray table file for a given parameter. Parameters ---------- parameter_name: str Model parameter name. file_path: Path Name (full path) of the sim_telarray table file. Returns ------- Table Astropy table. """ if parameter_name == "atmospheric_transmission": return _read_simtel_data_for_atmospheric_transmission(file_path) rows, meta_from_simtel, n_columns, n_dim = _read_simtel_data(file_path) columns_info, description = _data_columns(parameter_name, n_columns, n_dim) rows = _adjust_columns_length(rows, len(columns_info)) metadata = { "Name": parameter_name, "File": str(file_path), "Description:": description, "Context_from_sim_telarray": meta_from_simtel, } table = Table(rows=rows, names=[col["name"] for col in columns_info]) for col, info in zip(table.colnames, columns_info): table[col].unit = info.get("unit") table[col].description = info.get("description") table.meta.update(metadata) return table
def _adjust_columns_length(rows, n_columns): """ Adjust row lengths to match the specified column count. - Truncate rows with extra columns beyond the specified count 'n_columns'. - Pad shorter rows with zeros. """ return [row[:n_columns] + [0.0] * max(0, n_columns - len(row)) for row in rows] def _read_simtel_data(file_path): """ Read data, comments, and (if available) axis definition from sim_telarray table. Parameters ---------- file_path: Path Path to the sim_telarray table file. Returns ------- str, str, int, str data, metadata (comments), number of columns (max value), n-dimensional axis description. """ logger.debug(f"Reading sim_telarray table from {file_path}") meta_lines = [] data_lines = [] n_dim_axis = None r_pol_axis = None lines = gen.read_file_encoded_in_utf_or_latin(file_path) for line in lines: stripped = line.strip() if "@RPOL@" in stripped: # RPOL description for N-dimensional tables match = re.search(r"#@RPOL@\[(\w+)=\]", stripped) if match: r_pol_axis = match.group(1) elif r_pol_axis and r_pol_axis in stripped: # N-dimensional axis description n_dim_axis = stripped.split("=")[1].split() elif stripped.startswith("#"): # Metadata meta_lines.append(stripped.lstrip("#").strip()) elif stripped: # Data data_lines.append(stripped.split("%%%")[0].split("#")[0].strip()) # Remove comments rows = [[float(part) for part in line.split()] for line in data_lines] n_columns = max(len(row) for row in rows) if rows else 0 return rows, "\n".join(meta_lines), n_columns, n_dim_axis def _read_simtel_data_for_atmospheric_transmission(file_path): """ Read data and comments from sim_telarray table for atmospheric_transmission. Parameters ---------- file_path: Path Path to the sim_telarray table file. Returns ------- astropy table Table with atmospheric transmission. """ lines = lines = gen.read_file_encoded_in_utf_or_latin(file_path) observatory_level, height_bins = _read_header_line_for_atmospheric_transmission( lines, file_path ) wavelengths = [] heights = [] extinctions = [] meta_lines = [] for line in lines: if line.startswith("#") or not line.strip(): meta_lines.append(line.lstrip("#").strip()) continue parts = line.split() try: wl = float(parts[0]) for i, height in enumerate(height_bins): extinction_value = float(parts[i + 1]) if extinction_value == 99999.0: continue wavelengths.append(wl) heights.append(height) extinctions.append(extinction_value) except (ValueError, IndexError): logger.debug(f"Skipping malformed line: {line.strip()}") table = Table() table["wavelength"] = wavelengths table["altitude"] = heights table["extinction"] = extinctions table.meta.update( { "Name": "atmospheric_transmission", "File": str(file_path), "Description": "Atmospheric transmission", "Context_from_sim_telarray": "\n".join(meta_lines), "observatory_level": observatory_level, } ) return table def _read_header_line_for_atmospheric_transmission(lines, file_path): """Reader observatory level and height bins from header line for atmospheric transmission.""" header_line = None observatory_level = None for line in lines: if "H2=" in line and "H1=" in line: match_h2 = re.search(r"H2=\s*([\d.]+)", line) if match_h2: observatory_level = float(match_h2.group(1)) * u.km if "H1=" in line: header_line = line.split("H1=")[-1].strip() break if header_line is None: raise ValueError(f"Header with 'H1=' not found file {file_path}") height_bins = [float(x.replace(",", "")) for x in header_line.split()] return observatory_level, height_bins