Skip to content

Commit

Permalink
HDFS-11353. Improve the unit tests relevant to DataNode volume failur…
Browse files Browse the repository at this point in the history
…e testing. Contributed by Yiqun Lin.
  • Loading branch information
linyiqun committed Feb 2, 2017
1 parent 327c998 commit 3433f57
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.test.GenericTestUtils;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import com.google.common.base.Supplier;

import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -257,4 +260,27 @@ public static FsVolumeImpl getVolume(DataNode dn, File basePath) throws
}
return null;
}

/**
* Call and wait DataNode to detect disk failure.
*
* @param dn
* @param volume
* @throws Exception
*/
public static void waitForDiskError(DataNode dn, FsVolumeSpi volume)
throws Exception {
LOG.info("Starting to wait for datanode to detect disk failure.");
final long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
dn.checkDiskErrorAsync(volume);
// Wait 10 seconds for checkDiskError thread to finish and discover volume
// failures.
GenericTestUtils.waitFor(new Supplier<Boolean>() {

@Override
public Boolean get() {
return dn.getLastDiskErrorCheck() != lastDiskErrorCheck;
}
}, 100, 10000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,7 @@ public void testAddBackRemovedVolume()
*/
@Test(timeout=60000)
public void testDirectlyReloadAfterCheckDiskError()
throws IOException, TimeoutException, InterruptedException,
ReconfigurationException {
throws Exception {
// The test uses DataNodeTestUtils#injectDataDirFailure() to simulate
// volume failures which is currently not supported on Windows.
assumeNotWindows();
Expand All @@ -926,11 +925,7 @@ public void testDirectlyReloadAfterCheckDiskError()

DataNodeTestUtils.injectDataDirFailure(dirToFail);
// Call and wait DataNode to detect disk failure.
long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
dn.checkDiskErrorAsync(failedVolume);
while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100);
}
DataNodeTestUtils.waitForDiskError(dn, failedVolume);

createFile(new Path("/test1"), 32, (short)2);
assertEquals(used, failedVolume.getDfsUsed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsTracer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.RemotePeerFactory;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.Block;
Expand All @@ -73,19 +73,16 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;

import com.google.common.base.Supplier;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Supplier;

/**
* Fine-grain testing of block files and locations after volume failure.
*/
Expand All @@ -111,6 +108,10 @@ private class BlockLocs {
// block id to BlockLocs
final Map<String, BlockLocs> block_map = new HashMap<String, BlockLocs> ();

// specific the timeout for entire test class
@Rule
public Timeout timeout = new Timeout(120 * 1000);

@Before
public void setUp() throws Exception {
// bring up a cluster of 2
Expand Down Expand Up @@ -225,7 +226,7 @@ public Boolean get() {
*/
@Test(timeout=150000)
public void testFailedVolumeBeingRemovedFromDataNode()
throws InterruptedException, IOException, TimeoutException {
throws Exception {
// The test uses DataNodeTestUtils#injectDataDirFailure() to simulate
// volume failures which is currently not supported on Windows.
assumeNotWindows();
Expand All @@ -237,7 +238,8 @@ public void testFailedVolumeBeingRemovedFromDataNode()
File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
DataNode dn0 = cluster.getDataNodes().get(0);
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol1));

// Verify dn0Vol1 has been completely removed from DN0.
// 1. dn0Vol1 is removed from DataStorage.
Expand Down Expand Up @@ -284,35 +286,22 @@ public void testFailedVolumeBeingRemovedFromDataNode()
assertFalse(dataDirStrs[0].contains(dn0Vol1.getAbsolutePath()));
}

private static void checkDiskErrorSync(DataNode dn, FsVolumeSpi volume)
throws InterruptedException {
final long lastDiskErrorCheck = dn.getLastDiskErrorCheck();
dn.checkDiskErrorAsync(volume);
// Wait 10 seconds for checkDiskError thread to finish and discover volume
// failures.
int count = 100;
while (count > 0 && dn.getLastDiskErrorCheck() == lastDiskErrorCheck) {
Thread.sleep(100);
count--;
}
assertTrue("Disk checking thread does not finish in 10 seconds",
count > 0);
}

/**
* Test DataNode stops when the number of failed volumes exceeds
* dfs.datanode.failed.volumes.tolerated .
*/
@Test(timeout=10000)
public void testDataNodeShutdownAfterNumFailedVolumeExceedsTolerated()
throws InterruptedException, IOException {
throws Exception {
// make both data directories to fail on dn0
final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
DataNodeTestUtils.injectDataDirFailure(dn0Vol1, dn0Vol2);
DataNode dn0 = cluster.getDataNodes().get(0);
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol1));
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol2));

// DN0 should stop after the number of failure disks exceed tolerated
// value (1).
Expand All @@ -324,7 +313,7 @@ public void testDataNodeShutdownAfterNumFailedVolumeExceedsTolerated()
*/
@Test
public void testVolumeFailureRecoveredByHotSwappingVolume()
throws InterruptedException, ReconfigurationException, IOException {
throws Exception {
final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
final DataNode dn0 = cluster.getDataNodes().get(0);
Expand All @@ -333,7 +322,8 @@ public void testVolumeFailureRecoveredByHotSwappingVolume()

// Fail dn0Vol1 first.
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol1));

// Hot swap out the failure volume.
String dataDirs = dn0Vol2.getPath();
Expand All @@ -352,7 +342,8 @@ public void testVolumeFailureRecoveredByHotSwappingVolume()
// Fail dn0Vol2. Now since dn0Vol1 has been fixed, DN0 has sufficient
// resources, thus it should keep running.
DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol2));
assertTrue(dn0.shouldRun());
}

