Source code for job_execution.job_manager
"""Interface to workload managers like gridengine or HTCondor."""
import logging
import os
from pathlib import Path
import simtools.utils.general as gen
__all__ = ["JobManager", "JobExecutionError"]
[docs]
class JobExecutionError(Exception):
"""Job execution error."""
[docs]
class JobManager:
"""
Interface to workload managers like gridengine or HTCondor.
Expects that jobs are described by shell scripts.
Parameters
----------
submit_engine : str
Job submission system. Default is local.
test : bool
Testing mode without sub submission.
"""
engines = {
"gridengine": "qsub",
"htcondor": "condor_submit",
"local": "",
"test_wms": "test_wms", # used for testing only
}
def __init__(self, submit_engine=None, submit_options=None, test=False):
"""Initialize JobManager."""
self._logger = logging.getLogger(__name__)
self.submit_engine = submit_engine
self.submit_options = submit_options
self.test = test
self.run_script = None
self.run_out_file = None
self.check_submission_system()
@property
def submit_engine(self):
"""Get the submit command."""
return self._submit_engine
@submit_engine.setter
def submit_engine(self, value):
"""
Set the submit command.
Parameters
----------
value : str
Name of submit engine.
Raises
------
ValueError
if invalid submit engine.
"""
if value is None:
value = "local"
if value not in self.engines:
raise ValueError(f"Invalid submit command: {value}")
self._submit_engine = value
[docs]
def check_submission_system(self):
"""
Check that the requested workload manager exist on the system.
Raises
------
MissingWorkloadManagerError
if workflow manager is not found.
"""
if self.submit_engine is None or self.submit_engine == "local":
return
if gen.program_is_executable(self.engines[self.submit_engine]):
return
[docs]
def submit(self, run_script=None, run_out_file=None, log_file=None):
"""
Submit a job described by a shell script.
Parameters
----------
run_script: str
Shell script describing the job to be submitted.
run_out_file: str or Path
Redirect output/error/job stream to this file (out,err,job suffix).
log_file: str or Path
The log file of the actual simulator (CORSIKA or sim_telarray).
Provided in order to print the log excerpt in case of run time error.
"""
self.run_script = str(run_script)
run_out_file = Path(run_out_file)
self.run_out_file = str(run_out_file.parent.joinpath(run_out_file.stem))
self._logger.info(f"Submitting script {self.run_script}")
self._logger.info(f"Job output stream {self.run_out_file + '.out'}")
self._logger.info(f"Job error stream {self.run_out_file + '.err'}")
self._logger.info(f"Job log stream {self.run_out_file + '.job'}")
if self.submit_engine == "gridengine":
self._submit_gridengine()
elif self.submit_engine == "htcondor":
self._submit_htcondor()
elif self.submit_engine == "local":
self._submit_local(log_file)
def _submit_local(self, log_file):
"""
Run a job script on the command line (no submission to a workload manager).
Parameters
----------
log_file: str or Path
The log file of the actual simulator (CORSIKA or sim_telarray).
Provided in order to print the log excerpt in case of run time error.
"""
self._logger.info("Running script locally")
shell_command = f"{self.run_script} > {self.run_out_file}.out 2> {self.run_out_file}.err"
if not self.test:
sys_output = os.system(shell_command)
if sys_output != 0:
msg = gen.get_log_excerpt(f"{self.run_out_file}.err")
self._logger.error(msg)
if log_file.exists() and gen.get_file_age(log_file) < 5:
msg = gen.get_log_excerpt(log_file)
self._logger.error(msg)
raise JobExecutionError("See excerpt from log file above\n")
else:
self._logger.info("Testing (local)")
def _submit_htcondor(self):
"""Submit a job described by a shell script to HTcondor."""
_condor_file = self.run_script + ".condor"
try:
with open(_condor_file, "w", encoding="utf-8") as file:
file.write(f"Executable = {self.run_script}\n")
file.write(f"Output = {self.run_out_file + '.out'}\n")
file.write(f"Error = {self.run_out_file + '.err'}\n")
file.write(f"Log = {self.run_out_file + '.job'}\n")
if self.submit_options:
submit_option_list = self.submit_options.split(",")
for option in submit_option_list:
file.write(option.lstrip() + "\n")
file.write("queue 1\n")
except FileNotFoundError as exc:
self._logger.error(f"Failed creating condor submission file {_condor_file}")
raise JobExecutionError from exc
self._execute(self.submit_engine, self.engines[self.submit_engine] + " " + _condor_file)
def _submit_gridengine(self):
"""Submit a job described by a shell script to gridengine."""
this_sub_cmd = self.engines[self.submit_engine]
this_sub_cmd = this_sub_cmd + " -o " + self.run_out_file + ".out"
this_sub_cmd = this_sub_cmd + " -e " + self.run_out_file + ".err"
self._execute(self.submit_engine, this_sub_cmd + " " + self.run_script)
def _execute(self, engine, shell_command):
"""
Execute a shell command using a specific engine.
Parameters
----------
engine : str
Engine to use.
shell_command : str
Shell command to execute.
"""
self._logger.info(f"Submitting script to {engine}")
self._logger.debug(shell_command)
if not self.test:
os.system(shell_command)
else:
self._logger.info(f"Testing ({engine})")
self._logger.info(shell_command)