Skip to content

Commit

Permalink
Merge branch 'NVIDIA:main' into nemo-k8s
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo authored Sep 30, 2024
2 parents c074ebb + 0aed180 commit a5141a7
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.

import logging
import os
import subprocess
from pathlib import Path
from typing import Any, Dict
Expand Down Expand Up @@ -81,11 +80,6 @@ def install(self) -> InstallStatusResult:
if install_status.success:
return InstallStatusResult(success=True, message="NeMo-Launcher is already installed.")

try:
self._check_install_path_access()
except PermissionError as e:
return InstallStatusResult(success=False, message=str(e))

subdir_path = self.system.install_path / self.SUBDIR_PATH
subdir_path.mkdir(parents=True, exist_ok=True)

Expand Down Expand Up @@ -119,19 +113,6 @@ def uninstall(self) -> InstallStatusResult:

return InstallStatusResult(success=True)

def _check_install_path_access(self):
"""
Check if the install path exists and if there is permission to create a directory or file in the path.
Raises
PermissionError: If the install path does not exist or if there is no permission to create directories and
files.
"""
if not self.system.install_path.exists():
raise PermissionError(f"Install path {self.system.install_path} does not exist.")
if not self.system.install_path.is_dir() or not os.access(self.system.install_path, os.W_OK):
raise PermissionError(f"No permission to write in install path {self.system.install_path}.")

def _clone_repository(self, subdir_path: Path) -> None:
"""
Clones NeMo-Launcher repository into specified path if it does not already exist.
Expand Down
29 changes: 11 additions & 18 deletions src/cloudai/systems/slurm/slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import getpass
import logging
import re
from pathlib import Path
Expand Down Expand Up @@ -468,10 +467,9 @@ def get_available_nodes_from_group(
available nodes.
"""
self.validate_partition_and_group(partition_name, group_name)
current_user = getpass.getuser()
self.update_node_states()

grouped_nodes = self.group_nodes_by_state(partition_name, group_name, current_user)
grouped_nodes = self.group_nodes_by_state(partition_name, group_name)
allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, group_name)

# Log allocation details
Expand Down Expand Up @@ -502,9 +500,7 @@ def validate_partition_and_group(self, partition_name: str, group_name: str) ->
if group_name not in self.groups[partition_name]:
raise ValueError(f"Group '{group_name}' not found in partition '{partition_name}'.")

def group_nodes_by_state(
self, partition_name: str, group_name: str, current_user: str
) -> Dict[SlurmNodeState, List[SlurmNode]]:
def group_nodes_by_state(self, partition_name: str, group_name: str) -> Dict[SlurmNodeState, List[SlurmNode]]:
"""
Group nodes by their states, excluding nodes allocated to the current user.
Expand All @@ -524,11 +520,7 @@ def group_nodes_by_state(

for node in self.groups[partition_name][group_name]:
if node.state in grouped_nodes:
# Exclude nodes allocated to the current user
if node.state == SlurmNodeState.ALLOCATED and node.user == current_user:
continue
if node.state in grouped_nodes:
grouped_nodes[node.state].append(node)
grouped_nodes[node.state].append(node)

return grouped_nodes

Expand All @@ -552,26 +544,27 @@ def allocate_nodes(
"""
# Allocate nodes based on priority: idle, then completing, then allocated
allocated_nodes = []
available_states = [SlurmNodeState.IDLE, SlurmNodeState.COMPLETING, SlurmNodeState.ALLOCATED]

if isinstance(number_of_nodes, str) and number_of_nodes == "max_avail":
for state in available_states:
allocated_nodes.extend(grouped_nodes[state])

allocated_nodes.extend(grouped_nodes[SlurmNodeState.IDLE])
allocated_nodes.extend(grouped_nodes[SlurmNodeState.COMPLETING])
if len(allocated_nodes) == 0:
raise ValueError(f"No available nodes in group '{group_name}'.")

elif isinstance(number_of_nodes, int):
for state in available_states:
for state in grouped_nodes:
while grouped_nodes[state] and len(allocated_nodes) < number_of_nodes:
allocated_nodes.append(grouped_nodes[state].pop(0))

if len(allocated_nodes) < number_of_nodes:
raise ValueError(
"Requested number of nodes ({}) exceeds the number of " "available nodes in group '{}'.".format(
"Requested number of nodes ({}) exceeds the number of nodes in group '{}'.".format(
number_of_nodes, group_name
)
)
else:
raise ValueError(
f"number of nodes should either be an int or 'max_avail', number of nodes : {number_of_nodes}"
)

return allocated_nodes

Expand Down
16 changes: 9 additions & 7 deletions tests/test_slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ def grouped_nodes() -> dict[SlurmNodeState, list[SlurmNode]]:
SlurmNodeState.COMPLETING: [
SlurmNode(name="node04", partition=partition_name, state=SlurmNodeState.COMPLETING)
],
SlurmNodeState.ALLOCATED: [],
SlurmNodeState.DOWN: [SlurmNode(name="node05", partition=partition_name, state=SlurmNodeState.DOWN)],
SlurmNodeState.ALLOCATED: [SlurmNode(name="node05", partition=partition_name, state=SlurmNodeState.ALLOCATED)],
}

return grouped_nodes
Expand All @@ -178,9 +177,11 @@ def test_allocate_nodes_max_avail(slurm_system: SlurmSystem, grouped_nodes: dict
]
returned_node_names = [node.name for node in available_nodes]

assert set(returned_node_names) == set(expected_node_names), "Should return all available nodes except DOWN nodes"
down_node_name = grouped_nodes[SlurmNodeState.DOWN][0].name
assert down_node_name not in returned_node_names, "DOWN node should not be included"
assert set(returned_node_names) == set(
expected_node_names
), "Should return all available nodes except ALLOCATED nodes"
allocated_node_name = grouped_nodes[SlurmNodeState.ALLOCATED][0].name
assert allocated_node_name not in returned_node_names, "ALLOCATED node should not be included"


def test_allocate_nodes_num_nodes_integers(
Expand All @@ -200,11 +201,12 @@ def test_allocate_nodes_exceeding_limit(
slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]
):
group_name = "group_name"
num_nodes = 5

with pytest.raises(
ValueError,
match=re.escape(
f"Requested number of nodes (4) exceeds the number of available nodes in group '{group_name}'."
f"Requested number of nodes ({num_nodes}) exceeds the number of nodes in group '{group_name}'."
),
):
slurm_system.allocate_nodes(grouped_nodes, 4, group_name)
slurm_system.allocate_nodes(grouped_nodes, num_nodes, group_name)

0 comments on commit a5141a7

Please sign in to comment.