Source code for visualization.simtel_event_plots

#!/usr/bin/python3
"""Plots for light emission (flasher/calibration) sim_telarray events."""

import logging

import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
from ctapipe.calib import CameraCalibrator
from ctapipe.io import EventSource
from ctapipe.visualization import CameraDisplay
from scipy import signal as _signal

__all__ = [
    "plot_simtel_event_image",
    "plot_simtel_integrated_pedestal_image",
    "plot_simtel_integrated_signal_image",
    "plot_simtel_peak_timing",
    "plot_simtel_step_traces",
    "plot_simtel_time_traces",
    "plot_simtel_waveform_matrix",
]

_logger = logging.getLogger(__name__)

# Reusable literal constants (duplicated from visualize to avoid circular deps)
AXES_FRACTION = "axes fraction"
NO_R1_WAVEFORMS_MSG = "No R1 waveforms available in event"
TIME_NS_LABEL = "time [ns]"
R1_SAMPLES_LABEL = "R1 samples [d.c.]"


def _compute_integration_window(
    peak_idx: int, n_samp: int, half_width: int, mode: str, offset: int | None
) -> tuple[int, int]:
    """Return [a, b) window bounds for integration for signal/pedestal modes."""
    hw = int(half_width)
    win_len = 2 * hw + 1
    if mode == "signal":
        a = max(0, peak_idx - hw)
        b = min(n_samp, peak_idx + hw + 1)
        return a, b

    g = int(offset) if offset is not None else 16
    start = peak_idx + g
    if start + win_len <= n_samp:
        return start, start + win_len
    start = max(0, peak_idx - g - win_len)
    a = start
    b = min(n_samp, start + win_len)
    if a >= b:
        return 0, min(n_samp, win_len)
    return a, b


def _format_integrated_title(
    tel_label: str, et_name: str, half_width: int, mode: str, offset: int | None
) -> str:
    win_len = 2 * int(half_width) + 1
    if mode == "signal":
        return f"{tel_label} integrated signal (win {win_len}) ({et_name})"
    g = int(offset) if offset is not None else 16
    return f"{tel_label} integrated pedestal (win {win_len}, offset {g}) ({et_name})"


def _select_event_by_type(source):
    """
    Build an event selector for a ctapipe EventSource.

    Parameters
    ----------
    source : ctapipe.io.EventSource
        Iterable event source.

    Returns
    -------
    callable
        A function ``select_event(event_index: int | None) -> Any`` that returns
        the first event (when ``event_index`` is None) or the event at the given
        index. Returns ``None`` if no event is available or the index is out of range.
    """

    def select_event(event_index=None):
        if event_index is None:
            for ev in source:
                return ev
        else:
            for idx, ev in enumerate(source):
                if idx == event_index:
                    return ev
        _logger.warning("No events available from source or event_index out of range")
        return None

    return select_event


def _time_axis_from_readout(readout, n_samp):
    """
    Compute time axis in nanoseconds from a camera readout.

    Parameters
    ----------
    readout : Any
        Camera readout providing ``sampling_rate`` as an astropy Quantity.
    n_samp : int
        Number of samples per trace.

    Returns
    -------
    numpy.ndarray
        Array of shape ``(n_samp,)`` with time in nanoseconds.
    """
    try:
        dt = (1 / readout.sampling_rate).to(u.ns).value
    except (AttributeError, TypeError):
        dt = 1.0
    if not np.isfinite(dt) or dt <= 0:
        dt = 1.0
    return np.arange(int(n_samp)) * float(dt)


