From f295919b8281b3f4c8453086f75e1acf30fa7d31 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Mon, 16 Sep 2024 19:34:31 -0400 Subject: [PATCH 1/2] Use default namespace in K8s job, runner, and system Co-authored-by: Andrei Maslennikov --- .../runner/kubernetes/kubernetes_job.py | 12 +- .../runner/kubernetes/kubernetes_runner.py | 13 +- .../systems/kubernetes/kubernetes_system.py | 183 ++++++++---------- 3 files changed, 88 insertions(+), 120 deletions(-) diff --git a/src/cloudai/runner/kubernetes/kubernetes_job.py b/src/cloudai/runner/kubernetes/kubernetes_job.py index 99738a435..f96f2b5eb 100644 --- a/src/cloudai/runner/kubernetes/kubernetes_job.py +++ b/src/cloudai/runner/kubernetes/kubernetes_job.py @@ -28,15 +28,12 @@ class KubernetesJob(BaseJob): mode (str): The mode of the job (e.g., 'run', 'dry-run'). system (System): The system in which the job is running. test_run (TestRun): The test instance associated with this job. - namespace (str): The namespace of the job. name (str): The name of the job. kind (str): The kind of the job. output_path (Path): The path where the job's output is stored. """ - def __init__( - self, mode: str, system: System, test_run: TestRun, namespace: str, name: str, kind: str, output_path: Path - ): + def __init__(self, mode: str, system: System, test_run: TestRun, name: str, kind: str, output_path: Path): """ Initialize a KubernetesJob instance. @@ -44,14 +41,12 @@ def __init__( mode (str): The mode of the job (e.g., 'run', 'dry-run'). system (System): The system in which the job is running. test_run (TestRun): The test instance associated with this job. - namespace (str): The namespace of the job. name (str): The name of the job. kind (str): The kind of the job. output_path (Path): The path where the job's output is stored. """ super().__init__(mode, system, test_run, output_path) self.id = name - self.namespace = namespace self.name = name self.kind = kind @@ -62,7 +57,4 @@ def __repr__(self) -> str: Returns str: String representation of the job. """ - return ( - f"KubernetesJob(name={self.name}, namespace={self.namespace}, test={self.test_run.test.name}, " - f"kind={self.kind})" - ) + return f"KubernetesJob(name={self.name}, test={self.test_run.test.name}, kind={self.kind})" diff --git a/src/cloudai/runner/kubernetes/kubernetes_runner.py b/src/cloudai/runner/kubernetes/kubernetes_runner.py index e727727c5..2308ced7d 100644 --- a/src/cloudai/runner/kubernetes/kubernetes_runner.py +++ b/src/cloudai/runner/kubernetes/kubernetes_runner.py @@ -42,13 +42,12 @@ def _submit_test(self, tr: TestRun) -> KubernetesJob: job_spec = tr.test.gen_json(job_output_path, job_name, tr.time_limit, tr.num_nodes, tr.nodes) job_kind = job_spec.get("kind", "").lower() logging.info(f"Generated JSON string for test {tr.test.section_name}: {job_spec}") - job_namespace = "" if self.mode == "run": k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system) - job_name, job_namespace = k8s_system.create_job(job_spec) + job_name = k8s_system.create_job(job_spec) - return KubernetesJob(self.mode, self.system, tr, job_namespace, job_name, job_kind, job_output_path) + return KubernetesJob(self.mode, self.system, tr, job_name, job_kind, job_output_path) async def job_completion_callback(self, job: BaseJob) -> None: """ @@ -59,8 +58,8 @@ async def job_completion_callback(self, job: BaseJob) -> None: """ k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system) k_job = cast(KubernetesJob, job) - k8s_system.store_logs_for_job(k_job.namespace, k_job.name, k_job.output_path) - k8s_system.delete_job(k_job.namespace, k_job.name, k_job.kind) + k8s_system.store_logs_for_job(k_job.name, k_job.output_path) + k8s_system.delete_job(k_job.name, k_job.kind) def kill_job(self, job: BaseJob) -> None: """ @@ -71,5 +70,5 @@ def kill_job(self, job: BaseJob) -> None: """ k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system) k_job = cast(KubernetesJob, job) - k8s_system.store_logs_for_job(k_job.namespace, k_job.name, k_job.output_path) - k8s_system.delete_job(k_job.namespace, k_job.name, k_job.kind) + k8s_system.store_logs_for_job(k_job.name, k_job.output_path) + k8s_system.delete_job(k_job.name, k_job.kind) diff --git a/src/cloudai/systems/kubernetes/kubernetes_system.py b/src/cloudai/systems/kubernetes/kubernetes_system.py index fd1579081..3a926a3ae 100644 --- a/src/cloudai/systems/kubernetes/kubernetes_system.py +++ b/src/cloudai/systems/kubernetes/kubernetes_system.py @@ -17,7 +17,7 @@ import logging import time from pathlib import Path -from typing import Any, Dict, List, Tuple, cast +from typing import Any, Dict, List, cast from kubernetes import client, config from kubernetes.client import ApiException, CustomObjectsApi, V1DeleteOptions, V1Job @@ -120,7 +120,7 @@ def is_job_running(self, job: BaseJob) -> bool: bool: True if the job is running, False otherwise. """ k_job: KubernetesJob = cast(KubernetesJob, job) - return self._is_job_running(k_job.namespace, k_job.name, k_job.kind) + return self._is_job_running(k_job.name, k_job.kind) def is_job_completed(self, job: BaseJob) -> bool: """ @@ -133,29 +133,25 @@ def is_job_completed(self, job: BaseJob) -> bool: bool: True if the job is completed, False otherwise. """ k_job: KubernetesJob = cast(KubernetesJob, job) - return not self._is_job_running(k_job.namespace, k_job.name, k_job.kind) + return not self._is_job_running(k_job.name, k_job.kind) - def _is_job_running(self, job_namespace: str, job_name: str, job_kind: str) -> bool: + def _is_job_running(self, job_name: str, job_kind: str) -> bool: """ Check if a job is currently running. Args: - job_namespace (str): The namespace of the job. job_name (str): The name of the job. job_kind (str): The kind of the job ('MPIJob' or 'Job'). Returns: bool: True if the job is running, False if the job has completed or is not found. """ - logging.debug( - f"Checking for job '{job_name}' of kind '{job_kind}' in namespace '{job_namespace}' to determine if " - "it is running." - ) + logging.debug(f"Checking for job '{job_name}' of kind '{job_kind}' to determine if it is running.") if "mpijob" in job_kind.lower(): - return self._is_mpijob_running(job_namespace, job_name) + return self._is_mpijob_running(job_name) elif "job" in job_kind.lower(): - return self._is_batch_job_running(job_namespace, job_name) + return self._is_batch_job_running(job_name) else: error_message = ( f"Unsupported job kind: '{job_kind}'. Supported kinds are 'MPIJob' for MPI workloads and 'Job' for " @@ -164,12 +160,11 @@ def _is_job_running(self, job_namespace: str, job_name: str, job_kind: str) -> b logging.error(error_message) raise ValueError(error_message) - def _is_mpijob_running(self, job_namespace: str, job_name: str) -> bool: + def _is_mpijob_running(self, job_name: str) -> bool: """ Check if an MPIJob is currently running. Args: - job_namespace (str): The namespace of the MPIJob. job_name (str): The name of the MPIJob. Returns: @@ -177,7 +172,11 @@ def _is_mpijob_running(self, job_namespace: str, job_name: str) -> bool: """ try: mpijob = self.custom_objects_api.get_namespaced_custom_object( - group="kubeflow.org", version="v2beta1", namespace=job_namespace, plural="mpijobs", name=job_name + group="kubeflow.org", + version="v2beta1", + namespace=self.default_namespace, + plural="mpijobs", + name=job_name, ) assert isinstance(mpijob, dict) @@ -202,38 +201,32 @@ def _is_mpijob_running(self, job_namespace: str, job_name: str) -> bool: except ApiException as e: if e.status == 404: - logging.debug( - f"MPIJob '{job_name}' not found in namespace '{job_namespace}'. It may have completed and been " - "removed from the system." - ) + logging.debug(f"MPIJob '{job_name}' not found. It may have completed and been removed from the system.") return False else: error_message = ( - f"Error occurred while retrieving status for MPIJob '{job_name}' in namespace '{job_namespace}'. " + f"Error occurred while retrieving status for MPIJob '{job_name}'. " f"Error code: {e.status}. Message: {e.reason}. Please check the job name, namespace, and " "Kubernetes API server." ) logging.error(error_message) raise - def _is_batch_job_running(self, job_namespace: str, job_name: str) -> bool: + def _is_batch_job_running(self, job_name: str) -> bool: """ Check if a batch job is currently running. Args: - job_namespace (str): The namespace of the batch job. job_name (str): The name of the batch job. Returns: bool: True if the batch job is running, False if the job has completed or is not found. """ try: - k8s_job: Any = self.batch_v1.read_namespaced_job_status(name=job_name, namespace=job_namespace) + k8s_job: Any = self.batch_v1.read_namespaced_job_status(name=job_name, namespace=self.default_namespace) if not (hasattr(k8s_job, "status") and hasattr(k8s_job.status, "conditions")): - logging.debug( - f"Job '{job_name}' in namespace '{job_namespace}' does not have expected status attributes." - ) + logging.debug(f"Job '{job_name}' does not have expected status attributes.") return False conditions = k8s_job.status.conditions or [] @@ -253,14 +246,12 @@ def _is_batch_job_running(self, job_namespace: str, job_name: str) -> bool: except ApiException as e: if e.status == 404: logging.debug( - f"Batch job '{job_name}' not found in namespace '{job_namespace}'." - "It may have completed and been removed from the system." + f"Batch job '{job_name}' not found." "It may have completed and been removed from the system." ) return False else: logging.error( - f"Error occurred while retrieving status for batch job '{job_name}' in namespace " - "'{job_namespace}'." + f"Error occurred while retrieving status for batch job '{job_name}' " f"Error code: {e.status}. Message: {e.reason}. Please check the job name, namespace, " "and Kubernetes API server." ) @@ -274,21 +265,20 @@ def kill(self, job: BaseJob) -> None: job (BaseJob): The job to be terminated. """ k_job: KubernetesJob = cast(KubernetesJob, job) - self.delete_job(k_job.namespace, k_job.name, k_job.kind) + self.delete_job(k_job.name, k_job.kind) - def delete_job(self, namespace: str, job_name: str, job_kind: str) -> None: + def delete_job(self, job_name: str, job_kind: str) -> None: """ - Delete a job in the specified namespace. + Delete a job. Args: - namespace (str): The namespace of the job. job_name (str): The name of the job. job_kind (str): The kind of the job ('MPIJob' or 'Job'). """ if "mpijob" in job_kind.lower(): - self._delete_mpi_job(namespace, job_name) + self._delete_mpi_job(job_name) elif "job" in job_kind.lower(): - self._delete_batch_job(namespace, job_name) + self._delete_batch_job(job_name) else: error_message = ( f"Unsupported job kind: '{job_kind}'. Supported kinds are 'MPIJob' for MPI workloads and 'Job' for " @@ -297,57 +287,53 @@ def delete_job(self, namespace: str, job_name: str, job_kind: str) -> None: logging.error(error_message) raise ValueError(error_message) - def _delete_mpi_job(self, namespace: str, job_name: str) -> None: + def _delete_mpi_job(self, job_name: str) -> None: """ - Delete an MPIJob in the specified namespace. + Delete an MPIJob. Args: - namespace (str): The namespace of the job. job_name (str): The name of the job. """ - logging.debug(f"Deleting MPIJob '{job_name}' in namespace '{namespace}'") + logging.debug(f"Deleting MPIJob '{job_name}'") try: self.custom_objects_api.delete_namespaced_custom_object( group="kubeflow.org", version="v2beta1", - namespace=namespace, + namespace=self.default_namespace, plural="mpijobs", name=job_name, body=V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=5), ) - logging.debug(f"MPIJob '{job_name}' deleted successfully in namespace '{namespace}'") + logging.debug(f"MPIJob '{job_name}' deleted successfully") except ApiException as e: if e.status == 404: - logging.debug( - f"MPIJob '{job_name}' not found in namespace '{namespace}'. " "It may have already been deleted." - ) + logging.debug(f"MPIJob '{job_name}' not found. " "It may have already been deleted.") else: logging.error( - f"An error occurred while attempting to delete MPIJob '{job_name}' in namespace '{namespace}'. " + f"An error occurred while attempting to delete MPIJob '{job_name}'. " f"Error code: {e.status}. Message: {e.reason}. Please verify the job name, namespace, " "and Kubernetes API server." ) raise - def _delete_batch_job(self, namespace: str, job_name: str) -> None: + def _delete_batch_job(self, job_name: str) -> None: """ - Delete a batch job in the specified namespace. + Delete a batch job. Args: - namespace (str): The namespace of the job. job_name (str): The name of the job. """ - logging.debug(f"Deleting batch job '{job_name}' in namespace '{namespace}'") + logging.debug(f"Deleting batch job '{job_name}'") api_response = self.batch_v1.delete_namespaced_job( name=job_name, - namespace=namespace, + namespace=self.default_namespace, body=V1DeleteOptions(propagation_policy="Foreground", grace_period_seconds=5), ) api_response = cast(V1Job, api_response) logging.debug(f"Batch job '{job_name}' deleted with status: {api_response.status}") - def create_job(self, job_spec: Dict[Any, Any], timeout: int = 60, interval: int = 1) -> Tuple[str, str]: + def create_job(self, job_spec: Dict[Any, Any], timeout: int = 60, interval: int = 1) -> str: """ Create a job in the Kubernetes system in a blocking manner. @@ -357,36 +343,35 @@ def create_job(self, job_spec: Dict[Any, Any], timeout: int = 60, interval: int interval (int): The time to wait between checks, in seconds. Returns: - Tuple[str, str]: The job name and namespace. + str: The job name. Raises: ValueError: If the job specification does not contain a valid 'kind' field. TimeoutError: If the job is not observable within the timeout period. """ logging.debug(f"Creating job with spec: {job_spec}") - job_name, namespace = self._create_job(self.default_namespace, job_spec) + job_name = self._create_job(job_spec) # Wait for the job to be observable by Kubernetes start_time = time.time() while time.time() - start_time < timeout: - if self._is_job_observable(namespace, job_name, job_spec.get("kind", "")): - logging.debug(f"Job '{job_name}' is now observable in namespace '{namespace}'.") - return job_name, namespace - logging.debug(f"Waiting for job '{job_name}' to become observable in namespace '{namespace}'...") + if self._is_job_observable(job_name, job_spec.get("kind", "")): + logging.debug(f"Job '{job_name}' is now observable.") + return job_name + logging.debug(f"Waiting for job '{job_name}' to become observable...") time.sleep(interval) - raise TimeoutError(f"Job '{job_name}' in namespace '{namespace}' was not observable within {timeout} seconds.") + raise TimeoutError(f"Job '{job_name}' was not observable within {timeout} seconds.") - def _create_job(self, namespace: str, job_spec: Dict[Any, Any]) -> Tuple[str, str]: + def _create_job(self, job_spec: Dict[Any, Any]) -> str: """ - Submit a job to the specified namespace. + Submit a job. Args: - namespace (str): The namespace where the job will be created. job_spec (Dict[Any, Any]): The job specification. Returns: - Tuple[str, str]: The job name and namespace. + str: The job name. Raises: ValueError: If the job specification does not contain a valid 'kind' field. @@ -395,9 +380,9 @@ def _create_job(self, namespace: str, job_spec: Dict[Any, Any]) -> Tuple[str, st kind = job_spec.get("kind", "").lower() if "mpijob" in kind: - return self._create_mpi_job(namespace, job_spec) + return self._create_mpi_job(job_spec) elif ("batch" in api_version) and ("job" in kind): - return self._create_batch_job(namespace, job_spec) + return self._create_batch_job(job_spec) else: error_message = ( f"Unsupported job kind: '{job_spec.get('kind')}'.\n" @@ -408,105 +393,99 @@ def _create_job(self, namespace: str, job_spec: Dict[Any, Any]) -> Tuple[str, st logging.error(error_message) raise ValueError(error_message) - def _create_batch_job(self, namespace: str, job_spec: Dict[Any, Any]) -> Tuple[str, str]: + def _create_batch_job(self, job_spec: Dict[Any, Any]) -> str: """ - Submit a batch job to the specified namespace. + Submit a batch job. Args: - namespace (str): The namespace where the job will be created. job_spec (Dict[Any, Any]): The job specification. Returns: - Tuple[str, str]: The job name and namespace. + str: The job name. """ - logging.debug(f"Creating job in namespace '{namespace}'") - api_response = self.batch_v1.create_namespaced_job(body=job_spec, namespace=namespace) + logging.debug("Creating job") + api_response = self.batch_v1.create_namespaced_job(body=job_spec, namespace=self.default_namespace) if not isinstance(api_response, V1Job) or api_response.metadata is None: raise ValueError("Job creation failed or returned an unexpected type") job_name: str = api_response.metadata.name - job_namespace: str = api_response.metadata.namespace logging.debug(f"Job '{job_name}' created with status: {api_response.status}") - return job_name, job_namespace + return job_name - def _create_mpi_job(self, namespace: str, job_spec: Dict[Any, Any]) -> Tuple[str, str]: + def _create_mpi_job(self, job_spec: Dict[Any, Any]) -> str: """ - Submit an MPIJob to the specified namespace. + Submit an MPIJob. Args: - namespace (str): The namespace where the MPIJob will be created. job_spec (Dict[Any, Any]): The MPIJob specification. Returns: - Tuple[str, str]: The job name and namespace. + str: The job name. """ - logging.debug(f"Creating MPIJob in namespace '{namespace}'") + logging.debug("Creating MPIJob") api_response = self.custom_objects_api.create_namespaced_custom_object( group="kubeflow.org", version="v2beta1", - namespace=namespace, + namespace=self.default_namespace, plural="mpijobs", body=job_spec, ) job_name: str = api_response["metadata"]["name"] - job_namespace: str = api_response["metadata"]["namespace"] logging.debug(f"MPIJob '{job_name}' created with status: {api_response.get('status')}") - return job_name, job_namespace + return job_name - def _is_job_observable(self, namespace: str, job_name: str, job_kind: str) -> bool: + def _is_job_observable(self, job_name: str, job_kind: str) -> bool: """ Check if a job is observable by the Kubernetes client. Args: - namespace (str): The namespace of the job. job_name (str): The name of the job. job_kind (str): The kind of the job (e.g., 'Job', 'MPIJob'). Returns: bool: True if the job is observable, False otherwise. """ - logging.debug(f"Checking if job '{job_name}' of kind '{job_kind}' in namespace '{namespace}' is observable.") + logging.debug(f"Checking if job '{job_name}' of kind '{job_kind}' is observable.") if "mpijob" in job_kind.lower(): - return self._is_mpijob_observable(namespace, job_name) + return self._is_mpijob_observable(job_name) elif "job" in job_kind.lower(): - return self._is_batch_job_observable(namespace, job_name) + return self._is_batch_job_observable(job_name) else: logging.error(f"Unsupported job kind: '{job_kind}'") return False - def _is_mpijob_observable(self, namespace: str, job_name: str) -> bool: + def _is_mpijob_observable(self, job_name: str) -> bool: """ Check if an MPIJob is observable by the Kubernetes client. Args: - namespace (str): The namespace of the MPIJob. job_name (str): The name of the MPIJob. Returns: bool: True if the MPIJob is observable, False otherwise. """ - logging.debug(f"Attempting to observe MPIJob '{job_name}' in namespace '{namespace}'.") + logging.debug(f"Attempting to observe MPIJob '{job_name}'.") try: api_instance = CustomObjectsApi() mpijob = api_instance.get_namespaced_custom_object( group="kubeflow.org", version="v2beta1", - namespace=namespace, + namespace=self.default_namespace, plural="mpijobs", name=job_name, ) if mpijob: - logging.debug(f"MPIJob '{job_name}' found in namespace '{namespace}' with details: {mpijob}.") + logging.debug(f"MPIJob '{job_name}' found with details: {mpijob}.") return True else: - logging.debug(f"MPIJob '{job_name}' in namespace '{namespace}' is not yet observable.") + logging.debug(f"MPIJob '{job_name}' is not yet observable.") return False except ApiException as e: if e.status == 404: - logging.debug(f"MPIJob '{job_name}' not found in namespace '{namespace}'.") + logging.debug(f"MPIJob '{job_name}' not found.") return False else: logging.error( @@ -515,23 +494,22 @@ def _is_mpijob_observable(self, namespace: str, job_name: str) -> bool: ) raise - def _is_batch_job_observable(self, namespace: str, job_name: str) -> bool: + def _is_batch_job_observable(self, job_name: str) -> bool: """ Check if a batch job is observable by the Kubernetes client. Args: - namespace (str): The namespace of the batch job. job_name (str): The name of the batch job. Returns: bool: True if the batch job is observable, False otherwise. """ - logging.debug(f"Attempting to observe batch job '{job_name}' in namespace '{namespace}'.") + logging.debug(f"Attempting to observe batch job '{job_name}'.") try: - return self.batch_v1.read_namespaced_job_status(name=job_name, namespace=namespace) is not None + return self.batch_v1.read_namespaced_job_status(name=job_name, namespace=self.default_namespace) is not None except ApiException as e: if e.status == 404: - logging.debug(f"Batch job '{job_name}' not found in namespace '{namespace}'.") + logging.debug(f"Batch job '{job_name}' not found.") return False else: logging.error( @@ -595,18 +573,17 @@ def list_namespaces(self) -> List[str]: namespaces = self.core_v1.list_namespace().items return [ns.metadata.name for ns in namespaces] - def store_logs_for_job(self, namespace: str, job_name: str, output_dir: Path) -> None: + def store_logs_for_job(self, job_name: str, output_dir: Path) -> None: """ Retrieve and store logs for all pods associated with a given job. Args: - namespace (str): The namespace where the job is running. job_name (str): The name of the job. output_dir (Path): The directory where logs will be saved. """ - pod_names = self.get_pod_names_for_job(namespace, job_name) + pod_names = self.get_pod_names_for_job(self.default_namespace, job_name) if not pod_names: - logging.warning(f"No pods found for job '{job_name}' in namespace '{namespace}'") + logging.warning(f"No pods found for job '{job_name}' in namespace '{self.default_namespace}'") return output_dir.mkdir(parents=True, exist_ok=True) @@ -616,7 +593,7 @@ def store_logs_for_job(self, namespace: str, job_name: str, output_dir: Path) -> with stdout_file_path.open("w") as stdout_file: for pod_name in pod_names: try: - logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=namespace) + logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.default_namespace) log_file_path = output_dir / f"{pod_name}.txt" with log_file_path.open("w") as log_file: @@ -626,7 +603,7 @@ def store_logs_for_job(self, namespace: str, job_name: str, output_dir: Path) -> stdout_file.write(logs + "\n") except client.ApiException as e: - logging.error(f"Error retrieving logs for pod '{pod_name}' in namespace '{namespace}': {e}") + logging.error(f"Error retrieving logs for pod '{pod_name}': {e}") logging.info(f"All logs concatenated and saved to '{stdout_file_path}'") From 61e8c589bbd35b977eaf60d7f18f00ce00fc0931 Mon Sep 17 00:00:00 2001 From: Taekyung Heo <7621438+TaekyungHeo@users.noreply.github.com> Date: Mon, 16 Sep 2024 20:26:26 -0400 Subject: [PATCH 2/2] Move `docker_image_url` from the system config to the sleep template config Co-authored-by: Srinivas Sridharan --- conf/common/system/kubernetes_cluster.toml | 1 - conf/common/test_template/sleep.toml | 4 ++++ .../test_template/sleep/kubernetes_json_gen_strategy.py | 2 +- src/cloudai/systems/kubernetes/kubernetes_system.py | 5 +---- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/conf/common/system/kubernetes_cluster.toml b/conf/common/system/kubernetes_cluster.toml index d4ce8adf3..ef1675899 100644 --- a/conf/common/system/kubernetes_cluster.toml +++ b/conf/common/system/kubernetes_cluster.toml @@ -20,7 +20,6 @@ kube_config_path = "" install_path = "./install" output_path = "./results" -default_image = "ubuntu:22.04" default_namespace = "default" [global_env_vars] diff --git a/conf/common/test_template/sleep.toml b/conf/common/test_template/sleep.toml index de7be196c..e9377d348 100644 --- a/conf/common/test_template/sleep.toml +++ b/conf/common/test_template/sleep.toml @@ -20,3 +20,7 @@ name = "Sleep" [cmd_args.seconds] type = "int" default = "5" + + [cmd_args.docker_image_url] + type = "str" + default = "ubuntu:22.04" diff --git a/src/cloudai/schema/test_template/sleep/kubernetes_json_gen_strategy.py b/src/cloudai/schema/test_template/sleep/kubernetes_json_gen_strategy.py index b953cffc5..cb5565bd5 100644 --- a/src/cloudai/schema/test_template/sleep/kubernetes_json_gen_strategy.py +++ b/src/cloudai/schema/test_template/sleep/kubernetes_json_gen_strategy.py @@ -52,7 +52,7 @@ def gen_json( { "args": ["sleep " + sec], "command": ["/bin/bash", "-c"], - "image": kubernetes_system.default_image, + "image": self.final_cmd_args["docker_image_url"], "name": "task", } ], diff --git a/src/cloudai/systems/kubernetes/kubernetes_system.py b/src/cloudai/systems/kubernetes/kubernetes_system.py index 3a926a3ae..39ff82957 100644 --- a/src/cloudai/systems/kubernetes/kubernetes_system.py +++ b/src/cloudai/systems/kubernetes/kubernetes_system.py @@ -34,7 +34,6 @@ class KubernetesSystem(BaseModel, System): Attributes kube_config_path (Path): Path to the Kubernetes config file. default_namespace (str): The default Kubernetes namespace for jobs. - default_image (str): Default Docker image to be used for jobs. """ model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) @@ -44,7 +43,6 @@ class KubernetesSystem(BaseModel, System): output_path: Path kube_config_path: Path default_namespace: str - default_image: str scheduler: str = "kubernetes" global_env_vars: Dict[str, Any] = {} _core_v1: client.CoreV1Api @@ -97,8 +95,7 @@ def __repr__(self) -> str: f"System Name: {self.name}\n" f"Scheduler Type: {self.scheduler}\n" f"Kube Config Path: {self.kube_config_path}\n" - f"Default Namespace: {self.default_namespace}\n" - f"Default Docker Image: {self.default_image}" + f"Default Namespace: {self.default_namespace}" ) def update(self) -> None: