Skip to content

Commit

Permalink
Add memory and cpu polling to TestHarness, add --max-mem option
Browse files Browse the repository at this point in the history
  • Loading branch information
loganharbour committed Dec 20, 2024
1 parent e15d1e7 commit f1fab03
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 46 deletions.
89 changes: 53 additions & 36 deletions python/TestHarness/OutputInterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
#* Licensed under LGPL 2.1, please see LICENSE for details
#* https://www.gnu.org/licenses/lgpl-2.1.html

import contextlib
import os
import json
import threading

class OutputInterface:
""" Helper class for writing output to either memory or a file """
def __init__(self):
def __init__(self, locking=False):
# The in-memory output, if any
self.output = ''
# The path to write output to, if any
self.separate_output_path = None
# Thread lock for the output (if enabled)
self.output_lock = threading.Lock() if locking else None

class BadOutputException(Exception):
""" Exception that is thrown when bad output is detected """
Expand All @@ -25,24 +29,35 @@ def __init__(self, errors):
message = 'Bad output detected: ' + ', '.join(errors)
super().__init__(message)

def getOutputLock(self):
"""
Gets the thread lock for this system, if any.
This is safe to use in a with statement even if locking
is not enabled.
"""
return self.output_lock if self.output_lock else contextlib.suppress()

def setSeparateOutputPath(self, separate_output_path):
""" Sets the path for writing output to """
self.separate_output_path = separate_output_path

# If we have any dangling output, write it
if self.output:
self.setOutput(self.output)
self.output = ''
with self.getOutputLock():
if self.output:
self.setOutput(self.output)
self.output = ''

def getSeparateOutputFilePath(self) -> str:
""" Gets the path that this output is writing to, if any """
return self.separate_output_path

def hasOutput(self) -> bool:
""" Whether or not this object has any content written """
if self.separate_output_path:
return os.path.isfile(self.separate_output_path)
return len(self.output) > 0
with self.getOutputLock():
if self.separate_output_path:
return os.path.isfile(self.separate_output_path)
return len(self.output) > 0

def getOutput(self, sanitize: bool = True) -> str:
"""
Expand All @@ -56,46 +71,48 @@ def getOutput(self, sanitize: bool = True) -> str:
on before the output is used.
"""
output = ''
if self.separate_output_path:
try:
output = open(self.separate_output_path, 'r').read()
except FileNotFoundError:
pass
else:
output = self.output

if sanitize:
_, sanitize_failures = self._sanitizeOutput(output)
if sanitize_failures:
raise self.BadOutputException(sanitize_failures)

return output
with self.getOutputLock():
if self.separate_output_path:
try:
output = open(self.separate_output_path, 'r').read()
except FileNotFoundError:
pass
else:
output = self.output

if sanitize:
_, sanitize_failures = self._sanitizeOutput(output)
if sanitize_failures:
raise self.BadOutputException(sanitize_failures)

return output

def setOutput(self, output: str):
""" Sets the output given some output string """
if not output:
return
if self.separate_output_path:
open(self.separate_output_path, 'w').write(output)
else:
self.output = output
with self.getOutputLock():
if self.separate_output_path:
open(self.separate_output_path, 'w').write(output)
else:
self.output = output

def appendOutput(self, output: str):
""" Appends to the output """
if not output:
return
if self.separate_output_path:
open(self.separate_output_path, 'a').write(output)
else:
self.output += output
with self.getOutputLock():
if self.separate_output_path:
open(self.separate_output_path, 'a').write(output)
else:
self.output += output

def clearOutput(self):
""" Clears the output """
if self.separate_output_path:
if os.path.exists(self.separate_output_path):
os.remove(self.separate_output_path)
else:
self.output = ''
with self.getOutputLock():
if self.separate_output_path:
if os.path.exists(self.separate_output_path):
os.remove(self.separate_output_path)
else:
self.output = ''

