Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not require enroot on head node (backport #323) #326

Open
wants to merge 9 commits into
base: releases/v0.8
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/cloudai/_core/base_installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,16 @@ def install(self, test_templates: Iterable[TestTemplate]) -> InstallStatusResult
else:
install_results[test_template.name] = result.message
done += 1
logging.info(
f"{done}/{total} Installation for {test_template.name} finished with status: "
msg = (
f"{done}/{total} Installation of {test_template.name}: "
f"{result.message if result.message else 'OK'}"
)
if result.success:
install_results[test_template.name] = "Success"
logging.info(msg)
else:
install_results[test_template.name] = result.message
logging.error(msg)
except Exception as e:
done += 1
logging.error(f"{done}/{total} Installation failed for {test_template.name}: {e}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ def is_installed(self) -> InstallStatusResult:
if docker_image_result.success:
return InstallStatusResult(success=True)
else:
if self.docker_image_cache_manager.cache_docker_images_locally:
if self.docker_image_cache_manager.system.cache_docker_images_locally:
expected_docker_image_path = os.path.join(
self.docker_image_cache_manager.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
self.docker_image_cache_manager.system.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)
return InstallStatusResult(
success=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def is_installed(self) -> InstallStatusResult:
if docker_image_result.success:
return InstallStatusResult(success=True)
else:
if self.docker_image_cache_manager.cache_docker_images_locally:
if self.docker_image_cache_manager.system.cache_docker_images_locally:
expected_docker_image_path = os.path.join(
self.docker_image_cache_manager.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
self.docker_image_cache_manager.system.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)
return InstallStatusResult(
success=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def is_installed(self) -> InstallStatusResult:
if docker_image_result.success:
return InstallStatusResult(success=True)
else:
if self.docker_image_cache_manager.cache_docker_images_locally:
if self.docker_image_cache_manager.system.cache_docker_images_locally:
expected_docker_image_path = os.path.join(
self.docker_image_cache_manager.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
self.docker_image_cache_manager.system.install_path, self.SUBDIR_PATH, self.DOCKER_IMAGE_FILENAME
)
return InstallStatusResult(
success=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ def __init__(self, system: SlurmSystem, env_vars: Dict[str, Any], cmd_args: Dict
self.install_path = self.slurm_system.install_path
self.default_env_vars.update(self.slurm_system.global_env_vars)

self.docker_image_cache_manager = DockerImageCacheManager(
self.slurm_system.install_path,
self.slurm_system.cache_docker_images_locally,
self.slurm_system.default_partition,
)
self.docker_image_cache_manager = DockerImageCacheManager(self.slurm_system)
docker_image_url_info = self.cmd_args.get("docker_image_url")
if docker_image_url_info is not None:
self.docker_image_url = docker_image_url_info.get("default")
Expand Down
6 changes: 1 addition & 5 deletions src/cloudai/systems/slurm/strategy/slurm_install_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,7 @@ def __init__(
super().__init__(system, env_vars, cmd_args)
self.slurm_system = cast(SlurmSystem, self.system)
self.install_path = self.slurm_system.install_path
self.docker_image_cache_manager = DockerImageCacheManager(
self.slurm_system.install_path,
self.slurm_system.cache_docker_images_locally,
self.slurm_system.default_partition,
)
self.docker_image_cache_manager = DockerImageCacheManager(self.slurm_system)
docker_image_url_info = self.cmd_args.get("docker_image_url")
if docker_image_url_info is not None:
self.docker_image_url = docker_image_url_info.get("default")
Expand Down
148 changes: 30 additions & 118 deletions src/cloudai/util/docker_image_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import os
import shutil
import subprocess
import tempfile

from cloudai.systems import SlurmSystem


class PrerequisiteCheckResult:
Expand Down Expand Up @@ -107,15 +108,11 @@ class DockerImageCacheManager:
Manages the caching of Docker images for installation strategies.

Attributes
install_path (str): The base installation path.
cache_docker_images_locally (bool): Whether to cache Docker image files locally.
partition_name (str): The partition name to use in the srun command.
system (SlurmSystem): The Slurm system configuration.
"""

def __init__(self, install_path: str, cache_docker_images_locally: bool, partition_name: str) -> None:
self.install_path = install_path
self.cache_docker_images_locally = cache_docker_images_locally
self.partition_name = partition_name
def __init__(self, system: SlurmSystem) -> None:
self.system = system

def ensure_docker_image(
self, docker_image_url: str, subdir_name: str, docker_image_filename: str
Expand All @@ -135,7 +132,7 @@ def ensure_docker_image(
if image_check_result.success:
return image_check_result

if self.cache_docker_images_locally:
if self.system.cache_docker_images_locally:
return self.cache_docker_image(docker_image_url, subdir_name, docker_image_filename)

return image_check_result
Expand All @@ -155,13 +152,14 @@ def check_docker_image_exists(
DockerImageCacheResult: Result of the Docker image existence check.
"""
logging.debug(
f"Checking if Docker image exists: docker_image_url={docker_image_url}, subdir_name={subdir_name}, "
f"Checking if Docker image exists: docker_image_url={docker_image_url}, "
f"subdir_name={subdir_name}, "
f"docker_image_filename={docker_image_filename}, "
f"cache_docker_images_locally={self.cache_docker_images_locally}"
f"cache_docker_images_locally={self.system.cache_docker_images_locally}"
)

# If not caching locally, return True. Defer checking URL accessibility to srun.
if not self.cache_docker_images_locally:
if not self.system.cache_docker_images_locally:
return DockerImageCacheResult(True, docker_image_url, "")

# Check if docker_image_url is a file path and exists
Expand All @@ -171,12 +169,12 @@ def check_docker_image_exists(
)

# Check if the cache file exists
if not os.path.exists(self.install_path):
message = f"Install path {self.install_path} does not exist."
if not os.path.exists(self.system.install_path):
message = f"Install path {self.system.install_path} does not exist."
logging.debug(message)
return DockerImageCacheResult(False, "", message)

subdir_path = os.path.join(self.install_path, subdir_name)
subdir_path = os.path.join(self.system.install_path, subdir_name)
if not os.path.exists(subdir_path):
message = f"Subdirectory path {subdir_path} does not exist."
logging.debug(message)
Expand Down Expand Up @@ -206,26 +204,26 @@ def cache_docker_image(
Returns:
DockerImageCacheResult: Result of the Docker image caching operation.
"""
subdir_path = os.path.join(self.install_path, subdir_name)
subdir_path = os.path.join(self.system.install_path, subdir_name)
docker_image_path = os.path.join(subdir_path, docker_image_filename)

if os.path.isfile(docker_image_path):
success_message = f"Cached Docker image already exists at {docker_image_path}."
logging.info(success_message)
return DockerImageCacheResult(True, docker_image_path, success_message)

prerequisite_check = self._check_prerequisites(docker_image_url)
prerequisite_check = self._check_prerequisites()
if not prerequisite_check:
logging.error(f"Prerequisite check failed: {prerequisite_check.message}")
return DockerImageCacheResult(False, "", prerequisite_check.message)

if not os.path.exists(self.install_path):
error_message = f"Install path {self.install_path} does not exist."
if not os.path.exists(self.system.install_path):
error_message = f"Install path {self.system.install_path} does not exist."
logging.error(error_message)
return DockerImageCacheResult(False, "", error_message)

if not os.access(self.install_path, os.W_OK):
error_message = f"No permission to write in install path {self.install_path}."
if not os.access(self.system.install_path, os.W_OK):
error_message = f"No permission to write in install path {self.system.install_path}."
logging.error(error_message)
return DockerImageCacheResult(False, "", error_message)

Expand All @@ -237,10 +235,10 @@ def cache_docker_image(
logging.error(error_message)
return DockerImageCacheResult(False, "", error_message)

enroot_import_cmd = (
f"srun --export=ALL --partition={self.partition_name} "
f"enroot import -o {docker_image_path} docker://{docker_image_url}"
)
srun_prefix = f"srun --export=ALL --partition={self.system.default_partition}"
if self.system.account:
srun_prefix += f" --account={self.system.account}"
enroot_import_cmd = f"{srun_prefix} enroot import -o {docker_image_path} docker://{docker_image_url}"
logging.debug(f"Importing Docker image: {enroot_import_cmd}")

try:
Expand All @@ -264,29 +262,17 @@ def cache_docker_image(
error_message = (
f"Failed to import Docker image from {docker_image_url}. Command: {enroot_import_cmd}. Error: {e}"
)
logging.error(error_message)
return DockerImageCacheResult(
False,
"",
(
f"Failed to import Docker image from {docker_image_url}. "
f"Command: {enroot_import_cmd}. "
f"Error: {e}. Please check the Docker image URL and ensure that it is accessible and set up with "
f"valid credentials."
),
)
logging.debug(error_message)
return DockerImageCacheResult(False, message=error_message)

def _check_prerequisites(self, docker_image_url: str) -> PrerequisiteCheckResult:
def _check_prerequisites(self) -> PrerequisiteCheckResult:
"""
Check prerequisites for caching Docker image.

Args:
docker_image_url (str): URL of the Docker image.

Returns:
Returns
PrerequisiteCheckResult: Result of the prerequisite check.
"""
required_binaries = ["enroot", "srun"]
required_binaries = ["srun"]
missing_binaries = [binary for binary in required_binaries if not shutil.which(binary)]

if missing_binaries:
Expand All @@ -296,82 +282,8 @@ def _check_prerequisites(self, docker_image_url: str) -> PrerequisiteCheckResult
False, f"{missing_binaries_str} are required for caching Docker images but are not installed."
)

docker_accessible = self._check_docker_image_accessibility(docker_image_url)
if not docker_accessible.success:
logging.error(f"Docker image URL {docker_image_url} is not accessible. Error: {docker_accessible.message}")
return docker_accessible

return PrerequisiteCheckResult(True, "All prerequisites are met.")

def _check_docker_image_accessibility(self, docker_image_url: str) -> PrerequisiteCheckResult:
"""
Check if the Docker image URL is accessible.

Args:
docker_image_url (str): URL of the Docker image.

Returns:
PrerequisiteCheckResult: Result of the Docker image accessibility check.
"""
with tempfile.TemporaryDirectory() as temp_dir:
docker_image_path = os.path.join(temp_dir, "docker_image.sqsh")
enroot_import_cmd = f"enroot import -o {docker_image_path} docker://{docker_image_url}"

logging.debug(f"Checking Docker image accessibility: {enroot_import_cmd}")

process = subprocess.Popen(enroot_import_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
while True:
error_output = process.stderr.readline() if process.stderr else None
error_output = error_output.decode() if error_output else ""

if error_output:
if (
"Downloading" in error_output
or "Found all layers in cache" in error_output
or "Fetching image manifest list" in error_output
):
logging.debug(
f"Docker image URL, {docker_image_url}, is accessible. "
f"Command used: {enroot_import_cmd}. Found keyword: {error_output.strip()}"
)
process.terminate()
return PrerequisiteCheckResult(
True, f"Docker image URL, {docker_image_url}, is accessible."
)
if "[ERROR]" in error_output:
logging.error(
f"Failed to access Docker image URL, {docker_image_url}. "
f"Command used: {enroot_import_cmd}. Error: {error_output}"
)
process.terminate()
if "401 Unauthorized" in error_output:
detailed_message = (
f"Failed to access Docker image URL: {docker_image_url}. Error: {error_output}\n"
"This error indicates that access to the Docker image URL is unauthorized. "
"Please ensure you have the necessary permissions and have followed the "
"instructions in the README for setting up your credentials correctly."
)
return PrerequisiteCheckResult(False, detailed_message)
return PrerequisiteCheckResult(
False, f"Failed to access Docker image URL: {docker_image_url}. Error: {error_output}"
)
if process.poll() is not None:
break

logging.debug(f"Failed to access Docker image URL: {docker_image_url}. Unknown error.")
return PrerequisiteCheckResult(
False, f"Failed to access Docker image URL: {docker_image_url}. Unknown error."
)
finally:
# Ensure the temporary docker image file is removed
if os.path.exists(docker_image_path):
try:
os.remove(docker_image_path)
logging.debug(f"Temporary Docker image file removed: {docker_image_path}")
except OSError as e:
logging.error(f"Failed to remove temporary Docker image file {docker_image_path}. Error: {e}")

def uninstall_cached_image(self, subdir_name: str, docker_image_filename: str) -> DockerImageCacheResult:
"""
Uninstall the cached Docker image and remove the subdirectory if empty.
Expand All @@ -387,7 +299,7 @@ def uninstall_cached_image(self, subdir_name: str, docker_image_filename: str) -
if not result.success:
return result

subdir_path = os.path.join(self.install_path, subdir_name)
subdir_path = os.path.join(self.system.install_path, subdir_name)
if os.path.isdir(subdir_path):
try:
if not os.listdir(subdir_path):
Expand All @@ -414,7 +326,7 @@ def remove_cached_image(self, subdir_name: str, docker_image_filename: str) -> D
Returns:
DockerImageCacheResult: Result of the removal operation.
"""
docker_image_path = os.path.join(self.install_path, subdir_name, docker_image_filename)
docker_image_path = os.path.join(self.system.install_path, subdir_name, docker_image_filename)
if os.path.isfile(docker_image_path):
try:
os.remove(docker_image_path)
Expand Down
Loading
Loading