Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT : Reservation with groups #168

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 82 additions & 41 deletions src/cloudai/systems/slurm/slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import getpass
import logging
import re
from pathlib import Path
Expand Down Expand Up @@ -449,10 +448,7 @@ def get_available_nodes_from_group(
self, partition_name: str, group_name: str, number_of_nodes: Union[int, str]
) -> List[SlurmNode]:
"""
Retrieve a specific number of potentially available nodes from a group within a partition.

Prioritizes nodes by their current state, preferring idle nodes first, then completing nodes, and finally
allocated nodes, while excluding nodes that are down and allocated nodes to the current user.
Return the reserved nodes corresponding to the given reservation name.

Args:
partition_name (str): The name of the partition.
Expand All @@ -461,29 +457,35 @@ def get_available_nodes_from_group(
Could also be 'all' to retrieve all the nodes from the group.

Returns:
List[SlurmNode]: Objects that are potentially available for use.

Raises:
ValueError: If the partition or group is not found, or if the requested number of nodes exceeds the
available nodes.
Dict[str, str]: Names of nodes within the specified group and partition and reservation.
"""
self.validate_partition_and_group(partition_name, group_name)
current_user = getpass.getuser()
self.update_node_states()

grouped_nodes = self.group_nodes_by_state(partition_name, group_name, current_user)
grouped_nodes = self.group_nodes_by_state(partition_name, group_name)
allocated_nodes = self.allocate_nodes(grouped_nodes, number_of_nodes, group_name)

# Log allocation details
logging.info(
"Allocated nodes from group '{}' in partition '{}': {}".format(
group_name,
partition_name,
[node.name for node in allocated_nodes],
)
)
def _get_available_nodes(self, partition_name: str, group_name: str):
"""
Return the available nodes sorted into idle and completing.

return allocated_nodes
Args:
partition_name (str): The name of the partition.
group_name (str): The name of the group.

Returns:
Dict[str, str]: Names of nodes within the specified group and partition and reservation.
"""
grouped_nodes = {
SlurmNodeState.IDLE: [],
SlurmNodeState.COMPLETING: [],
}

for node in self.groups[partition_name][group_name]:
if node.state in grouped_nodes:
grouped_nodes[node.state].append(node)

return grouped_nodes

def validate_partition_and_group(self, partition_name: str, group_name: str) -> None:
"""
Expand All @@ -502,9 +504,7 @@ def validate_partition_and_group(self, partition_name: str, group_name: str) ->
if group_name not in self.groups[partition_name]:
raise ValueError(f"Group '{group_name}' not found in partition '{partition_name}'.")

def group_nodes_by_state(
self, partition_name: str, group_name: str, current_user: str
) -> Dict[SlurmNodeState, List[SlurmNode]]:
def group_nodes_by_state(self, partition_name: str, group_name: str) -> Dict[SlurmNodeState, List[SlurmNode]]:
"""
Group nodes by their states, excluding nodes allocated to the current user.

