diff --git a/README.md b/README.md index 26f196695..5c8182f00 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ Control). The metrics reporter periodically samples the Kafka raw metrics on the **Note**: * Cruise Control will need some time to read the raw Kafka metrics from the cluster. * The metrics of a newly up broker may take a few minutes to get stable. Cruise Control will drop the inconsistent -metrics (e.g when topic bytes-in is higher than broker bytes-in), so first few snapshot windows may not have enough valid partitions. +metrics (e.g when topic bytes-in is higher than broker bytes-in), so first few windows may not have enough valid partitions. ### REST API ### Cruise Control provides a [REST API](https://github.com/linkedin/cruise-control/wiki/REST-APIs) for users diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index af2b95dfb..c5fe013b8 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -982,15 +982,14 @@ private ModelCompletenessRequirements modelCompletenessRequirements(Collection goalNames) { - sanityCheckHardGoalPresence(goalNames, false); - Collection goals = goalsByPriority(goalNames); + public boolean meetCompletenessRequirements(List goals) { MetadataClient.ClusterAndGeneration clusterAndGeneration = _loadMonitor.refreshClusterAndGeneration(); - return goals.stream().allMatch(g -> _loadMonitor.meetCompletenessRequirements( + return goalsByPriority(goals).stream().allMatch(g -> _loadMonitor.meetCompletenessRequirements( clusterAndGeneration, g.clusterModelCompletenessRequirements())); } @@ -1020,7 +1019,7 @@ private List goalsByPriority(List goals) { * @param goals A list of goals. * @param skipHardGoalCheck True if hard goal checking is not needed. */ - private void sanityCheckHardGoalPresence(List goals, boolean skipHardGoalCheck) { + public void sanityCheckHardGoalPresence(List goals, boolean skipHardGoalCheck) { if (goals != null && !goals.isEmpty() && !skipHardGoalCheck && !(goals.size() == 1 && goals.get(0).equals(PreferredLeaderElectionGoal.class.getSimpleName()))) { sanityCheckNonExistingGoal(goals, AnalyzerUtils.getCaseInsensitiveGoalsByName(_config)); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/AnalyzerUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/AnalyzerUtils.java index c113963da..a90d8a98f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/AnalyzerUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/AnalyzerUtils.java @@ -142,7 +142,7 @@ public static int compare(double d1, double d2, double epsilon) { } /** - * Get an list of goals sorted by highest to lowest default priority. + * Get the list of default goals sorted by highest to lowest default priority. */ public static List getGoalMapByPriority(KafkaCruiseControlConfig config) { return config.getConfiguredInstances(KafkaCruiseControlConfig.DEFAULT_GOALS_CONFIG, Goal.class); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector.java index 5dcf60165..ba3220cfd 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetector.java @@ -20,6 +20,8 @@ import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner; import java.util.Collections; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; @@ -44,7 +46,8 @@ public class AnomalyDetector { Collections.emptyMap(), true, true, - true); + true, + Collections.emptyList()); private final KafkaCruiseControl _kafkaCruiseControl; private final AnomalyNotifier _anomalyNotifier; // Detectors @@ -55,11 +58,11 @@ public class AnomalyDetector { private final long _anomalyDetectionIntervalMs; private final LinkedBlockingDeque _anomalies; private volatile boolean _shutdown; - private final Meter _brokerFailureRate; - private final Meter _goalViolationRate; - private final Meter _metricAnomalyRate; + private final Map _anomalyRateByType; private final LoadMonitor _loadMonitor; private final AnomalyDetectorState _anomalyDetectorState; + // TODO: Make this configurable. + private final List _selfHealingGoals; public AnomalyDetector(KafkaCruiseControlConfig config, LoadMonitor loadMonitor, @@ -72,15 +75,21 @@ public AnomalyDetector(KafkaCruiseControlConfig config, AnomalyNotifier.class); _loadMonitor = loadMonitor; _kafkaCruiseControl = kafkaCruiseControl; - _goalViolationDetector = new GoalViolationDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl); - _brokerFailureDetector = new BrokerFailureDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl); + _selfHealingGoals = Collections.emptyList(); + _kafkaCruiseControl.sanityCheckHardGoalPresence(_selfHealingGoals, false); + _goalViolationDetector = new GoalViolationDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl, _selfHealingGoals); + _brokerFailureDetector = new BrokerFailureDetector(config, _loadMonitor, _anomalies, time, _kafkaCruiseControl, _selfHealingGoals); _metricAnomalyDetector = new MetricAnomalyDetector(config, _loadMonitor, _anomalies, _kafkaCruiseControl); _detectorScheduler = Executors.newScheduledThreadPool(4, new KafkaCruiseControlThreadFactory(METRIC_REGISTRY_NAME, false, LOG)); _shutdown = false; - _brokerFailureRate = dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "broker-failure-rate")); - _goalViolationRate = dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "goal-violation-rate")); - _metricAnomalyRate = dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "metric-anomaly-rate")); + _anomalyRateByType = new HashMap<>(AnomalyType.cachedValues().size()); + _anomalyRateByType.put(AnomalyType.BROKER_FAILURE, + dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "broker-failure-rate"))); + _anomalyRateByType.put(AnomalyType.GOAL_VIOLATION, + dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "goal-violation-rate"))); + _anomalyRateByType.put(AnomalyType.METRIC_ANOMALY, + dropwizardMetricRegistry.meter(MetricRegistry.name(METRIC_REGISTRY_NAME, "metric-anomaly-rate"))); // Add anomaly detector state int numCachedRecentAnomalyStates = config.getInt(KafkaCruiseControlConfig.NUM_CACHED_RECENT_ANOMALY_STATES_CONFIG); _anomalyDetectorState = new AnomalyDetectorState(_anomalyNotifier.selfHealingEnabled(), numCachedRecentAnomalyStates); @@ -107,12 +116,12 @@ public AnomalyDetector(KafkaCruiseControlConfig config, _kafkaCruiseControl = kafkaCruiseControl; _detectorScheduler = detectorScheduler; _shutdown = false; - _brokerFailureRate = new Meter(); - _goalViolationRate = new Meter(); - _metricAnomalyRate = new Meter(); + _anomalyRateByType = new HashMap<>(AnomalyType.cachedValues().size()); + AnomalyType.cachedValues().forEach(anomalyType -> _anomalyRateByType.put(anomalyType, new Meter())); _loadMonitor = loadMonitor; // Add anomaly detector state _anomalyDetectorState = new AnomalyDetectorState(new HashMap<>(AnomalyType.cachedValues().size()), 10); + _selfHealingGoals = Collections.emptyList(); } public void startDetection() { @@ -201,22 +210,20 @@ public void run() { checkWithDelay(anomaly, _anomalyDetectionIntervalMs); } else { AnomalyNotificationResult notificationResult = null; + _anomalyRateByType.get(anomalyType).mark(); // Call the anomaly notifier to see if a fix is desired. switch (anomalyType) { case GOAL_VIOLATION: GoalViolations goalViolations = (GoalViolations) anomaly; notificationResult = _anomalyNotifier.onGoalViolation(goalViolations); - _goalViolationRate.mark(); break; case BROKER_FAILURE: BrokerFailures brokerFailures = (BrokerFailures) anomaly; notificationResult = _anomalyNotifier.onBrokerFailure(brokerFailures); - _brokerFailureRate.mark(); break; case METRIC_ANOMALY: KafkaMetricAnomaly metricAnomaly = (KafkaMetricAnomaly) anomaly; notificationResult = _anomalyNotifier.onMetricAnomaly(metricAnomaly); - _metricAnomalyRate.mark(); break; default: throw new IllegalStateException("Unrecognized anomaly type."); @@ -284,8 +291,7 @@ private boolean isReadyToFix(Anomaly anomaly) { LOG.info("Skipping {} because load monitor is in {} state.", skipMsg, loadMonitorTaskRunnerState); _anomalyDetectorState.onAnomalyHandle(anomaly, AnomalyState.Status.LOAD_MONITOR_NOT_READY); } else { - boolean meetCompletenessRequirements = _kafkaCruiseControl.meetCompletenessRequirements(Collections.emptyList()); - if (meetCompletenessRequirements) { + if (_kafkaCruiseControl.meetCompletenessRequirements(_selfHealingGoals)) { return true; } else { LOG.warn("Skipping {} because load completeness requirement is not met for goals.", skipMsg); @@ -296,7 +302,8 @@ private boolean isReadyToFix(Anomaly anomaly) { } private void fixAnomaly(Anomaly anomaly) throws Exception { - if (isReadyToFix(anomaly)) { + boolean isReadyToFix = isReadyToFix(anomaly); + if (isReadyToFix) { LOG.info("Fixing anomaly {}", anomaly); boolean startedSuccessfully = false; try { @@ -308,14 +315,21 @@ private void fixAnomaly(Anomaly anomaly) throws Exception { } } + handlePostFixAnomaly(isReadyToFix); + } + + private void handlePostFixAnomaly(boolean isReadyToFix) { _anomalies.clear(); // We need to add the shutdown message in case the failure detector has shutdown. if (_shutdown) { _anomalies.addFirst(SHUTDOWN_ANOMALY); + } else { + // Explicitly detect broker failures after clearing the queue. This ensures that anomaly detector does not miss + // broker failures upon (1) fixing another anomaly, or (2) having broker failures that are not yet ready for fix. + // We don't need to worry about other anomaly types because they run periodically. + _detectorScheduler.schedule(_brokerFailureDetector::detectBrokerFailures, + isReadyToFix ? 0L : _anomalyDetectionIntervalMs, TimeUnit.MILLISECONDS); } - // Explicitly detect broker failures after clear the queue. - // We don't need to worry about the goal violation because it is run periodically. - _brokerFailureDetector.detectBrokerFailures(); } } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector.java index ce8b86c6e..467b0c06d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector.java @@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ZK_SESSION_TIMEOUT; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ZK_CONNECTION_TIMEOUT; import static java.util.stream.Collectors.toSet; @@ -48,15 +50,17 @@ public class BrokerFailureDetector { private final boolean _allowCapacityEstimation; private final boolean _excludeRecentlyDemotedBrokers; private final boolean _excludeRecentlyRemovedBrokers; + private final List _selfHealingGoals; public BrokerFailureDetector(KafkaCruiseControlConfig config, LoadMonitor loadMonitor, Queue anomalies, Time time, - KafkaCruiseControl kafkaCruiseControl) { + KafkaCruiseControl kafkaCruiseControl, + List selfHealingGoals) { String zkUrl = config.getString(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG); - ZkConnection zkConnection = new ZkConnection(zkUrl, 30000); - _zkClient = new ZkClient(zkConnection, 30000, new ZkStringSerializer()); + ZkConnection zkConnection = new ZkConnection(zkUrl, ZK_SESSION_TIMEOUT); + _zkClient = new ZkClient(zkConnection, ZK_CONNECTION_TIMEOUT, new ZkStringSerializer()); // Do not support secure ZK at this point. _zkUtils = new ZkUtils(_zkClient, zkConnection, false); _failedBrokers = new HashMap<>(); @@ -68,6 +72,7 @@ public BrokerFailureDetector(KafkaCruiseControlConfig config, _allowCapacityEstimation = config.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG); _excludeRecentlyDemotedBrokers = config.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG); _excludeRecentlyRemovedBrokers = config.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG); + _selfHealingGoals = selfHealingGoals; } void startDetection() { @@ -157,12 +162,12 @@ private void parsePersistedFailedBrokers(String failedBrokerListString) { private void reportBrokerFailures() { if (!_failedBrokers.isEmpty()) { - Map failedBrokers = new HashMap<>(_failedBrokers); _anomalies.add(new BrokerFailures(_kafkaCruiseControl, - failedBrokers, + failedBrokers(), _allowCapacityEstimation, _excludeRecentlyDemotedBrokers, - _excludeRecentlyRemovedBrokers)); + _excludeRecentlyRemovedBrokers, + _selfHealingGoals)); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailures.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailures.java index cda8f427f..f18881a9b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailures.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailures.java @@ -9,13 +9,13 @@ import com.linkedin.kafka.cruisecontrol.detector.notifier.AnomalyType; import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException; import com.linkedin.kafka.cruisecontrol.servlet.response.OptimizationResult; -import java.text.DateFormat; -import java.text.SimpleDateFormat; import java.util.Collections; -import java.util.Date; +import java.util.List; import java.util.Map; import java.util.UUID; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.toDateString; + /** * The broker failures that have been detected. @@ -28,12 +28,14 @@ public class BrokerFailures extends KafkaAnomaly { private final boolean _excludeRecentlyDemotedBrokers; private final boolean _excludeRecentlyRemovedBrokers; private final String _anomalyId; + private final List _selfHealingGoals; public BrokerFailures(KafkaCruiseControl kafkaCruiseControl, Map failedBrokers, boolean allowCapacityEstimation, boolean excludeRecentlyDemotedBrokers, - boolean excludeRecentlyRemovedBrokers) { + boolean excludeRecentlyRemovedBrokers, + List selfHealingGoals) { _kafkaCruiseControl = kafkaCruiseControl; _failedBrokers = failedBrokers; _allowCapacityEstimation = allowCapacityEstimation; @@ -41,6 +43,7 @@ public BrokerFailures(KafkaCruiseControl kafkaCruiseControl, _excludeRecentlyRemovedBrokers = excludeRecentlyRemovedBrokers; _anomalyId = String.format("%s-%s", ID_PREFIX, UUID.randomUUID().toString().substring(ID_PREFIX.length() + 1)); _optimizationResult = null; + _selfHealingGoals = selfHealingGoals; } /** @@ -62,7 +65,7 @@ public boolean fix() throws KafkaCruiseControlException { _optimizationResult = new OptimizationResult(_kafkaCruiseControl.decommissionBrokers(_failedBrokers.keySet(), false, false, - Collections.emptyList(), + _selfHealingGoals, null, new OperationProgress(), _allowCapacityEstimation, @@ -87,9 +90,7 @@ public boolean fix() throws KafkaCruiseControlException { public String toString() { StringBuilder sb = new StringBuilder().append("{\n"); _failedBrokers.forEach((key, value) -> { - Date date = new Date(value); - DateFormat format = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss"); - sb.append("\tBroker ").append(key).append(" failed at ").append(format.format(date)).append("\n"); + sb.append("\tBroker ").append(key).append(" failed at ").append(toDateString(value)).append("\n"); }); sb.append("}"); return sb.toString(); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java index bfb3ea20f..287629a44 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java @@ -55,12 +55,14 @@ public class GoalViolationDetector implements Runnable { private final boolean _allowCapacityEstimation; private final boolean _excludeRecentlyDemotedBrokers; private final boolean _excludeRecentlyRemovedBrokers; + private final List _selfHealingGoals; public GoalViolationDetector(KafkaCruiseControlConfig config, LoadMonitor loadMonitor, Queue anomalies, Time time, - KafkaCruiseControl kafkaCruiseControl) { + KafkaCruiseControl kafkaCruiseControl, + List selfHealingGoals) { _loadMonitor = loadMonitor; // Notice that we use a separate set of Goal instances for anomaly detector to avoid interference. _goals = getDetectorGoalsMap(config); @@ -71,6 +73,7 @@ public GoalViolationDetector(KafkaCruiseControlConfig config, _excludeRecentlyDemotedBrokers = config.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG); _excludeRecentlyRemovedBrokers = config.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG); _kafkaCruiseControl = kafkaCruiseControl; + _selfHealingGoals = selfHealingGoals; } private SortedMap getDetectorGoalsMap(KafkaCruiseControlConfig config) { @@ -137,7 +140,8 @@ public void run() { AutoCloseable clusterModelSemaphore = null; try { GoalViolations goalViolations = new GoalViolations(_kafkaCruiseControl, _allowCapacityEstimation, - _excludeRecentlyDemotedBrokers, _excludeRecentlyRemovedBrokers); + _excludeRecentlyDemotedBrokers, _excludeRecentlyRemovedBrokers, + _selfHealingGoals); long now = _time.milliseconds(); boolean newModelNeeded = true; ClusterModel clusterModel = null; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolations.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolations.java index baddaab7b..09800f8dc 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolations.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolations.java @@ -33,11 +33,13 @@ public class GoalViolations extends KafkaAnomaly { private final boolean _excludeRecentlyDemotedBrokers; private final boolean _excludeRecentlyRemovedBrokers; private final String _anomalyId; + private final List _selfHealingGoals; public GoalViolations(KafkaCruiseControl kafkaCruiseControl, boolean allowCapacityEstimation, boolean excludeRecentlyDemotedBrokers, - boolean excludeRecentlyRemovedBrokers) { + boolean excludeRecentlyRemovedBrokers, + List selfHealingGoals) { _kafkaCruiseControl = kafkaCruiseControl; _allowCapacityEstimation = allowCapacityEstimation; _violatedGoalsByFixability = new HashMap<>(); @@ -45,6 +47,7 @@ public GoalViolations(KafkaCruiseControl kafkaCruiseControl, _excludeRecentlyRemovedBrokers = excludeRecentlyRemovedBrokers; _anomalyId = String.format("%s-%s", ID_PREFIX, UUID.randomUUID().toString().substring(ID_PREFIX.length() + 1)); _optimizationResult = null; + _selfHealingGoals = selfHealingGoals; } /** @@ -74,7 +77,7 @@ public boolean fix() throws KafkaCruiseControlException { if (_violatedGoalsByFixability.get(false) == null) { try { // Fix the fixable goal violations with rebalance operation. - _optimizationResult = new OptimizationResult(_kafkaCruiseControl.rebalance(Collections.emptyList(), + _optimizationResult = new OptimizationResult(_kafkaCruiseControl.rebalance(_selfHealingGoals, false, null, new OperationProgress(), diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java index d57830d7f..dab0332d3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java @@ -526,12 +526,11 @@ public MetadataClient.ClusterAndGeneration refreshClusterAndGeneration() { */ public boolean meetCompletenessRequirements(MetadataClient.ClusterAndGeneration clusterAndGeneration, ModelCompletenessRequirements requirements) { - int availableNumSnapshots = + int numValidWindows = _partitionMetricSampleAggregator.validWindows(clusterAndGeneration, - requirements.minMonitoredPartitionsPercentage()) - .size(); - int requiredSnapshot = requirements.minRequiredNumWindows(); - return availableNumSnapshots >= requiredSnapshot; + requirements.minMonitoredPartitionsPercentage()).size(); + int requiredNumValidWindows = requirements.minRequiredNumWindows(); + return numValidWindows >= requiredNumValidWindows; } /** diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest.java index a76ef027e..10c0b8001 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/AnomalyDetectorTest.java @@ -91,7 +91,8 @@ public void testDelayedCheck() throws InterruptedException { try { anomalyDetector.startDetection(); anomalies.add(new BrokerFailures(mockKafkaCruiseControl, Collections.singletonMap(0, 100L), - false, true, true)); + false, true, true, + Collections.emptyList())); while (!anomalies.isEmpty()) { // just wait for the anomalies to be drained. } @@ -161,7 +162,7 @@ public void testFix() throws InterruptedException, KafkaCruiseControlException { EasyMock.eq(true), EasyMock.eq(Collections.emptySet()))) .andReturn(null); - EasyMock.expect(mockKafkaCruiseControl.meetCompletenessRequirements(EasyMock.anyObject())).andReturn(true); + EasyMock.expect(mockKafkaCruiseControl.meetCompletenessRequirements(Collections.emptyList())).andReturn(true); EasyMock.replay(mockAnomalyNotifier); EasyMock.replay(mockBrokerFailureDetector); @@ -178,7 +179,8 @@ public void testFix() throws InterruptedException, KafkaCruiseControlException { try { anomalyDetector.startDetection(); GoalViolations violations = new GoalViolations(mockKafkaCruiseControl, true, - true, true); + true, true, + Collections.emptyList()); violations.addViolation("RackAwareGoal", true); anomalies.add(violations); while (!anomalies.isEmpty()) { @@ -253,7 +255,8 @@ public void testExecutionInProgress() throws InterruptedException { try { anomalyDetector.startDetection(); anomalies.add(new GoalViolations(mockKafkaCruiseControl, true, - true, true)); + true, true, + Collections.emptyList())); while (!anomalies.isEmpty()) { // Just wait for the anomalies to be drained. } diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetectorTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetectorTest.java index 00a2bc67e..02fb98d32 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetectorTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetectorTest.java @@ -72,8 +72,10 @@ public void testFailureDetection() throws Exception { assertEquals("The failed broker should be 0 and time should be 100L", Collections.singletonMap(brokerId, 100L), brokerFailures.failedBrokers()); + // Ensure that broker failure is detected as long as the broker is down. + detector.detectBrokerFailures(); + assertEquals("One broker failure should have been detected before timeout.", 1, anomalies.size()); // Bring the broker back - System.out.println("Starting brokers."); restartDeadBroker(brokerId); detector.detectBrokerFailures(); assertTrue(detector.failedBrokers().isEmpty()); @@ -137,7 +139,8 @@ private BrokerFailureDetector createBrokerFailureDetector(Queue anomali mockLoadMonitor, anomalies, time, - mockKafkaCruiseControl); + mockKafkaCruiseControl, + Collections.emptyList()); } private void killBroker(int index) throws Exception { diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SelfHealingNotifierTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SelfHealingNotifierTest.java index c67430ff6..c38562c9d 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SelfHealingNotifierTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SelfHealingNotifierTest.java @@ -47,7 +47,8 @@ public void testOnBrokerFailure() { failedBrokers, allowCapacityEstimation, EXCLUDE_RECENTLY_DEMOTED_BROKERS, - EXCLUDE_RECENTLY_REMOVED_BROKERS)); + EXCLUDE_RECENTLY_REMOVED_BROKERS, + Collections.emptyList())); assertEquals(AnomalyNotificationResult.Action.CHECK, result.action()); assertEquals(SelfHealingNotifier.DEFAULT_ALERT_THRESHOLD_MS + failureTime1 - mockTime.milliseconds(), result.delay()); @@ -59,7 +60,8 @@ public void testOnBrokerFailure() { failedBrokers, allowCapacityEstimation, EXCLUDE_RECENTLY_DEMOTED_BROKERS, - EXCLUDE_RECENTLY_REMOVED_BROKERS)); + EXCLUDE_RECENTLY_REMOVED_BROKERS, + Collections.emptyList())); assertEquals(AnomalyNotificationResult.Action.CHECK, result.action()); assertEquals(1, result.delay()); assertFalse(anomalyNotifier._alertCalled.get(AnomalyType.BROKER_FAILURE)); @@ -71,7 +73,8 @@ public void testOnBrokerFailure() { failedBrokers, allowCapacityEstimation, EXCLUDE_RECENTLY_DEMOTED_BROKERS, - EXCLUDE_RECENTLY_REMOVED_BROKERS)); + EXCLUDE_RECENTLY_REMOVED_BROKERS, + Collections.emptyList())); assertEquals(AnomalyNotificationResult.Action.CHECK, result.action()); assertEquals(SelfHealingNotifier.DEFAULT_AUTO_FIX_THRESHOLD_MS + failureTime1 - mockTime.milliseconds(), result.delay()); @@ -84,7 +87,8 @@ public void testOnBrokerFailure() { failedBrokers, allowCapacityEstimation, EXCLUDE_RECENTLY_DEMOTED_BROKERS, - EXCLUDE_RECENTLY_REMOVED_BROKERS)); + EXCLUDE_RECENTLY_REMOVED_BROKERS, + Collections.emptyList())); assertEquals(AnomalyNotificationResult.Action.CHECK, result.action()); assertEquals(1, result.delay()); assertTrue(anomalyNotifier._alertCalled.get(AnomalyType.BROKER_FAILURE)); @@ -97,7 +101,8 @@ public void testOnBrokerFailure() { failedBrokers, allowCapacityEstimation, EXCLUDE_RECENTLY_DEMOTED_BROKERS, - EXCLUDE_RECENTLY_REMOVED_BROKERS)); + EXCLUDE_RECENTLY_REMOVED_BROKERS, + Collections.emptyList())); assertEquals(AnomalyNotificationResult.Action.FIX, result.action()); assertEquals(-1L, result.delay()); assertTrue(anomalyNotifier._alertCalled.get(AnomalyType.BROKER_FAILURE)); @@ -135,7 +140,8 @@ public void testSelfHealingDisabled() { failedBrokers, true, EXCLUDE_RECENTLY_DEMOTED_BROKERS, - EXCLUDE_RECENTLY_REMOVED_BROKERS)); + EXCLUDE_RECENTLY_REMOVED_BROKERS, + Collections.emptyList())); assertEquals(AnomalyNotificationResult.Action.IGNORE, result.action()); assertTrue(anomalyNotifier._alertCalled.get(AnomalyType.BROKER_FAILURE)); assertFalse(anomalyNotifier._autoFixTriggered.get(AnomalyType.BROKER_FAILURE)); @@ -145,7 +151,8 @@ public void testSelfHealingDisabled() { result = anomalyNotifier.onGoalViolation(new GoalViolations(mockKafkaCruiseControl, true, EXCLUDE_RECENTLY_DEMOTED_BROKERS, - EXCLUDE_RECENTLY_REMOVED_BROKERS)); + EXCLUDE_RECENTLY_REMOVED_BROKERS, + Collections.emptyList())); assertEquals(AnomalyNotificationResult.Action.IGNORE, result.action()); assertTrue(anomalyNotifier._alertCalled.get(AnomalyType.GOAL_VIOLATION)); assertFalse(anomalyNotifier._autoFixTriggered.get(AnomalyType.GOAL_VIOLATION)); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SlackSelfHealingNotifierTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SlackSelfHealingNotifierTest.java index f0be534cd..46a8752de 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SlackSelfHealingNotifierTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/detector/notifier/SlackSelfHealingNotifierTest.java @@ -6,6 +6,7 @@ import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; import com.linkedin.kafka.cruisecontrol.detector.BrokerFailures; +import java.util.Collections; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.easymock.EasyMock; @@ -36,7 +37,9 @@ public static void setup() { Map failedBrokers = new HashMap<>(); failedBrokers.put(1, 200L); failedBrokers.put(2, 400L); - failures = new BrokerFailures(mockKafkaCruiseControl, failedBrokers, true, true, true); + failures = new BrokerFailures(mockKafkaCruiseControl, failedBrokers, true, + true, true, + Collections.emptyList()); } @Test