@staticmethod
def _sanitizeOutput(output):
Expand Down
53 changes: 52 additions & 1 deletion python/TestHarness/TestHarness.py
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ def cleanup(self):
summary += fatal_error
print(util.colorText(summary, "", html=True, colored=self.options.colored, code=self.options.code))
else:
# Fill summary footer
summary = ''

# Number of tests, their status, and timing
num_nonzero_timing = sum(1 if float(tup[0].getTiming()) > 0 else 0 for tup in self.test_table)
if num_nonzero_timing > 0:
timing_max = max(float(tup[0].getTiming()) for tup in self.test_table)
Expand All @@ -739,6 +743,13 @@ def cleanup(self):
summary = f'Ran {self.num_passed + self.num_failed} tests in {stats["time_total"]:.1f} seconds.'
summary += f' Average test time {timing_avg:.1f} seconds,'
summary += f' maximum test time {timing_max:.1f} seconds.'
# Memory usage, if available
max_memory = [tup[0].getMaxMemoryUsage() for tup in self.test_table if tup[0].getMaxMemoryUsage() not in [None, 0]]
if max_memory:
max_max_memory = max(max_memory)
avg_max_memory = sum(max_memory) / len(max_memory)
summary += f'\nEstimated maximum test memory usage maximum {util.humanMemory(max_max_memory)}, '
summary += f'average {util.humanMemory(avg_max_memory)}.'
print(summary)

# Get additional results from the scheduler
Expand Down Expand Up @@ -809,6 +820,25 @@ def cleanup(self):
print(str(group[0]).ljust((self.options.term_cols - (len(group[1]) + 4)), ' '), f'[{group[1]}s]')
print('\n')

if self.options.largest_jobs:
valued_tups = [tup for tup in self.test_table if tup[0].getMaxMemoryUsage() not in [None, 0]]
sorted_tups = sorted(valued_tups, key=lambda tup: tup[0].getMaxMemoryUsage(), reverse=True)

print('\n%d largest jobs:' % self.options.largest_jobs)
print(('-' * (self.options.term_cols)))

# Copy the current options and force timing to be true so that
# we get memory when we call formatResult() below
options_with_timing = copy.deepcopy(self.options)
options_with_timing.timing = True

for tup in sorted_tups[0:self.options.largest_jobs]:
job = tup[0]
if not job.isSkip() and job.getMaxMemoryUsage() > 0:
print(util.formatResult(job, options_with_timing, caveats=True))
if len(sorted_tups) == 0:
print('No jobs were completed or no jobs contained resource usage.')

all_jobs = self.scheduler.retrieveJobs()

# Gather and print the jobs with race conditions after the jobs are finished
Expand Down Expand Up @@ -1065,6 +1095,7 @@ def parseCLArgs(self, argv):
parser.add_argument('-l', '--load-average', action='store', type=float, dest='load', help='Do not run additional tests if the load average is at least LOAD')
parser.add_argument('-t', '--timing', action='store_true', dest='timing', help='Report Timing information for passing tests')
parser.add_argument('--longest-jobs', action='store', dest='longest_jobs', type=int, default=0, help='Print the longest running jobs upon completion')
parser.add_argument('--largest-jobs', action='store', dest='largest_jobs', type=int, default=0, help='Print the largest (by max memory usage) jobs upon completion')
parser.add_argument('-s', '--scale', action='store_true', dest='scaling', help='Scale problems that have SCALE_REFINE set')
parser.add_argument('-i', nargs=1, action='store', type=str, dest='input_file_name', help='The test specification file to look for (default: tests)')
parser.add_argument('--libmesh_dir', nargs=1, action='store', type=str, dest='libmesh_dir', help='Currently only needed for bitten code coverage')
Expand Down Expand Up @@ -1131,6 +1162,10 @@ def parseCLArgs(self, argv):
hpcgroup.add_argument('--hpc-no-hold', nargs=1, action='store', type=bool, default=False, dest='hpc_no_hold', help='Do not pre-create hpc jobs to be held')
hpcgroup.add_argument('--pbs-queue', nargs=1, action='store', dest='hpc_queue', type=str, metavar='', help='Submit jobs to the specified queue')

