Skip to content

Commit

Permalink
Merge pull request #1416 from shekhar316/vpa-service
Browse files Browse the repository at this point in the history
5. Adding Recommendation Updater Service [KRUIZE-VPA Integration]
  • Loading branch information
dinogun authored Dec 16, 2024
2 parents 5c30a41 + d28c8a4 commit 64b41a1
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/main/java/com/autotune/Autotune.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.autotune.analyzer.exceptions.MonitoringAgentNotFoundException;
import com.autotune.analyzer.exceptions.MonitoringAgentNotSupportedException;
import com.autotune.analyzer.performanceProfiles.MetricProfileCollection;
import com.autotune.analyzer.recommendations.updater.RecommendationUpdaterService;
import com.autotune.analyzer.utils.AnalyzerConstants;
import com.autotune.common.datasource.DataSourceCollection;
import com.autotune.common.datasource.DataSourceInfo;
Expand Down Expand Up @@ -133,6 +134,8 @@ public static void main(String[] args) {
checkAvailableDataSources();
// load available metric profiles from db
loadMetricProfilesFromDB();
// start updater service
startRecommendationUpdaterService();

}
// close the existing session factory before recreating
Expand Down Expand Up @@ -288,4 +291,8 @@ private static void executeDDLs(String ddlFileName) throws Exception {
LOGGER.info(DBConstants.DB_MESSAGES.DB_LIVELINESS_PROBE_SUCCESS);
}

