Source code for job_execution.job_manager

"""Interface to workload managers like gridengine or HTCondor."""

import logging
import subprocess
from pathlib import Path

import simtools.utils.general as gen

__all__ = ["JobExecutionError", "JobManager"]


[docs] class JobExecutionError(Exception): """Job execution error."""
[docs] class JobManager: """ Interface to workload managers like gridengine or HTCondor. Expects that jobs are described by shell scripts. Parameters ---------- submit_engine : str Job submission system. Default is local. test : bool Testing mode without sub submission. """ engines = { "gridengine": "qsub", "htcondor": "condor_submit", "local": "", "test_wms": "test_wms", # used for testing only } def __init__(self, submit_engine=None, submit_options=None, test=False): """Initialize JobManager.""" self._logger = logging.getLogger(__name__) self.submit_engine = submit_engine self.submit_options = submit_options self.test = test self.run_script = None self.run_out_file = None self.check_submission_system() @property def submit_engine(self): """Get the submit command.""" return self._submit_engine @submit_engine.setter def submit_engine(self, value): """ Set the submit command. Parameters ---------- value : str Name of submit engine. Raises ------ ValueError if invalid submit engine. """ self._submit_engine = value or "local" if self._submit_engine not in self.engines: raise ValueError(f"Invalid submit command: {self._submit_engine}")
[docs] def check_submission_system(self): """ Check that the requested workload manager exist on the system. Raises ------ JobExecutionError if workflow manager is not found. """ try: if self.submit_engine in (None, "local") or gen.program_is_executable( self.engines[self.submit_engine] ): return except KeyError: pass raise JobExecutionError(f"Submit engine {self.submit_engine} not found")
[docs] def submit(self, run_script=None, run_out_file=None, log_file=None): """ Submit a job described by a shell script. Parameters ---------- run_script: str Shell script describing the job to be submitted. run_out_file: str or Path Redirect output/error/job stream to this file (out,err,job suffix). log_file: str or Path The log file of the actual simulator (CORSIKA or sim_telarray). Provided in order to print the log excerpt in case of run time error. """ self.run_script = str(run_script) run_out_file = Path(run_out_file) self.run_out_file = str(run_out_file.parent.joinpath(run_out_file.stem)) self._logger.info(f"Submitting script {self.run_script}") self._logger.info(f"Job output stream {self.run_out_file + '.out'}") self._logger.info(f"Job error stream {self.run_out_file + '.err'}") self._logger.info(f"Job log stream {self.run_out_file + '.job'}") submit_result = 0 if self.submit_engine == "local": submit_result = self._submit_local(log_file) else: submit_result = getattr(self, f"_submit_{self.submit_engine}")() if submit_result != 0: raise JobExecutionError(f"Job submission failed with return code {submit_result}")
def _submit_local(self, log_file): """ Run a job script on the command line (no submission to a workload manager). Parameters ---------- log_file: str or Path The log file of the actual simulator (CORSIKA or sim_telarray). Provided in order to print the log excerpt in case of run time error. Returns ------- int Return code of the executed script """ self._logger.info("Running script locally") if self.test: self._logger.info("Testing (local)") return 0 result = None try: with ( open(f"{self.run_out_file}.out", "w", encoding="utf-8") as stdout, open(f"{self.run_out_file}.err", "w", encoding="utf-8") as stderr, ): result = subprocess.run( f"{self.run_script}", shell=True, check=True, text=True, stdout=stdout, stderr=stderr, ) except subprocess.CalledProcessError as exc: self._logger.error(gen.get_log_excerpt(f"{self.run_out_file}.err")) if log_file.exists() and gen.get_file_age(log_file) < 5: self._logger.error(gen.get_log_excerpt(log_file)) raise JobExecutionError("See excerpt from log file above\n") from exc return result.returncode if result else 0 def _submit_htcondor(self): """Submit a job described by a shell script to HTcondor.""" _condor_file = self.run_script + ".condor" lines = [ f"Executable = {self.run_script}", f"Output = {self.run_out_file}.out", f"Error = {self.run_out_file}.err", f"Log = {self.run_out_file}.job", ] if self.submit_options: lines.extend(option.lstrip() for option in self.submit_options.split(",")) lines.append("queue 1") try: with open(_condor_file, "w", encoding="utf-8") as file: file.write("\n".join(lines) + "\n") except FileNotFoundError as exc: self._logger.error(f"Failed creating condor submission file {_condor_file}") raise JobExecutionError from exc return self._execute(self.submit_engine, [self.engines[self.submit_engine], _condor_file]) def _submit_gridengine(self): """Submit a job described by a shell script to gridengine.""" this_sub_cmd = [ self.engines[self.submit_engine], "-o", self.run_out_file + ".out", "-e", self.run_out_file + ".err", self.run_script, ] return self._execute(self.submit_engine, this_sub_cmd) def _execute(self, engine, shell_command): """ Execute a shell command using a specific engine. Parameters ---------- engine : str Engine to use. shell_command : list List of shell command plus arguments. """ self._logger.info(f"Submitting script to {engine}") self._logger.debug(shell_command) result = None if not self.test: result = subprocess.run(shell_command, shell=True, check=True) else: self._logger.info(f"Testing ({engine}: {shell_command})") return result.returncode if result else 0