diff --git a/config.py b/config.py index 38d489f..e6ff35f 100644 --- a/config.py +++ b/config.py @@ -1,6 +1,7 @@ # Constants (User configurable) APP_NAME = 'DistributedCP' # Used to generate derivative names unique to the application. +LOG_GROUP_NAME = APP_NAME # DOCKER REGISTRY INFORMATION: DOCKERHUB_TAG = 'cellprofiler/distributed-cellprofiler:2.0.0_4.1.3' @@ -32,10 +33,14 @@ # SQS QUEUE INFORMATION: SQS_QUEUE_NAME = APP_NAME + 'Queue' SQS_MESSAGE_VISIBILITY = 1*60 # Timeout (secs) for messages in flight (average time to be processed) -SQS_DEAD_LETTER_QUEUE = 'arn:aws:sqs:some-region:111111100000:DeadMessages' +SQS_DEAD_LETTER_QUEUE = 'user_DeadMessages' -# LOG GROUP INFORMATION: -LOG_GROUP_NAME = APP_NAME +# MONITORING +AUTO_MONITOR = 'True' + +# CLOUDWATCH DASHBOARD CREATION +CREATE_DASHBOARD = 'True' # Create a dashboard in Cloudwatch for run +CLEAN_DASHBOARD = 'True' # Automatically remove dashboard at end of run with Monitor # REDUNDANCY CHECKS CHECK_IF_DONE_BOOL = 'False' #True or False- should it check if there are a certain number of non-empty files and delete the job if yes? diff --git a/documentation/DCP-documentation/overview_2.md b/documentation/DCP-documentation/overview_2.md index bf8ccc8..6f1c8a1 100644 --- a/documentation/DCP-documentation/overview_2.md +++ b/documentation/DCP-documentation/overview_2.md @@ -37,7 +37,7 @@ Any time they don't have a job they go back to SQS. If SQS tells them there are no visible jobs then they shut themselves down. * When an instance finishes a job it sends a message to SQS and removes that job from the queue. -## What does this look like? +## What does an instance configuration look like? ![Example Instance Configuration](images/sample_DCP_config_1.png) @@ -65,4 +65,14 @@ How long a job takes to run and how quickly you need the data may also affect ho * Running a few large Docker containers (as opposed to many small ones) increases the amount of memory all the copies of your software are sharing, decreasing the likelihood you'll run out of memory if you stagger your job start times. However, you're also at a greater risk of running out of hard disk space. -Keep an eye on all of the logs the first few times you run any workflow and you'll get a sense of whether your resources are being utilized well or if you need to do more tweaking. \ No newline at end of file +Keep an eye on all of the logs the first few times you run any workflow and you'll get a sense of whether your resources are being utilized well or if you need to do more tweaking. + + ## What does this look like on AWS? + The following five are the primary resources that Distributed-CellProfiler interacts with. + After you have finished [preparing for Distributed-CellProfiler](step_0_prep), you do not need to directly interact with any of these services outside of Distributed-CellProfiler. + If you would like a granular view of what Distributed-CellProfiler is doing while it runs, you can open each console in a separate tab in your browser and watch their individual behaviors, though this is not necessary, especially if you run the [monitor command](step_4_monitor.md) and/or have DS automatically create a Dashboard for you (see [Configuration](step_1_configuration.md)). + * [S3 Console](https://console.aws.amazon.com/s3) + * [EC2 Console](https://console.aws.amazon.com/ec2/) + * [ECS Console](https://console.aws.amazon.com/ecs/) + * [SQS Console](https://console.aws.amazon.com/sqs/) + * [CloudWatch Console](https://console.aws.amazon.com/cloudwatch/) \ No newline at end of file diff --git a/documentation/DCP-documentation/step_0_prep.md b/documentation/DCP-documentation/step_0_prep.md index a1c87dc..36f8281 100644 --- a/documentation/DCP-documentation/step_0_prep.md +++ b/documentation/DCP-documentation/step_0_prep.md @@ -1,92 +1,81 @@ # Step 0: Prep +There are two classes of AWS resources that Distributed-CellProfiler interacts with: 1) infrastructure that is made once per AWS account to enable any Distributed-CellProfiler implementation to run and 2) infrastructure that is made and destroyed with every run. +This section describes the creation of the first class of AWS infrastructure and only needs to be followed once per account. -Distributed-CellProfiler runs many parallel jobs in EC2 instances that are automatically managed by ECS. -To get jobs started, a control node to submit jobs and monitor progress is needed. -This section describes what you need in AWS and in the control node to get started. -This guide only needs to be followed once per account. -(Though we recommend each user has their own control node, further control nodes can be created from an AMI after this guide has been followed to completion once.) - - -## 1. AWS Configuration +## AWS Configuration +The AWS resources involved in running Distributed-CellProfiler are configured using the [AWS Web Console](https://aws.amazon.com/console/) and a setup script we provide ([setup_AWS.py](../../setup_AWS.py)). +You need an active AWS account configured to proceed. +Login into your AWS account, and make sure the following list of resources is created: -The AWS resources involved in running Distributed-CellProfiler can be primarily configured using the [AWS Web Console](https://aws.amazon.com/console/). -The architecture of Distributed-CellProfiler is based in the [worker pattern](https://aws.amazon.com/blogs/compute/better-together-amazon-ecs-and-aws-lambda/) for distributed systems. -We have adapted and simplified that architecture for Distributed-CellProfiler. - -You need an active account configured to proceed. Login into your AWS account, and make sure the following list of resources is created: - -### 1.1 Access keys -* Get [security credentials](http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) for your account. +### 1.1 Manually created resources +* **Security Credentials**: Get [security credentials](http://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html) for your account. Store your credentials in a safe place that you can access later. -* You will probably need an ssh key to login into your EC2 instances (control or worker nodes). +* **SSH Key**: You will probably need an ssh key to login into your EC2 instances (control or worker nodes). [Generate an SSH key](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-key-pairs.html) and store it in a safe place for later use. If you'd rather, you can generate a new key pair to use for this during creation of the control node; make sure to `chmod 600` the private key when you download it. - -### 1.2 Roles and permissions -* You can use your default VPC, subnet, and security groups; you should add an inbound SSH connection from your IP address to your security group. -* [Create an ecsInstanceRole](http://docs.aws.amazon.com/AmazonECS/latest/developerguide/instance_IAM_role.html) with appropriate permissions (An S3 bucket access policy CloudWatchFullAccess, CloudWatchActionEC2Access, AmazonEC2ContainerServiceforEC2Role policies, ec2.amazonaws.com as a Trusted Entity) -* [Create an aws-ec2-spot-fleet-tagging-role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-fleet-requests.html) with appropriate permissions (just needs AmazonEC2SpotFleetTaggingRole); ensure that in the "Trust Relationships" tab it says "spotfleet.amazonaws.com" rather than "ec2.amazonaws.com" (edit this if necessary). -In the current interface, it's easiest to click "Create role", select "EC2" from the main service list, then select "EC2- Spot Fleet Tagging". +* **SSH Connection**: You can use your default AWS account VPC, subnet, and security groups. +You should add an inbound SSH connection from your IP address to your security group. + +### 1.2 Automatically created resources +* Run setup_AWS by entering `python setup_AWS.py` from your command line. +It will automatically create: + * an [ecsInstanceRole](http://docs.aws.amazon.com/AmazonECS/latest/developerguide/instance_IAM_role.html) with appropriate permissions. + This role is used by the EC2 instances generated by your spot fleet request and coordinated by ECS. + * an [aws-ec2-spot-fleet-tagging-role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/spot-fleet-requests.html) with appropriate permissions. + This role grants the Spot Fleet the permissions to request, launch, terminate, and tag instances. + * an SNS topic that is used for triggering the auto-Monitor. + * a Monitor lambda function that is used for auto-monitoring of your runs (see [Step 4: Monitor](step_4_monitor.md) for more information). ### 1.3 Auxiliary Resources +*You can certainly configure Distributed-CellProfiler for use without S3, but most DS implementations use S3 for storage.* * [Create an S3 bucket](http://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html) and upload your data to it. -* Add permissions to your bucket so that [logs can be exported to it](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/S3ExportTasksConsole.html) (Step 3, first code block) -* [Create an SQS](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSGettingStartedGuide/CreatingQueue.html) queue for unprocessable-messages to be dumped into (aka a DeadLetterQueue). - -### 1.4 Primary Resources -The following five are the primary resources that Distributed-CellProfiler interacts with. -After you have finished preparing for Distributed-CellProfiler (this guide), you do not need to directly interact with any of these services outside of Distributed-CellProfiler. -If you would like a granular view of [what Distributed-CellProfiler is doing while it runs](overview_2.md), you can open each console in a separate tab in your browser and watch their individual behaviors, though this is not necessary, especially if you run the [monitor command](step_4_monitor.md) and/or enable auto-Dashboard creation in your [configuration](step_1_configuration.md). -* [S3 Console](https://console.aws.amazon.com/s3) -* [EC2 Console](https://console.aws.amazon.com/ec2/) -* [ECS Console](https://console.aws.amazon.com/ecs/) -* [SQS Console](https://console.aws.amazon.com/sqs/) -* [CloudWatch Console](https://console.aws.amazon.com/cloudwatch/) - -### 1.5 Spot Limits +Add permissions to your bucket so that [logs can be exported to it](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/S3ExportTasksConsole.html) (Step 3, first code block). + +### 1.4 Increase Spot Limits AWS initially [limits the number of spot instances](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-limits.html) you can use at one time; you can request more through a process in the linked documentation. Depending on your workflow (your scale and how you group your jobs), this may not be necessary. -## 2. The Control Node -The control node can be your local machine if it is configured properly, or it can also be a small instance in AWS. +## The Control Node +The control node is a machine that is used for running the Distributed-CellProfiler scripts. +It can be your local machine, if it is configured properly, or it can also be a small instance in AWS. We prefer to have a small EC2 instance dedicated to controlling our Distributed-CellProfiler workflows for simplicity of access and configuration. -To login in an EC2 machine you need an ssh key that can be generated in the web console. +To login in an EC2 machine you need an SSH key that can be generated in the web console. Each time you launch an EC2 instance you have to confirm having this key (which is a .pem file). This machine is needed only for submitting jobs, and does not have any special computational requirements, so you can use a micro instance to run basic scripts to proceed. +(Though we recommend each user has their own control node, further control nodes can be created from an AMI after this guide has been followed to completion once.) The control node needs the following tools to successfully run Distributed-CellProfiler. -Here we assume you are using the command line in a Linux machine, but you are free to try other operating systems too. +These instructions assume you are using the command line in a Linux machine, but you are free to try other operating systems too. -### 2.1 Make your own control node +### Create Control Node from Scratch +#### 2.1 Install Python 3.8 or higher and pip +Most scripts are written in Python and support Python 3.8 and 3.9. +Follow installation instructions for your platform to install Python. +pip should be included with the installation of Python 3.8 or 3.9, but if you do not have it installed, install pip. -#### 2.1.1 Clone this repo +#### 2.2 Clone this repository and install requirements You will need the scripts in Distributed-CellProfiler locally available in your control node.
sudo apt-get install git git clone https://github.com/DistributedScience/Distributed-CellProfiler.git cd Distributed-CellProfiler/ git pull -- -#### 2.1.2 Python 3.8 or higher and pip -Most scripts are written in Python and support Python 3.8 and 3.9. -Follow installation instructions for your platform to install python and, if needed, pip. -After Python has been installed, you need to install the requirements for Distributed-CellProfiler following this steps: - -
- cd Distributed-CellProfiler/files + # install requirements + cd files sudo pip install -r requirements.txt-#### 2.1.3 AWS CLI +#### 2.3 Install AWS CLI The command line interface is the main mode of interaction between the local node and the resources in AWS. -Follow AWS instructions to install [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html). -Then set up AWS CLI with: +You need to install [awscli](http://docs.aws.amazon.com/cli/latest/userguide/installing.html) for Distributed-CellProfiler to work properly: +
+ sudo pip install awscli --ignore-installed six + sudo pip install --upgrade awscli aws configure-When running the last step, you will need to enter your AWS credentials. +When running the last step (`aws configure`), you will need to enter your AWS credentials. Make sure to set the region correctly (i.e. us-west-1 or eu-east-1, not eu-west-2a), and set the default file type to json. #### 2.1.4 s3fs-fuse (optional) @@ -94,17 +83,18 @@ Make sure to set the region correctly (i.e. us-west-1 or eu-east-1, not eu-west- It does not have all the performance of a real file system, but allows you to easily access all the files in your s3 bucket. Follow the instructions at the link to mount your bucket. -#### 2.1.5 Parallel (optional) -Parallel is an optional Linux tool that you can install on your control node for generating job files using the `batches.sh` scripting tool. -If you use other ways of generating job files (e.g. `run_batch_general.py`) you do not need parallel. -To install parallel, run: -
- sudo apt-get install parallel -- -#### 2.1.6 Create a Control Node AMI (optional) +### Create Control Node from AMI (optional) Once you've set up the other software (and gotten a job running, so you know everything is set up correctly), you can use Amazon's web console to set this up as an Amazon Machine Instance, or AMI, to replicate the current state of the hard drive. Create future control nodes using this AMI so that you don't need to repeat the above installation. -### 2.2 Use a pre-made AMI -You can use our [Cytominer-VM](https://github.com/cytomining/cytominer-vm) and add your own security keys; it has extra things you may not need, such as R, but it can be very handy! +## Removing long-term infrastructure +If you decide that you never want to run Distributed-CellProfiler again and would like to remove the long-term infrastructure, follow these steps. + +### Remove Roles, Lambda Monitor, and Monitor SNS +
+python setup_AWS.py destroy ++ +### Remove EC2 Control node +If you made your control node as an EC2 instance, while in the AWS console, select the instance. +Select `Instance state` => `Terminate instance`. \ No newline at end of file diff --git a/documentation/DCP-documentation/step_1_configuration.md b/documentation/DCP-documentation/step_1_configuration.md index a577c76..e507076 100644 --- a/documentation/DCP-documentation/step_1_configuration.md +++ b/documentation/DCP-documentation/step_1_configuration.md @@ -55,6 +55,7 @@ This can safely be set to 0 for workflows that don't require much memory or exec * **SQS_MESSAGE_VISIBILITY:** How long each job is hidden from view before being allowed to be tried again. We recommend setting this to slightly longer than the average amount of time it takes an individual job to process- if you set it too short, you may waste resources doing the same job multiple times; if you set it too long, your instances may have to wait around a long while to access a job that was sent to an instance that stalled or has since been terminated. * **SQS_DEAD_LETTER_QUEUE:** The name of the queue to send jobs to if they fail to process correctly multiple times; this keeps a single bad job (such as one where a single file has been corrupted) from keeping your cluster active indefinitely. +This queue will be automatically made if it doesn't exist already. See [Step 0: Prep](step_0_prep.med) for more information. *** @@ -65,6 +66,18 @@ See [Step 0: Prep](step_0_prep.med) for more information. *** + ### MONITORING + * **AUTO_MONITOR:** Whether or not to have Auto-Monitor automatically monitor your jobs. + +*** + +### CLOUDWATCH DASHBOARD CREATION + +* **CREATE_DASHBOARD:** Create a Cloudwatch Dashboard that plots run metrics? +* **CLEAN_DASHBOARD:** Automatically clean up the Cloudwatch Dashboard at the end of the run? + + *** + ### REDUNDANCY CHECKS * **CHECK_IF_DONE_BOOL:** Whether or not to check the output folder before proceeding. diff --git a/documentation/DCP-documentation/step_4_monitor.md b/documentation/DCP-documentation/step_4_monitor.md index ee9dd3a..fcd78e2 100644 --- a/documentation/DCP-documentation/step_4_monitor.md +++ b/documentation/DCP-documentation/step_4_monitor.md @@ -2,53 +2,68 @@ Your workflow is now submitted. Distributed-CellProfiler will keep an eye on a few things for you at this point without you having to do anything else. - * Each instance is labeled with your APP_NAME, so that you can easily find your instances if you want to look at the instance metrics on the Running Instances section of the [EC2 web interface](https://console.aws.amazon.com/ec2/v2/home) to monitor performance. - * You can also look at the whole-cluster CPU and memory usage statistics related to your APP_NAME in the [ECS web interface](https://console.aws.amazon.com/ecs/home). - * Each instance will have an alarm placed on it so that if CPU usage dips below 1% for 15 consecutive minutes (almost always the result of a crashed machine), the instance will be automatically terminated and a new one will take its place. - * Each individual job processed will create a log of the CellProfiler output, and each Docker container will create a log showing CPU, memory, and disk usage. -If you choose to run the monitor script, Distributed-CellProfiler can be even more helpful. -The monitor can be run by entering `python run.py monitor files/APP_NAMESpotFleetRequestId.json`; the JSON file containing all the information Distributed-CellProfiler needs will have been automatically created when you sent the instructions to start your cluster in the previous step. - -(**Note:** You should run the monitor inside [Screen](https://www.gnu.org/software/screen/), [tmux](https://tmux.github.io/), or another comparable service to keep a network disconnection from killing your monitor; this is particularly critical the longer your run takes.) +If you choose to run the Monitor script, Distributed-CellProfiler can be even more helpful. -*** +## Running Monitor -## Monitor functions +### Manually running Monitor +Monitor can be run by entering `python run.py monitor files/APP_NAMESpotFleetRequestId.json`. +While the optimal time to initiate Monitor is as soon as you have triggered a run as it downscales infrastructure as necessary, you can run Monitor at any point in time and it will clean up whatever infrastructure remains. -### While your analysis is running +**Note:** You should run the monitor inside [Screen](https://www.gnu.org/software/screen/), [tmux](https://tmux.github.io/), or another comparable service to keep a network disconnection from killing your monitor; this is particularly critical the longer your run takes. -* Checks your queue once per minute to see how many jobs are currently processing and how many remain to be processed. +### Using Auto-Monitor +Instead of manually triggering Monitor, you can have a version of Monitor automatically initiate after you [start your cluster](step_3_start_cluster.md) by setting `AUTO_MONITOR = 'True'` in your [config file](step_1_configuration.md). +Auto-Monitor is an AWS Lambda function that is triggered by alarms placed on the SQS queue. +Read more about the [SQS Queue](SQS_QUEUE_information.md) to better understand the alarm metrics. -* Once per hour, it deletes the alarms for any instances that have been terminated in the last 24 hours (because of spot prices rising above your maximum bid, machine crashes, etc). +## Monitor functions -### When the number of jobs in your queue goes to 0 +### While your analysis is running +* Scales down the spot fleet request to match the number of remaining jobs WITHOUT force terminating them. +This happens every 10 minutes with manual Monitor or when the are no Visible Messages in your queue for Auto-Monitor. +* Deletes the alarms for any instances that have been terminated in the last 24 hours (because of spot prices rising above your maximum bid, machine crashes, etc). +This happens every hour with manual Monitor or when the are no Visible Messages in your queue for Auto-Monitor. +### When your queue is totally empty (there are no Visible or Not Visible messages) * Downscales the ECS service associated with your APP_NAME. - * Deletes all the alarms associated with your spot fleet (both the currently running and the previously terminated instances). - * Shuts down your spot fleet to keep you from incurring charges after your analysis is over. - * Gets rid of the queue, service, and task definition created for this analysis. - * Exports all the logs from your analysis onto your S3 bucket. - -* Deletes your Cloudwatch Dashboard if you created it and set CLEAN_DASHBOARD to True. - -*** +* Deletes your Cloudwatch Dashboard if you created it and set `CLEAN_DASHBOARD = 'True'` in your [config file](step_1_configuration.md). ## Cheapest mode -You can run the monitor in an optional "cheapest" mode, which will downscale the number of requested machines (but not RUNNING machines) to one 15 minutes after the monitor is engaged. +If you are manually triggering Monitor, you can run the monitor in an optional "cheapest" mode, which will downscale the number of requested machines (but not RUNNING machines) to one machine 15 minutes after the monitor is engaged. You can engage cheapest mode by adding `True` as a final configurable parameter when starting the monitor, aka `python run.py monitor files/APP_NAMESpotFleetRequestId.json True` -Cheapest mode is cheapest because it will remove all but 1 machine as soon as that machine crashes and/or runs out of jobs to do; this can save you money, particularly in multi-CPU Dockers running long jobs. - -This mode is optional because running this way involves some inherent risks. +Cheapest mode is cheapest because it will remove all but 1 machine as soon as that machine crashes and/or runs out of jobs to do; this can save you money, particularly in multi-CPU Dockers running long jobs. +This mode is optional because running this way involves some inherent risks. If machines stall out due to processing errors, they will not be replaced, meaning your job will take overall longer. Additionally, if there is limited capacity for your requested configuration when you first start (e.g. you want 200 machines but AWS says it can currently only allocate you 50), more machines will not be added if and when they become available in cheapest mode as they would in normal mode. + +*** + +## Monitor file + +The JSON monitor file containing all the information Distributed-CellProfiler needs will have been automatically created when you sent the instructions to start your cluster in the [previous step](step_3_start_cluster). +The file itself is quite simple and contains the following information: + +``` +{"MONITOR_FLEET_ID" : "sfr-9999ef99-99fc-9d9d-9999-9999999e99ab", +"MONITOR_APP_NAME" : "2021_12_13_Project_Analysis", +"MONITOR_ECS_CLUSTER" : "default", +"MONITOR_QUEUE_NAME" : "2021_12_13_Project_AnalysisQueue", +"MONITOR_BUCKET_NAME" : "bucket-name", +"MONITOR_LOG_GROUP_NAME" : "2021_12_13_Project_Analysis", +"MONITOR_START_TIME" : "1649187798951"} +``` + +For any Distributed-CellProfiler run where you have run [`startCluster`](step_3_start_cluster) more than once, the most recent values will overwrite the older values in the monitor file. +Therefore, if you have started multiple spot fleets (which you might do in different subnets if you are having trouble getting enough capacity in your spot fleet, for example), Monitor will only clean up the latest request unless you manually edit the `MONITOR_FLEET_ID` to match the spot fleet you have kept. \ No newline at end of file diff --git a/lambda_function.py b/lambda_function.py new file mode 100644 index 0000000..e14ba94 --- /dev/null +++ b/lambda_function.py @@ -0,0 +1,187 @@ +import boto3 +import datetime +import botocore +import json + +s3 = boto3.client("s3") +ecs = boto3.client("ecs") +ec2 = boto3.client("ec2") +cloudwatch = boto3.client("cloudwatch") +sqs = boto3.client("sqs") + +bucket = "BUCKET_NAME" + + +def killdeadAlarms(fleetId, monitorapp, project): + checkdates = [ + datetime.datetime.now().strftime("%Y-%m-%d"), + (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d"), + ] + todel = [] + for eachdate in checkdates: + datedead = ec2.describe_spot_fleet_request_history( + SpotFleetRequestId=fleetId, StartTime=eachdate + ) + for eachevent in datedead["HistoryRecords"]: + if eachevent["EventType"] == "instanceChange": + if eachevent["EventInformation"]["EventSubType"] == "terminated": + todel.append(eachevent["EventInformation"]["InstanceId"]) + todel = [f"{project}_{x}" for x in todel] + cloudwatch.delete_alarms(AlarmNames=todel) + print("Old alarms deleted") + + +def seeIfLogExportIsDone(logExportId): + while True: + result = cloudwatch.describe_export_tasks(taskId=logExportId) + if result["exportTasks"][0]["status"]["code"] != "PENDING": + if result["exportTasks"][0]["status"]["code"] != "RUNNING": + print(result["exportTasks"][0]["status"]["code"]) + break + time.sleep(30) + + +def downscaleSpotFleet(queue, spotFleetID): + response = sqs.get_queue_url(QueueName=queue) + queueUrl = response["QueueUrl"] + response = sqs.get_queue_attributes( + QueueUrl=queueUrl, + AttributeNames=[ + "ApproximateNumberOfMessages", + "ApproximateNumberOfMessagesNotVisible", + ], + ) + visible = int(response["Attributes"]["ApproximateNumberOfMessages"]) + nonvisible = int(response["Attributes"]["ApproximateNumberOfMessagesNotVisible"]) + status = ec2.describe_spot_fleet_instances(SpotFleetRequestId=spotFleetID) + if nonvisible < len(status["ActiveInstances"]): + result = ec2.modify_spot_fleet_request( + ExcessCapacityTerminationPolicy="noTermination", + TargetCapacity=str(nonvisible), + SpotFleetRequestId=spotFleetID, + ) + + +def lambda_handler(event, lambda_context): + # Triggered any time SQS queue ApproximateNumberOfMessagesVisible = 0 + # OR ApproximateNumberOfMessagesNotVisible = 0 + messagestring = event["Records"][0]["Sns"]["Message"] + messagedict = json.loads(messagestring) + queueId = messagedict["Trigger"]["Dimensions"][0]["value"] + project = queueId.rsplit("_", 1)[0] + + # Download monitor file + monitor_file_name = f"{queueId.split('Queue')[0]}SpotFleetRequestId.json" + monitor_local_name = f"/tmp/{monitor_file_name}" + monitor_on_bucket_name = f"monitors/{monitor_file_name}" + + with open(monitor_local_name, "wb") as f: + try: + s3.download_fileobj(bucket, monitor_on_bucket_name, f) + except botocore.exceptions.ClientError as error: + print("Error retrieving monitor file.") + return + with open(monitor_local_name, "r") as input: + monitorInfo = json.load(input) + + monitorcluster = monitorInfo["MONITOR_ECS_CLUSTER"] + monitorapp = monitorInfo["MONITOR_APP_NAME"] + fleetId = monitorInfo["MONITOR_FLEET_ID"] + loggroupId = monitorInfo["MONITOR_LOG_GROUP_NAME"] + starttime = monitorInfo["MONITOR_START_TIME"] + CLEAN_DASHBOARD = monitorInfo["CLEAN_DASHBOARD"] + print(f"Monitor triggered for {monitorcluster} {monitorapp} {fleetId} {loggroupId}") + + # If no visible messages, downscale machines + if "ApproximateNumberOfMessagesVisible" in event["Records"][0]["Sns"]["Message"]: + print("No visible messages. Tidying as we go.") + killdeadAlarms(fleetId, monitorapp, project) + downscaleSpotFleet(queueId, fleetId) + + # If no messages in progress, cleanup + if "ApproximateNumberOfMessagesNotVisible" in event["Records"][0]["Sns"]["Message"]: + print("No messages in progress. Cleaning up.") + ecs.update_service( + cluster=monitorcluster, + service=f"{monitorapp}Service", + desiredCount=0, + ) + print("Service has been downscaled") + + # Delete the alarms from active machines and machines that have died. + active_dictionary = ec2.describe_spot_fleet_instances( + SpotFleetRequestId=fleetId + ) + active_instances = [] + for instance in active_dictionary["ActiveInstances"]: + active_instances.append(instance["InstanceId"]) + cloudwatch.delete_alarms(AlarmNames=active_instances) + killdeadAlarms(fleetId, monitorapp, project) + + # Read spot fleet id and terminate all EC2 instances + ec2.cancel_spot_fleet_requests( + SpotFleetRequestIds=[fleetId], TerminateInstances=True + ) + print("Fleet shut down.") + + # Remove SQS queue, ECS Task Definition, ECS Service + ECS_TASK_NAME = monitorapp + "Task" + ECS_SERVICE_NAME = monitorapp + "Service" + + print("Deleting existing queue.") + queueoutput = sqs.list_queues(QueueNamePrefix=queueId) + try: + if len(queueoutput["QueueUrls"]) == 1: + queueUrl = queueoutput["QueueUrls"][0] + else: # In case we have "AnalysisQueue" and "AnalysisQueue1" and only want to delete the first of those + for eachUrl in queueoutput["QueueUrls"]: + if eachUrl.split("/")[-1] == queueName: + queueUrl = eachUrl + sqs.delete_queue(QueueUrl=queueUrl) + except KeyError: + print("Can't find queue to delete.") + + print("Deleting service") + try: + ecs.delete_service(cluster=monitorcluster, service=ECS_SERVICE_NAME) + except: + print("Couldn't delete service.") + + print("De-registering task") + taskArns = ecs.list_task_definitions() + for eachtask in taskArns["taskDefinitionArns"]: + fulltaskname = eachtask.split("/")[-1] + ecs.deregister_task_definition(taskDefinition=fulltaskname) + + print("Removing cluster if it's not the default and not otherwise in use") + if monitorcluster != "default": + result = ecs.describe_clusters(clusters=[monitorcluster]) + if ( + sum( + [ + result["clusters"][0]["pendingTasksCount"], + result["clusters"][0]["runningTasksCount"], + result["clusters"][0]["activeServicesCount"], + ] + ) + == 0 + ): + ecs.delete_cluster(cluster=monitorcluster) + + # Remove alarms that triggered monitor + print("Removing alarms that triggered Monitor") + cloudwatch.delete_alarms( + AlarmNames=[ + f"ApproximateNumberOfMessagesVisibleisZero_{monitorapp}", + f"ApproximateNumberOfMessagesNotVisibleisZero_{monitorapp}", + ] + ) + + # Remove Cloudwatch dashboard if created and cleanup desired + if CLEAN_DASHBOARD.lower() == "true": + dashboard_list = cloudwatch.list_dashboards() + for entry in dashboard_list["DashboardEntries"]: + if monitorapp in entry["DashboardName"]: + cloudwatch.delete_dashboards( + DashboardNames=[entry["DashboardName"]] + ) diff --git a/run.py b/run.py index f143ff1..4da71d4 100644 --- a/run.py +++ b/run.py @@ -10,11 +10,12 @@ from email.mime.text import MIMEText # Back compatability with old config versions -SOURCE_BUCKET = False -UPLOAD_FLAGS = False -UPDATE_PLUGINS = False -CREATE_DASHBOARD = False -CLEAN_DASHBOARD = False +SOURCE_BUCKET = 'False' +UPLOAD_FLAGS = 'False' +UPDATE_PLUGINS = 'False' +CREATE_DASHBOARD = 'False' +CLEAN_DASHBOARD = 'False' +AUTO_MONITOR = 'False' from config import * @@ -123,7 +124,7 @@ def generate_task_definition(AWS_PROFILE): {"name": "NECESSARY_STRING", "value": NECESSARY_STRING}, {"name": "DOWNLOAD_FILES", "value": DOWNLOAD_FILES}, ] - if SOURCE_BUCKET: + if SOURCE_BUCKET.lower()=='true': task_definition['containerDefinitions'][0]['environment'] += [ { 'name': 'SOURCE_BUCKET', @@ -133,13 +134,13 @@ def generate_task_definition(AWS_PROFILE): 'name': 'DESTINATION_BUCKET', 'value': DESTINATION_BUCKET }] - if UPLOAD_FLAGS: + if UPLOAD_FLAGS.lower()=='true': task_definition['containerDefinitions'][0]['environment'] += [ { 'name': 'UPLOAD_FLAGS', 'value': UPLOAD_FLAGS }] - if UPDATE_PLUGINS: + if UPDATE_PLUGINS.lower()=='true': task_definition["containerDefinitions"][0]["environment"] += [ {"name": "UPDATE_PLUGINS", "value": str(UPDATE_PLUGINS)}, {"name": "PLUGINS_COMMIT", "value": str(PLUGINS_COMMIT)}, @@ -589,7 +590,8 @@ def startCluster(): # Step 3: Make the monitor starttime=str(int(time.time()*1000)) - createMonitor=open('files/' + APP_NAME + 'SpotFleetRequestId.json','w') + monitor_file_name=f'files/{APP_NAME}SpotFleetRequestId.json' + createMonitor=open(monitor_file_name,'w') createMonitor.write('{"MONITOR_FLEET_ID" : "'+requestInfo['SpotFleetRequestId']+'",\n') createMonitor.write('"MONITOR_APP_NAME" : "'+APP_NAME+'",\n') createMonitor.write('"MONITOR_ECS_CLUSTER" : "'+ECS_CLUSTER+'",\n') @@ -597,8 +599,16 @@ def startCluster(): createMonitor.write('"MONITOR_BUCKET_NAME" : "'+AWS_BUCKET+'",\n') createMonitor.write('"MONITOR_LOG_GROUP_NAME" : "'+LOG_GROUP_NAME+'",\n') createMonitor.write('"MONITOR_START_TIME" : "'+ starttime+'"}\n') + createMonitor.write('"CLEAN_DASHBOARD" : "'+ CLEAN_DASHBOARD+'"}\n') createMonitor.close() + # Upload monitor file to S3 so it can be read by Auto-Monitor lambda function + if AUTO_MONITOR.lower()=='true': + s3 = boto3.client("s3") + json_on_bucket_name = f'monitors/{APP_NAME}SpotFleetRequestId.json' # match path set in lambda function + with open(monitor_file_name, "rb") as a: + s3.put_object(Body=a, Bucket=AWS_BUCKET, Key=json_on_bucket_name) + # Step 4: Create a log group for this app and date if one does not already exist logclient=boto3.client('logs') loggroupinfo=logclient.describe_log_groups(logGroupNamePrefix=LOG_GROUP_NAME) @@ -643,11 +653,42 @@ def startCluster(): SpotFleetRequestId=requestInfo["SpotFleetRequestId"] ) print("Spot fleet successfully created. Your job should start in a few minutes.") + print(f"Your monitor file is available at {monitor_file_name}") - if CREATE_DASHBOARD: + if CREATE_DASHBOARD.lower()=='true': print ("Creating CloudWatch dashboard for run metrics") create_dashboard(requestInfo) + if AUTO_MONITOR.lower()=='true': + # Create alarms that will trigger Monitor based on SQS queue metrics + cloudwatch = boto3.client("cloudwatch") + metricnames = [ + "ApproximateNumberOfMessagesNotVisible", + "ApproximateNumberOfMessagesVisible", + ] + sns = boto3.client("sns") + MonitorARN = sns.create_topic(Name="Monitor")['TopicArn'] # returns ARN since topic already exists + for metric in metricnames: + response = cloudwatch.put_metric_alarm( + AlarmName=f'{metric}isZero_{APP_NAME}', + ActionsEnabled=True, + OKActions=[], + AlarmActions=[MonitorARN], + InsufficientDataActions=[], + MetricName=metric, + Namespace="AWS/SQS", + Statistic="Average", + Dimensions=[ + {"Name": "QueueName", "Value": f'{APP_NAME}Queue'} + ], + Period=300, + EvaluationPeriods=1, + DatapointsToAlarm=1, + Threshold=0, + ComparisonOperator="LessThanOrEqualToThreshold", + TreatMissingData="missing", + ) + ################################# # SERVICE 4: MONITOR JOB ################################# @@ -684,6 +725,11 @@ def monitor(cheapest=False): while queue.pendingLoad(): if time.time() - startcountdown > 900: downscaleSpotFleet(queue, fleetId, ec2, manual=1) + # Print spot fleet metrics. + spot_fleet_info = ec2.describe_spot_fleet_requests(SpotFleetRequestIds=[fleetId]) + target = spot_fleet_info['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']['TargetCapacity'] + fulfilled = spot_fleet_info['SpotFleetRequestConfigs'][0]['SpotFleetRequestConfig']['FulfilledCapacity'] + print(f'Spot fleet has {target} requested instances. {fulfilled} are currently fulfilled.') break time.sleep(MONITOR_TIME) @@ -752,7 +798,7 @@ def monitor(cheapest=False): print("Removing cluster if it's not the default and not otherwise in use") removeClusterIfUnused(monitorcluster, ecs) # Remove Cloudwatch dashboard if created and cleanup desired - if CREATE_DASHBOARD and CLEAN_DASHBOARD: + if CREATE_DASHBOARD.lower()=='true' and CLEAN_DASHBOARD.lower()=='true': clean_dashboard(monitorapp) #Step 6: Export the logs to S3 @@ -782,4 +828,4 @@ def monitor(cheapest=False): elif sys.argv[1] == 'startCluster': startCluster() elif sys.argv[1] == 'monitor': - monitor() \ No newline at end of file + monitor() diff --git a/setup_AWS.py b/setup_AWS.py new file mode 100644 index 0000000..8a4d555 --- /dev/null +++ b/setup_AWS.py @@ -0,0 +1,192 @@ +import sys +import boto3 +import json + +iam = boto3.client("iam") +sns = boto3.client("sns") +lmbda = boto3.client("lambda") + +ecsInstanceRole_policy_list = [ + "arn:aws:iam::aws:policy/AmazonS3FullAccess", + "arn:aws:iam::aws:policy/CloudWatchFullAccess", + "arn:aws:iam::aws:policy/CloudWatchActionsEC2Access", + "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role", + "arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceRole", +] +LambdaFullAccess_policy_list = [ + "arn:aws:iam::aws:policy/AWSLambda_FullAccess", + "arn:aws:iam::aws:policy/AmazonSNSFullAccess", + "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole", + "arn:aws:iam::aws:policy/service-role/AWSLambdaSQSQueueExecutionRole", + "arn:aws:iam::aws:policy/AWSLambdaExecute", + "arn:aws:iam::aws:policy/AmazonECS_FullAccess", + "arn:aws:iam::aws:policy/service-role/AmazonEC2SpotFleetTaggingRole", + "arn:aws:iam::aws:policy/AmazonS3FullAccess", + "arn:aws:iam::aws:policy/AmazonSQSFullAccess", + "arn:aws:iam::aws:policy/CloudWatchFullAccess" +] + + +def setup(): + # Create ECS Instance Role + assume_role_policy_document = json.dumps( + { + "Version": "2008-10-17", + "Statement": [ + { + "Sid": "", + "Effect": "Allow", + "Principal": {"Service": "ec2.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + ) + try: + iam.create_role( + RoleName="ecsInstanceRole", + AssumeRolePolicyDocument=assume_role_policy_document, + ) + for arn in ecsInstanceRole_policy_list: + iam.attach_role_policy( + PolicyArn=arn, + RoleName="ecsInstanceRole", + ) + print ('Created ecsInstanceRole.') + except iam.exceptions.EntityAlreadyExistsException: + print ('Skipping creation of ecsInstanceRole. Already exists.') + + + # Create EC2 Spot Fleet Tagging Role + assume_role_policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "", + "Effect": "Allow", + "Principal": {"Service": "spotfleet.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + ) + try: + iam.create_role( + RoleName="aws-ec2-spot-fleet-tagging-role", + AssumeRolePolicyDocument=assume_role_policy_document, + ) + iam.attach_role_policy( + PolicyArn="arn:aws:iam::aws:policy/service-role/AmazonEC2SpotFleetTaggingRole", + RoleName="aws-ec2-spot-fleet-tagging-role", + ) + print ('Created aws-ec2-spot-fleet-tagging-role.') + except iam.exceptions.EntityAlreadyExistsException: + print ('Skipping creation of aws-ec2-spot-fleet-tagging-role. Already exists.') + + # Create Lambda Full Access Role + assume_role_policy_document = json.dumps( + { + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "", + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + ) + try: + iam.create_role( + RoleName="LambdaFullAccess", + AssumeRolePolicyDocument=assume_role_policy_document, + ) + for arn in LambdaFullAccess_policy_list: + iam.attach_role_policy( + PolicyArn=arn, + RoleName="LambdaFullAccess", + ) + print ('Created LambdaFullAccess role.') + except iam.exceptions.EntityAlreadyExistsException: + print ('Skipping creation of LambdaFullAccess role. Already exists.') + + # Create SNS Monitor topic + MonitorTopic = sns.create_topic(Name="Monitor") + print ('(Re-)Created Monitor SNS Topic.') + + # Create Monitor Lambda function + LambdaFullAccess = iam.get_role(RoleName="LambdaFullAccess") + + fxn = open("lambda_function.zip", "rb").read() + try: + MonitorFunction = lmbda.create_function( + FunctionName="Monitor", + Runtime="python3.9", + Role=LambdaFullAccess["Role"]["Arn"], + Handler="lambda_function.lambda_handler", + Code={ + "ZipFile": fxn, + }, + Description="Auto-monitor DS runs", + Timeout=900, + MemorySize=3008, + Publish=True, + PackageType="Zip", + TracingConfig={"Mode": "PassThrough"}, + Architectures=["x86_64"], + EphemeralStorage={"Size": 512} + ) + # Subscribe Monitor Lambda to Monitor Topic + sns.subscribe( + TopicArn=MonitorTopic["TopicArn"], + Protocol="lambda", + Endpoint=MonitorFunction["FunctionArn"], + ) + print ('Created Monitor Lambda Function.') + except lmbda.exceptions.ResourceConflictException: + print ('Skipping creation of Monitor Lambda Function. Already exists.') + try: + lmbda.add_permission( + FunctionName='Monitor', + StatementId='InvokeBySNS', + Action='lambda:InvokeFunction', + Principal='sns.amazonaws.com') + except lmbda.exceptions.ResourceConflictException: + print ('Monitor Lambda Function already has SNS invoke permission.') + +def destroy(): + # Delete roles + for arn in ecsInstanceRole_policy_list: + iam.detach_role_policy(RoleName="ecsInstanceRole", PolicyArn=arn) + iam.delete_role(RoleName="ecsInstanceRole") + + iam.detach_role_policy( + RoleName="aws-ec2-spot-fleet-tagging-role", + PolicyArn="arn:aws:iam::aws:policy/service-role/AmazonEC2SpotFleetTaggingRole", + ) + iam.delete_role(RoleName="aws-ec2-spot-fleet-tagging-role") + + for arn in LambdaFullAccess_policy_list: + iam.detach_role_policy(RoleName="LambdaFullAccess", PolicyArn=arn) + iam.delete_role(RoleName="LambdaFullAccess") + + # Delete Monitor Lambda function + lmbda.delete_function(FunctionName="Monitor") + + # Delete Monitor SNS topic + # create_topic is idempotent so we use it to return ARN since topic already exists + MonitorTopic = sns.create_topic(Name="Monitor") + sns.delete_topic(TopicArn=MonitorTopic["TopicArn"]) + + +if __name__ == "__main__": + if len(sys.argv) == 1: + setup() + else: + if sys.argv[1] == "destroy": + destroy() + else: + print("Use: setup_AWS.py or setup_AWS.py destroy") + sys.exit()