// starts the recommendation updater service
private static void startRecommendationUpdaterService() {
RecommendationUpdaterService.initiateUpdaterService();
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/autotune/analyzer/kruizeObject/KruizeObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class KruizeObject implements ExperimentTypeAware {
private String datasource;
@SerializedName(KruizeConstants.JSONKeys.EXPERIMENT_TYPE) //TODO: to be used in future
private AnalyzerConstants.ExperimentType experimentType;
@SerializedName("default_updater")
private String defaultUpdater;
private String namespace; // TODO: Currently adding it at this level with an assumption that there is only one entry in k8s object needs to be changed
private String mode; //Todo convert into Enum
@SerializedName("target_cluster")
Expand Down Expand Up @@ -310,6 +312,14 @@ public void setExperimentType(AnalyzerConstants.ExperimentType experimentType) {
}


public String getDefaultUpdater() {
return defaultUpdater;
}

public void setDefaultUpdater(String defaultUpdater) {
this.defaultUpdater = defaultUpdater;
}

@Override
public String toString() {
// Creating a temporary cluster name as we allow null for cluster name now
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*******************************************************************************
* Copyright (c) 2024 Red Hat, IBM Corporation and others.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/

package com.autotune.analyzer.recommendations.updater;

import com.autotune.analyzer.experiment.ExperimentInterface;
import com.autotune.analyzer.experiment.ExperimentInterfaceImpl;
import com.autotune.analyzer.kruizeObject.KruizeObject;
import com.autotune.analyzer.recommendations.updater.vpa.VpaUpdaterImpl;
import com.autotune.analyzer.utils.AnalyzerConstants;
import com.autotune.analyzer.utils.AnalyzerErrorConstants;
import com.autotune.database.service.ExperimentDBService;
import com.autotune.database.table.KruizeExperimentEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class RecommendationUpdaterService {

private static final Logger LOGGER = LoggerFactory.getLogger(RecommendationUpdaterService.class);

public static void initiateUpdaterService() {
try {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

LOGGER.info(AnalyzerConstants.RecommendationUpdaterConstants.InfoMsgs.STARTING_SERVICE);
executorService.scheduleAtFixedRate(() -> {
try {
RecommendationUpdaterImpl updater = new RecommendationUpdaterImpl();
Map<String, KruizeObject> experiments = getAutoModeExperiments();
for (Map.Entry<String, KruizeObject> experiment : experiments.entrySet()) {
KruizeObject kruizeObject = updater.generateResourceRecommendationsForExperiment(experiment.getValue().getExperimentName());
// TODO:// add default updater in kruizeObject and check if GPU recommendations are present
if (kruizeObject.getDefaultUpdater() == null) {
kruizeObject.setDefaultUpdater(AnalyzerConstants.RecommendationUpdaterConstants.SupportedUpdaters.VPA);
}

if (kruizeObject.getDefaultUpdater().equalsIgnoreCase(AnalyzerConstants.RecommendationUpdaterConstants.SupportedUpdaters.VPA)) {
VpaUpdaterImpl vpaUpdater = VpaUpdaterImpl.getInstance();
vpaUpdater.applyResourceRecommendationsForExperiment(kruizeObject);
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}, AnalyzerConstants.RecommendationUpdaterConstants.DEFAULT_INITIAL_DELAY,
AnalyzerConstants.RecommendationUpdaterConstants.DEFAULT_SLEEP_INTERVAL,
TimeUnit.SECONDS);
} catch (Exception e) {
LOGGER.error(AnalyzerErrorConstants.RecommendationUpdaterErrors.UPDTAER_SERVICE_START_ERROR + e.getMessage());
}
}

private static Map<String, KruizeObject> getAutoModeExperiments() {
try {
LOGGER.debug(AnalyzerConstants.RecommendationUpdaterConstants.InfoMsgs.CHECKING_AUTO_EXP);
Map<String, KruizeObject> mainKruizeExperimentMap = new ConcurrentHashMap<>();
new ExperimentDBService().loadAllLMExperiments(mainKruizeExperimentMap);
// filter map to only include entries where mode is auto or recreate
Map<String, KruizeObject> filteredMap = mainKruizeExperimentMap.entrySet().stream()
.filter(entry -> {
String mode = entry.getValue().getMode();
return AnalyzerConstants.AUTO.equalsIgnoreCase(mode) || AnalyzerConstants.RECREATE.equalsIgnoreCase(mode);
})
.collect(Collectors.toConcurrentMap(Map.Entry::getKey, Map.Entry::getValue));
return filteredMap;
} catch (Exception e) {
LOGGER.error(e.getMessage());
return new HashMap<>();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,8 @@ private RecommendationUpdaterConstants() {

}

public static final int DEFAULT_SLEEP_INTERVAL = 60;
public static final int DEFAULT_INITIAL_DELAY = 30;
public static final class SupportedUpdaters {
public static final String VPA = "vpa";

Expand Down Expand Up @@ -719,6 +721,8 @@ public static final class InfoMsgs {
public static final String VPA_PATCHED = "VPA object with name %s is patched successfully with recommendations.";
public static final String CREATEING_VPA = "Creating VPA with name: %s";
public static final String CREATED_VPA = "Created VPA with name: %s";
public static final String STARTING_SERVICE = "Starting recommendation updater.";
public static final String CHECKING_AUTO_EXP = "Searching for experiments with auto or recreate mode.";
private InfoMsgs() {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ private RecommendationUpdaterErrors() {

}

public static final String UPDTAER_SERVICE_START_ERROR = "Error occurred while initializing RecommendationUpdaterService.";
public static final String UNSUPPORTED_UPDATER_TYPE = "Updater type %s is not supported.";
public static final String GENERATE_RECOMMNEDATION_FAILED = "Failed to generate recommendations for experiment: {}";
public static final String UPDATER_NOT_INSTALLED = "Updater is not installed.";
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/autotune/database/dao/ExperimentDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public interface ExperimentDAO {
// If Kruize object restarts load all experiment which are in inprogress
public List<KruizeExperimentEntry> loadAllExperiments() throws Exception;

// If Kruize object restarts load all local monitoring experiments which are in inprogress
public List<KruizeLMExperimentEntry> loadAllLMExperiments() throws Exception;

// If Kruize object restarts load all results from the experiments which are in inprogress
List<KruizeResultsEntry> loadAllResults() throws Exception;

Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/autotune/database/dao/ExperimentDAOImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,29 @@ public List<KruizeExperimentEntry> loadAllExperiments() throws Exception {
return entries;
}

@Override
public List<KruizeLMExperimentEntry> loadAllLMExperiments() throws Exception {
//todo load only experimentStatus=inprogress , playback may not require completed experiments
List<KruizeLMExperimentEntry> entries = null;
String statusValue = "failure";
Timer.Sample timerLoadAllExp = Timer.start(MetricsConfig.meterRegistry());
try (Session session = KruizeHibernateUtil.getSessionFactory().openSession()) {
entries = session.createQuery(SELECT_FROM_LM_EXPERIMENTS, KruizeLMExperimentEntry.class).list();
// TODO: remove native sql query and transient
//getExperimentTypeInKruizeExperimentEntry(entries);
statusValue = "success";
} catch (Exception e) {
LOGGER.error("Not able to load experiment due to {}", e.getMessage());
throw new Exception("Error while loading exsisting experiments from database due to : " + e.getMessage());
} finally {
if (null != timerLoadAllExp) {
MetricsConfig.timerLoadAllExp = MetricsConfig.timerBLoadAllExp.tag("status", statusValue).register(MetricsConfig.meterRegistry());
timerLoadAllExp.stop(MetricsConfig.timerLoadAllExp);
}
}
return entries;
}

@Override
public List<KruizeResultsEntry> loadAllResults() throws Exception {
// TODO: load only experimentStatus=inProgress , playback may not require completed experiments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public class DBConstants {

public static final class SQLQUERY {
public static final String SELECT_FROM_EXPERIMENTS = "from KruizeExperimentEntry";
public static final String SELECT_FROM_LM_EXPERIMENTS = "from KruizeLMExperimentEntry";
public static final String SELECT_FROM_EXPERIMENTS_BY_EXP_NAME = "from KruizeExperimentEntry k WHERE k.experiment_name = :experimentName";
public static final String SELECT_FROM_LM_EXPERIMENTS_BY_EXP_NAME = "from KruizeLMExperimentEntry k WHERE k.experiment_name = :experimentName";
public static final String SELECT_FROM_RESULTS = "from KruizeResultsEntry";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,32 @@ public void loadAllExperiments(Map<String, KruizeObject> mainKruizeExperimentMap
}
}

public void loadAllLMExperiments(Map<String, KruizeObject> mainKruizeExperimentMap) throws Exception {
ExperimentInterface experimentInterface = new ExperimentInterfaceImpl();
List<KruizeLMExperimentEntry> entries = experimentDAO.loadAllLMExperiments();
if (null != entries && !entries.isEmpty()) {
List<CreateExperimentAPIObject> createExperimentAPIObjects = DBHelpers.Converters.KruizeObjectConverters.convertLMExperimentEntryToCreateExperimentAPIObject(entries);
if (null != createExperimentAPIObjects && !createExperimentAPIObjects.isEmpty()) {
List<KruizeObject> kruizeExpList = new ArrayList<>();

int failureThreshHold = createExperimentAPIObjects.size();
int failureCount = 0;
for (CreateExperimentAPIObject createExperimentAPIObject : createExperimentAPIObjects) {
KruizeObject kruizeObject = Converters.KruizeObjectConverters.convertCreateExperimentAPIObjToKruizeObject(createExperimentAPIObject);
if (null != kruizeObject) {
kruizeExpList.add(kruizeObject);
} else {
failureCount++;
}
}
if (failureThreshHold > 0 && failureCount == failureThreshHold) {
throw new Exception("None of the experiments are able to load from DB.");
}
experimentInterface.addExperimentToLocalStorage(mainKruizeExperimentMap, kruizeExpList);
}
}
}

public void loadAllResults(Map<String, KruizeObject> mainKruizeExperimentMap) throws Exception {
ExperimentInterface experimentInterface = new ExperimentInterfaceImpl();
KruizeObject kruizeObject;
Expand Down

0 comments on commit 64b41a1

Please sign in to comment.