Skip to content

Commit

Permalink
Replace docker image retrieval with DockerImageCacheManager
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed Jun 3, 2024
1 parent c5568a2 commit b59db56
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 296 deletions.
120 changes: 14 additions & 106 deletions src/cloudai/schema/test_template/nccl_test/slurm_install_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import shutil
import subprocess
from typing import Any, Dict

from cloudai._core.install_status_result import InstallStatusResult
from cloudai._core.system import System
from cloudai.systems.slurm.strategy import SlurmInstallStrategy
from cloudai.util import CommandShell


class NcclTestSlurmInstallStrategy(SlurmInstallStrategy):
Expand All @@ -35,118 +28,33 @@ class NcclTestSlurmInstallStrategy(SlurmInstallStrategy):
SUBDIR_PATH = "nccl-test"
DOCKER_IMAGE_FILENAME = "nccl_test.sqsh"

def __init__(
self,
system: System,
env_vars: Dict[str, Any],
cmd_args: Dict[str, Any],
) -> None:
super().__init__(system, env_vars, cmd_args)

def is_installed(self) -> InstallStatusResult:
docker_image_path = os.path.join(self.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME)
if os.path.isfile(docker_image_path):
docker_image_result = self.docker_image_cache_manager.check_docker_image_exists(
self.docker_image_url, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)
if docker_image_result.success:
return InstallStatusResult(success=True)
else:
return InstallStatusResult(
success=False,
message=(
"Docker image for NCCL test is not installed. "
f"Tried to find Docker image at: {docker_image_path}. "
"Please ensure the Docker image is present at the specified location."
),
)
return InstallStatusResult(success=False, message=docker_image_result.message)

def install(self) -> InstallStatusResult:
install_status = self.is_installed()
if install_status.success:
return InstallStatusResult(success=True)

docker_image_dir_path = os.path.join(self.install_path, self.SUBDIR_PATH)
os.makedirs(docker_image_dir_path, exist_ok=True)
docker_image_path = os.path.join(docker_image_dir_path, self.DOCKER_IMAGE_FILENAME)

# Remove existing Docker image if it exists
shell = CommandShell()
remove_cmd = f"rm -f {docker_image_path}"
process = shell.execute(remove_cmd)
stdout, stderr = process.communicate()
if process.returncode != 0:
return InstallStatusResult(
success=False,
message=(
f"Failed to remove existing Docker image at {docker_image_path}. "
"NCCL tests tried to download a new Docker image, but an existing Docker image was found. "
"CloudAI tried to remove it but failed. "
f"Command run: {remove_cmd}. "
f"Error: {stderr}"
),
)

# Import new Docker image using enroot
docker_image_url_info = self.cmd_args.get("docker_image_url")
docker_image_url = docker_image_url_info.get("default") if docker_image_url_info else None
if docker_image_url is None:
return InstallStatusResult(
success=False,
message=(
"docker_image_url not found in the test schema or its value is not valid. "
"You should have a valid Docker image URL to the NCCL test Docker image."
),
)

