Skip to content

Commit

Permalink
Improve performance and accuracy of anomaly detector. (linkedin#682)
Browse files Browse the repository at this point in the history
* Avoid excessive generation of broker failure anomalies when there are dead brokers not ready for a fix. An anomaly is not ready if it (1) does not meet completeness requirements or (2) load monitor is in an unexpected state.
* Avoid redundant and repeated hard goal presence sanity checks for goals used in anomaly detection.
  • Loading branch information
efeg authored Apr 30, 2019
1 parent db59db7 commit a9e094b
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 67 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Control). The metrics reporter periodically samples the Kafka raw metrics on the
**Note**:
* Cruise Control will need some time to read the raw Kafka metrics from the cluster.
* The metrics of a newly up broker may take a few minutes to get stable. Cruise Control will drop the inconsistent
metrics (e.g when topic bytes-in is higher than broker bytes-in), so first few snapshot windows may not have enough valid partitions.
metrics (e.g when topic bytes-in is higher than broker bytes-in), so first few windows may not have enough valid partitions.
### REST API ###
Cruise Control provides a [REST API](https://github.com/linkedin/cruise-control/wiki/REST-APIs) for users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -982,15 +982,14 @@ private ModelCompletenessRequirements modelCompletenessRequirements(Collection<G
}

/**
* Check if the given goals meet the completeness requirements.
* Check if the completeness requirements are met for the given goals.
*
* @param goalNames Goal names (and empty list of names indicates all goals).
* @param goals A list of goals to check completeness for.
* @return True if completeness requirements are met for the given goals, false otherwise.
*/
public boolean meetCompletenessRequirements(List<String> goalNames) {
sanityCheckHardGoalPresence(goalNames, false);
Collection<Goal> goals = goalsByPriority(goalNames);
public boolean meetCompletenessRequirements(List<String> goals) {
MetadataClient.ClusterAndGeneration clusterAndGeneration = _loadMonitor.refreshClusterAndGeneration();
return goals.stream().allMatch(g -> _loadMonitor.meetCompletenessRequirements(
return goalsByPriority(goals).stream().allMatch(g -> _loadMonitor.meetCompletenessRequirements(
clusterAndGeneration, g.clusterModelCompletenessRequirements()));
}

Expand Down Expand Up @@ -1020,7 +1019,7 @@ private List<Goal> goalsByPriority(List<String> goals) {
* @param goals A list of goals.
* @param skipHardGoalCheck True if hard goal checking is not needed.
*/
private void sanityCheckHardGoalPresence(List<String> goals, boolean skipHardGoalCheck) {
public void sanityCheckHardGoalPresence(List<String> goals, boolean skipHardGoalCheck) {
if (goals != null && !goals.isEmpty() && !skipHardGoalCheck &&
!(goals.size() == 1 && goals.get(0).equals(PreferredLeaderElectionGoal.class.getSimpleName()))) {
sanityCheckNonExistingGoal(goals, AnalyzerUtils.getCaseInsensitiveGoalsByName(_config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static int compare(double d1, double d2, double epsilon) {
}

/**
* Get an list of goals sorted by highest to lowest default priority.
* Get the list of default goals sorted by highest to lowest default priority.
*/
public static List<Goal> getGoalMapByPriority(KafkaCruiseControlConfig config) {
return config.getConfiguredInstances(KafkaCruiseControlConfig.DEFAULT_GOALS_CONFIG, Goal.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -44,7 +46,8 @@ public class AnomalyDetector {
Collections.emptyMap(),
true,
true,
true);
true,
Collections.emptyList());
private final KafkaCruiseControl _kafkaCruiseControl;
private final AnomalyNotifier _anomalyNotifier;
// Detectors
Expand All @@ -55,11 +58,11 @@ public class AnomalyDetector {
private final long _anomalyDetectionIntervalMs;
private final LinkedBlockingDeque<Anomaly> _anomalies;
private volatile boolean _shutdown;
private final Meter _brokerFailureRate;
private final Meter _goalViolationRate;
private final Meter _metricAnomalyRate;
private final Map<AnomalyType, Meter> _anomalyRateByType;
private final LoadMonitor _loadMonitor;
private final AnomalyDetectorState _anomalyDetectorState;
// TODO: Make this configurable.
private final List<String> _selfHealingGoals;

public AnomalyDetector(KafkaCruiseControlConfig config,
LoadMonitor loadMonitor,
Expand All @@ -72,15 +75,21 @@ public AnomalyDetector(KafkaCruiseControlConfig config,
AnomalyNotifier.class);
_loadMonitor = loadMonitor;
_kafkaCruiseControl = kafkaCruiseControl;
_goalViolationDetector = new GoalViolationDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl);
_brokerFailureDetector = new BrokerFailureDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl);
_selfHealingGoals = Collections.emptyList();
_kafkaCruiseControl.sanityCheckHardGoalPresence(_selfHealingGoals, false);
_goalViolationDetector = new GoalViolationDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl, _selfHealingGoals);
_brokerFailureDetector = new BrokerFailureDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl, _selfHealingGoals);
_metricAnomalyDetector = new MetricAnomalyDetector(config, _loadMonitor, _anomalies, _kafkaCruiseControl);
_detectorScheduler =
Executors.newScheduledThreadPool(4, new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG));
_shutdown = false;
_brokerFailureRate = dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "broker-failure-rate"));
_goalViolationRate = dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "goal-violation-rate"));
_metricAnomalyRate = dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "metric-anomaly-rate"));
_anomalyRateByType = new HashMap<>(AnomalyType.cachedValues().size());
_anomalyRateByType.put(AnomalyType.BROKER_FAILURE,
dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "broker-failure-rate")));
_anomalyRateByType.put(AnomalyType.GOAL_VIOLATION,
dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "goal-violation-rate")));
_anomalyRateByType.put(AnomalyType.METRIC_ANOMALY,
dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "metric-anomaly-rate")));
// Add anomaly detector state
int numCachedRecentAnomalyStates = config.getInt(KafkaCruiseControlConfig.NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG);
_anomalyDetectorState = new AnomalyDetectorState(_anomalyNotifier.selfHealingEnabled(), numCachedRecentAnomalyStates);
Expand All @@ -107,12 +116,12 @@ public AnomalyDetector(KafkaCruiseControlConfig config,
_kafkaCruiseControl = kafkaCruiseControl;
_detectorScheduler = detectorScheduler;
_shutdown = false;
_brokerFailureRate = new Meter();
_goalViolationRate = new Meter();
_metricAnomalyRate = new Meter();
_anomalyRateByType = new HashMap<>(AnomalyType.cachedValues().size());
AnomalyType.cachedValues().forEach(anomalyType -> _anomalyRateByType.put(anomalyType, new Meter()));
_loadMonitor = loadMonitor;
// Add anomaly detector state
_anomalyDetectorState = new AnomalyDetectorState(new HashMap<>(AnomalyType.cachedValues().size()), 10);
_selfHealingGoals = Collections.emptyList();
}

