From c646a421048cef76f8df4142a8f6871cff7b53d1 Mon Sep 17 00:00:00 2001 From: jeffnvidia Date: Mon, 5 Aug 2024 16:01:54 +0300 Subject: [PATCH 1/6] remove allocated nodes from slurm_system --- src/cloudai/systems/slurm/slurm_system.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index 203c8d975..8598daa5f 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -519,16 +519,11 @@ def group_nodes_by_state( grouped_nodes = { SlurmNodeState.IDLE: [], SlurmNodeState.COMPLETING: [], - SlurmNodeState.ALLOCATED: [], } for node in self.groups[partition_name][group_name]: if node.state in grouped_nodes: - # Exclude nodes allocated to the current user - if node.state == SlurmNodeState.ALLOCATED and node.user == current_user: - continue - if node.state in grouped_nodes: - grouped_nodes[node.state].append(node) + grouped_nodes[node.state].append(node) return grouped_nodes @@ -553,6 +548,9 @@ def allocate_nodes( # Allocate nodes based on priority: idle, then completing, then allocated allocated_nodes = [] available_states = [SlurmNodeState.IDLE, SlurmNodeState.COMPLETING, SlurmNodeState.ALLOCATED] + for state in grouped_nodes: + while grouped_nodes[state] and len(allocated_nodes) < number_of_nodes: + allocated_nodes.append(grouped_nodes[state].pop(0)) if isinstance(number_of_nodes, str) and number_of_nodes == "max_avail": for state in available_states: From 6330d7386cf5e41ad2c89c35034bf17d859a863b Mon Sep 17 00:00:00 2001 From: jeffnvidia Date: Tue, 6 Aug 2024 13:00:49 +0300 Subject: [PATCH 2/6] remove useless package --- src/cloudai/systems/slurm/slurm_system.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index 8598daa5f..86805a987 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import getpass import logging import re from pathlib import Path From 9c44cc19a3523ff897cfc40d0dbb5493d3caad0e Mon Sep 17 00:00:00 2001 From: jeffnvidia Date: Wed, 18 Sep 2024 17:27:33 +0300 Subject: [PATCH 3/6] pending jobs no matter what --- src/cloudai/systems/slurm/slurm_system.py | 26 ++++++++++------------- tests/test_slurm_system.py | 16 ++++++++------ 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index 86805a987..f9e5473ed 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -467,10 +467,9 @@ def get_available_nodes_from_group( available nodes. """ self.validate_partition_and_group(partition_name, group_name) - current_user = getpass.getuser() self.update_node_states() - grouped_nodes = self.group_nodes_by_state(partition_name, group_name, current_user) + grouped_nodes = self.group_nodes_by_state(partition_name, group_name) allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, group_name) # Log allocation details @@ -501,9 +500,7 @@ def validate_partition_and_group(self, partition_name: str, group_name: str) -> if group_name not in self.groups[partition_name]: raise ValueError(f"Group '{group_name}' not found in partition '{partition_name}'.") - def group_nodes_by_state( - self, partition_name: str, group_name: str, current_user: str - ) -> Dict[SlurmNodeState, List[SlurmNode]]: + def group_nodes_by_state(self, partition_name: str, group_name: str) -> Dict[SlurmNodeState, List[SlurmNode]]: """ Group nodes by their states, excluding nodes allocated to the current user. @@ -518,6 +515,7 @@ def group_nodes_by_state( grouped_nodes = { SlurmNodeState.IDLE: [], SlurmNodeState.COMPLETING: [], + SlurmNodeState.ALLOCATED: [], } for node in self.groups[partition_name][group_name]: @@ -546,29 +544,27 @@ def allocate_nodes( """ # Allocate nodes based on priority: idle, then completing, then allocated allocated_nodes = [] - available_states = [SlurmNodeState.IDLE, SlurmNodeState.COMPLETING, SlurmNodeState.ALLOCATED] - for state in grouped_nodes: - while grouped_nodes[state] and len(allocated_nodes) < number_of_nodes: - allocated_nodes.append(grouped_nodes[state].pop(0)) - if isinstance(number_of_nodes, str) and number_of_nodes == "max_avail": - for state in available_states: - allocated_nodes.extend(grouped_nodes[state]) - + allocated_nodes.extend(grouped_nodes[SlurmNodeState.IDLE]) + allocated_nodes.extend(grouped_nodes[SlurmNodeState.COMPLETING]) if len(allocated_nodes) == 0: raise ValueError(f"No available nodes in group '{group_name}'.") elif isinstance(number_of_nodes, int): - for state in available_states: + for state in grouped_nodes: while grouped_nodes[state] and len(allocated_nodes) < number_of_nodes: allocated_nodes.append(grouped_nodes[state].pop(0)) if len(allocated_nodes) < number_of_nodes: raise ValueError( - "Requested number of nodes ({}) exceeds the number of " "available nodes in group '{}'.".format( + "Requested number of nodes ({}) exceeds the number of nodes in group '{}'.".format( number_of_nodes, group_name ) ) + else: + raise ValueError( + f"number of nodes should either be an int or 'max_avail', number of nodes : {number_of_nodes}" + ) return allocated_nodes diff --git a/tests/test_slurm_system.py b/tests/test_slurm_system.py index 39a6188d2..bf8a790c6 100644 --- a/tests/test_slurm_system.py +++ b/tests/test_slurm_system.py @@ -160,8 +160,7 @@ def grouped_nodes() -> dict[SlurmNodeState, list[SlurmNode]]: SlurmNodeState.COMPLETING: [ SlurmNode(name="node04", partition=partition_name, state=SlurmNodeState.COMPLETING) ], - SlurmNodeState.ALLOCATED: [], - SlurmNodeState.DOWN: [SlurmNode(name="node05", partition=partition_name, state=SlurmNodeState.DOWN)], + SlurmNodeState.ALLOCATED: [SlurmNode(name="node05", partition=partition_name, state=SlurmNodeState.ALLOCATED)], } return grouped_nodes @@ -178,9 +177,11 @@ def test_allocate_nodes_max_avail(slurm_system: SlurmSystem, grouped_nodes: dict ] returned_node_names = [node.name for node in available_nodes] - assert set(returned_node_names) == set(expected_node_names), "Should return all available nodes except DOWN nodes" - down_node_name = grouped_nodes[SlurmNodeState.DOWN][0].name - assert down_node_name not in returned_node_names, "DOWN node should not be included" + assert set(returned_node_names) == set( + expected_node_names + ), "Should return all available nodes except ALLOCATED nodes" + allocated_node_name = grouped_nodes[SlurmNodeState.ALLOCATED][0].name + assert allocated_node_name not in returned_node_names, "ALLOCATED node should not be included" def test_allocate_nodes_num_nodes_integers( @@ -200,11 +201,12 @@ def test_allocate_nodes_exceeding_limit( slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]] ): group_name = "group_name" + num_nodes = 5 with pytest.raises( ValueError, match=re.escape( - f"Requested number of nodes (4) exceeds the number of available nodes in group '{group_name}'." + f"Requested number of nodes ({num_nodes}) exceeds the number of nodes in group '{group_name}'." ), ): - slurm_system.allocate_nodes(grouped_nodes, 4, group_name) + slurm_system.allocate_nodes(grouped_nodes, num_nodes, group_name) From d14aaf32a3c79e9655145d8085ca92a128d4a21a Mon Sep 17 00:00:00 2001 From: jeffnvidia Date: Mon, 5 Aug 2024 18:27:14 +0300 Subject: [PATCH 4/6] add fetching for reservations add parsing output --- src/cloudai/systems/slurm/slurm_system.py | 63 ++++++++++++++++++++--- 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index f9e5473ed..6b6728154 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -512,15 +512,27 @@ def group_nodes_by_state(self, partition_name: str, group_name: str) -> Dict[Slu Returns: Dict[SlurmNodeState, List[SlurmNode]]: A dictionary grouping nodes by their state. """ - grouped_nodes = { - SlurmNodeState.IDLE: [], - SlurmNodeState.COMPLETING: [], - SlurmNodeState.ALLOCATED: [], - } + + if "reservation" in self.extra_srun_args: + reservation_key = "--reservation " + reservation_name = self.extra_srun_args.split(reservation_key, 1)[1].split(" ", 1)[0] + reserved_nodes = self.get_reservation(reservation_name) + grouped_nodes = { + SlurmNodeState.RESERVED: [], + } + for node in self.groups[partition_name][group_name]: + if node.state in grouped_nodes and node.name in reserved_nodes: + grouped_nodes[node.state].append(node) + else: + grouped_nodes = { + SlurmNodeState.IDLE: [], + SlurmNodeState.COMPLETING: [], + SlurmNodeState.ALLOCATED: [], + } - for node in self.groups[partition_name][group_name]: - if node.state in grouped_nodes: - grouped_nodes[node.state].append(node) + for node in self.groups[partition_name][group_name]: + if node.state in grouped_nodes: + grouped_nodes[node.state].append(node) return grouped_nodes @@ -621,6 +633,17 @@ def get_sinfo(self) -> str: """ sinfo_output, _ = self.fetch_command_output("sinfo") return sinfo_output + + def get_reservation(self, reservation_name) -> str: + """ + Fetch the output from the 'scontrol show reservation' command. + + Returns + str: The stdout from the 'scontrol show reservation' command execution. + """ + reservation_output, _ = self.fetch_command_output("scontrol show reservation") + reserved_nodes = self.parse_reservation_output(reservation_output, reservation_name) + return reserved_nodes def fetch_command_output(self, command: str) -> Tuple[str, str]: """ @@ -697,6 +720,30 @@ def parse_sinfo_output(self, sinfo_output: str, node_user_map: Dict[str, str]) - node.state = state_enum node.user = node_user_map.get(node_name, "N/A") break + + def parse_reservation_output(self, reservation_output: str, reservation_name) -> Dict[str, str]: + """ + Parse the output from the 'squeue' command to map nodes to users. + + The expected format of scontrol show reservation is lines of 'node_spec|user', where node_spec can include comma-separated + node names or ranges. + + Args: + scontrol show reservation (str): The raw output from the squeue command. + + Returns: + Dict[str, str]: A dictionary mapping node names to usernames. + """ + print("reservation output : ", reservation_output) + print("res : ", reservation_output.split("ReservationName")) + for reservation in reservation_output.split("ReservationName"): + if reservation_name in reservation: + nodes = reservation.split("Nodes=")[1].split(" ")[0] + node_list = self.parse_node_list(nodes) + print("nodes :", nodes) + print("node_list : ", node_list) + + return node_list def convert_state_to_enum(self, state_str: str) -> SlurmNodeState: """ From b4b84b70824d6c57f265b135f5696370ca634fea Mon Sep 17 00:00:00 2001 From: jeffnvidia Date: Tue, 6 Aug 2024 11:52:41 +0300 Subject: [PATCH 5/6] reformat --- src/cloudai/systems/slurm/slurm_system.py | 26 ++++++++++------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index 6b6728154..93fc564a9 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -452,6 +452,7 @@ def get_available_nodes_from_group( Prioritizes nodes by their current state, preferring idle nodes first, then completing nodes, and finally allocated nodes, while excluding nodes that are down and allocated nodes to the current user. + If a reservation was queried, then cloudAI will take from the reserved nodes according to the reservation name. Args: partition_name (str): The name of the partition. @@ -633,8 +634,8 @@ def get_sinfo(self) -> str: """ sinfo_output, _ = self.fetch_command_output("sinfo") return sinfo_output - - def get_reservation(self, reservation_name) -> str: + + def get_reservation(self, reservation_name: str) -> str: """ Fetch the output from the 'scontrol show reservation' command. @@ -642,8 +643,7 @@ def get_reservation(self, reservation_name) -> str: str: The stdout from the 'scontrol show reservation' command execution. """ reservation_output, _ = self.fetch_command_output("scontrol show reservation") - reserved_nodes = self.parse_reservation_output(reservation_output, reservation_name) - return reserved_nodes + return reservation_output def fetch_command_output(self, command: str) -> Tuple[str, str]: """ @@ -720,28 +720,24 @@ def parse_sinfo_output(self, sinfo_output: str, node_user_map: Dict[str, str]) - node.state = state_enum node.user = node_user_map.get(node_name, "N/A") break - - def parse_reservation_output(self, reservation_output: str, reservation_name) -> Dict[str, str]: + + def parse_reservation_output(self, reservation_output: str, reservation_name: str) -> List[str]: """ - Parse the output from the 'squeue' command to map nodes to users. + Parse the output from the 'scontrol show reservation' command to get the nodes of a specific reservation. - The expected format of scontrol show reservation is lines of 'node_spec|user', where node_spec can include comma-separated - node names or ranges. + The expected format of scontrol show reservation is lines of 'ReservationName=... /n Nodes=...'. Args: - scontrol show reservation (str): The raw output from the squeue command. + reservation_output (str): The raw output from the scontrol show reservation command. + reservation_name (str) : The name of the reservation the user wants to use. Returns: - Dict[str, str]: A dictionary mapping node names to usernames. + List[str]: A list of the nodes related to the reservation. """ - print("reservation output : ", reservation_output) - print("res : ", reservation_output.split("ReservationName")) for reservation in reservation_output.split("ReservationName"): if reservation_name in reservation: nodes = reservation.split("Nodes=")[1].split(" ")[0] node_list = self.parse_node_list(nodes) - print("nodes :", nodes) - print("node_list : ", node_list) return node_list From 7f0a794aa7653aac007f7de74d422e97b218c22b Mon Sep 17 00:00:00 2001 From: jeffnvidia Date: Tue, 6 Aug 2024 13:46:30 +0300 Subject: [PATCH 6/6] simplify the code --- src/cloudai/systems/slurm/slurm_system.py | 47 +++++++++++++---------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/src/cloudai/systems/slurm/slurm_system.py b/src/cloudai/systems/slurm/slurm_system.py index 93fc564a9..a7269cd02 100644 --- a/src/cloudai/systems/slurm/slurm_system.py +++ b/src/cloudai/systems/slurm/slurm_system.py @@ -448,11 +448,7 @@ def get_available_nodes_from_group( self, partition_name: str, group_name: str, number_of_nodes: Union[int, str] ) -> List[SlurmNode]: """ - Retrieve a specific number of potentially available nodes from a group within a partition. - - Prioritizes nodes by their current state, preferring idle nodes first, then completing nodes, and finally - allocated nodes, while excluding nodes that are down and allocated nodes to the current user. - If a reservation was queried, then cloudAI will take from the reserved nodes according to the reservation name. + Return the reserved nodes corresponding to the given reservation name. Args: partition_name (str): The name of the partition. @@ -461,11 +457,7 @@ def get_available_nodes_from_group( Could also be 'all' to retrieve all the nodes from the group. Returns: - List[SlurmNode]: Objects that are potentially available for use. - - Raises: - ValueError: If the partition or group is not found, or if the requested number of nodes exceeds the - available nodes. + Dict[str, str]: Names of nodes within the specified group and partition and reservation. """ self.validate_partition_and_group(partition_name, group_name) self.update_node_states() @@ -473,16 +465,27 @@ def get_available_nodes_from_group( grouped_nodes = self.group_nodes_by_state(partition_name, group_name) allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, group_name) - # Log allocation details - logging.info( - "Allocated nodes from group '{}' in partition '{}': {}".format( - group_name, - partition_name, - [node.name for node in allocated_nodes], - ) - ) + def _get_available_nodes(self, partition_name: str, group_name: str): + """ + Return the available nodes sorted into idle and completing. - return allocated_nodes + Args: + partition_name (str): The name of the partition. + group_name (str): The name of the group. + + Returns: + Dict[str, str]: Names of nodes within the specified group and partition and reservation. + """ + grouped_nodes = { + SlurmNodeState.IDLE: [], + SlurmNodeState.COMPLETING: [], + } + + for node in self.groups[partition_name][group_name]: + if node.state in grouped_nodes: + grouped_nodes[node.state].append(node) + + return grouped_nodes def validate_partition_and_group(self, partition_name: str, group_name: str) -> None: """ @@ -738,8 +741,10 @@ def parse_reservation_output(self, reservation_output: str, reservation_name: st if reservation_name in reservation: nodes = reservation.split("Nodes=")[1].split(" ")[0] node_list = self.parse_node_list(nodes) - - return node_list + return node_list + raise ValueError( + 'wrong reservation specified \n. Reservation should be in the form "--reservation reservation_name"' + ) def convert_state_to_enum(self, state_str: str) -> SlurmNodeState: """