From a179e4a5c211752754472d5a1a8444acf068f015 Mon Sep 17 00:00:00 2001 From: Rohit Ashiwal Date: Sun, 29 Oct 2023 23:40:00 +0530 Subject: [PATCH] fix-flaky: MultiCodecReindexIT The test spawns random number of data, client and manager nodes which sometimes lead to connect_exception when any of the test nodes fail to connect to the other node(s) leading to flakiness. Let's use only one data node as per the requirement of the test to reduce flakiness. Signed-off-by: Rohit Ashiwal --- .../index/codec/MultiCodecReindexIT.java | 195 ++++++++---------- 1 file changed, 87 insertions(+), 108 deletions(-) diff --git a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java index 604c233ca49c4..88c97c6749d0f 100644 --- a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java @@ -22,9 +22,10 @@ import org.opensearch.index.reindex.ReindexRequestBuilder; import org.opensearch.index.reindex.ReindexTestCase; import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -34,14 +35,10 @@ import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_METADATA; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_READ; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY_ALLOW_DELETE; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; +@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) public class MultiCodecReindexIT extends ReindexTestCase { @Override @@ -50,7 +47,6 @@ protected Collection> nodePlugins() { } public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { - internalCluster().ensureAtLeastNumDataNodes(1); Map codecMap = Map.of( "best_compression", "BEST_COMPRESSION", @@ -61,72 +57,94 @@ public void testReindexingMultipleCodecs() throws InterruptedException, Executio "lz4", "BEST_SPEED" ); - - for (Map.Entry codec : codecMap.entrySet()) { - assertReindexingWithMultipleCodecs(codec.getKey(), codec.getValue(), codecMap); - } - - } - - private void assertReindexingWithMultipleCodecs(String destCodec, String destCodecMode, Map codecMap) - throws ExecutionException, InterruptedException { - - final String index = "test-index" + destCodec; - final String destIndex = "dest-index" + destCodec; - - // creating source index - createIndex( - index, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", "default") - .put("index.merge.policy.max_merged_segment", "1b") - .build() - ); - ensureGreen(index); - final int nbDocs = randomIntBetween(2, 5); + final String index = "source-index"; + + { // Setup + // create source index + createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", "default") + .put("index.merge.policy.max_merged_segment", "1b") + .build() + ); + ensureGreen(index); + + // index using all codecs + for (Map.Entry codec : codecMap.entrySet()) { + useCodec(index, codec.getKey()); + + // perform index + indexRandom( + randomBoolean(), + false, + randomBoolean(), + IntStream.range(0, nbDocs) + .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) + .collect(toList()) + ); + + // perform flush + FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet(); + assertNoFailures(flushResponse); - // indexing with all 4 codecs - for (Map.Entry codec : codecMap.entrySet()) { - useCodec(index, codec.getKey()); - ingestDocs(index, nbDocs); - } - - assertTrue( - getSegments(index).stream() - .flatMap(s -> s.getAttributes().values().stream()) - .collect(Collectors.toSet()) - .containsAll(codecMap.values()) - ); + // perform refresh + RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet(); + assertNoFailures(refreshResponse); + } - // creating destination index with destination codec - createIndex( - destIndex, - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", destCodec) - .build() - ); + assertTrue( + getSegments(index).stream() + .flatMap(s -> s.getAttributes().values().stream()) + .collect(Collectors.toSet()) + .containsAll(codecMap.values()) + ); + } + { // Test reindex for each codec + for (Map.Entry codec : codecMap.entrySet()) { + final String destCodecMode = codec.getValue(); + final String destCodec = codec.getKey(); + + final String destIndex = "dest-index" + destCodec; + final int expectedResponseSize = codecMap.size() * nbDocs; + + // create destination index with selected codec + createIndex( + destIndex, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", destCodec) + .build() + ); + ensureGreen(destIndex); + + // perform reindex + BulkByScrollResponse response = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index) + .destination(destIndex) + .refresh(true) + .waitForActiveShards(ActiveShardCount.ONE) + .get(); + + // assertions + assertEquals(0, response.getNoops()); + assertEquals(1, response.getBatches()); + assertEquals(0, response.getDeleted()); + assertEquals(0, response.getVersionConflicts()); + assertEquals(0, response.getBulkFailures().size()); + assertEquals(0, response.getSearchFailures().size()); + + assertEquals(expectedResponseSize, response.getTotal()); + assertEquals(expectedResponseSize, response.getCreated()); + + assertTrue(response.getTook().getMillis() > 0); + assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); + } + } - BulkByScrollResponse bulkResponse = new ReindexRequestBuilder(client(), ReindexAction.INSTANCE).source(index) - .destination(destIndex) - .refresh(true) - .waitForActiveShards(ActiveShardCount.ONE) - .get(); - - assertEquals(codecMap.size() * nbDocs, bulkResponse.getCreated()); - assertEquals(codecMap.size() * nbDocs, bulkResponse.getTotal()); - assertEquals(0, bulkResponse.getDeleted()); - assertEquals(0, bulkResponse.getNoops()); - assertEquals(0, bulkResponse.getVersionConflicts()); - assertEquals(1, bulkResponse.getBatches()); - assertTrue(bulkResponse.getTook().getMillis() > 0); - assertEquals(0, bulkResponse.getBulkFailures().size()); - assertEquals(0, bulkResponse.getSearchFailures().size()); - assertTrue(getSegments(destIndex).stream().allMatch(segment -> segment.attributes.containsValue(destCodecMode))); } private void useCodec(String index, String codec) throws ExecutionException, InterruptedException { @@ -142,46 +160,7 @@ private void useCodec(String index, String codec) throws ExecutionException, Int assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1)); } - private void flushAndRefreshIndex(String index) { - - // Request is not blocked - for (String blockSetting : Arrays.asList( - SETTING_BLOCKS_READ, - SETTING_BLOCKS_WRITE, - SETTING_READ_ONLY, - SETTING_BLOCKS_METADATA, - SETTING_READ_ONLY_ALLOW_DELETE - )) { - try { - enableIndexBlock(index, blockSetting); - // flush - FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).execute().actionGet(); - assertNoFailures(flushResponse); - - // refresh - RefreshResponse refreshResponse = client().admin().indices().prepareRefresh(index).execute().actionGet(); - assertNoFailures(refreshResponse); - } finally { - disableIndexBlock(index, blockSetting); - } - } - } - - private void ingestDocs(String index, int nbDocs) throws InterruptedException { - - indexRandom( - randomBoolean(), - false, - randomBoolean(), - IntStream.range(0, nbDocs) - .mapToObj(i -> client().prepareIndex(index).setId(UUID.randomUUID().toString()).setSource("num", i)) - .collect(toList()) - ); - flushAndRefreshIndex(index); - } - private ArrayList getSegments(String index) { - return new ArrayList<>( client().admin() .indices()