Source code for runners.simtools_runner

"""Tools for running applications in the simtools framework."""

import shutil
import subprocess
from pathlib import Path

import simtools.utils.general as gen
from simtools import dependencies
from simtools.io import ascii_handler


[docs] def run_applications(args_dict, db_config, logger): """ Run simtools applications step-by-step as defined in a configuration file. Parameters ---------- args_dict : dict Dictionary containing command line arguments. db_config : dict Database configuration logger : logging.Logger Logger for logging application output. """ configurations, runtime_environment, log_file = _read_application_configuration( args_dict["configuration_file"], args_dict.get("steps"), logger ) run_time = ( read_runtime_environment(runtime_environment) if not args_dict["ignore_runtime_environment"] else [] ) with log_file.open("w", encoding="utf-8") as file: file.write("Running simtools applications\n") file.write(dependencies.get_version_string(db_config, run_time)) for config in configurations: app = config.get("application") if not config.get("run_application"): logger.info(f"Skipping application: {app}") continue logger.info(f"Running application: {app}") stdout, stderr = run_application(run_time, app, config.get("configuration"), logger) file.write("=" * 80 + "\n") file.write(f"Application: {app}\nSTDOUT:\n{stdout}\nSTDERR:\n{stderr}\n")
[docs] def run_application(runtime_environment, application, configuration, logger): """ Run a simtools application and return stdout and stderr. Allow to specify a runtime environment (e.g., Docker) and a working directory. Parameters ---------- runtime_environment : list Command to run the application in the specified runtime environment. application : str Name of the application to run. configuration : dict Configuration for the application. logger : logging.Logger Logger for logging application output. Returns ------- tuple stdout and stderr from the application run. """ command = [application, *_convert_dict_to_args(configuration)] if runtime_environment: command = runtime_environment + command try: result = subprocess.run( command, check=True, capture_output=True, text=True, ) except subprocess.CalledProcessError as exc: logger.error(f"Error running application {application}: {exc.stderr}") raise exc return result.stdout, result.stderr
def _convert_dict_to_args(parameters): """ Convert a dictionary of parameters to a list of command line arguments. Parameters ---------- parameters : dict Dictionary containing parameters to convert. Returns ------- list List of command line arguments. """ args = [] for key, value in parameters.items(): if isinstance(value, bool): if value: args.append(f"--{key}") elif isinstance(value, list): args.extend([f"--{key}", *(str(item) for item in value)]) else: args.extend([f"--{key}", str(value)]) return args def _read_application_configuration(configuration_file, steps, logger): """ Read application configuration from file and modify for setting workflows. Strong assumptions on the structure of input and output files: - configuration file is expected to be in './input/<workflow directory>/<yaml file>' - output files will be written out to './output/<workflow directory>/' Replaces the placeholders in the configuration file with the actual values. Sets 'USE_PLAIN_OUTPUT_PATH' to True for all applications. Parameters ---------- configuration_file : str Configuration file name. steps : list List of steps to be executed (None: all steps). logger : Logger Logger object. Returns ------- dict Application configuration. dict: Runtime environment configuration. Path Path to the log file. """ job_configuration = ascii_handler.collect_data_from_file(configuration_file) configurations = job_configuration.get("applications") output_path, setting_workflow = _set_input_output_directories(configuration_file) logger.info(f"Setting workflow output path to {output_path}") for step_count, config in enumerate(configurations, start=1): config["run_application"] = step_count in steps if steps else True config = gen.change_dict_keys_case(config, True) config["configuration"] = _replace_placeholders_in_configuration( config.get("configuration", {}), output_path, setting_workflow, ) configurations[step_count - 1] = config return ( configurations, job_configuration.get("runtime_environment"), output_path / "simtools.log", ) def _replace_placeholders_in_configuration( configuration, output_path, setting_workflow, place_holder="__SETTING_WORKFLOW__" ): """ Replace placeholders in the configuration dictionary. Parameters ---------- configuration : dict Configuration dictionary. output_path : Path Path to the output directory. setting_workflow : str The setting workflow to replace the placeholder with. place_holder : str Placeholder to be replaced. Returns ------- dict Configuration dictionary with placeholders replaced. """ for key, value in configuration.items(): if isinstance(value, str): configuration[key] = value.replace(place_holder, setting_workflow) if isinstance(value, list): configuration[key] = [ item.replace(place_holder, setting_workflow) if isinstance(item, str) else item for item in value ] if output_path: configuration["use_plain_output_path"] = True configuration["output_path"] = str(output_path) return configuration def _set_input_output_directories(path): """ Set input and output directories based on the configuration file path. Tuned to simulation models setting workflows. Parameters ---------- path : str or Path Path to the configuration file. Returns ------- tuple The first part is the 'input' directory, the second part is the subdirectory name """ path = Path(path).resolve() try: input_index = path.parts.index("input") # Get all parts after 'input', excluding the filename subdirs = path.parts[input_index + 1 : -1] setting_workflow = "/".join(subdirs) workflow_dir = path.parts[input_index] except (ValueError, IndexError) as exc: raise ValueError(f"Could not find subdirectory under 'input': {exc}") from exc output_path = Path(str(workflow_dir).replace("input", "output")) / Path(setting_workflow) output_path.mkdir(parents=True, exist_ok=True) return output_path, "/".join(subdirs)
[docs] def read_runtime_environment(runtime_environment, workdir="/workdir/external/"): """ Read the runtime environment (e.g. docker runtime) and generate the required command. Parameters ---------- runtime_environment : str or None Path to the runtime environment configuration file. Returns ------- list Runtime command. """ if runtime_environment is None: return [] engine = runtime_environment.get("container_engine", "docker") if shutil.which(engine) is None: raise RuntimeError(f"Container engine '{engine}' not found.") cmd = [engine, "run", "--rm", "-v", f"{Path.cwd()}:{workdir}", "-w", workdir] if options := runtime_environment.get("options"): for opt in options: cmd.extend(opt.split()) if env := runtime_environment.get("env_file"): cmd += ["--env-file", env] if net := runtime_environment.get("network"): cmd += ["--network", net] cmd.append(runtime_environment["image"]) return cmd