Expand All @@ -362,7 +353,7 @@ public void testVolumeFailureRecoveredByHotSwappingVolume()
*/
@Test
public void testTolerateVolumeFailuresAfterAddingMoreVolumes()
throws InterruptedException, ReconfigurationException, IOException {
throws Exception {
final File dn0Vol1 = new File(dataDir, "data" + (2 * 0 + 1));
final File dn0Vol2 = new File(dataDir, "data" + (2 * 0 + 2));
final File dn0VolNew = new File(dataDir, "data_new");
Expand All @@ -379,12 +370,14 @@ public void testTolerateVolumeFailuresAfterAddingMoreVolumes()

// Fail dn0Vol1 first and hot swap it.
DataNodeTestUtils.injectDataDirFailure(dn0Vol1);
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol1));
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol1));
assertTrue(dn0.shouldRun());

// Fail dn0Vol2, now dn0 should stop, because we only tolerate 1 disk failure.
DataNodeTestUtils.injectDataDirFailure(dn0Vol2);
checkDiskErrorSync(dn0, DataNodeTestUtils.getVolume(dn0, dn0Vol2));
DataNodeTestUtils.waitForDiskError(dn0,
DataNodeTestUtils.getVolume(dn0, dn0Vol2));
assertFalse(dn0.shouldRun());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/**
* Test reporting of DN volume failure counts and metrics.
Expand Down Expand Up @@ -80,6 +82,10 @@ public class TestDataNodeVolumeFailureReporting {
// a datanode to be considered dead by the namenode.
final int WAIT_FOR_DEATH = 15000;

// specific the timeout for entire test class
@Rule
public Timeout timeout = new Timeout(120 * 1000);

@Before
public void setUp() throws Exception {
// These tests use DataNodeTestUtils#injectDataDirFailure() to simulate
Expand Down Expand Up @@ -204,13 +210,13 @@ public void testSuccessiveVolumeFailures() throws Exception {
DFSTestUtil.createFile(fs, file3, 1024, (short)3, 1L);
DFSTestUtil.waitReplication(fs, file3, (short)2);

// The DN should consider itself dead
DFSTestUtil.waitForDatanodeDeath(dns.get(2));

// And report two failed volumes
checkFailuresAtDataNode(dns.get(2), 2, true, dn3Vol1.getAbsolutePath(),
dn3Vol2.getAbsolutePath());

// The DN should consider itself dead
DFSTestUtil.waitForDatanodeDeath(dns.get(2));

// The NN considers the DN dead
DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2,
origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/**
* Test the ability of a DN to tolerate volume failures.
Expand All @@ -58,6 +60,10 @@ public class TestDataNodeVolumeFailureToleration {
// a datanode to be considered dead by the namenode.
final int WAIT_FOR_DEATH = 15000;

// specific the timeout for entire test class
@Rule
public Timeout timeout = new Timeout(120 * 1000);

@Before
public void setUp() throws Exception {
conf = new HdfsConfiguration();
Expand Down

0 comments on commit 3433f57

Please sign in to comment.