Skip to content

Commit

Permalink
Refactor and enhance job execution error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
TaekyungHeo committed May 31, 2024
1 parent c76358e commit 45696ba
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 7 deletions.
5 changes: 4 additions & 1 deletion src/cloudai/_core/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@ class BaseJob:
Attributes
id (int): The unique identifier of the job.
test (Test): The test instance associated with this job.
output_path (str): The path where the job's output is stored.
terminated_by_dependency (bool): Flag to indicate if the job was terminated due to a dependency.
"""

def __init__(self, job_id: int, test: Test):
def __init__(self, job_id: int, test: Test, output_path: str):
"""
Initialize a BaseJob instance.
Args:
job_id (int): The unique identifier of the job.
output_path (str): The path where the job's output is stored.
test (Test): The test instance associated with the job.
"""
self.id = job_id
self.test = test
self.output_path = output_path
self.terminated_by_dependency = False

def increment_iteration(self):
Expand Down
32 changes: 29 additions & 3 deletions src/cloudai/_core/base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from typing import Dict, List, Optional

from .base_job import BaseJob
from .exceptions import JobSubmissionError
from .exceptions import JobFailureError, JobSubmissionError
from .job_status_result import JobStatusResult
from .system import System
from .test import Test
from .test_scenario import TestScenario
Expand Down Expand Up @@ -287,11 +288,36 @@ async def monitor_jobs(self) -> int:
self.logger.debug("Monitoring jobs.")
for job in list(self.jobs):
if self.is_job_completed(job):
completed_jobs_count += 1
await self.handle_job_completion(job)
if self.mode == "dry-run":
completed_jobs_count += 1
else:
job_status_result = self.get_job_status(job)
if job_status_result.is_successful:
completed_jobs_count += 1
await self.handle_job_completion(job)
else:
error_message = (
f"Job {job.id} for test {job.test.section_name} failed: {job_status_result.error_message}"
)
self.logger.error(error_message)
print(error_message, file=sys.stdout)
await self.shutdown()
raise JobFailureError(job.test.section_name, error_message, job_status_result.error_message)

return completed_jobs_count

def get_job_status(self, job: BaseJob) -> JobStatusResult:
"""
Retrieve the job status from a specified output directory.
Args:
job (BaseJob): The job to be checked.
Returns:
JobStatusResult: The result containing the job status and an optional error message.
"""
return job.test.get_job_status(job.output_path)

async def handle_job_completion(self, completed_job: BaseJob):
"""
Handle the completion of a job, including dependency management and iteration control.
Expand Down
39 changes: 39 additions & 0 deletions src/cloudai/_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,42 @@ class JobIdRetrievalError(JobSubmissionError):
"""

pass


class JobFailureError(Exception):
"""
Exception raised for errors that occur during job execution.
Attributes
test_name (str): The name of the test that failed.
message (str): A custom message describing the error.
details (str): Additional details about the job failure.
"""

def __init__(self, test_name: str, message: str, details: str = ""):
"""
Initialize a JobFailureError instance.
Args:
test_name (str): The name of the test associated with the job.
message (str): A custom message describing the error.
details (str): Additional details about the job failure.
"""
super().__init__(message)
self.test_name = test_name
self.message = message
self.details = details.strip()

def __str__(self):
"""
Return a formatted string representation of the JobFailureError instance.
Returns
str: A formatted string with detailed error information.
"""
return (
f"\nERROR: Job Execution Failed\n"
f"\tTest Name: {self.test_name}\n"
f"\tMessage: {self.message}\n"
f"\tDetails: '{self.details}'\n"
)
1 change: 0 additions & 1 deletion src/cloudai/_core/job_status_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.



class JobStatusResult:
"""
Encapsulates the result of a job status retrieval.
Expand Down
2 changes: 1 addition & 1 deletion src/cloudai/runner/slurm/slurm_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def _submit_test(self, test: Test) -> SlurmJob:
stderr=stderr,
message="Failed to retrieve job ID from command output.",
)
return SlurmJob(job_id, test)
return SlurmJob(job_id, test, job_output_path)

def is_job_running(self, job: BaseJob) -> bool:
"""
Expand Down
2 changes: 1 addition & 1 deletion src/cloudai/runner/standalone/standalone_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _submit_test(self, test: Test) -> StandaloneJob:
stderr="",
message="Failed to retrieve job ID from command output.",
)
return StandaloneJob(job_id, test)
return StandaloneJob(job_id, test, job_output_path)

def is_job_running(self, job: BaseJob) -> bool:
"""
Expand Down

0 comments on commit 45696ba

Please sign in to comment.