Skip to content

Commit

Permalink
Retry on dynamic index creation when an OpenSearchException is thrown (
Browse files Browse the repository at this point in the history
…#3541)

* Retry on dynamic index creation when an OpenSearchException is thrown

Signed-off-by: Chase Engelbrecht <[email protected]>

* Use correct exception type in unit tests

Signed-off-by: Chase Engelbrecht <[email protected]>

* Remove older cache imports

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
Signed-off-by: Chase <[email protected]>
  • Loading branch information
engechas authored Nov 10, 2023
1 parent d2ad8ae commit 3543634
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ public void doOutput(final Collection<Record<Event>> records) {
String indexName = configuredIndexAlias;
try {
indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
} catch (IOException | EventKeyNotFoundException e) {
} catch (final Exception e) {
LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage());
dynamicIndexDroppedEvents.increment();
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,26 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkNotNull;

public class DynamicIndexManager extends AbstractIndexManager {
private final Cache<String, IndexManager> indexManagerCache;
private static final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30;
private static final int APPROXIMATE_INDEX_MANAGER_SIZE = 32;
private static final Logger LOG = LoggerFactory.getLogger(DynamicIndexManager.class);
private static final int INDEX_SETUP_RETRY_WAIT_TIME_MS = 1000;

private Cache<String, IndexManager> indexManagerCache;
final int CACHE_EXPIRE_AFTER_ACCESS_TIME_MINUTES = 30;
final int APPROXIMATE_INDEX_MANAGER_SIZE = 32;
private final long cacheSizeInKB = 1024;
protected RestHighLevelClient restHighLevelClient;
protected OpenSearchClient openSearchClient;
Expand Down Expand Up @@ -70,9 +78,27 @@ public String getIndexName(final String dynamicIndexAlias) throws IOException {
indexManager = indexManagerFactory.getIndexManager(
indexType, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, fullIndexAlias);
indexManagerCache.put(fullIndexAlias, indexManager);
indexManager.setupIndex();
setupIndexWithRetries(indexManager);
}
return indexManager.getIndexName(fullIndexAlias);
}

private void setupIndexWithRetries(final IndexManager indexManager) throws IOException {
boolean isIndexSetup = false;

while (!isIndexSetup) {
try {
indexManager.setupIndex();
isIndexSetup = true;
} catch (final OpenSearchException e) {
LOG.warn("Failed to setup dynamic index with an exception. ", e);
try {
Thread.sleep(INDEX_SETUP_RETRY_WAIT_TIME_MS);
} catch (final InterruptedException ex) {
LOG.warn("Interrupted while sleeping between index setup retries");
}
}
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.IndicesClient;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.opensearch.OpenSearchClient;
Expand All @@ -28,6 +29,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -72,6 +74,9 @@ public class DynamicIndexManagerTests {
@Mock
private TemplateStrategy templateStrategy;

@Mock
private OpenSearchException openSearchException;

static final String EVENT_TYPE = "event";

@BeforeEach
Expand Down Expand Up @@ -165,4 +170,64 @@ public void missingDynamicIndexTest() throws IOException {
JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(RandomStringUtils.randomAlphabetic(10), DYNAMIC)).build();
assertThrows(EventKeyNotFoundException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)));
}

@Test
public void getIndexName_DoesNotRetryOnNonOpenSearchExceptions() throws IOException {
when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS);
String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias();
String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC);
innerIndexManager = mock(IndexManager.class);
when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager);
doThrow(new RuntimeException())
.when(innerIndexManager).setupIndex();

JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build();
assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)));

verify(innerIndexManager, times(1)).setupIndex();
}

@Test
public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilSuccess() throws IOException {
when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS);
when(clusterSettingsParser.getStringValueClusterSetting(any(GetClusterSettingsResponse.class), eq(IndexConstants.ISM_ENABLED_SETTING)))
.thenReturn("true");
String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias();
String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC);
innerIndexManager = mock(IndexManager.class);
when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager);
doThrow(openSearchException)
.doThrow(openSearchException)
.doNothing()
.when(innerIndexManager).setupIndex();
when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias);

JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build();
final String indexName = dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias));
assertThat(expectedIndexAlias, equalTo(indexName));

verify(innerIndexManager, times(3)).setupIndex();
}

@Test
public void getIndexName_DoesRetryOnOpenSearchExceptions_UntilFailure() throws IOException {
when(indexConfiguration.getIndexAlias()).thenReturn(INDEX_ALIAS);
String configuredIndexAlias = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias();
String expectedIndexAlias = INDEX_ALIAS.replace("${" + ID + "}", DYNAMIC);
innerIndexManager = mock(IndexManager.class);
when(mockIndexManagerFactory.getIndexManager(
IndexType.CUSTOM, openSearchClient, restHighLevelClient, openSearchSinkConfiguration, templateStrategy, expectedIndexAlias)).thenReturn(innerIndexManager);
doThrow(openSearchException)
.doThrow(openSearchException)
.doThrow(new RuntimeException())
.when(innerIndexManager).setupIndex();
when(innerIndexManager.getIndexName(expectedIndexAlias)).thenReturn(expectedIndexAlias);

JacksonEvent event = JacksonEvent.builder().withEventType(EVENT_TYPE).withData(Map.of(ID, DYNAMIC)).build();
assertThrows(RuntimeException.class, () -> dynamicIndexManager.getIndexName(event.formatString(configuredIndexAlias)));

verify(innerIndexManager, times(3)).setupIndex();
}
}

0 comments on commit 3543634

Please sign in to comment.