Expand All @@ -516,17 +516,25 @@ def group_nodes_by_state(
Returns:
Dict[SlurmNodeState, List[SlurmNode]]: A dictionary grouping nodes by their state.
"""
grouped_nodes = {
SlurmNodeState.IDLE: [],
SlurmNodeState.COMPLETING: [],
SlurmNodeState.ALLOCATED: [],
}

if "reservation" in self.extra_srun_args:
reservation_key = "--reservation "
reservation_name = self.extra_srun_args.split(reservation_key, 1)[1].split(" ", 1)[0]
reserved_nodes = self.get_reservation(reservation_name)
grouped_nodes = {
SlurmNodeState.RESERVED: [],
}
for node in self.groups[partition_name][group_name]:
if node.state in grouped_nodes and node.name in reserved_nodes:
grouped_nodes[node.state].append(node)
else:
grouped_nodes = {
SlurmNodeState.IDLE: [],
SlurmNodeState.COMPLETING: [],
SlurmNodeState.ALLOCATED: [],
}

for node in self.groups[partition_name][group_name]:
if node.state in grouped_nodes:
# Exclude nodes allocated to the current user
if node.state == SlurmNodeState.ALLOCATED and node.user == current_user:
continue
for node in self.groups[partition_name][group_name]:
if node.state in grouped_nodes:
grouped_nodes[node.state].append(node)

Expand All @@ -552,26 +560,27 @@ def allocate_nodes(
"""
# Allocate nodes based on priority: idle, then completing, then allocated
allocated_nodes = []
available_states = [SlurmNodeState.IDLE, SlurmNodeState.COMPLETING, SlurmNodeState.ALLOCATED]

if isinstance(number_of_nodes, str) and number_of_nodes == "max_avail":
for state in available_states:
allocated_nodes.extend(grouped_nodes[state])

allocated_nodes.extend(grouped_nodes[SlurmNodeState.IDLE])
allocated_nodes.extend(grouped_nodes[SlurmNodeState.COMPLETING])
if len(allocated_nodes) == 0:
raise ValueError(f"No available nodes in group '{group_name}'.")

elif isinstance(number_of_nodes, int):
for state in available_states:
for state in grouped_nodes:
while grouped_nodes[state] and len(allocated_nodes) < number_of_nodes:
allocated_nodes.append(grouped_nodes[state].pop(0))

if len(allocated_nodes) < number_of_nodes:
raise ValueError(
"Requested number of nodes ({}) exceeds the number of " "available nodes in group '{}'.".format(
"Requested number of nodes ({}) exceeds the number of nodes in group '{}'.".format(
number_of_nodes, group_name
)
)
else:
raise ValueError(
f"number of nodes should either be an int or 'max_avail', number of nodes : {number_of_nodes}"
)

return allocated_nodes

Expand Down Expand Up @@ -629,6 +638,16 @@ def get_sinfo(self) -> str:
sinfo_output, _ = self.fetch_command_output("sinfo")
return sinfo_output

def get_reservation(self, reservation_name: str) -> str:
"""
Fetch the output from the 'scontrol show reservation' command.

Returns
str: The stdout from the 'scontrol show reservation' command execution.
"""
reservation_output, _ = self.fetch_command_output("scontrol show reservation")
return reservation_output

def fetch_command_output(self, command: str) -> Tuple[str, str]:
"""
Execute a system command and return its output.
Expand Down Expand Up @@ -705,6 +724,28 @@ def parse_sinfo_output(self, sinfo_output: str, node_user_map: Dict[str, str]) -
node.user = node_user_map.get(node_name, "N/A")
break

def parse_reservation_output(self, reservation_output: str, reservation_name: str) -> List[str]:
"""
Parse the output from the 'scontrol show reservation' command to get the nodes of a specific reservation.

The expected format of scontrol show reservation is lines of 'ReservationName=... /n Nodes=...'.

Args:
reservation_output (str): The raw output from the scontrol show reservation command.
reservation_name (str) : The name of the reservation the user wants to use.

Returns:
List[str]: A list of the nodes related to the reservation.
"""
for reservation in reservation_output.split("ReservationName"):
if reservation_name in reservation:
nodes = reservation.split("Nodes=")[1].split(" ")[0]
node_list = self.parse_node_list(nodes)
return node_list
raise ValueError(
'wrong reservation specified \n. Reservation should be in the form "--reservation reservation_name"'
)

def convert_state_to_enum(self, state_str: str) -> SlurmNodeState:
"""
Convert a Slurm node state string to its corresponding enum member.
Expand Down
16 changes: 9 additions & 7 deletions tests/test_slurm_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,7 @@ def grouped_nodes() -> dict[SlurmNodeState, list[SlurmNode]]:
SlurmNodeState.COMPLETING: [
SlurmNode(name="node04", partition=partition_name, state=SlurmNodeState.COMPLETING)
],
SlurmNodeState.ALLOCATED: [],
SlurmNodeState.DOWN: [SlurmNode(name="node05", partition=partition_name, state=SlurmNodeState.DOWN)],
SlurmNodeState.ALLOCATED: [SlurmNode(name="node05", partition=partition_name, state=SlurmNodeState.ALLOCATED)],
}

return grouped_nodes
Expand All @@ -178,9 +177,11 @@ def test_allocate_nodes_max_avail(slurm_system: SlurmSystem, grouped_nodes: dict
]
returned_node_names = [node.name for node in available_nodes]

assert set(returned_node_names) == set(expected_node_names), "Should return all available nodes except DOWN nodes"
down_node_name = grouped_nodes[SlurmNodeState.DOWN][0].name
assert down_node_name not in returned_node_names, "DOWN node should not be included"
assert set(returned_node_names) == set(
expected_node_names
), "Should return all available nodes except ALLOCATED nodes"
allocated_node_name = grouped_nodes[SlurmNodeState.ALLOCATED][0].name
assert allocated_node_name not in returned_node_names, "ALLOCATED node should not be included"


def test_allocate_nodes_num_nodes_integers(
Expand All @@ -200,11 +201,12 @@ def test_allocate_nodes_exceeding_limit(
slurm_system: SlurmSystem, grouped_nodes: dict[SlurmNodeState, list[SlurmNode]]
):
group_name = "group_name"
num_nodes = 5

with pytest.raises(
ValueError,
match=re.escape(
f"Requested number of nodes (4) exceeds the number of available nodes in group '{group_name}'."
f"Requested number of nodes ({num_nodes}) exceeds the number of nodes in group '{group_name}'."
),
):
slurm_system.allocate_nodes(grouped_nodes, 4, group_name)
slurm_system.allocate_nodes(grouped_nodes, num_nodes, group_name)
Loading