Source code for job_execution.job_manager
"""Interface to workload managers to run jobs on a compute node."""
import logging
import subprocess
from pathlib import Path
import simtools.utils.general as gen
__all__ = ["JobExecutionError", "JobManager"]
[docs]
class JobExecutionError(Exception):
"""Job execution error."""
[docs]
class JobManager:
"""
Job manager for submitting jobs to a compute node.
Expects that jobs can be described by shell scripts.
Parameters
----------
test : bool
Testing mode without sub submission.
"""
def __init__(self, test=False):
"""Initialize JobManager."""
self._logger = logging.getLogger(__name__)
self.test = test
self.run_script = None
self.run_out_file = None
[docs]
def submit(self, run_script=None, run_out_file=None, log_file=None):
"""
Submit a job described by a shell script.
Parameters
----------
run_script: str
Shell script describing the job to be submitted.
run_out_file: str or Path
Redirect output/error/job stream to this file (out,err,job suffix).
log_file: str or Path
The log file of the actual simulator (CORSIKA or sim_telarray).
Provided in order to print the log excerpt in case of run time error.
"""
self.run_script = str(run_script)
run_out_file = Path(run_out_file)
self.run_out_file = str(run_out_file.parent.joinpath(run_out_file.stem))
self._logger.info(f"Submitting script {self.run_script}")
self._logger.info(f"Job output stream {self.run_out_file + '.out'}")
self._logger.info(f"Job error stream {self.run_out_file + '.err'}")
self._logger.info(f"Job log stream {self.run_out_file + '.job'}")
submit_result = self.submit_local(log_file)
if submit_result != 0:
raise JobExecutionError(f"Job submission failed with return code {submit_result}")
[docs]
def submit_local(self, log_file):
"""
Run a job script on the command line.
Parameters
----------
log_file: str or Path
The log file of the actual simulator (CORSIKA or sim_telarray).
Provided in order to print the log excerpt in case of run time error.
Returns
-------
int
Return code of the executed script
"""
self._logger.info("Running script locally")
if self.test:
self._logger.info("Testing (local)")
return 0
result = None
try:
with (
open(f"{self.run_out_file}.out", "w", encoding="utf-8") as stdout,
open(f"{self.run_out_file}.err", "w", encoding="utf-8") as stderr,
):
result = subprocess.run(
f"{self.run_script}",
shell=True,
check=True,
text=True,
stdout=stdout,
stderr=stderr,
)
except subprocess.CalledProcessError as exc:
self._logger.error(gen.get_log_excerpt(f"{self.run_out_file}.err"))
if log_file.exists() and gen.get_file_age(log_file) < 5:
self._logger.error(gen.get_log_excerpt(log_file))
raise JobExecutionError("See excerpt from log file above\n") from exc
return result.returncode if result else 0