diff --git a/src/cloudai/__init__.py b/src/cloudai/__init__.py index a82083f8..9d3dba60 100644 --- a/src/cloudai/__init__.py +++ b/src/cloudai/__init__.py @@ -18,6 +18,7 @@ from ._core.base_job import BaseJob from ._core.base_runner import BaseRunner from ._core.base_system_parser import BaseSystemParser +from ._core.cloudai_gym import CloudAIGym from ._core.command_gen_strategy import CommandGenStrategy from ._core.exceptions import ( JobIdRetrievalError, @@ -42,6 +43,7 @@ from ._core.test_scenario_parser import TestScenarioParser from ._core.test_template import TestTemplate from ._core.test_template_strategy import TestTemplateStrategy +from .environment.example_environment import ExampleEnv from .installer.kubernetes_installer import KubernetesInstaller from .installer.slurm_installer import SlurmInstaller from .installer.standalone_installer import StandaloneInstaller @@ -239,6 +241,7 @@ Registry().add_test_definition("JaxToolboxNemotron", NemotronTestDefinition) Registry().add_test_definition("SlurmContainer", SlurmContainerTestDefinition) +Registry().add_gym("example_gym", CloudAIGym) __all__ = [ "BaseInstaller", @@ -272,4 +275,6 @@ "TestScenarioParsingError", "TestTemplate", "TestTemplateStrategy", + "ExampleEnv", + "CloudAIGym", ] diff --git a/src/cloudai/_core/cloudai_gym.py b/src/cloudai/_core/cloudai_gym.py new file mode 100644 index 00000000..c0de9cba --- /dev/null +++ b/src/cloudai/_core/cloudai_gym.py @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Any, Dict, Tuple + + +class CloudAIGym(ABC): + """A generic custom Gym environment for CloudAI.""" + + def __init__(self, action_space: Any, observation_space: Any, max_steps: int = 100): + """ + Initialize the environment. + + Args: + action_space: Defines the set of valid actions for the environment. + observation_space: Describes the space of possible observations (states). + max_steps: Maximum number of steps in an episode. + """ + self.action_space: Any = action_space + self.observation_space: Any = observation_space + self.max_steps: int = max_steps + self.state: Any = None + self.current_step: int = 0 + + @abstractmethod + def step(self, action: Any) -> Tuple[Any, float, bool, Dict[str, Any]]: + """Perform one step in the environment.""" + pass + + @abstractmethod + def reset(self) -> Any: + """Reset the environment to its initial state.""" + pass + + @abstractmethod + def render(self) -> None: + """Render the environment.""" + pass + + @abstractmethod + def is_valid_action(self, action: Any) -> bool: + """Contraint checks for the action.""" + pass diff --git a/src/cloudai/_core/registry.py b/src/cloudai/_core/registry.py index 548d8653..05de49d2 100644 --- a/src/cloudai/_core/registry.py +++ b/src/cloudai/_core/registry.py @@ -18,6 +18,7 @@ from .base_installer import BaseInstaller from .base_runner import BaseRunner +from .cloudai_gym import CloudAIGym from .grading_strategy import GradingStrategy from .job_id_retrieval_strategy import JobIdRetrievalStrategy from .job_status_retrieval_strategy import JobStatusRetrievalStrategy @@ -69,6 +70,7 @@ class Registry(metaclass=Singleton): installers_map: Dict[str, Type[BaseInstaller]] = {} systems_map: Dict[str, Type[System]] = {} test_definitions_map: Dict[str, Type[TestDefinition]] = {} + gyms_map: Dict[str, Type[CloudAIGym]] = {} # Add this line def add_runner(self, name: str, value: Type[BaseRunner]) -> None: """ @@ -100,6 +102,36 @@ def update_runner(self, name: str, value: Type[BaseRunner]) -> None: raise ValueError(f"Invalid runner implementation for '{name}', should be subclass of 'BaseRunner'.") self.runners_map[name] = value + def add_gym(self, name: str, value: Type[CloudAIGym]) -> None: # Add this method + """ + Add a new gym implementation mapping. + + Args: + name (str): The name of the gym. + value (Type[CloudAIGym]): The gym implementation. + + Raises: + ValueError: If the gym implementation already exists. + """ + if name in self.gyms_map: + raise ValueError(f"Duplicating implementation for '{name}', use 'update()' for replacement.") + self.update_gym(name, value) + + def update_gym(self, name: str, value: Type[CloudAIGym]) -> None: # Add this method + """ + Create or replace gym implementation mapping. + + Args: + name (str): The name of the gym. + value (Type[CloudAIGym]): The gym implementation. + + Raises: + ValueError: If value is not a subclass of CloudAIGym. + """ + if not issubclass(value, CloudAIGym): + raise ValueError(f"Invalid gym implementation for '{name}', should be subclass of 'CloudAIGym'.") + self.gyms_map[name] = value + def add_strategy( self, strategy_interface: Type[ diff --git a/src/cloudai/environment/__init__.py b/src/cloudai/environment/__init__.py new file mode 100644 index 00000000..23bfc249 --- /dev/null +++ b/src/cloudai/environment/__init__.py @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/src/cloudai/environment/example_environment.py b/src/cloudai/environment/example_environment.py new file mode 100644 index 00000000..286ca6d8 --- /dev/null +++ b/src/cloudai/environment/example_environment.py @@ -0,0 +1,112 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, Tuple + +import numpy as np + +from src.cloudai._core.cloudai_gym import CloudAIGym + + +class ExampleEnv(CloudAIGym): + """A sample environment that extends CloudAIGym.""" + + def __init__(self, max_steps: int = 10): + """ + Initialize the environment. + + Args: + max_steps (int): Maximum number of steps allowed in the environment. + """ + # Define the action and observation structures + action_space = { + "num_cores": (0, 15), # Integer range: [0, 14] + "freq": (0.5, 3.0), # Float range: [0.5, 3.0] + "mem_type": (0, 2), # Integer range: [0, 2] + "mem_size": (0, 64), # Integer range: [0, 64] + } + observation_space = { + "energy": (0.0, 1.0), + "area": (0.0, 1.0), + "latency": (0.0, 1.0), + } + + super().__init__(action_space, observation_space, max_steps) + + # Additional environment attributes + self.ideal = np.array([4, 2.0, 1, 32]) # Ideal values + self.reset() + + def is_valid_action(self, action: dict) -> bool: + """ + Validate whether the action falls within the defined bounds. + + Args: + action (dict): A dictionary representing the action. + + Returns: + bool: True if valid, False otherwise. + + """ + for key, value in action.items(): + if key not in self.action_space: + return False + lower, upper = self.action_space[key] + if not (lower <= value <= upper): + return False + return True + + def step(self, action: dict) -> Tuple[np.ndarray, float, bool, Dict]: + """Execute a step in the environment.""" + if not self.is_valid_action(action): + raise ValueError(f"Invalid action: {action}") + + # Unpack action components + num_cores = action["num_cores"] + freq = action["freq"] + mem_type = action["mem_type"] + mem_size = action["mem_size"] + + # Update state + self.energy += num_cores * 1 + freq * 2 + mem_size * 3 + self.area += num_cores * 2 + freq * 3 + mem_size * 1 + self.latency += num_cores * 3 + freq * 3 + mem_size * 1 + + # Update observation + self.observation = np.array([self.energy, self.area, self.latency]) + + # Calculate reward as negative L2-norm to the ideal values + action_array = np.array([num_cores, freq, mem_type, mem_size]) + reward: float = -float(np.linalg.norm(action_array - self.ideal)) + + # Check if the episode is done + self.current_step += 1 + done = self.current_step >= self.max_steps + + return self.observation, reward, done, {} + + def reset(self) -> Tuple[np.ndarray, Dict]: + """Reset the environment to its initial state.""" + self.energy = 0 + self.area = 0 + self.latency = 0 + self.current_step = 0 + self.observation = np.array([self.energy, self.area, self.latency]) + return self.observation, {} + + def render(self) -> None: + """Render the current state of the environment.""" + print(f"Energy: {self.energy}, Area: {self.area}, Latency: {self.latency}") diff --git a/tests/test_example_env.py b/tests/test_example_env.py new file mode 100644 index 00000000..2f5af02e --- /dev/null +++ b/tests/test_example_env.py @@ -0,0 +1,84 @@ +import numpy as np +import pytest + +from src.cloudai.environment.example_environment import ExampleEnv + + +# Fixture to initialize the environment +@pytest.fixture +def env(): + """ + Fixture to provide a fresh instance of ParameterConvergeEnv. + Automatically resets the environment before each test. + """ + environment = ExampleEnv(max_steps=10) + environment.reset() + return environment + + +def test_environment_initialization(env): + """ + Test that the environment initializes correctly. + """ + observation, info = env.reset() + assert isinstance(observation, np.ndarray), "Observation should be a numpy array." + assert observation.shape == (3,), "Observation should have three elements." + assert env.current_step == 0, "Step count should initialize to 0." + assert not env.done, "Environment should not be done immediately after reset." + + +@pytest.mark.parametrize( + "action, expected_done", + [ + ({"num_cores": 4, "freq": 2.0, "mem_type": 1, "mem_size": 32}, False), + ({"num_cores": 10, "freq": 1.5, "mem_type": 0, "mem_size": 16}, False), + ], +) +def test_step_execution(env, action, expected_done): + """ + Test that a valid action updates the environment correctly. + """ + observation, reward, done, truncated, info = env.step(action) + assert isinstance(observation, np.ndarray), "Observation should be a numpy array." + assert isinstance(reward, float), "Reward should be a float." + assert reward < 0, "Reward should be negative for L2-norm distance." + assert done == expected_done, "Done flag should match expected behavior." + assert not truncated, "Truncated should always be False." + + +@pytest.mark.parametrize( + "action", + [ + {"num_cores": 20, "freq": 5.0, "mem_type": 5, "mem_size": 100}, # All invalid + {"num_cores": 15, "freq": 0.0, "mem_type": -1, "mem_size": 65}, # Boundary invalid + ], +) +def test_invalid_action(env, action): + """ + Test that invalid actions raise a ValueError. + """ + with pytest.raises(ValueError): + env.step(action) + + +def test_episode_completion(env): + """ + Test that the environment ends the episode after max_steps. + """ + action = {"num_cores": 4, "freq": 2.0, "mem_type": 1, "mem_size": 32} + done = False + for _ in range(10): # Perform max_steps + observation, reward, done, truncated, info = env.step(action) + assert done, "Environment should be done after reaching max_steps." + observation, info = env.reset() + assert env.current_step == 0, "Step count should reset to 0 after reset." + assert not env.done, "Environment should not be done after reset." + + +def test_reset_behavior(env): + """ + Test that resetting the environment restores it to the initial state. + """ + observation, info = env.reset() + assert np.allclose(observation, [0, 0, 0]), "Initial observation should be zeros." + assert env.current_step == 0, "Step count should be reset to 0."