Skip to content

Commit

Permalink
add a run_command() method to Runners
Browse files Browse the repository at this point in the history
  • Loading branch information
AdeelH committed Nov 6, 2023
1 parent a7d56de commit 8aa44e2
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 78 deletions.
1 change: 0 additions & 1 deletion rastervision_aws_batch/rastervision/aws_batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,4 @@ def register_plugin(registry):
__all__ = [
'AWS_BATCH',
AWSBatchRunner.__name__,
submit_job.__name__,
]
149 changes: 73 additions & 76 deletions rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,80 +15,6 @@
AWS_BATCH = 'batch'


def submit_job(cmd: List[str],
job_name: str,
debug: bool = False,
profile: str = False,
attempts: int = 5,
parent_job_ids: List[str] = None,
num_array_jobs: Optional[int] = None,
use_gpu: bool = False,
job_queue: Optional[str] = None,
job_def: Optional[str] = None) -> str:
"""Submit a job to run on AWS Batch.
Args:
cmd: Command to run in the Docker container for the remote job as list
of strings.
debug: If True, run the command using a ptvsd wrapper which sets up a
remote VS Code Python debugger server.
profile: If True, run the command using kernprof, a line profiler.
attempts: The number of times to try running the command which is
useful in case of failure.
parent_job_ids: Optional list of parent Batch job ids. The job created
by this will only run after the parent jobs complete successfully.
num_array_jobs: If set, make this a Batch array job with size equal to
num_array_jobs.
use_gpu: If True, run the job in a GPU-enabled queue.
job_queue: If set, use this job queue.
job_def: If set, use this job definition.
"""
import boto3

batch_config = rv_config.get_namespace_config(AWS_BATCH)

if job_queue is None:
if use_gpu:
job_queue = batch_config('gpu_job_queue')
else:
job_queue = batch_config('cpu_job_queue')

if job_def is None:
if use_gpu:
job_def = batch_config('gpu_job_def')
else:
job_def = batch_config('cpu_job_def')

if debug:
cmd = [
'python', '-m', 'ptvsd', '--host', '0.0.0.0', '--port', '6006',
'--wait', '-m'
] + cmd

if profile:
cmd = ['kernprof', '-v', '-l'] + cmd

kwargs = {
'jobName': job_name,
'jobQueue': job_queue,
'jobDefinition': job_def,
'containerOverrides': {
'command': cmd
},
'retryStrategy': {
'attempts': attempts
},
}
if parent_job_ids:
kwargs['dependsOn'] = [{'jobId': id} for id in parent_job_ids]
if num_array_jobs:
kwargs['arrayProperties'] = {'size': num_array_jobs}

client = boto3.client('batch')
job_id = client.submit_job(**kwargs)['jobId']
return job_id


