Source code for caelus.run.cmd
# -*- coding: utf-8 -*-
"""\
Caelus Job Manager Interface
----------------------------
"""
import logging
import os
from ..config import config
from ..utils import osutils
from .hpc_queue import get_job_scheduler
_lgr = logging.getLogger(__name__)
[docs]
class CaelusCmd(object):
"""CML execution interface.
CaelusCmd is a high-level interface to execute CML binaries within an
appropriate enviroment across different operating systems.
"""
def __init__(self, cml_exe, casedir=None, cml_env=None, output_file=None):
"""
Args:
cml_exe (str): The binary to be executed (e.g., blockMesh)
casedir (path): Absolute path to case directory
cml_env (CMLEnv): Environment used to run the executable
output_file (file): Filename to redirect all output
"""
#: CPL configuration object
self.cfg = config.get_config()
#: CML program to be executed
self.cml_exe = cml_exe
#: Case directory
self.casedir = casedir or os.getcwd()
#: CML version used for this run
self.cml_env = cml_env
exe_base, _ = os.path.splitext(os.path.basename(cml_exe))
#: Log file where all output and error are captured
self.output_file = output_file or "%s.log" % exe_base
#: Handle to the subprocess instance running the command
self.runner = get_job_scheduler()(self.cml_exe)
#: Is this a parallel run
self.parallel = False
self.num_mpi_ranks = 1
#: Arguments passed to the CML executable
self.cml_exe_args = ""
#: Extra arguments passed to MPI
self.mpi_extra_args = ""
self.job_id = None
@property
def num_mpi_ranks(self):
"""Number of MPI ranks for a parallel run"""
return self.runner.num_ranks
@num_mpi_ranks.setter
def num_mpi_ranks(self, nranks):
nranks = int(nranks)
self.runner.num_ranks = nranks
if nranks > 1:
self.parallel = True
@property
def mpi_extra_args(self):
"""Extra arguments to pass to MPI command"""
return self.runner.mpi_extra_args
@mpi_extra_args.setter
def mpi_extra_args(self, value):
self.runner.mpi_extra_args = value
[docs]
def prepare_exe_cmd(self):
"""Prepare the shell command and return as a string
Returns:
The CML command invocation with all its options
"""
cmd_args = (
" -parallel " + self.cml_exe_args
if self.parallel
else self.cml_exe_args
)
cmd_line = self.cml_exe + " " + cmd_args
if self.runner.is_job_scheduler():
cmd_line += " >& %s" % self.output_file
else:
self.runner.stdout = self.output_file
return cmd_line
[docs]
def prepare_shell_cmd(self):
"""Prepare the complete command line string as executed"""
return (
self.runner.prepare_mpi_cmd() + " " + self.prepare_exe_cmd()
if self.parallel
else self.prepare_exe_cmd()
)
def __call__(self, wait=True, job_dependencies=None):
"""Run the executable"""
with osutils.set_work_dir(self.casedir):
runner = self.runner
runner.cml_env = self.cml_env
runner.script_body = self.prepare_shell_cmd()
if runner.is_job_scheduler(): # pylint: disable=no-else-return
self.job_id = runner(job_dependencies=job_dependencies)
return 0
else:
return runner(wait=wait)