Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce load on SGE with qacct command #2

Open
wants to merge 11 commits into
base: 3.21.x
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
*.pyc
.*
/src/*.egg-info
/build
/dist
Expand Down
6 changes: 3 additions & 3 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def killJobs(self):
while killList:
for jobID in list(killList):
batchJobID = self.getBatchSystemID(jobID)
if with_retries(self.getJobExitCode, batchJobID) is not None:
if with_retries(self.getJobExitCode, batchJobID, jobID) is not None:
logger.debug('Adding jobID %s to killedJobsQueue', jobID)
self.killedJobsQueue.put(jobID)
killList.remove(jobID)
Expand All @@ -207,7 +207,7 @@ def checkOnJobs(self):
activity = False
for jobID in list(self.runningJobs):
batchJobID = self.getBatchSystemID(jobID)
status = with_retries(self.getJobExitCode, batchJobID)
status = with_retries(self.getJobExitCode, batchJobID, jobID)
if status is not None:
activity = True
self.updatedJobsQueue.put((jobID, status))
Expand Down Expand Up @@ -285,7 +285,7 @@ def killJob(self, jobID):
raise NotImplementedError()

@abstractmethod
def getJobExitCode(self, batchJobID):
def getJobExitCode(self, batchJobID, jobID):
"""
Returns job exit code. Implementation-specific; called by
AbstractGridEngineWorker.checkOnJobs()
Expand Down
15 changes: 11 additions & 4 deletions src/toil/batchSystems/gridengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def submitJob(self, subLine):
result = int(process.stdout.readline().decode('utf-8').strip())
return result

def getJobExitCode(self, sgeJobID):
def getJobExitCode(self, sgeJobID, jobID):
# the task is set as part of the job ID if using getBatchSystemID()
job, task = (sgeJobID, None)
if '.' in sgeJobID:
Expand All @@ -80,13 +80,19 @@ def getJobExitCode(self, sgeJobID):
args.extend(["-t", str(task)])

logger.debug("Running %r", args)
iscurrtoiljob = False
process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in process.stdout:
if line.startswith("failed") and int(line.split()[1]) == 1:
return 1
elif line.startswith("exit_status"):
if line.startswith("jobname") and line.split()[1] == "toil_job_" + str(jobID):
iscurrtoiljob = True
if iscurrtoiljob and line.startswith("exit_status"):
logger.debug('Exit Status: %r', line.split()[1])
return int(line.split()[1])
#elif line.startswith("exit_status"):
# logger.debug('Exit Status: %r', line.split()[1])
# return int(line.split()[1])
return None

"""
Expand All @@ -104,7 +110,8 @@ def prepareQsub(self, cpu, mem, jobID):
reqline = list()
sgeArgs = os.getenv('TOIL_GRIDENGINE_ARGS')
if mem is not None:
memStr = str(old_div(mem, 1024)) + 'K'
memStr = str(int(math.ceil(mem/(1024*math.ceil(cpu))))) + 'K'
# memStr = str(old_div(mem, 1024)) + 'K'
if not self.boss.config.manualMemArgs:
# for UGE instead of SGE; see #2309
reqline += ['vf=' + memStr, 'h_vmem=' + memStr]
Expand Down Expand Up @@ -137,7 +144,7 @@ def prepareQsub(self, cpu, mem, jobID):

@classmethod
def getWaitDuration(cls):
return 1
return 15

@classmethod
def obtainSystemConstants(cls):
Expand Down
6 changes: 3 additions & 3 deletions src/toil/batchSystems/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def getJobExitCode(self, lsfJobID):
process = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
started = 0
for line in process.stdout:
for line in process.stdout.readlines():
if "Done successfully" in line:
logger.debug("bjobs detected job completed for job: "
"{}".format(job))
Expand Down Expand Up @@ -132,7 +132,7 @@ def getJobExitCode(self, lsfJobID):
args = ["bacct", "-l", str(job)]
process = subprocess.Popen(args, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
for line in process.stdout:
for line in process.stdout.readlines():
if line.find("Completed <done>") > -1:
logger.debug("Detected job completed for job: "
"{}".format(job))
Expand Down Expand Up @@ -212,7 +212,7 @@ def obtainSystemConstants(cls):

maxCPU = 0
maxMEM = MemoryString("0")
for line in p.stdout:
for line in p.stdout.readlines():
items = line.strip().split()
if len(items) < num_columns:
RuntimeError("lshosts output has a varying number of "
Expand Down