diff --git a/rastervision_aws_batch/rastervision/aws_batch/__init__.py b/rastervision_aws_batch/rastervision/aws_batch/__init__.py index a856acc644..e0858279e8 100644 --- a/rastervision_aws_batch/rastervision/aws_batch/__init__.py +++ b/rastervision_aws_batch/rastervision/aws_batch/__init__.py @@ -18,5 +18,4 @@ def register_plugin(registry): __all__ = [ 'AWS_BATCH', AWSBatchRunner.__name__, - submit_job.__name__, ] diff --git a/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py b/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py index 565b1e2fbb..db5e33f71d 100644 --- a/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py +++ b/rastervision_aws_batch/rastervision/aws_batch/aws_batch_runner.py @@ -15,80 +15,6 @@ AWS_BATCH = 'batch' -def submit_job(cmd: List[str], - job_name: str, - debug: bool = False, - profile: str = False, - attempts: int = 5, - parent_job_ids: List[str] = None, - num_array_jobs: Optional[int] = None, - use_gpu: bool = False, - job_queue: Optional[str] = None, - job_def: Optional[str] = None) -> str: - """Submit a job to run on AWS Batch. - - Args: - cmd: Command to run in the Docker container for the remote job as list - of strings. - debug: If True, run the command using a ptvsd wrapper which sets up a - remote VS Code Python debugger server. - profile: If True, run the command using kernprof, a line profiler. - attempts: The number of times to try running the command which is - useful in case of failure. - parent_job_ids: Optional list of parent Batch job ids. The job created - by this will only run after the parent jobs complete successfully. - num_array_jobs: If set, make this a Batch array job with size equal to - num_array_jobs. - use_gpu: If True, run the job in a GPU-enabled queue. - job_queue: If set, use this job queue. - job_def: If set, use this job definition. - """ - import boto3 - - batch_config = rv_config.get_namespace_config(AWS_BATCH) - - if job_queue is None: - if use_gpu: - job_queue = batch_config('gpu_job_queue') - else: - job_queue = batch_config('cpu_job_queue') - - if job_def is None: - if use_gpu: - job_def = batch_config('gpu_job_def') - else: - job_def = batch_config('cpu_job_def') - - if debug: - cmd = [ - 'python', '-m', 'ptvsd', '--host', '0.0.0.0', '--port', '6006', - '--wait', '-m' - ] + cmd - - if profile: - cmd = ['kernprof', '-v', '-l'] + cmd - - kwargs = { - 'jobName': job_name, - 'jobQueue': job_queue, - 'jobDefinition': job_def, - 'containerOverrides': { - 'command': cmd - }, - 'retryStrategy': { - 'attempts': attempts - }, - } - if parent_job_ids: - kwargs['dependsOn'] = [{'jobId': id} for id in parent_job_ids] - if num_array_jobs: - kwargs['arrayProperties'] = {'size': num_array_jobs} - - client = boto3.client('batch') - job_id = client.submit_job(**kwargs)['jobId'] - return job_id - - class AWSBatchRunner(Runner): """Runs pipelines remotely using AWS Batch. @@ -116,7 +42,7 @@ def run(self, commands, num_splits, pipeline_run_name=pipeline_run_name) - job_id = submit_job(cmd=cmd, **args) + job_id = self.run_command(cmd=cmd, **args) job_info = dict( name=args['job_name'], @@ -178,5 +104,76 @@ def build_cmd(self, return cmd, args - def get_split_ind(self): + def get_split_ind(self) -> int: return int(os.environ.get('AWS_BATCH_JOB_ARRAY_INDEX', 0)) + + def run_command(self, + cmd: List[str], + job_name: Optional[str] = None, + debug: bool = False, + attempts: int = 1, + parent_job_ids: Optional[List[str]] = None, + num_array_jobs: Optional[int] = None, + use_gpu: bool = False, + job_queue: Optional[str] = None, + job_def: Optional[str] = None, + **kwargs) -> str: + """Submit a command as a job to AWS Batch. + + Args: + cmd: Command to run in the Docker container for the remote job as + list of strings. + job_name: Optional job name. If None, is set to + "raster-vision-". + debug: If True, run the command using a ptvsd wrapper which sets up + a remote VS Code Python debugger server. Defaults to False. + attempts: The number of times to try running the command which is + useful in case of failure. Defaults to 5. + parent_job_ids: Optional list of parent Batch job IDs. The job + created by this will only run after the parent jobs complete + successfully. Defaults to None. + num_array_jobs: If set, make this a Batch array job with size equal + to num_array_jobs. Defaults to None. + use_gpu: If True, run the job in a GPU-enabled queue. Defaults to + False. + job_queue: If set, use this job queue. Default to None. + job_def: If set, use this job definition. Default to None. + **kwargs: Any other kwargs to pass to Batch when submitting job. + """ + import boto3 + + batch_config = rv_config.get_namespace_config(AWS_BATCH) + device = 'gpu' if use_gpu else 'cpu' + + if job_name is None: + job_name = f'raster-vision-{uuid.uuid4()}' + if job_queue is None: + job_queue = batch_config(f'{device}_job_queue') + if job_def is None: + job_queue = batch_config(f'{device}_job_def') + + if debug: + cmd = [ + 'python', '-m', 'ptvsd', '--host', '0.0.0.0', '--port', '6006', + '--wait', '-m' + ] + cmd + + args = { + 'jobName': job_name, + 'jobQueue': job_queue, + 'jobDefinition': job_def, + 'containerOverrides': { + 'command': cmd + }, + 'retryStrategy': { + 'attempts': attempts + }, + } + if parent_job_ids: + args['dependsOn'] = [{'jobId': id} for id in parent_job_ids] + if num_array_jobs: + args['arrayProperties'] = {'size': num_array_jobs} + + client = boto3.client('batch') + job_id = client.submit_job(**args, **kwargs)['jobId'] + return job_id diff --git a/rastervision_aws_sagemaker/rastervision/aws_sagemaker/aws_sagemaker_runner.py b/rastervision_aws_sagemaker/rastervision/aws_sagemaker/aws_sagemaker_runner.py index 2e17ec7267..67ce651717 100644 --- a/rastervision_aws_sagemaker/rastervision/aws_sagemaker/aws_sagemaker_runner.py +++ b/rastervision_aws_sagemaker/rastervision/aws_sagemaker/aws_sagemaker_runner.py @@ -6,6 +6,7 @@ from rastervision.pipeline import rv_config_ as rv_config from rastervision.pipeline.runner import Runner +from sagemaker import Session from sagemaker.processing import Processor from sagemaker.estimator import Estimator from sagemaker.workflow.pipeline import Pipeline as SageMakerPipeline @@ -188,3 +189,32 @@ def build_step(self, step_name: str, cmd: List[str], role: str, step = ProcessingStep(step_name, step_args=step_args) return step + + def run_command(self, + cmd: List[str], + use_gpu: bool = False, + image_uri: Optional[str] = None, + instance_type: Optional[str] = None, + job_name: Optional[str] = None, + sagemaker_session: Optional[Session] = None) -> None: + config = rv_config.get_namespace_config(AWS_SAGEMAKER) + role = config('exec_role') + device = 'gpu' if use_gpu else 'cpu' + + if image_uri is None: + image_uri = config(f'{device}_image') + if instance_type is None: + instance_type = config(f'{device}_instance_type') + if sagemaker_session is None: + sagemaker_session = Session() + + processor = Processor( + role=role, + image_uri=image_uri, + instance_count=1, + instance_type=instance_type, + sagemaker_session=sagemaker_session, + entrypoint=cmd, + base_job_name=job_name, + ) + processor.run() diff --git a/rastervision_pipeline/rastervision/pipeline/runner/inprocess_runner.py b/rastervision_pipeline/rastervision/pipeline/runner/inprocess_runner.py index 404938d03a..9a5606821b 100644 --- a/rastervision_pipeline/rastervision/pipeline/runner/inprocess_runner.py +++ b/rastervision_pipeline/rastervision/pipeline/runner/inprocess_runner.py @@ -1,3 +1,4 @@ +from typing import List from rastervision.pipeline.cli import _run_command from rastervision.pipeline.runner.runner import Runner @@ -23,3 +24,7 @@ def run(self, _run_command(cfg_json_uri, command, split_ind, num_splits) else: _run_command(cfg_json_uri, command, 0, 1) + + def run_command(self, cmd: List[str]): + raise NotImplementedError( + 'Use LocalRunner.run_command to run a command locally.') diff --git a/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py b/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py index 981323f46d..fa810032ee 100644 --- a/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py +++ b/rastervision_pipeline/rastervision/pipeline/runner/local_runner.py @@ -55,7 +55,10 @@ def run(self, makefile_path = join(dirname(cfg_json_uri), 'Makefile') str_to_file(makefile, makefile_path) makefile_path_local = download_if_needed(makefile_path) - process = Popen(['make', '-j', '-f', makefile_path_local]) + return self.run_command(['make', '-j', '-f', makefile_path_local]) + + def run_command(self, cmd: List[str]): + process = Popen(cmd) terminate_at_exit(process) exitcode = process.wait() if exitcode != 0: diff --git a/rastervision_pipeline/rastervision/pipeline/runner/runner.py b/rastervision_pipeline/rastervision/pipeline/runner/runner.py index 7c081bf289..2cea0caa6a 100644 --- a/rastervision_pipeline/rastervision/pipeline/runner/runner.py +++ b/rastervision_pipeline/rastervision/pipeline/runner/runner.py @@ -28,6 +28,15 @@ def run(self, """ pass + @abstractmethod + def run_command(self, cmd: List[str]): + """Run a single command. + + Args: + cmd: Command to run in the Docker container for the remote job as list + of strings. + """ + def get_split_ind(self) -> Optional[int]: """Get the split_ind for the process.