Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Native environment in CloudAI #312

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/cloudai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from ._core.base_job import BaseJob
from ._core.base_runner import BaseRunner
from ._core.base_system_parser import BaseSystemParser
from ._core.cloudai_gym import CloudAIGym
from ._core.command_gen_strategy import CommandGenStrategy
from ._core.exceptions import (
JobIdRetrievalError,
Expand All @@ -42,6 +43,7 @@
from ._core.test_scenario_parser import TestScenarioParser
from ._core.test_template import TestTemplate
from ._core.test_template_strategy import TestTemplateStrategy
from .environment.example_environment import ExampleEnv
from .installer.kubernetes_installer import KubernetesInstaller
from .installer.slurm_installer import SlurmInstaller
from .installer.standalone_installer import StandaloneInstaller
Expand Down Expand Up @@ -239,6 +241,7 @@
Registry().add_test_definition("JaxToolboxNemotron", NemotronTestDefinition)
Registry().add_test_definition("SlurmContainer", SlurmContainerTestDefinition)

Registry().add_gym("example_gym", CloudAIGym)

__all__ = [
"BaseInstaller",
Expand Down Expand Up @@ -272,4 +275,6 @@
"TestScenarioParsingError",
"TestTemplate",
"TestTemplateStrategy",
"ExampleEnv",
"CloudAIGym",
]
57 changes: 57 additions & 0 deletions src/cloudai/_core/cloudai_gym.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Any, Dict, Tuple


class CloudAIGym(ABC):
"""A generic custom Gym environment for CloudAI."""

def __init__(self, action_space: Any, observation_space: Any, max_steps: int = 100):
"""
Initialize the environment.

Args:
action_space: Defines the set of valid actions for the environment.
observation_space: Describes the space of possible observations (states).
max_steps: Maximum number of steps in an episode.
"""
self.action_space: Any = action_space
self.observation_space: Any = observation_space
self.max_steps: int = max_steps
self.state: Any = None
self.current_step: int = 0

@abstractmethod
def step(self, action: Any) -> Tuple[Any, float, bool, Dict[str, Any]]:
"""Perform one step in the environment."""
pass

@abstractmethod
def reset(self) -> Any:
"""Reset the environment to its initial state."""
pass

@abstractmethod
def render(self) -> None:
"""Render the environment."""
pass

@abstractmethod
def is_valid_action(self, action: Any) -> bool:
"""Contraint checks for the action."""
pass
32 changes: 32 additions & 0 deletions src/cloudai/_core/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from .base_installer import BaseInstaller
from .base_runner import BaseRunner
from .cloudai_gym import CloudAIGym
from .grading_strategy import GradingStrategy
from .job_id_retrieval_strategy import JobIdRetrievalStrategy
from .job_status_retrieval_strategy import JobStatusRetrievalStrategy
Expand Down Expand Up @@ -69,6 +70,7 @@ class Registry(metaclass=Singleton):
installers_map: Dict[str, Type[BaseInstaller]] = {}
systems_map: Dict[str, Type[System]] = {}
test_definitions_map: Dict[str, Type[TestDefinition]] = {}
gyms_map: Dict[str, Type[CloudAIGym]] = {} # Add this line

def add_runner(self, name: str, value: Type[BaseRunner]) -> None:
"""
Expand Down Expand Up @@ -100,6 +102,36 @@ def update_runner(self, name: str, value: Type[BaseRunner]) -> None:
raise ValueError(f"Invalid runner implementation for '{name}', should be subclass of 'BaseRunner'.")
self.runners_map[name] = value

def add_gym(self, name: str, value: Type[CloudAIGym]) -> None: # Add this method
"""
Add a new gym implementation mapping.

Args:
name (str): The name of the gym.
value (Type[CloudAIGym]): The gym implementation.

Raises:
ValueError: If the gym implementation already exists.
"""
if name in self.gyms_map:
raise ValueError(f"Duplicating implementation for '{name}', use 'update()' for replacement.")
self.update_gym(name, value)

def update_gym(self, name: str, value: Type[CloudAIGym]) -> None: # Add this method
"""
Create or replace gym implementation mapping.

Args:
name (str): The name of the gym.
value (Type[CloudAIGym]): The gym implementation.

Raises:
ValueError: If value is not a subclass of CloudAIGym.
"""
if not issubclass(value, CloudAIGym):
raise ValueError(f"Invalid gym implementation for '{name}', should be subclass of 'CloudAIGym'.")
self.gyms_map[name] = value

def add_strategy(
self,
strategy_interface: Type[
Expand Down
15 changes: 15 additions & 0 deletions src/cloudai/environment/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
112 changes: 112 additions & 0 deletions src/cloudai/environment/example_environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Dict, Tuple

import numpy as np

from src.cloudai._core.cloudai_gym import CloudAIGym


class ExampleEnv(CloudAIGym):
"""A sample environment that extends CloudAIGym."""

def __init__(self, max_steps: int = 10):
"""
Initialize the environment.

Args:
max_steps (int): Maximum number of steps allowed in the environment.
"""
# Define the action and observation structures
action_space = {
"num_cores": (0, 15), # Integer range: [0, 14]
"freq": (0.5, 3.0), # Float range: [0.5, 3.0]
"mem_type": (0, 2), # Integer range: [0, 2]
"mem_size": (0, 64), # Integer range: [0, 64]
}
observation_space = {
"energy": (0.0, 1.0),
"area": (0.0, 1.0),
"latency": (0.0, 1.0),
}

super().__init__(action_space, observation_space, max_steps)

# Additional environment attributes
self.ideal = np.array([4, 2.0, 1, 32]) # Ideal values
self.reset()

def is_valid_action(self, action: dict) -> bool:
"""
Validate whether the action falls within the defined bounds.

Args:
action (dict): A dictionary representing the action.

Returns:
bool: True if valid, False otherwise.

"""
for key, value in action.items():
if key not in self.action_space:
return False
lower, upper = self.action_space[key]
if not (lower <= value <= upper):
return False
return True

def step(self, action: dict) -> Tuple[np.ndarray, float, bool, Dict]:
"""Execute a step in the environment."""
if not self.is_valid_action(action):
raise ValueError(f"Invalid action: {action}")

# Unpack action components
num_cores = action["num_cores"]
freq = action["freq"]
mem_type = action["mem_type"]
mem_size = action["mem_size"]

# Update state
self.energy += num_cores * 1 + freq * 2 + mem_size * 3
self.area += num_cores * 2 + freq * 3 + mem_size * 1
self.latency += num_cores * 3 + freq * 3 + mem_size * 1

# Update observation
self.observation = np.array([self.energy, self.area, self.latency])

# Calculate reward as negative L2-norm to the ideal values
action_array = np.array([num_cores, freq, mem_type, mem_size])
reward: float = -float(np.linalg.norm(action_array - self.ideal))

# Check if the episode is done
self.current_step += 1
done = self.current_step >= self.max_steps

return self.observation, reward, done, {}

def reset(self) -> Tuple[np.ndarray, Dict]:
"""Reset the environment to its initial state."""
self.energy = 0
self.area = 0
self.latency = 0
self.current_step = 0
self.observation = np.array([self.energy, self.area, self.latency])
return self.observation, {}

def render(self) -> None:
"""Render the current state of the environment."""
print(f"Energy: {self.energy}, Area: {self.area}, Latency: {self.latency}")
84 changes: 84 additions & 0 deletions tests/test_example_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import numpy as np
import pytest

from src.cloudai.environment.example_environment import ExampleEnv


# Fixture to initialize the environment
@pytest.fixture
def env():
"""
Fixture to provide a fresh instance of ParameterConvergeEnv.
Automatically resets the environment before each test.
"""
environment = ExampleEnv(max_steps=10)
environment.reset()
return environment


def test_environment_initialization(env):
"""
Test that the environment initializes correctly.
"""
observation, info = env.reset()
assert isinstance(observation, np.ndarray), "Observation should be a numpy array."
assert observation.shape == (3,), "Observation should have three elements."
assert env.current_step == 0, "Step count should initialize to 0."
assert not env.done, "Environment should not be done immediately after reset."


@pytest.mark.parametrize(
"action, expected_done",
[
({"num_cores": 4, "freq": 2.0, "mem_type": 1, "mem_size": 32}, False),
({"num_cores": 10, "freq": 1.5, "mem_type": 0, "mem_size": 16}, False),
],
)
def test_step_execution(env, action, expected_done):
"""
Test that a valid action updates the environment correctly.
"""
observation, reward, done, truncated, info = env.step(action)
assert isinstance(observation, np.ndarray), "Observation should be a numpy array."
assert isinstance(reward, float), "Reward should be a float."
assert reward < 0, "Reward should be negative for L2-norm distance."
assert done == expected_done, "Done flag should match expected behavior."
assert not truncated, "Truncated should always be False."


@pytest.mark.parametrize(
"action",
[
{"num_cores": 20, "freq": 5.0, "mem_type": 5, "mem_size": 100}, # All invalid
{"num_cores": 15, "freq": 0.0, "mem_type": -1, "mem_size": 65}, # Boundary invalid
],
)
def test_invalid_action(env, action):
"""
Test that invalid actions raise a ValueError.
"""
with pytest.raises(ValueError):
env.step(action)


def test_episode_completion(env):
"""
Test that the environment ends the episode after max_steps.
"""
action = {"num_cores": 4, "freq": 2.0, "mem_type": 1, "mem_size": 32}
done = False
for _ in range(10): # Perform max_steps
observation, reward, done, truncated, info = env.step(action)
assert done, "Environment should be done after reaching max_steps."
observation, info = env.reset()
assert env.current_step == 0, "Step count should reset to 0 after reset."
assert not env.done, "Environment should not be done after reset."


def test_reset_behavior(env):
"""
Test that resetting the environment restores it to the initial state.
"""
observation, info = env.reset()
assert np.allclose(observation, [0, 0, 0]), "Initial observation should be zeros."
assert env.current_step == 0, "Step count should be reset to 0."
Loading