-
Notifications
You must be signed in to change notification settings - Fork 387
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement experimental AWS SageMaker support (#1968)
* Create SageMaker runner and submit jobs to pipeline * Suddenly working? * Pretty-print * Update Dockerfile * Handle training steps, processing steps differently * Successful launch spot training job * Enable GPU jobs * Use Estimator * No need for temporary scripts * GPU and dependencies for splits * update RV version in requirements * use pipeline_run_name in job name * add use_spot_instances to config template and fix formatting * remove comment * fix docstring * refactor * rename "sagemaker" to "aws_sagemaker" for consistency with "aws_batch" * add use_spot_instances to rv config schema * refactor * add to API docs * update docs * add unit tests * replace ScriptProcessor with Processor * shorten default pipeline run name * "inst" --> "instance" * bump sagemaker version --------- Co-authored-by: James McClain <[email protected]> Co-authored-by: Adeel Hassan <[email protected]>
- Loading branch information
1 parent
7106652
commit e56f93d
Showing
13 changed files
with
384 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
include requirements.txt |
25 changes: 25 additions & 0 deletions
25
rastervision_aws_sagemaker/rastervision/aws_sagemaker/__init__.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# flake8: noqa | ||
|
||
|
||
def register_plugin(registry): | ||
from rastervision.aws_sagemaker.aws_sagemaker_runner import ( | ||
AWS_SAGEMAKER, AWSSageMakerRunner) | ||
registry.set_plugin_version('rastervision.aws_sagemaker', 0) | ||
registry.add_runner(AWS_SAGEMAKER, AWSSageMakerRunner) | ||
registry.add_rv_config_schema(AWS_SAGEMAKER, [ | ||
'exec_role', | ||
'cpu_image', | ||
'cpu_instance_type', | ||
'gpu_image', | ||
'gpu_instance_type', | ||
'use_spot_instances', | ||
]) | ||
|
||
|
||
import rastervision.pipeline | ||
from rastervision.aws_sagemaker.aws_sagemaker_runner import * | ||
|
||
__all__ = [ | ||
'AWS_SAGEMAKER', | ||
AWSSageMakerRunner.__name__, | ||
] |
190 changes: 190 additions & 0 deletions
190
rastervision_aws_sagemaker/rastervision/aws_sagemaker/aws_sagemaker_runner.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
from typing import TYPE_CHECKING, List, Optional, Union | ||
import logging | ||
from pprint import pprint | ||
|
||
import boto3 | ||
from rastervision.pipeline import rv_config_ as rv_config | ||
from rastervision.pipeline.runner import Runner | ||
|
||
from sagemaker.processing import Processor | ||
from sagemaker.estimator import Estimator | ||
from sagemaker.workflow.pipeline import Pipeline as SageMakerPipeline | ||
from sagemaker.workflow.pipeline_context import PipelineSession | ||
from sagemaker.workflow.steps import ProcessingStep, TrainingStep | ||
|
||
if TYPE_CHECKING: | ||
from rastervision.pipeline.pipeline import Pipeline | ||
from sagemaker.workflow.pipeline_context import _JobStepArguments | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
AWS_SAGEMAKER = 'sagemaker' | ||
|
||
|
||
class AWSSageMakerRunner(Runner): | ||
"""Runs pipelines remotely using AWS SageMaker. | ||
Requires Everett configuration of form: | ||
.. code-block:: ini | ||
[SAGEMAKER] | ||
exec_role= | ||
cpu_image= | ||
cpu_instance_type= | ||
gpu_image= | ||
gpu_instance_type= | ||
use_spot_instances= | ||
""" | ||
|
||
def run(self, | ||
cfg_json_uri: str, | ||
pipeline: 'Pipeline', | ||
commands: List[str], | ||
num_splits: int = 1, | ||
cmd_prefix: List[str] = [ | ||
'python', '-m', 'rastervision.pipeline.cli' | ||
], | ||
pipeline_run_name: str = 'rv'): | ||
config = rv_config.get_namespace_config(AWS_SAGEMAKER) | ||
exec_role = config('exec_role') | ||
|
||
sagemaker_pipeline = self.build_pipeline( | ||
cfg_json_uri, | ||
pipeline, | ||
commands, | ||
num_splits, | ||
cmd_prefix=cmd_prefix, | ||
pipeline_run_name=pipeline_run_name) | ||
|
||
# Submit the pipeline to SageMaker | ||
iam_client = boto3.client('iam') | ||
role_arn = iam_client.get_role(RoleName=exec_role)['Role']['Arn'] | ||
sagemaker_pipeline.upsert(role_arn=role_arn) | ||
execution = sagemaker_pipeline.start() | ||
|
||
pprint(execution.describe()) | ||
|
||
def build_pipeline(self, | ||
cfg_json_uri: str, | ||
pipeline: 'Pipeline', | ||
commands: List[str], | ||
num_splits: int = 1, | ||
cmd_prefix: List[str] = [ | ||
'python', '-m', 'rastervision.pipeline.cli' | ||
], | ||
pipeline_run_name: str = 'rv') -> SageMakerPipeline: | ||
"""Build a SageMaker Pipeline with each command as a step within it.""" | ||
|
||
config = rv_config.get_namespace_config(AWS_SAGEMAKER) | ||
exec_role = config('exec_role') | ||
cpu_image = config('cpu_image') | ||
cpu_instance_type = config('cpu_instance_type') | ||
gpu_image = config('gpu_image') | ||
gpu_instance_type = config('gpu_instance_type') | ||
use_spot_instances = config('use_spot_instances').lower() == 'yes' | ||
sagemaker_session = PipelineSession() | ||
|
||
steps = [] | ||
|
||
for command in commands: | ||
use_gpu = command in pipeline.gpu_commands | ||
job_name = f'{pipeline_run_name}-{command}' | ||
|
||
cmd = cmd_prefix[:] | ||
|
||
if rv_config.get_verbosity() > 1: | ||
num_vs = rv_config.get_verbosity() - 1 | ||
# produces a string like "-vvv..." | ||
verbosity_opt_str = f'-{"v" * num_vs}' | ||
cmd += [verbosity_opt_str] | ||
|
||
cmd.extend(['run_command', cfg_json_uri, command]) | ||
|
||
if command in pipeline.split_commands and num_splits > 1: | ||
# If the step can be split, then split it into parts | ||
# that do not depend on each other (can run in | ||
# parallel). | ||
_steps = [] | ||
for i in range(num_splits): | ||
cmd += [ | ||
'--split-ind', | ||
str(i), '--num-splits', | ||
str(num_splits) | ||
] | ||
step = self.build_step( | ||
step_name=f'{job_name}_{i+1}of{num_splits}', | ||
cmd=cmd, | ||
role=exec_role, | ||
image_uri=gpu_image if use_gpu else cpu_image, | ||
instance_type=(gpu_instance_type | ||
if use_gpu else cpu_instance_type), | ||
use_spot_instances=use_spot_instances, | ||
sagemaker_session=sagemaker_session, | ||
use_gpu=use_gpu) | ||
step.add_depends_on(steps) | ||
_steps.append(step) | ||
steps.extend(_steps) | ||
else: | ||
# If the step can not be split, then submit it as-is. | ||
step = self.build_step( | ||
step_name=job_name, | ||
cmd=cmd, | ||
role=exec_role, | ||
image_uri=gpu_image if use_gpu else cpu_image, | ||
instance_type=(gpu_instance_type | ||
if use_gpu else cpu_instance_type), | ||
use_spot_instances=use_spot_instances, | ||
sagemaker_session=sagemaker_session, | ||
use_gpu=use_gpu) | ||
step.add_depends_on(steps) | ||
steps.append(step) | ||
|
||
# Submit the pipeline to SageMaker | ||
sagemaker_pipeline = SageMakerPipeline( | ||
name=pipeline_run_name, | ||
steps=steps, | ||
sagemaker_session=sagemaker_session, | ||
) | ||
return sagemaker_pipeline | ||
|
||
def build_step(self, step_name: str, cmd: List[str], role: str, | ||
image_uri: str, instance_type: str, | ||
use_spot_instances: bool, | ||
sagemaker_session: PipelineSession, | ||
use_gpu: bool) -> Union[TrainingStep, ProcessingStep]: | ||
"""Build an TrainingStep if use_gpu=True, otherwise a ProcessingStep. | ||
""" | ||
if use_gpu: | ||
# For GPU-enabled steps, create an "Estimator". | ||
# Formally this should probably not be used for prediction in | ||
# this way, but it is expedient (especially given default | ||
# service quotas, and other stuff). | ||
step_estimator = Estimator( | ||
container_entry_point=cmd, | ||
image_uri=image_uri, | ||
instance_count=1, | ||
instance_type=instance_type, | ||
max_retry_attempts=1, | ||
role=role, | ||
sagemaker_session=sagemaker_session, | ||
use_spot=use_spot_instances, | ||
) | ||
step_args: Optional['_JobStepArguments'] = step_estimator.fit( | ||
wait=False) | ||
step = TrainingStep(step_name, step_args=step_args) | ||
else: | ||
# For non-GPU-enabled steps, create a ScriptProcessor. | ||
step_processor = Processor( | ||
role=role, | ||
image_uri=image_uri, | ||
instance_count=1, | ||
instance_type=instance_type, | ||
sagemaker_session=sagemaker_session, | ||
entrypoint=cmd, | ||
) | ||
step_args: Optional['_JobStepArguments'] = step_processor.run( | ||
wait=False) | ||
step = ProcessingStep(step_name, step_args=step_args) | ||
|
||
return step |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
rastervision_pipeline==0.21.4-dev | ||
sagemaker==2.196.0 |
Oops, something went wrong.