class AWSBatchRunner(Runner):
"""Runs pipelines remotely using AWS Batch.
Expand Down Expand Up @@ -116,7 +42,7 @@ def run(self,
commands,
num_splits,
pipeline_run_name=pipeline_run_name)
job_id = submit_job(cmd=cmd, **args)
job_id = self.run_command(cmd=cmd, **args)

job_info = dict(
name=args['job_name'],
Expand Down Expand Up @@ -178,5 +104,76 @@ def build_cmd(self,

return cmd, args

def get_split_ind(self):
def get_split_ind(self) -> int:
return int(os.environ.get('AWS_BATCH_JOB_ARRAY_INDEX', 0))

def run_command(self,
cmd: List[str],
job_name: Optional[str] = None,
debug: bool = False,
attempts: int = 1,
parent_job_ids: Optional[List[str]] = None,
num_array_jobs: Optional[int] = None,
use_gpu: bool = False,
job_queue: Optional[str] = None,
job_def: Optional[str] = None,
**kwargs) -> str:
"""Submit a command as a job to AWS Batch.
Args:
cmd: Command to run in the Docker container for the remote job as
list of strings.
job_name: Optional job name. If None, is set to
"raster-vision-<uuid>".
debug: If True, run the command using a ptvsd wrapper which sets up
a remote VS Code Python debugger server. Defaults to False.
attempts: The number of times to try running the command which is
useful in case of failure. Defaults to 5.
parent_job_ids: Optional list of parent Batch job IDs. The job
created by this will only run after the parent jobs complete
successfully. Defaults to None.
num_array_jobs: If set, make this a Batch array job with size equal
to num_array_jobs. Defaults to None.
use_gpu: If True, run the job in a GPU-enabled queue. Defaults to
False.
job_queue: If set, use this job queue. Default to None.
job_def: If set, use this job definition. Default to None.
**kwargs: Any other kwargs to pass to Batch when submitting job.
"""
import boto3

batch_config = rv_config.get_namespace_config(AWS_BATCH)
device = 'gpu' if use_gpu else 'cpu'

if job_name is None:
job_name = f'raster-vision-{uuid.uuid4()}'
if job_queue is None:
job_queue = batch_config(f'{device}_job_queue')
if job_def is None:
job_queue = batch_config(f'{device}_job_def')

if debug:
cmd = [
'python', '-m', 'ptvsd', '--host', '0.0.0.0', '--port', '6006',
'--wait', '-m'
] + cmd

args = {
'jobName': job_name,
'jobQueue': job_queue,
'jobDefinition': job_def,
'containerOverrides': {
'command': cmd
},
'retryStrategy': {
'attempts': attempts
},
}
if parent_job_ids:
args['dependsOn'] = [{'jobId': id} for id in parent_job_ids]
if num_array_jobs:
args['arrayProperties'] = {'size': num_array_jobs}

client = boto3.client('batch')
job_id = client.submit_job(**args, **kwargs)['jobId']
return job_id
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from rastervision.pipeline import rv_config_ as rv_config
from rastervision.pipeline.runner import Runner

from sagemaker import Session
from sagemaker.processing import Processor
from sagemaker.estimator import Estimator
from sagemaker.workflow.pipeline import Pipeline as SageMakerPipeline
Expand Down Expand Up @@ -188,3 +189,32 @@ def build_step(self, step_name: str, cmd: List[str], role: str,
step = ProcessingStep(step_name, step_args=step_args)

return step

def run_command(self,
cmd: List[str],
use_gpu: bool = False,
image_uri: Optional[str] = None,
instance_type: Optional[str] = None,
job_name: Optional[str] = None,
sagemaker_session: Optional[Session] = None) -> None:
config = rv_config.get_namespace_config(AWS_SAGEMAKER)
role = config('exec_role')
device = 'gpu' if use_gpu else 'cpu'

if image_uri is None:
image_uri = config(f'{device}_image')
if instance_type is None:
instance_type = config(f'{device}_instance_type')
if sagemaker_session is None:
sagemaker_session = Session()

processor = Processor(
role=role,
image_uri=image_uri,
instance_count=1,
instance_type=instance_type,
sagemaker_session=sagemaker_session,
entrypoint=cmd,
base_job_name=job_name,
)
processor.run()
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import List
from rastervision.pipeline.cli import _run_command
from rastervision.pipeline.runner.runner import Runner

Expand All @@ -23,3 +24,7 @@ def run(self,
_run_command(cfg_json_uri, command, split_ind, num_splits)
else:
_run_command(cfg_json_uri, command, 0, 1)

def run_command(self, cmd: List[str]):
raise NotImplementedError(
'Use LocalRunner.run_command to run a command locally.')
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def run(self,
makefile_path = join(dirname(cfg_json_uri), 'Makefile')
str_to_file(makefile, makefile_path)
makefile_path_local = download_if_needed(makefile_path)
process = Popen(['make', '-j', '-f', makefile_path_local])
return self.run_command(['make', '-j', '-f', makefile_path_local])

def run_command(self, cmd: List[str]):
process = Popen(cmd)
terminate_at_exit(process)
exitcode = process.wait()
if exitcode != 0:
Expand Down
9 changes: 9 additions & 0 deletions rastervision_pipeline/rastervision/pipeline/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ def run(self,
"""
pass

@abstractmethod
def run_command(self, cmd: List[str]):
"""Run a single command.
Args:
cmd: Command to run in the Docker container for the remote job as list
of strings.
"""

def get_split_ind(self) -> Optional[int]:
"""Get the split_ind for the process.
Expand Down

0 comments on commit 8aa44e2

Please sign in to comment.