public void startDetection() {
Expand Down Expand Up @@ -201,22 +210,20 @@ public void run() {
checkWithDelay(anomaly, _anomalyDetectionIntervalMs);
} else {
AnomalyNotificationResult notificationResult = null;
_anomalyRateByType.get(anomalyType).mark();
// Call the anomaly notifier to see if a fix is desired.
switch (anomalyType) {
case GOAL_VIOLATION:
GoalViolations goalViolations = (GoalViolations) anomaly;
notificationResult = _anomalyNotifier.onGoalViolation(goalViolations);
_goalViolationRate.mark();
break;
case BROKER_FAILURE:
BrokerFailures brokerFailures = (BrokerFailures) anomaly;
notificationResult = _anomalyNotifier.onBrokerFailure(brokerFailures);
_brokerFailureRate.mark();
break;
case METRIC_ANOMALY:
KafkaMetricAnomaly metricAnomaly = (KafkaMetricAnomaly) anomaly;
notificationResult = _anomalyNotifier.onMetricAnomaly(metricAnomaly);
_metricAnomalyRate.mark();
break;
default:
throw new IllegalStateException("Unrecognized anomaly type.");
Expand Down Expand Up @@ -284,8 +291,7 @@ private boolean isReadyToFix(Anomaly anomaly) {
LOG.info("Skipping {} because load monitor is in {} state.", skipMsg, loadMonitorTaskRunnerState);
_anomalyDetectorState.onAnomalyHandle(anomaly, AnomalyState.Status.LOAD_MONITOR_NOT_READY);
} else {
boolean meetCompletenessRequirements = _kafkaCruiseControl.meetCompletenessRequirements(Collections.emptyList());
if (meetCompletenessRequirements) {
if (_kafkaCruiseControl.meetCompletenessRequirements(_selfHealingGoals)) {
return true;
} else {
LOG.warn("Skipping {} because load completeness requirement is not met for goals.", skipMsg);
Expand All @@ -296,7 +302,8 @@ private boolean isReadyToFix(Anomaly anomaly) {
}

private void fixAnomaly(Anomaly anomaly) throws Exception {
if (isReadyToFix(anomaly)) {
boolean isReadyToFix = isReadyToFix(anomaly);
if (isReadyToFix) {
LOG.info("Fixing anomaly {}", anomaly);
boolean startedSuccessfully = false;
try {
Expand All @@ -308,14 +315,21 @@ private void fixAnomaly(Anomaly anomaly) throws Exception {
}
}

handlePostFixAnomaly(isReadyToFix);
}

private void handlePostFixAnomaly(boolean isReadyToFix) {
_anomalies.clear();
// We need to add the shutdown message in case the failure detector has shutdown.
if (_shutdown) {
_anomalies.addFirst(SHUTDOWN_ANOMALY);
} else {
// Explicitly detect broker failures after clearing the queue. This ensures that anomaly detector does not miss
// broker failures upon (1) fixing another anomaly, or (2) having broker failures that are not yet ready for fix.
// We don't need to worry about other anomaly types because they run periodically.
_detectorScheduler.schedule(_brokerFailureDetector::detectBrokerFailures,
isReadyToFix ? 0L : _anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS);
}
// Explicitly detect broker failures after clear the queue.
// We don't need to worry about the goal violation because it is run periodically.
_brokerFailureDetector.detectBrokerFailures();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ZK_SESSION_TIMEOUT;
import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ZK_CONNECTION_TIMEOUT;
import static java.util.stream.Collectors.toSet;


Expand All @@ -48,15 +50,17 @@ public class BrokerFailureDetector {
private final boolean _allowCapacityEstimation;
private final boolean _excludeRecentlyDemotedBrokers;
private final boolean _excludeRecentlyRemovedBrokers;
private final List<String> _selfHealingGoals;

public BrokerFailureDetector(KafkaCruiseControlConfig config,
LoadMonitor loadMonitor,
Queue<Anomaly> anomalies,
Time time,
KafkaCruiseControl kafkaCruiseControl) {
KafkaCruiseControl kafkaCruiseControl,
List<String> selfHealingGoals) {
String zkUrl = config.getString(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG);
ZkConnection zkConnection = new ZkConnection(zkUrl, 30000);
_zkClient = new ZkClient(zkConnection, 30000, new ZkStringSerializer());
ZkConnection zkConnection = new ZkConnection(zkUrl, ZK_SESSION_TIMEOUT);
_zkClient = new ZkClient(zkConnection, ZK_CONNECTION_TIMEOUT, new ZkStringSerializer());
// Do not support secure ZK at this point.
_zkUtils = new ZkUtils(_zkClient, zkConnection, false);
_failedBrokers = new HashMap<>();
Expand All @@ -68,6 +72,7 @@ public BrokerFailureDetector(KafkaCruiseControlConfig config,
_allowCapacityEstimation = config.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG);
_excludeRecentlyDemotedBrokers = config.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG);
_excludeRecentlyRemovedBrokers = config.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG);
_selfHealingGoals = selfHealingGoals;
}

void startDetection() {
Expand Down Expand Up @@ -157,12 +162,12 @@ private void parsePersistedFailedBrokers(String failedBrokerListString) {

private void reportBrokerFailures() {
if (!_failedBrokers.isEmpty()) {
Map<Integer, Long> failedBrokers = new HashMap<>(_failedBrokers);
_anomalies.add(new BrokerFailures(_kafkaCruiseControl,
failedBrokers,
failedBrokers(),
_allowCapacityEstimation,
_excludeRecentlyDemotedBrokers,
_excludeRecentlyRemovedBrokers));
_excludeRecentlyRemovedBrokers,
_selfHealingGoals));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.servlet.response.OptimizationResult;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.toDateString;


/**
* The broker failures that have been detected.
Expand All @@ -28,19 +28,22 @@ public class BrokerFailures extends KafkaAnomaly {
private final boolean _excludeRecentlyDemotedBrokers;
private final boolean _excludeRecentlyRemovedBrokers;
private final String _anomalyId;
private final List<String> _selfHealingGoals;

public BrokerFailures(KafkaCruiseControl kafkaCruiseControl,
Map<Integer, Long> failedBrokers,
boolean allowCapacityEstimation,
boolean excludeRecentlyDemotedBrokers,
boolean excludeRecentlyRemovedBrokers) {
boolean excludeRecentlyRemovedBrokers,
List<String> selfHealingGoals) {
_kafkaCruiseControl = kafkaCruiseControl;
_failedBrokers = failedBrokers;
_allowCapacityEstimation = allowCapacityEstimation;
_excludeRecentlyDemotedBrokers = excludeRecentlyDemotedBrokers;
_excludeRecentlyRemovedBrokers = excludeRecentlyRemovedBrokers;
_anomalyId = String.format("%s-%s", ID_PREFIX, UUID.randomUUID().toString().substring(ID_PREFIX.length() + 1));
_optimizationResult = null;
_selfHealingGoals = selfHealingGoals;
}

/**
Expand All @@ -62,7 +65,7 @@ public boolean fix() throws KafkaCruiseControlException {
_optimizationResult = new OptimizationResult(_kafkaCruiseControl.decommissionBrokers(_failedBrokers.keySet(),
false,
false,
Collections.emptyList(),
_selfHealingGoals,
null,
new OperationProgress(),
_allowCapacityEstimation,
Expand All @@ -87,9 +90,7 @@ public boolean fix() throws KafkaCruiseControlException {
public String toString() {
StringBuilder sb = new StringBuilder().append("{\n");
_failedBrokers.forEach((key, value) -> {
Date date = new Date(value);
DateFormat format = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss");
sb.append("\tBroker ").append(key).append(" failed at ").append(format.format(date)).append("\n");
sb.append("\tBroker ").append(key).append(" failed at ").append(toDateString(value)).append("\n");
});
sb.append("}");
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ public class GoalViolationDetector implements Runnable {
private final boolean _allowCapacityEstimation;
private final boolean _excludeRecentlyDemotedBrokers;
private final boolean _excludeRecentlyRemovedBrokers;
private final List<String> _selfHealingGoals;

public GoalViolationDetector(KafkaCruiseControlConfig config,
LoadMonitor loadMonitor,
Queue<Anomaly> anomalies,
Time time,
KafkaCruiseControl kafkaCruiseControl) {
KafkaCruiseControl kafkaCruiseControl,
List<String> selfHealingGoals) {
_loadMonitor = loadMonitor;
// Notice that we use a separate set of Goal instances for anomaly detector to avoid interference.
_goals = getDetectorGoalsMap(config);
Expand All @@ -71,6 +73,7 @@ public GoalViolationDetector(KafkaCruiseControlConfig config,
_excludeRecentlyDemotedBrokers = config.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG);
_excludeRecentlyRemovedBrokers = config.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG);
_kafkaCruiseControl = kafkaCruiseControl;
_selfHealingGoals = selfHealingGoals;
}

private SortedMap<Integer, Goal> getDetectorGoalsMap(KafkaCruiseControlConfig config) {
Expand Down Expand Up @@ -137,7 +140,8 @@ public void run() {
AutoCloseable clusterModelSemaphore = null;
try {
GoalViolations goalViolations = new GoalViolations(_kafkaCruiseControl, _allowCapacityEstimation,
_excludeRecentlyDemotedBrokers, _excludeRecentlyRemovedBrokers);
_excludeRecentlyDemotedBrokers, _excludeRecentlyRemovedBrokers,
_selfHealingGoals);
long now = _time.milliseconds();
boolean newModelNeeded = true;
ClusterModel clusterModel = null;
Expand Down
Loading

0 comments on commit a9e094b

Please sign in to comment.