Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safety valve to seperate the DSE execution with benchmarking execution #334

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d945201
agent environment intergation with runner
srivatsankrishnan Jan 11, 2025
1be4398
more fixes
srivatsankrishnan Jan 11, 2025
4df4ab9
Remove Farma gym dependies for more control over types + other fixes …
srivatsankrishnan Jan 11, 2025
e6905f7
vulture fix
srivatsankrishnan Jan 11, 2025
177694f
remove farma gym dependencies + update the pytest for cloudai_gym
srivatsankrishnan Jan 11, 2025
b10dbfb
remove farma gym from pyproject
srivatsankrishnan Jan 11, 2025
15be693
fix the copyright headers checks
srivatsankrishnan Jan 11, 2025
d5d1e14
use iterators to avoid indexing errors.
srivatsankrishnan Jan 11, 2025
96ab055
helper method for manipulating the TestRun object directly
srivatsankrishnan Jan 11, 2025
8cab450
Modifcations for storing dse results
srivatsankrishnan Jan 11, 2025
0acf43e
add dse_iteration to TestRun object
srivatsankrishnan Jan 11, 2025
55c203a
safety valve to seperate the dse execution with benchmarking execution
srivatsankrishnan Jan 12, 2025
be56e31
add copyright headers
srivatsankrishnan Jan 12, 2025
71a12c4
remove
srivatsankrishnan Jan 12, 2025
669bd8d
remove empty line
srivatsankrishnan Jan 12, 2025
b055d4d
Merge branch 'main' into safety-valve
srivatsankrishnan Jan 12, 2025
02bb3e0
Removing the agent's configuration and instead query from the environ…
srivatsankrishnan Jan 14, 2025
382e424
Fixed typo in NeMoRunTestDefinition docstr (#336)
itamar-rauch Jan 13, 2025
e16873d
Mount NCCL_TOPO_FILE in NCCL test (#337)
TaekyungHeo Jan 14, 2025
fd3b6c9
Fix the testing code
srivatsankrishnan Jan 14, 2025
9fadfce
fix the configurator structure
srivatsankrishnan Jan 14, 2025
4cb3f02
update the test_agent
srivatsankrishnan Jan 15, 2025
996d6fd
Merge branch 'main' into safety-valve
srivatsankrishnan Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
name = "cloudai"
dynamic = ["version"]
dependencies = [
"gymnasium @ git+https://github.com/Farama-Foundation/Gymnasium/@v1.0.0a2",
"bokeh==3.4.1",
"pandas==2.2.1",
"tbparse==0.0.8",
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
gymnasium @ git+https://github.com/Farama-Foundation/Gymnasium/@v1.0.0a2
bokeh==3.4.1
pandas==2.2.1
tbparse==0.0.8
Expand Down
23 changes: 12 additions & 11 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,19 @@ def __init__(self, mode: str, system: System, test_scenario: TestScenario):

def setup_output_directory(self, base_output_path: Path) -> Path:
"""
Set up and return the output directory path for the runner instance.
Set up and return the base output directory path for the runner instance.

Args:
base_output_path (Path): The base output directory.

Returns:
Path: The path to the output directory.
Path: The path to the base output directory.
"""
if not base_output_path.exists():
base_output_path.mkdir()
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
output_subpath = base_output_path / f"{self.test_scenario.name}_{current_time}"
output_subpath.mkdir()
return output_subpath
base_output__path_with_name = base_output_path / self.test_scenario.name
if not base_output__path_with_name.exists():
base_output__path_with_name = base_output__path_with_name
base_output__path_with_name.mkdir(parents=True, exist_ok=True)
return base_output__path_with_name

def register_signal_handlers(self):
"""Register signal handlers for handling termination-related signals."""
Expand Down Expand Up @@ -264,10 +263,12 @@ def get_job_output_path(self, tr: TestRun) -> Path:
job_output_path = Path() # avoid reportPossiblyUnboundVariable from pyright

try:
test_output_path = self.output_path / tr.name
test_output_path.mkdir()
iteration_path = self.output_path / f"iteration_{tr.dse_iteration}"
iteration_path.mkdir(parents=True, exist_ok=True)
test_output_path = iteration_path / f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" / tr.name
test_output_path.mkdir(parents=True, exist_ok=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not agree with the proposed directory structure. We need to discuss this with all core members: Srinivas, Srivatsan, and Andrei.

My proposal

  • For non-DSE jobs -> Keep the current directory structure.
results
  |--test_scenario.name
      |--Test.1
            |---0
                  |--sbatch
                  |--run.sh
  • For DSE jobs -> Follow Srivatsan's proposal, but switch the levels between iteration and test scenario name.
results
|--test_scenario.name
  |--iteration_1
      |--Test.1
            |---0
                  |--sbatch
                  |--run.sh
  |--iteration_2
      |--Test.1
            |---0
                  |--sbatch
                  |--run.sh

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that we keep the structure the same for both regular and dse cases:

results
|--test_scenario.name_<datetime>
   |--Test.1-dseid.X
      |--0
         |--sbatch
         |--run.sh

job_output_path = test_output_path / str(tr.current_iteration)
job_output_path.mkdir()
job_output_path.mkdir(parents=True, exist_ok=True)
except PermissionError as e:
raise PermissionError(f"Cannot create directory {job_output_path}: {e}") from e

Expand Down
102 changes: 102 additions & 0 deletions src/cloudai/_core/configurator/base_gym.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple


class BaseGym(ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge BaseGym and CloudAIGym into single object? Why do we need both?

"""Base class for CloudAI Gym environments."""

def __init__(self):
"""Initialize the CloudAIGym environment."""
self.action_space = self.define_action_space()
self.observation_space = self.define_observation_space()

@abstractmethod
def define_action_space(self) -> Dict[str, Any]:
"""
Define the action space for the environment.

Returns:
Dict[str, Any]: The action space.
"""
pass

@abstractmethod
def define_observation_space(self) -> list:
"""
Define the observation space for the environment.

Returns:
list: The observation space.
"""
pass

@abstractmethod
def reset(
self, seed: Optional[int] = None, _options: Optional[dict[str, Any]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename _options to options.

) -> Tuple[list, dict[str, Any]]:
"""
Reset the environment.

Args:
seed (Optional[int]): Seed for the environment's random number generator.
options (Optional[dict]): Additional options for reset.

Returns:
Tuple: A tuple containing:
- observation (list): Initial observation.
- info (dict): Additional info for debugging.
"""
pass

@abstractmethod
def step(self, action: Any) -> Tuple[list, float, bool, dict]:
"""
Execute one step in the environment.

Args:
action (Any): Action chosen by the agent.

Returns:
Tuple: A tuple containing:
- observation (list): Updated system state.
- reward (float): Reward for the action taken.
- done (bool): Whether the episode is done.
- info (dict): Additional info for debugging.
"""
pass

@abstractmethod
def render(self, mode: str = "human"):
"""
Render the current state of the environment.

Args:
mode (str): The mode to render with. Default is "human".
"""
pass

@abstractmethod
def seed(self, seed: Optional[int] = None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why seed is optional here? Can user avoid it if there is no seed?
  2. In which case user would need to set a seed using this function and not reset(seed=X, _options=None)?

"""
Set the seed for the environment's random number generator.

Args:
seed (Optional[int]): Seed for the environment's random number generator.
"""
pass
87 changes: 46 additions & 41 deletions src/cloudai/_core/configurator/cloudai_gym.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Any, Optional, Tuple
from typing import Any, Dict, Optional, Tuple

import gymnasium as gym
import numpy as np
from gymnasium import spaces

from cloudai import System
from cloudai._core.configurator.base_gym import BaseGym
from cloudai._core.test_scenario import TestRun, TestScenario
from cloudai.systems import SlurmSystem


class CloudAIGymEnv(gym.Env):
class CloudAIGymEnv(BaseGym):
"""
Custom Gym environment for CloudAI integration.

Uses the TestRun object and actual runner methods to execute jobs.
"""

def __init__(self, test_run: TestRun, system: SlurmSystem, test_scenario: TestScenario):
def __init__(self, test_run: TestRun, system: System, test_scenario: TestScenario):
"""
Initialize the Gym environment using the TestRun object.

Expand All @@ -40,47 +39,40 @@ def __init__(self, test_run: TestRun, system: SlurmSystem, test_scenario: TestSc
system (SlurmSystem): The system configuration for running the tests.
test_scenario (TestScenario): The test scenario configuration.
"""
super(CloudAIGymEnv, self).__init__()
self.test_run = test_run
self.system = system
self.test_scenario = test_scenario
super().__init__()

self.action_space = self.extract_action_space(self.test_run.test.cmd_args)
self.observation_space = self.define_observation_space()

def extract_action_space(self, cmd_args: dict) -> spaces.Dict:
def define_action_space(self) -> Dict[str, Any]:
"""
Extract the action space from the cmd_args dictionary.

Args:
cmd_args (dict): The command arguments dictionary from the TestRun object.
Define the action space for the environment.

Returns:
spaces.Dict: A dictionary containing the action space variables and their feasible values.
Dict[str, Any]: The action space.
"""
action_space = {}
for key, value in cmd_args.items():
for key, value in self.test_run.test.cmd_args.items():
if isinstance(value, list):
action_space[key] = spaces.Discrete(len(value))
action_space[key] = len(value)
elif isinstance(value, dict):
for sub_key, sub_value in value.items():
if isinstance(sub_value, list):
action_space[f"{key}.{sub_key}"] = spaces.Discrete(len(sub_value))
return spaces.Dict(action_space)
action_space[f"{key}.{sub_key}"] = len(sub_value)
return action_space

def define_observation_space(self) -> spaces.Space:
def define_observation_space(self) -> list:
"""
Define the observation space for the environment.

Returns:
spaces.Space: The observation space.
list: The observation space.
"""
return spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
return [0.0]

def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict[str, Any]] = None,
) -> Tuple[np.ndarray, dict[str, Any]]:
self, seed: Optional[int] = None, _options: Optional[dict[str, Any]] = None
) -> Tuple[list, dict[str, Any]]:
"""
Reset the environment and reinitialize the TestRun.

Expand All @@ -90,37 +82,37 @@ def reset(

Returns:
Tuple: A tuple containing:
- observation (np.ndarray): Initial observation.
- observation (list): Initial observation.
- info (dict): Additional info for debugging.
"""
super().reset(seed=seed, options=options)
if seed is not None:
np.random.seed(seed)
self.test_run.current_iteration = 0
observation = np.array([0.0], dtype=np.float32)
observation = [0.0]
info = {}
return observation, info

def step(self, action: np.ndarray) -> tuple:
def step(self, action: Any) -> Tuple[list, float, bool, dict]:
"""
Execute one step in the environment.

Args:
action (np.ndarray): Action chosen by the agent.
action (Any): Action chosen by the agent.

Returns:
tuple: A tuple containing:
- observation (np.ndarray): Updated system state.
Tuple: A tuple containing:
- observation (list): Updated system state.
- reward (float): Reward for the action taken.
- done (bool): Whether the episode is done.
- info (dict): Additional info for debugging.
"""
observation = self.get_observation(action)
reward = 0.0
reward = self.compute_reward()
done = False
info = {}

return observation, reward, done, info

def render(self, mode="human"):
def render(self, mode: str = "human"):
"""
Render the current state of the TestRun.

Expand All @@ -129,6 +121,16 @@ def render(self, mode="human"):
"""
print(f"Step {self.test_run.current_iteration}: Parameters {self.test_run.test.cmd_args}")

def seed(self, seed: Optional[int] = None):
"""
Set the seed for the environment's random number generator.

Args:
seed (Optional[int]): Seed for the environment's random number generator.
"""
if seed is not None:
np.random.seed(seed)

def compute_reward(self) -> float:
"""
Compute a reward based on the TestRun result.
Expand All @@ -138,12 +140,15 @@ def compute_reward(self) -> float:
"""
return 0.0

def get_observation(self, action) -> np.ndarray:
def get_observation(self, action: Any) -> list:
"""
Get the observation from the TestRun object.

Args:
action (Any): Action taken by the agent.

Returns:
np.ndarray: A scalar value representing the observation.
list: The observation.
"""
obs = action * 0.5
return np.array([obs], dtype=np.float32)
return [obs]
1 change: 1 addition & 0 deletions src/cloudai/_core/test_scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class TestRun:
output_path: Path = Path("")
iterations: int = 1
current_iteration: int = 0
dse_iteration: int = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use existing iterations field?

time_limit: Optional[str] = None
sol: Optional[float] = None
weight: float = 0.0
Expand Down
Loading
Loading