[docs] def plot_simtel_event_image(filename, distance=None, event_index=None): """ Read a sim_telarray file and plot the DL1 image for one event via ctapipe. Parameters ---------- filename : str | pathlib.Path Path to the ``.simtel`` file. distance : astropy.units.Quantity | float | None, optional Distance to annotate in the plot. If a Quantity, interpreted in meters. If not provided, no unit conversion is attempted. event_index : int | None, optional Zero-based index of the event to plot. If None, the first event is used. Returns ------- matplotlib.figure.Figure | None The created figure, or ``None`` if no suitable event/image is available. """ source = EventSource(filename, max_events=None) event = _select_event_by_type(source)(event_index=event_index) if not event: _logger.warning("No event found in the file.") return None tel_ids = sorted(getattr(event.r1, "tel", {}).keys()) if not tel_ids: _logger.warning("First event has no R1 telescope data") return None tel_id = tel_ids[0] calib = CameraCalibrator(subarray=source.subarray) calib(event) geometry = source.subarray.tel[tel_id].camera.geometry image = event.dl1.tel[tel_id].image fig, ax = plt.subplots(dpi=300) tel = source.subarray.tel[tel_id] tel_label = getattr(tel, "name", f"CT{tel_id}") title = f"{tel_label}, run {event.index.obs_id} event {event.index.event_id}" disp = CameraDisplay(geometry, image=image, norm="symlog", ax=ax) disp.cmap = "RdBu_r" disp.add_colorbar(fraction=0.02, pad=-0.1) disp.set_limits_percent(100) ax.set_title(title, pad=20) try: d_str = f"{distance.to(u.m)}" except (AttributeError, TypeError, ValueError): d_str = str(distance) ax.annotate( f"tel type: {source.subarray.tel[tel_id].type.name}\n" f"optics: {source.subarray.tel[tel_id].optics.name}\n" f"camera: {source.subarray.tel[tel_id].camera_name}\n" f"distance: {d_str}", xy=(0, 0), xytext=(0.1, 1), xycoords=AXES_FRACTION, va="top", size=7, ) ax.annotate( f"dl1 image,\ntotal ADC counts: {np.round(np.sum(image))}\n", xy=(0, 0), xytext=(0.75, 1), xycoords=AXES_FRACTION, va="top", ha="left", size=7, ) ax.set_axis_off() fig.tight_layout() return fig
[docs] def plot_simtel_time_traces( filename, tel_id: int | None = None, n_pixels: int = 3, event_index: int | None = None, ): """ Plot R1 time traces for a few pixels of one event. Parameters ---------- filename : str | pathlib.Path Path to the ``.simtel`` file. tel_id : int | None, optional Telescope ID to use. If None, the first telescope with R1 data is chosen. n_pixels : int, optional Number of pixels with highest signal to plot. Default is 3. event_index : int | None, optional Zero-based index of the event to plot. If None, the first event is used. Returns ------- matplotlib.figure.Figure | None The created figure, or ``None`` if R1 waveforms are unavailable. """ source = EventSource(filename, max_events=None) event = _select_event_by_type(source)(event_index=event_index) r1_tel_ids = sorted(getattr(event.r1, "tel", {}).keys()) if r1_tel_ids: tel_id = tel_id or r1_tel_ids[0] else: dl1_tel_ids = sorted(getattr(event.dl1, "tel", {}).keys()) tel_id = tel_id or dl1_tel_ids[0] calib = CameraCalibrator(subarray=source.subarray) try: calib(event) image = event.dl1.tel[tel_id].image except (RuntimeError, ValueError, KeyError, AttributeError): image = None waveforms = getattr(event.r1.tel.get(tel_id, None), "waveform", None) if waveforms is None: _logger.warning(NO_R1_WAVEFORMS_MSG) return None w = np.asarray(waveforms) if w.ndim == 3: w = w[0] _, n_samp = w.shape if image is not None: pix_ids = np.argsort(image)[-n_pixels:][::-1] else: integrals = w.sum(axis=1) pix_ids = np.argsort(integrals)[-n_pixels:][::-1] readout = source.subarray.tel[tel_id].camera.readout t = _time_axis_from_readout(readout, n_samp) fig, ax = plt.subplots(dpi=300) for pid in pix_ids: ax.plot(t, w[pid], label=f"pix {int(pid)}", drawstyle="steps-mid") ax.set_xlabel(TIME_NS_LABEL) ax.set_ylabel(R1_SAMPLES_LABEL) et_name = getattr(getattr(event.trigger, "event_type", None), "name", "?") tel = source.subarray.tel[tel_id] tel_label = getattr(tel, "name", f"CT{tel_id}") ax.set_title(f"{tel_label} waveforms ({et_name})") ax.legend(loc="best", fontsize=7) fig.tight_layout() return fig
[docs] def plot_simtel_waveform_matrix( filename, tel_id: int | None = None, vmax: float | None = None, event_index: int | None = None, pixel_step: int | None = None, ): """ Create a pseudocolor image of R1 waveforms (sample index vs. pixel id). Parameters ---------- filename : str | pathlib.Path Path to the ``.simtel`` file. tel_id : int | None, optional Telescope ID to use. If None, the first telescope with R1 data is chosen. vmax : float | None, optional Upper limit for color normalization. If None, determined automatically. event_index : int | None, optional Zero-based index of the event to plot. If None, the first event is used. pixel_step : int | None, optional Step between plotted pixel ids (e.g., 1 plots all, 2 plots every second pixel). Returns ------- matplotlib.figure.Figure | None The created figure, or ``None`` if R1 waveforms are unavailable. """ source = EventSource(filename, max_events=None) event = _select_event_by_type(source)(event_index=event_index) r1_tel_ids = sorted(getattr(event.r1, "tel", {}).keys()) if r1_tel_ids: tel_id = tel_id or r1_tel_ids[0] else: _logger.warning("Event has no R1 data for waveform plot") return None waveforms = getattr(event.r1.tel.get(tel_id, None), "waveform", None) if waveforms is None: _logger.warning(NO_R1_WAVEFORMS_MSG) return None w = np.asarray(waveforms) if w.ndim == 3: w = w[0] n_pix, n_samp = w.shape step = max(1, int(pixel_step)) if pixel_step is not None else 1 pix_idx = np.arange(n_pix)[::step] w_sel = w[pix_idx] readout = source.subarray.tel[tel_id].camera.readout t = _time_axis_from_readout(readout, n_samp) fig, ax = plt.subplots(dpi=300) mesh = ax.pcolormesh(t, pix_idx, w_sel, shading="auto", vmax=vmax) cbar = fig.colorbar(mesh, ax=ax) cbar.set_label(R1_SAMPLES_LABEL) et_name = getattr(getattr(event.trigger, "event_type", None), "name", "?") tel = source.subarray.tel[tel_id] tel_label = getattr(tel, "name", f"CT{tel_id}") ax.set_title(f"{tel_label} waveform matrix ({et_name})") ax.set_xlabel(TIME_NS_LABEL) ax.set_ylabel("pixel id") fig.tight_layout() return fig
[docs] def plot_simtel_step_traces( filename, tel_id: int | None = None, pixel_step: int = 100, max_pixels: int | None = None, event_index: int | None = None, ): """ Plot step-style R1 traces for regularly sampled pixels (0, N, 2N, ...). Parameters ---------- filename : str | pathlib.Path Path to the ``.simtel`` file. tel_id : int | None, optional Telescope ID to use. If None, the first telescope with R1 data is chosen. pixel_step : int, optional Interval between pixel indices to plot. Default is 100. max_pixels : int | None, optional Maximum number of pixels to plot. If None, plot all selected by ``pixel_step``. event_index : int | None, optional Zero-based index of the event to plot. If None, the first event is used. Returns ------- matplotlib.figure.Figure | None The created figure, or ``None`` if R1 waveforms are unavailable. """ source = EventSource(filename, max_events=None) event = _select_event_by_type(source)(event_index=event_index) r1_tel_ids = sorted(getattr(event.r1, "tel", {}).keys()) if r1_tel_ids: tel_id = tel_id or r1_tel_ids[0] else: _logger.warning("Event has no R1 data for traces plot") return None waveforms = getattr(event.r1.tel.get(tel_id, None), "waveform", None) if waveforms is None: _logger.warning(NO_R1_WAVEFORMS_MSG) return None w = np.asarray(waveforms) if w.ndim == 3: w = w[0] n_pix, n_samp = w.shape readout = source.subarray.tel[tel_id].camera.readout t = _time_axis_from_readout(readout, n_samp) pix_ids = np.arange(0, n_pix, max(1, pixel_step)) if max_pixels is not None: pix_ids = pix_ids[:max_pixels] fig, ax = plt.subplots(dpi=300) for pid in pix_ids: ax.plot(t, w[int(pid)], label=f"pix {int(pid)}", drawstyle="steps-mid") ax.set_xlabel(TIME_NS_LABEL) ax.set_ylabel(R1_SAMPLES_LABEL) et_name = getattr(getattr(event.trigger, "event_type", None), "name", "?") tel = source.subarray.tel[tel_id] tel_label = getattr(tel, "name", f"CT{tel_id}") ax.set_title(f"{tel_label} step traces ({et_name})") ax.legend(loc="best", fontsize=7, ncol=2) fig.tight_layout() return fig
def _detect_peaks(trace, peak_width, signal_mod): """ Detect peak indices using CWT if available, otherwise ``find_peaks``. Parameters ---------- trace : numpy.ndarray One-dimensional waveform samples for a single pixel. peak_width : int Characteristic peak width (samples) for CWT. signal_mod : module SciPy ``signal``-like module providing ``find_peaks_cwt``/``find_peaks``. Returns ------- numpy.ndarray Array of integer indices of detected peaks (possibly empty). """ peaks = [] try: if hasattr(signal_mod, "find_peaks_cwt"): peaks = signal_mod.find_peaks_cwt(trace, widths=np.array([peak_width])) if not np.any(peaks): peaks, _ = signal_mod.find_peaks(trace, prominence=np.max(trace) * 0.1) except (ValueError, TypeError): peaks = [] return np.asarray(peaks, dtype=int) if np.size(peaks) else np.array([], dtype=int) def _collect_peak_samples(w, sum_threshold, peak_width, signal_mod): """ Compute peak-sample indices per pixel from waveform matrix. Parameters ---------- w : numpy.ndarray Waveform array of shape ``(n_pix, n_samples)`` (or ``(1, n_pix, n_samples)``). sum_threshold : float Minimum sum over samples for a pixel to be considered. peak_width : int Characteristic peak width (samples) for CWT. signal_mod : module SciPy ``signal``-like module providing peak finding routines. Returns ------- tuple[numpy.ndarray | None, numpy.ndarray | None, int] ``(peak_samples, pix_ids, found_count)`` where ``peak_samples`` are the selected peak indices per considered pixel, ``pix_ids`` are the pixel indices that passed ``sum_threshold``, and ``found_count`` is the number of pixels with at least one detected peak. Returns ``(None, None, 0)`` if no pixels passed the threshold. """ n_pix, _ = w.shape sums = w.sum(axis=1) has_signal = sums > float(sum_threshold) pix_ids = np.arange(n_pix)[has_signal] if pix_ids.size == 0: return None, None, 0 peak_samples = [] found_count = 0 for pid in pix_ids: trace = w[int(pid)] pks = _detect_peaks(trace, peak_width, signal_mod) if pks.size: found_count += 1 peak_idx = int(pks[np.argmax(trace[pks])]) else: peak_idx = int(np.argmax(trace)) peak_samples.append(peak_idx) return np.asarray(peak_samples), pix_ids, found_count def _histogram_edges(n_samp, timing_bins): """ Compute contiguous histogram bin edges for sample indices. Parameters ---------- n_samp : int Number of samples per trace. timing_bins : int | None Number of histogram bins. If None, use unit-width bins. Returns ------- numpy.ndarray Array of bin edges spanning the sample index range. """ if timing_bins and timing_bins > 0: return np.linspace(-0.5, n_samp - 0.5, int(timing_bins) + 1) return np.arange(-0.5, n_samp + 0.5, 1.0) def _draw_peak_hist( ax, peak_samples, edges, mean_sample, std_sample, tel_label, et_name, considered, found_count, ): """ Draw a histogram of peak samples with overlays and annotations. Parameters ---------- ax : matplotlib.axes.Axes Target axes to draw into. Peak sample indices per pixel. edges : numpy.ndarray Histogram bin edges. mean_sample : float Mean of peak sample indices. std_sample : float Standard deviation of peak sample indices. tel_label : str Telescope label used in the title. et_name : str Event type name used in the title. considered : int Number of pixels considered (passed threshold). found_count : int Number of pixels with at least one detected peak. Returns ------- None """ counts, edges = np.histogram(peak_samples, bins=edges) ax.bar(edges[:-1], counts, width=np.diff(edges), color="#5B90DC", align="edge") ax.set_xlim(edges[0], edges[-1]) ax.set_xlabel("peak sample") ax.set_ylabel("N pixels") ax.axvline( mean_sample, color="#D8153C", linestyle="--", label=f"mean={mean_sample:.2f}", ) ax.axvspan( mean_sample - std_sample, mean_sample + std_sample, color="#D8153C", alpha=0.2, label=f"std={std_sample:.2f}", ) ax.set_title(f"{tel_label} peak timing ({et_name})") ax.text( 0.98, 0.95, f"considered: {considered}\npeaks found: {found_count}", transform=ax.transAxes, ha="right", va="top", fontsize=7, bbox={ "boxstyle": "round,pad=0.2", "facecolor": "white", "alpha": 0.6, "linewidth": 0.0, }, ) ax.legend(fontsize=7)
[docs] def plot_simtel_peak_timing( filename, tel_id: int | None = None, sum_threshold: float = 10.0, peak_width: int = 8, examples: int = 3, timing_bins: int | None = None, return_stats: bool = False, event_index: int | None = None, ): """ Peak finding per pixel; report mean/std of peak sample and plot a histogram. Parameters ---------- filename : str | pathlib.Path Path to the ``.simtel`` file. tel_id : int | None, optional Telescope ID to use. If None, the first telescope with R1 data is chosen. sum_threshold : float, optional Minimum sum over samples for a pixel to be considered. Default is 10.0. peak_width : int, optional Characteristic peak width (samples) for CWT. Default is 8. examples : int, optional Number of example pixel traces to overlay. Default is 3. timing_bins : int | None, optional Number of histogram bins. If None, use unit-width bins. return_stats : bool, optional If True, also return a statistics dictionary. Default is False. event_index : int | None, optional Zero-based index of the event to plot. If None, the first event is used. Returns ------- matplotlib.figure.Figure | tuple[matplotlib.figure.Figure, dict] | None The created figure, or ``None`` if R1 waveforms are unavailable. If ``return_stats`` is True, a tuple ``(fig, stats)`` is returned, where ``stats`` has keys ``{"considered", "found", "mean", "std"}``. """ source = EventSource(filename, max_events=None) event = _select_event_by_type(source)(event_index=event_index) r1_tel_ids = sorted(getattr(event.r1, "tel", {}).keys()) if r1_tel_ids: tel_id = tel_id or r1_tel_ids[0] else: _logger.warning("Event has no R1 data for peak timing plot") return None waveforms = getattr(event.r1.tel.get(tel_id, None), "waveform", None) if waveforms is None: _logger.warning(NO_R1_WAVEFORMS_MSG) return None w = np.asarray(waveforms) if w.ndim == 3: w = w[0] _, n_samp = w.shape peak_samples, pix_ids, found_count = _collect_peak_samples( w, sum_threshold, peak_width, _signal ) if peak_samples is None or pix_ids is None: _logger.warning("No pixels exceeded sum_threshold for peak timing") return None mean_sample = float(np.mean(peak_samples)) std_sample = float(np.std(peak_samples)) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 4), dpi=300) edges = _histogram_edges(n_samp, timing_bins) et_name = getattr(getattr(event.trigger, "event_type", None), "name", "?") tel = source.subarray.tel[tel_id] tel_label = getattr(tel, "name", f"CT{tel_id}") _draw_peak_hist( ax1, peak_samples, edges, mean_sample, std_sample, tel_label, et_name, pix_ids.size, found_count, ) readout = source.subarray.tel[tel_id].camera.readout t = _time_axis_from_readout(readout, n_samp) ex_ids = pix_ids[: max(1, int(examples))] for pid in ex_ids: trace = w[int(pid)] pks = _detect_peaks(trace, peak_width, _signal) ax2.plot(t, trace, drawstyle="steps-mid", label=f"pix {int(pid)}") if pks.size: ax2.scatter(t[pks], trace[pks], s=10) ax2.set_xlabel(TIME_NS_LABEL) ax2.set_ylabel(R1_SAMPLES_LABEL) ax2.legend(fontsize=7) fig.tight_layout() if return_stats: stats = { "considered": int(pix_ids.size), "found": int(found_count), "mean": float(mean_sample), "std": float(std_sample), } return fig, stats return fig
def _prepare_waveforms_for_image(filename, tel_id, context_no_r1, event_index=None): """ Fetch R1 waveforms for one event/telescope and return prepared arrays. Parameters ---------- filename : str | pathlib.Path Path to the ``.simtel`` file. tel_id : int | None Telescope ID to use. If None, the first telescope with R1 data is chosen. context_no_r1 : str Short description used in warnings when no R1 data is available. event_index : int | None, optional Zero-based index of the event to use. If None, the first event is used. Returns ------- tuple | None ``(w, n_pix, n_samp, source, event, tel_id)`` where ``w`` is a ``numpy.ndarray`` of shape ``(n_pix, n_samples)``, ``n_pix`` and ``n_samp`` are integers, and ``source``, ``event`` and ``tel_id`` are the ctapipe objects used. Returns ``None`` on failure. """ source = EventSource(filename, max_events=None) event = _select_event_by_type(source)(event_index=event_index) r1_tel_ids = sorted(getattr(event.r1, "tel", {}).keys()) if r1_tel_ids: tel_id = tel_id or r1_tel_ids[0] else: _logger.warning(f"Event has no R1 data for {context_no_r1}") return None waveforms = getattr(event.r1.tel.get(tel_id, None), "waveform", None) if waveforms is None: _logger.warning(NO_R1_WAVEFORMS_MSG) return None w = np.asarray(waveforms) if w.ndim == 3: w = w[0] n_pix, n_samp = w.shape return w, n_pix, n_samp, source, event, tel_id
[docs] def plot_simtel_integrated_signal_image( filename, tel_id: int | None = None, half_width: int = 8, event_index: int | None = None, ): """Plot camera image of integrated signal per pixel around the flasher peak.""" return _plot_simtel_integrated_image( filename=filename, tel_id=tel_id, half_width=half_width, event_index=event_index, mode="signal", )
[docs] def plot_simtel_integrated_pedestal_image( filename, tel_id: int | None = None, half_width: int = 8, offset: int = 16, event_index: int | None = None, ): """Plot camera image of integrated pedestal per pixel away from the flasher peak.""" return _plot_simtel_integrated_image( filename=filename, tel_id=tel_id, half_width=half_width, event_index=event_index, mode="pedestal", offset=offset, )
def _plot_simtel_integrated_image( filename, tel_id: int | None, half_width: int, event_index: int | None, mode: str, offset: int | None = None, ): """Shared implementation for integrated signal/pedestal images. mode: "signal" or "pedestal". For "pedestal", ``offset`` is used. """ context = "integrated-signal image" if mode == "signal" else "integrated-pedestal image" prepared = _prepare_waveforms_for_image(filename, tel_id, context, event_index=event_index) if prepared is None: return None w, n_pix, n_samp, source, event, tel_id = prepared img = np.zeros(n_pix, dtype=float) for pid in range(n_pix): trace = w[pid] peak_idx = int(np.argmax(trace)) a, b = _compute_integration_window(peak_idx, n_samp, half_width, mode, offset) img[pid] = float(np.sum(trace[a:b])) geometry = source.subarray.tel[tel_id].camera.geometry fig, ax = plt.subplots(dpi=300) disp = CameraDisplay(geometry, image=img, norm="lin", ax=ax) disp.cmap = "viridis" if mode == "signal" else "cividis" disp.add_colorbar(fraction=0.02, pad=-0.1) disp.set_limits_percent(100) et_name = getattr(getattr(event.trigger, "event_type", None), "name", "?") tel = source.subarray.tel[tel_id] tel_label = getattr(tel, "name", f"CT{tel_id}") ax.set_title(_format_integrated_title(tel_label, et_name, half_width, mode, offset), pad=20) ax.set_axis_off() fig.tight_layout() return fig