enroot_import_cmd = (
f"srun"
f" --export=ALL"
f" --partition={self.slurm_system.default_partition}"
f" enroot import -o {docker_image_path} docker://{docker_image_url}"
docker_image_result = self.docker_image_cache_manager.ensure_docker_image(
self.docker_image_url, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)

try:
subprocess.run(enroot_import_cmd, shell=True, check=True)
except subprocess.CalledProcessError as e:
return InstallStatusResult(
success=False,
message=(
f"Failed to import Docker image from {docker_image_url}. "
"CloudAI failed to import the Docker image. "
f"Command run: {enroot_import_cmd}. "
f"Error: {e}. "
"Please check the Docker image URL and ensure that it is accessible and set up "
"with valid credentials."
),
)
if not docker_image_result.success:
return InstallStatusResult(success=False, message=docker_image_result.message)

return InstallStatusResult(success=True)

def uninstall(self) -> InstallStatusResult:
docker_image_path = os.path.join(self.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME)

if os.path.isfile(docker_image_path):
try:
os.remove(docker_image_path)
except OSError as e:
return InstallStatusResult(
success=False,
message=(
f"Failed to remove Docker image at {docker_image_path}. "
f"Error: {e}. "
"Please check the file permissions and ensure the file is not in use."
),
)

nccl_test_dir = os.path.join(self.install_path, self.SUBDIR_PATH)
if os.path.isdir(nccl_test_dir) and not os.listdir(nccl_test_dir):
try:
shutil.rmtree(nccl_test_dir)
except OSError as e:
return InstallStatusResult(
success=False,
message=(
f"Failed to remove nccl-test directory at {nccl_test_dir}. "
f"Error: {e}. "
"Please check the directory permissions and ensure it is not in use."
),
)
docker_image_result = self.docker_image_cache_manager.uninstall_cached_image(
self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)
if not docker_image_result.success:
return InstallStatusResult(success=False, message=docker_image_result.message)

return InstallStatusResult(success=True)
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

import logging
import os
import shutil
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List

from cloudai._core.install_status_result import InstallStatusResult
from cloudai._core.system import System
from cloudai.systems.slurm import SlurmNodeState, SlurmSystem
from cloudai.systems.slurm import SlurmNodeState
from cloudai.systems.slurm.strategy import SlurmInstallStrategy


Expand Down Expand Up @@ -98,14 +97,12 @@ def _configure_logging(self):

def is_installed(self) -> InstallStatusResult:
subdir_path = os.path.join(self.install_path, self.SUBDIR_PATH)
docker_image_path = os.path.join(subdir_path, self.DOCKER_IMAGE_FILENAME)
repo_path = os.path.join(subdir_path, self.REPOSITORY_NAME)
repo_installed = os.path.isdir(repo_path)

if not os.path.isfile(self.docker_image_url):
docker_image_installed = os.path.isfile(docker_image_path)
else:
docker_image_installed = True
docker_image_installed = self.docker_image_cache_manager.check_docker_image_exists(
self.docker_image_url, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
).success

data_dir_path = self.default_cmd_args["data_dir"]
datasets_ready = self._check_datasets_on_nodes(data_dir_path)
Expand Down Expand Up @@ -159,30 +156,22 @@ def install(self) -> InstallStatusResult:

try:
self._clone_repository(subdir_path)
if not os.path.isfile(self.docker_image_url):
self._setup_docker_image(self.slurm_system, subdir_path)
docker_image_result = self.docker_image_cache_manager.ensure_docker_image(
self.docker_image_url, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)
if not docker_image_result.success:
raise RuntimeError(docker_image_result.message)
except RuntimeError as e:
return InstallStatusResult(success=False, message=str(e))

return InstallStatusResult(success=True)

def uninstall(self) -> InstallStatusResult:
subdir_path = os.path.join(self.install_path, self.SUBDIR_PATH)
docker_image_path = os.path.join(subdir_path, self.DOCKER_IMAGE_FILENAME)
try:
if os.path.isfile(docker_image_path):
os.remove(docker_image_path)
if os.path.exists(subdir_path):
shutil.rmtree(subdir_path)
except Exception as e:
return InstallStatusResult(
success=False,
message=(
f"Failed to remove Docker image or directory at {subdir_path}. "
f"Error: {e}. "
"Please check the file permissions and ensure the file is not in use."
),
)
docker_image_result = self.docker_image_cache_manager.uninstall_cached_image(
self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)
if not docker_image_result.success:
return InstallStatusResult(success=False, message=docker_image_result.message)

return InstallStatusResult(success=True)

Expand Down Expand Up @@ -303,72 +292,3 @@ def _clone_repository(self, subdir_path: str) -> None:
result = subprocess.run(checkout_cmd, cwd=repo_path, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"Failed to checkout commit: {result.stderr}")

def _setup_docker_image(self, slurm_system: SlurmSystem, subdir_path: str) -> None:
"""
Download and install Docker image on Slurm nodes.
Args:
slurm_system (SlurmSystem): Slurm system instance.
subdir_path (str): Subdirectory where Docker image will be installed.
Raises:
RuntimeError: If Docker image download or installation fails.
"""
docker_image_path = os.path.join(subdir_path, self.DOCKER_IMAGE_FILENAME)
self.logger.info("Downloading Docker image from %s to %s", self.docker_image_url, docker_image_path)
result = subprocess.run(
["curl", "-o", docker_image_path, self.docker_image_url], capture_output=True, text=True
)
if result.returncode != 0:
raise RuntimeError(f"Failed to download Docker image: {result.stderr}")

partition_nodes = slurm_system.get_partition_nodes(slurm_system.default_partition)
idle_nodes = [node.name for node in partition_nodes if node.state == SlurmNodeState.IDLE]
if not idle_nodes:
self.logger.info(
"There are no idle nodes in the default partition to install the Docker image. "
"Skipping Docker image installation."
)
return

with ThreadPoolExecutor(max_workers=len(idle_nodes)) as executor:
futures = {
executor.submit(
self._install_docker_image_on_node,
node,
docker_image_path,
docker_image_path,
): node
for node in idle_nodes
}
for future in as_completed(futures):
node = futures[future]
if future.result() is not True:
self.logger.error("Failed to install Docker image on node: %s", node)
raise RuntimeError(f"Failed to install Docker image on node: {node}")

def _install_docker_image_on_node(self, node: str, image_source_path: str, image_dest_path: str) -> bool:
"""
Install Docker image on a specific node.
Args:
node (str): Node name.
image_source_path (str): Source path of the Docker image.
image_dest_path (str): Destination path for the Docker image on the node.
Returns:
bool: True if installation is successful, False otherwise.
"""
self.logger.info("Installing Docker image on node %s", node)
install_cmd = [
"srun",
"--nodes=1",
"--ntasks=1",
"--nodelist=" + node,
"cp",
image_source_path,
image_dest_path,
]
result = subprocess.run(install_cmd, capture_output=True, text=True)
return result.returncode == 0
Loading

0 comments on commit b59db56

Please sign in to comment.