# Options for resource limits
resourcegroup = parser.add_argument_group('Resource Options', 'Options for controlling resource limits')
resourcegroup.add_argument('--max-memory', dest='max_memory', action='store', type=str, default=None, help='Set maximum memory allowed per slot (default none, ex: 100MB)')

# Try to find the terminal size if we can
# Try/except here because the terminal size could fail w/o a display
term_cols = None
Expand All @@ -1142,7 +1177,7 @@ def parseCLArgs(self, argv):

# Optionally load in the environment controlled values
term_cols = int(os.getenv('MOOSE_TERM_COLS', term_cols))
term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcst')
term_format = os.getenv('MOOSE_TERM_FORMAT', 'njcsmt')

# Terminal options
termgroup = parser.add_argument_group('Terminal Options', 'Options for controlling the formatting of terminal output')
Expand Down Expand Up @@ -1234,6 +1269,22 @@ def checkAndUpdateCLArgs(self):
if not self.options.input_file_name:
self.options.input_file_name = 'tests'

# Resource usage collection
has_psutil = True
try:
import psutil
except:
has_psutil = False
# Set max_memory in bytes if set
if self.options.max_memory is not None:
try:
self.options.max_memory = util.convertMemoryToBytes(self.options.max_memory)
except:
print(f'ERROR: Failed to parse --max-memory="{self.options.max_memory}"')
sys.exit(1)
if not has_psutil:
print(f'ERROR: --max-memory cannot be used because the python module "psutil" is not available')

def postRun(self, specs, timing):
return

Expand Down
53 changes: 51 additions & 2 deletions python/TestHarness/runners/Runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
#* Licensed under LGPL 2.1, please see LICENSE for details
#* https://www.gnu.org/licenses/lgpl-2.1.html

import os, json
import os, threading, time, traceback
from collections import namedtuple
from TestHarness import OutputInterface, util

class Runner(OutputInterface):
# Helper struct for storing information about sampled resource usage
ResourceUsage = namedtuple('ResourceUsage', 'time mem_bytes')

"""
Base class for running a process via a command.
Expand All @@ -19,7 +23,8 @@ class Runner(OutputInterface):
or externally (i.e., PBS, slurm, etc on HPC)
"""
def __init__(self, job, options):
OutputInterface.__init__(self)
# Output is locking so that the resource thread can concurrently write
OutputInterface.__init__(self, locking=True)

# The job that this runner is for
self.job = job
Expand Down Expand Up @@ -109,3 +114,47 @@ def readOutput(self, stream):
if output and output[-1] != '\n':
output += '\n'
return output

def getResourceUsage(self):
"""
To be overridden by derived Runners that support resource usage collection
Should return a list of ResourceUsage objects
"""
return None

def getMaxMemoryUsage(self):
"""
Get the max memory usage (in bytes) of the spawned process if it was
able to be captured
"""
resource_usage = self.getResourceUsage()
if not resource_usage: # runner doesn't support it
return None
max_mem = 0
for usage in resource_usage:
max_mem = max(max_mem, usage.mem_bytes)
return max_mem

def checkResourceUsage(self, usage):
"""
Checks the resource usage to ensure that it does not go over
limits. Will kill the job if so.
Usage should be a ResourceUsage object
"""
# Scale all of the requirements on a per-slot basis
slots = self.job.getSlots()

# Check for memory overrun if max is set
if self.options.max_memory is not None:
allowed_mem = slots * self.options.max_memory
if usage.mem_bytes > allowed_mem:
usage_human = util.humanMemory(usage.mem_bytes)
allowed_human = util.humanMemory(allowed_mem)
output = util.outputHeader('Process killed due to resource oversubscription')
output += f'Memory usage {usage_human} exceeded {allowed_human}'
self.appendOutput(output)
self.job.setStatus(self.job.error, 'EXCEEDED MEM')
self.kill()
return
Loading

0 comments on commit f1fab03

Please sign in to comment.