"""Generate a reduced dataset from given simulation event list and save the output to file."""
import logging
from dataclasses import dataclass, field
import numpy as np
import tables
from eventio import EventIOFile
from eventio.simtel import (
ArrayEvent,
MCEvent,
MCRunHeader,
MCShower,
TrackingPosition,
TriggerInformation,
)
from simtools.utils.geometry import calculate_circular_mean
DEFAULT_FILTERS = tables.Filters(complevel=5, complib="zlib", shuffle=True, bitshuffle=False)
[docs]
@dataclass
class ShowerEventData:
"""Shower event data."""
simulated_energy: list = field(default_factory=list)
x_core: list = field(default_factory=list)
y_core: list = field(default_factory=list)
shower_azimuth: list = field(default_factory=list)
shower_altitude: list = field(default_factory=list)
shower_id: list = field(default_factory=list)
area_weight: list = field(default_factory=list)
x_core_shower: list = field(default_factory=list)
y_core_shower: list = field(default_factory=list)
core_distance_shower: list = field(default_factory=list)
[docs]
@dataclass
class TriggeredEventData:
"""Triggered event data."""
triggered_id: list = field(default_factory=list)
array_altitudes: list = field(default_factory=list)
array_azimuths: list = field(default_factory=list)
trigger_telescope_list_list: list = field(default_factory=list)
angular_distance: list = field(default_factory=list)
[docs]
class SimtelIOEventDataWriter:
"""
Generate a reduced dataset from given simulation event list and save the output to file.
Attributes
----------
input_files : list
List of input file paths to process.
output_file : str
Path to the output file.
max_files : int, optional
Maximum number of files to process.
"""
def __init__(self, input_files, output_file, max_files=100):
"""Initialize class."""
self._logger = logging.getLogger(__name__)
self.input_files = input_files
self.output_file = output_file
try:
self.max_files = max_files if max_files < len(input_files) else len(input_files)
except TypeError as exc:
raise TypeError("No input files provided.") from exc
self.shower = None
self.n_use = None
self.shower_id_offset = 0
self.event_data = ShowerEventData()
self.triggered_data = TriggeredEventData()
self.file_names = []
[docs]
def process_files(self):
"""Process the input files and store them in an file."""
self.shower_id_offset = 0
for i, file in enumerate(self.input_files[: self.max_files], start=1):
self._logger.info(f"Processing file {i}/{self.max_files}: {file}")
self._process_file(file)
if i == 1 or len(self.event_data.simulated_energy) >= 1e7:
self._write_data(mode="w" if i == 1 else "a")
self.shower_id_offset += len(self.event_data.simulated_energy)
self._reset_data()
self._write_data(mode="a")
[docs]
def get_event_data(self):
"""
Return shower and triggered event data.
Returns
-------
ShowerEventData, TriggeredEventData
Shower and triggered event data.
"""
return self.event_data, self.triggered_data
def _process_file(self, file):
"""Process a single file and update data lists."""
with EventIOFile(file) as f:
for eventio_object in f:
if isinstance(eventio_object, MCRunHeader):
self._process_mc_run_header(eventio_object)
elif isinstance(eventio_object, MCShower):
self._process_mc_shower(eventio_object)
elif isinstance(eventio_object, MCEvent):
self._process_mc_event(eventio_object)
elif isinstance(eventio_object, ArrayEvent):
self._process_array_event(eventio_object)
self.file_names.append(str(file))
def _process_mc_run_header(self, eventio_object):
"""Process MC run header and update data lists."""
mc_head = eventio_object.parse()
self.n_use = mc_head["n_use"] # reuse factor n_use needed to extend the values below
self._logger.info(f"Shower reuse factor: {self.n_use} (viewcone: {mc_head['viewcone']})")
def _process_mc_shower(self, eventio_object):
"""
Process MC shower and update shower event list.
Duplicated entries 'self.n_use' times to match the number simulated events with
different core positions.
"""
self.shower = eventio_object.parse()
self.event_data.simulated_energy.extend([self.shower["energy"]] * self.n_use)
self.event_data.shower_azimuth.extend([self.shower["azimuth"]] * self.n_use)
self.event_data.shower_altitude.extend([self.shower["altitude"]] * self.n_use)
def _process_mc_event(self, eventio_object):
"""Process MC event and update shower event list."""
event = eventio_object.parse()
self.event_data.shower_id.append(event["shower_num"])
self.event_data.x_core.append(event["xcore"])
self.event_data.y_core.append(event["ycore"])
self.event_data.area_weight.append(event["aweight"])
def _process_array_event(self, eventio_object):
"""Process array event and update triggered event list."""
tracking_positions = []
for _, obj in enumerate(eventio_object):
if isinstance(obj, TriggerInformation):
self._process_trigger_information(obj)
if isinstance(obj, TrackingPosition):
tracking_position = obj.parse()
tracking_positions.append(
{
"altitude": tracking_position["altitude_raw"],
"azimuth": tracking_position["azimuth_raw"],
}
)
if tracking_positions:
self._process_tracking_positions(tracking_positions)
def _process_tracking_positions(self, tracking_positions):
"""
Process collected tracking positions and update triggered event list.
Use mean telescope tracking positions, averaged over all triggered telescopes.
"""
altitudes = [pos["altitude"] for pos in tracking_positions]
azimuths = [pos["azimuth"] for pos in tracking_positions]
self.triggered_data.array_altitudes.append(np.mean(altitudes))
self.triggered_data.array_azimuths.append(calculate_circular_mean(azimuths))
def _process_trigger_information(self, trigger_info):
"""Process trigger information and update triggered event list."""
trigger_info = trigger_info.parse()
telescopes = trigger_info["triggered_telescopes"]
if len(telescopes) > 0:
# add offset to obtain unique shower IDs among all files
self.triggered_data.triggered_id.append(self.shower["shower"] + self.shower_id_offset)
self.triggered_data.trigger_telescope_list_list.append(
np.array(telescopes, dtype=np.int16)
)
def _table_descriptions(self):
"""HDF5 table descriptions for shower data, triggered data, and file names."""
shower_data_desc = {
"shower_id": tables.Int32Col(),
"simulated_energy": tables.Float32Col(),
"x_core": tables.Float32Col(),
"y_core": tables.Float32Col(),
"area_weight": tables.Float32Col(),
"shower_azimuth": tables.Float32Col(),
"shower_altitude": tables.Float32Col(),
}
triggered_data_desc = {
"triggered_id": tables.Int32Col(),
"array_altitudes": tables.Float32Col(),
"array_azimuths": tables.Float32Col(),
"telescope_list_index": tables.Int32Col(), # Index into VLArray
}
file_names_desc = {
"file_names": tables.StringCol(256),
}
return shower_data_desc, triggered_data_desc, file_names_desc
def _tables(self, output_file, data_group, mode="a"):
"""Create or get HDF5 tables."""
descriptions = self._table_descriptions()
table_names = ["reduced_data", "triggered_data", "file_names"]
table_dict = {}
for name, desc in zip(table_names, descriptions):
path = f"/data/{name}"
table_dict[name] = (
output_file.create_table(
data_group, name, desc, name.replace("_", " ").title(), filters=DEFAULT_FILTERS
)
if mode == "w" or path not in output_file
else output_file.get_node(path)
)
return table_dict["reduced_data"], table_dict["triggered_data"], table_dict["file_names"]
def _write_event_data(self, reduced_table):
"""Fill event data tables."""
if len(self.event_data.simulated_energy) == 0:
return
row = reduced_table.row
for i, energy in enumerate(self.event_data.simulated_energy):
row["shower_id"] = (
self.event_data.shower_id[i] if i < len(self.event_data.shower_id) else 0
)
row["simulated_energy"] = energy
row["x_core"] = self.event_data.x_core[i] if i < len(self.event_data.x_core) else 0
row["y_core"] = self.event_data.y_core[i] if i < len(self.event_data.y_core) else 0
row["area_weight"] = (
self.event_data.area_weight[i] if i < len(self.event_data.area_weight) else 0
)
row["shower_azimuth"] = (
self.event_data.shower_azimuth[i] if i < len(self.event_data.shower_azimuth) else 0
)
row["shower_altitude"] = (
self.event_data.shower_altitude[i]
if i < len(self.event_data.shower_altitude)
else 0
)
row.append()
reduced_table.flush()
def _writer_triggered_data(self, triggered_table, vlarray):
"""Fill triggered event data tables."""
# Get or create VLArray for telescope lists
if len(self.triggered_data.triggered_id) == 0:
return
row = triggered_table.row
start_idx = vlarray.nrows
for i, triggered_id in enumerate(self.triggered_data.triggered_id):
row["triggered_id"] = triggered_id
row["array_altitudes"] = (
self.triggered_data.array_altitudes[i]
if i < len(self.triggered_data.array_altitudes)
else 0
)
row["array_azimuths"] = (
self.triggered_data.array_azimuths[i]
if i < len(self.triggered_data.array_azimuths)
else 0
)
row["telescope_list_index"] = start_idx + i # Index into the VLArray
row.append()
vlarray.append(
self.triggered_data.trigger_telescope_list_list[i]
if i < len(self.triggered_data.trigger_telescope_list_list)
else []
)
triggered_table.flush()
def _write_data(self, mode="a"):
"""Write data to HDF5 file."""
with tables.open_file(self.output_file, mode=mode) as f:
data_group = (
f.create_group("/", "data", "Data group")
if mode == "w" or "/data" not in f
else f.get_node("/data")
)
reduced_table, triggered_table, file_names_table = self._tables(f, data_group, mode)
self._write_event_data(reduced_table)
vlarray = (
f.create_vlarray(
data_group,
"trigger_telescope_list_list",
tables.Int16Atom(),
"List of triggered telescope IDs",
)
if mode == "w" or "/data/trigger_telescope_list_list" not in f
else f.get_node("/data/trigger_telescope_list_list")
)
self._writer_triggered_data(triggered_table, vlarray)
if self.file_names:
file_names_table.append([[name] for name in self.file_names])
file_names_table.flush()
def _reset_data(self):
"""Reset data structures for batch processing."""
self.event_data = ShowerEventData()
self.triggered_data = TriggeredEventData()
self.file_names = []