Source code for alex.components.nlg.tectotpl.tool.cluster

#!/usr/bin/env python
# coding=utf-8
#
#

from __future__ import unicode_literals
import os
import commands
import string
import random
import codecs
import sys
import re
import time
from alex.components.nlg.tectotpl.core.util import first
import collections

"""\
Interface for running any Python code as a job on the cluster
(using the qsub/qstat/qacct commands).

Tested with Sun Grid Engine.
"""

__author__ = "Ondřej Dušek"
__date__ = "2012"


[docs]class Job(object): """\ This represents a piece of code as a job on the cluster, holds information about the job and is able to retrieve job metadata. The most important method is submit(), which submits the given piece of code to the cluster. Important attributes (some may be set in the constructor or at job submission, but all may be set between construction and launch): ------------------------------------------------------------------ name -- job name on the cluster (and the name of the created Python script, default will be generated if not set) code -- the Python code to be run (needs to have imports and sys.path set properly) header -- the header of the created Python script (may contain imports etc.) memory -- the amount of memory to reserve for this job on the cluster cores -- the number of cores needed for this job work_dir -- the working directory where the job script will be created and run (will be created on launch) dependencies-list of Jobs this job depends on (must be submitted before submitting this job) In addition, the following values may be queried for each job at runtime or later: ------------------------------------------------------------------ submitted -- True if the job has been submitted to the cluster. state -- current job state ('qw' = queued, 'r' = running, 'f' = finished, only if the job was submitted) host -- the machine where the job is running (short name) jobid -- the numeric id of the job in the cluster (NB: type is string!) report -- job report using the qacct command (dictionary, available only after the job has finished) exit_status- numeric job exit status (if the job is finished) """ # default job header DEFAULT_HEADER = """\ #!/usr/bin/env python # coding=utf8 from __future__ import unicode_literals """ # job state 'FINISHED' symbol FINISH = 'f' # job name prefix NAME_PREFIX = 'pyjob_' # job directory prefix DIR_PREFIX = '_clrun-' # legal chars for generated job names JOBNAME_LEGAL_CHARS = string.ascii_letters + string.digits # default number of cores DEFAULT_CORES = 1 # default memory size (in GBs) DEFAULT_MEMORY = 4 # only 1 job status query per second TIME_QUERY_DELAY = 1 # qsub multicore command QSUB_MULTICORE_CMD = '-pe smp {0}' # qsub memory command QSUB_MEMORY_CMD = '-hard -l mem_free={0} -l act_mem_free={0}' + \ ' -l h_vmem={0}' # job status polling delay for wait() in seconds TIME_POLL_DELAY = 60 def __init__(self, code=None, header=DEFAULT_HEADER, name=None, work_dir=None, dependencies=None): """\ Constructor. May provide some running options -- the desired Python code to be run, the headers of the resulting script (default provided), the job name and working directory. All of these options can be set later via the corresponding attributes. """ self.header = header self.code = code self.memory = self.DEFAULT_MEMORY self.cores = self.DEFAULT_CORES self.__jobid = None self.__host = None self.__state = None self.__report = None self.__state_last_query = time.time() self.__dependencies = [] if dependencies is not None: self.add_dependency(dependencies) self.__name = name if name is not None else self.__generate_name() self.submitted = False self.work_dir = work_dir \ if work_dir is not None \ else self.__get_work_dir()
[docs] def submit(self, memory=None, cores=None, work_dir=None): """\ Submit the job to the cluster. Override the pre-set memory and cores defaults if necessary. The job code, header and working directory must be set in advance. All jobs on which this job is dependent must already be submitted! """ if cores is not None: self.cores = cores if memory is not None: self.memory = memory # create working directory if necessary if not os.path.isdir(self.work_dir): os.mkdir(self.work_dir) cwd = os.getcwdu() os.chdir(self.work_dir) # create the script script_fh = codecs.open(self.name + '.py', 'w', 'UTF-8') print >> script_fh, self.get_script_text() script_fh.close() # submit the script command = 'qsub ' + self.__get_resource_requests() + \ ' ' + self.__get_dependency_string() + \ ' -V -cwd -j y -S ' + sys.executable + ' ' + \ self.name + '.py' output = self.__try_command(command) self.__jobid = re.search('([0-9]+)', output).group(0) self.submitted = True os.chdir(cwd)
@property def state(self): """\ Retrieve information about current job state. Will also retrieve the host this job is running on and store it in the __host variable, if applicable. """ # job hasn't been submitted -- no point in retrieving state if not self.submitted: return None # state caching if time.time() < self.__state_last_query + self.TIME_QUERY_DELAY: return self.__state self.__state_last_query = time.time() # actually retrieve the state state, host = self.__get_job_state() self.__state = state if state != self.FINISH: self.__host = host return self.__state @property def report(self): """\ Access to qacct report. Please note that running the qacct command takes a few seconds, so the first access to the report is rather slow. """ # no stats until the job has finished if not self.submitted or self.state != self.FINISH: return None # the report is retrieved only once if self.__report is None: # try to retrieve the qacct report output = self.__try_command('qacct -j ' + self.jobid) self.__report = {} for line in output.split("\n")[1:]: key, val = re.split(r'\s+', line, 1) self.__report[key] = val.strip() return self.__report @property def exit_status(self): """\ Retrieve the exit status of the job via the qacct report. Throws an exception the job is still running and the exit status is not known. """ report = self.report if report is None: raise RuntimeError('Job' + self.jobid + ' is probably still running') return int(report['exit_status'])
[docs] def wait(self, poll_delay=None): """\ Waits for the job to finish. Will raise an exception if the job did not finish successfully. The poll_delay variable controls how often the job state is checked. """ poll_delay = poll_delay if poll_delay else self.TIME_POLL_DELAY while self.state != self.FINISH: time.sleep(poll_delay) if self.exit_status != 0: raise RuntimeError('Job ' + self.name + ' (' + self.jobid + ') did not finish successfully.')
[docs] def add_dependency(self, dependency): """\ Adds a dependency on the given Job(s). """ if isinstance(dependency, Job) or isinstance(dependency, basestring): self.__dependencies.append(dependency) elif isinstance(dependency, int): self.__dependencies.append(str(dependency)) elif isinstance(dependency, collections.Iterable): for dep_elem in dependency: self.add_dependency(dep_elem) else: raise ValueError('Unknown dependency type!')
[docs] def remove_dependency(self, dependency): """\ Removes the given Job(s) from the dependencies list. """ # single element removed if isinstance(dependency, (Job, basestring, int)): if isinstance(dependency, int): jobid = str(dependency) else: jobid = dependency rem = first(lambda d: d == jobid, self.__dependencies) if rem is not None: self.__dependencies.remove(rem) else: raise ValueError('Cannot find dependency!') elif isinstance(dependency, collections.Iterable): for dep_elem in dependency: self.remove_dependency(dep_elem) else: raise ValueError('Unknown dependency type!')
@property def host(self): """\ Retrieve information about the host this job is/was running on. """ # no point if the job has not been submitted if not self.submitted: return None # return a cached value if self.__host is not None: return self.__host # try to get state and return the stored value self.state() return self.__host @property def name(self): """\ Return the job name. """ return self.__name @property def jobid(self): """\ Return the job id. """ return self.__jobid
[docs] def get_script_text(self): """\ Join headers and code to create a meaningful Python script. """ text = self.header text += "\ndef main():\n" text += re.sub('^', ' ', self.code, 0, re.MULTILINE) + "\n\n" text += "if __name__ == '__main__':\n main()\n" return text
def __generate_name(self): """\ Generate a job name """ return self.NAME_PREFIX + \ ''.join([random.choice(self.JOBNAME_LEGAL_CHARS) for _ in xrange(5)]) def __get_work_dir(self): """\ Generate a valid working directory name """ num = 1 workdir = None while workdir is None or os.path.exists(workdir): workdir = os.getcwdu() + os.path.sep + self.DIR_PREFIX + \ self.name + '-' + str(num).zfill(3) num += 1 return workdir def __get_resource_requests(self): """\ Generate qsub resource requests based on the memory and core setting. """ res = self.QSUB_MEMORY_CMD.format(str(self.memory) + 'G') if self.cores > 1: res = self.QSUB_MULTICORE_CMD.format(self.cores) + ' ' + res return res def __get_dependency_string(self): """\ Generate qsub dependency string based on the list of dependencies. """ if self.__dependencies: if not all([dep.submitted if isinstance(dep, Job) else True for dep in self.__dependencies]): raise RuntimeError('Job has unsubmitted dependencies!') return '-hold_jid ' + ','.join([dep.jobid if isinstance(dep, Job) else dep for dep in self.__dependencies]) return '' def __try_command(self, cmd): """\ Try to run a command and return its output. If the command fails, throw a RuntimeError. """ status, output = commands.getstatusoutput(cmd) if status != 0: raise RuntimeError('Command \'' + cmd + '\' failed. Status: ' + str(status) + ', Output: ' + output) return output def __get_job_state(self): """\ Parse the qstat command and try to retrieve the current job state and the machine it is running on. """ # get state of job assuming it is in the queue output = self.__try_command('qstat') # get the relevant line of the qstat output output = first(lambda line: re.search(self.jobid, line), output.split("\n")) # job does not exist anymore if output is None: return self.FINISH, None # parse the correct line: fields = re.split(r'\s+', output) state, host = fields[4], fields[7] host = re.sub(r'.*@([^.]+)\..*', r'\1', host) return state, host def __eq__(self, other): """\ Comparison: based on ids or reference if ids are None. """ if self.__jobid is not None and other.__jobid is not None: return self.__jobid == other.__jobid return self == other def __str__(self): """\ String representation returns the attribute name and type. """ return self.__class__.__name__ + ': ' + \ self.name + ' (' + self.work_dir + ')'