diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java index 686ec4e2ef..f64dfdeb52 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/sourcecoordination/enhanced/EnhancedLeaseBasedSourceCoordinator.java @@ -118,7 +118,7 @@ public Optional acquireAvailablePartition(String partit LOG.debug("Try to acquire an available {} partition", partitionType); Optional sourceItem = coordinationStore.tryAcquireAvailablePartition(this.sourceIdentifier + "|" + partitionType, hostName, DEFAULT_LEASE_TIMEOUT); if (sourceItem.isEmpty()) { - LOG.info("Partition owner {} failed to acquire a partition, no available {} partitions now", hostName, partitionType); + LOG.debug("Partition owner {} failed to acquire a partition, no available {} partitions now", hostName, partitionType); return Optional.empty(); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 107032c373..18308f33fc 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -102,7 +102,7 @@ public void start(Buffer> buffer) { Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics); ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, shardManager, buffer); - Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager); + Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); // May consider start or shutdown the scheduler on demand // Currently, event after the exports are done, the related scheduler will not be shutdown diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java index b53d5df2b6..70a6cbcf31 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverter.java @@ -26,8 +26,8 @@ public class ExportRecordConverter extends RecordConverter { private static final String ITEM_KEY = "Item"; - static final String EXPORT_RECORD_SUCCESS_COUNT = "exportRecordSuccess"; - static final String EXPORT_RECORD_ERROR_COUNT = "exportRecordErrors"; + static final String EXPORT_RECORDS_PROCESSED_COUNT = "exportRecordsProcessed"; + static final String EXPORT_RECORDS_PROCESSING_ERROR_COUNT = "exportRecordProcessingErrors"; IonObjectMapper MAPPER = new IonObjectMapper(); @@ -40,8 +40,8 @@ public class ExportRecordConverter extends RecordConverter { public ExportRecordConverter(Buffer> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) { super(buffer, tableInfo); this.pluginMetrics = pluginMetrics; - this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORD_SUCCESS_COUNT); - this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORD_ERROR_COUNT); + this.exportRecordSuccessCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT); + this.exportRecordErrorCounter = pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java index 2a221298f0..0286627ba6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/MetadataKeyAttributes.java @@ -6,7 +6,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.converter; public class MetadataKeyAttributes { - static final String COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "_id"; + static final String PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE = "primary_key"; static final String PARTITION_KEY_METADATA_ATTRIBUTE = "partition_key"; diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java index 3b5816778b..cedf7fb0f1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/RecordConverter.java @@ -17,7 +17,7 @@ import java.util.List; import java.util.Map; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TABLE_NAME_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_TIMESTAMP_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.PARTITION_KEY_METADATA_ATTRIBUTE; @@ -86,7 +86,7 @@ public Record convertToEvent(Map data, Instant eventCreat } eventMetadata.setAttribute(STREAM_EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE, mapStreamEventNameToBulkAction(streamEventName)); - eventMetadata.setAttribute(COMPOSITE_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, getId(data)); + eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, getId(data)); eventMetadata.setAttribute(PARTITION_KEY_METADATA_ATTRIBUTE, getPartitionKey(data)); eventMetadata.setAttribute(SORT_KEY_METADATA_ATTRIBUTE, getSortKey(data)); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index 2ba6fdf45e..bdfe6dcbe1 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -26,8 +26,8 @@ public class StreamRecordConverter extends RecordConverter { private static final Logger LOG = LoggerFactory.getLogger(StreamRecordConverter.class); - static final String CHANGE_EVENT_SUCCESS_COUNT = "changeEventSuccess"; - static final String CHANGE_EVENT_ERROR_COUNT = "changeEventErrors"; + static final String CHANGE_EVENTS_PROCESSED_COUNT = "changeEventsProcessed"; + static final String CHANGE_EVENTS_PROCESSING_ERROR_COUNT = "changeEventsProcessingErrors"; private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -42,8 +42,8 @@ public class StreamRecordConverter extends RecordConverter { public StreamRecordConverter(Buffer> buffer, TableInfo tableInfo, PluginMetrics pluginMetrics) { super(buffer, tableInfo); this.pluginMetrics = pluginMetrics; - this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENT_SUCCESS_COUNT); - this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENT_ERROR_COUNT); + this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); + this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); } @Override diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java index be1d1fc175..854bf0c7ce 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; @@ -34,7 +35,8 @@ public class DataFileScheduler implements Runnable { private static final int MAX_JOB_COUNT = 2; private static final int DEFAULT_LEASE_INTERVAL_MILLIS = 30_000; - static final String EXPORT_FILE_SUCCESS_COUNT = "exportFileSuccess"; + static final String EXPORT_S3_OBJECTS_PROCESSED_COUNT = "exportS3ObjectsProcessed"; + static final String ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE = "activeExportS3ObjectConsumers"; private final EnhancedSourceCoordinator coordinator; @@ -47,6 +49,7 @@ public class DataFileScheduler implements Runnable { private final Counter exportFileSuccessCounter; + private final AtomicLong activeExportS3ObjectConsumersGauge; public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFactory loaderFactory, PluginMetrics pluginMetrics) { @@ -57,7 +60,8 @@ public DataFileScheduler(EnhancedSourceCoordinator coordinator, DataFileLoaderFa executor = Executors.newFixedThreadPool(MAX_JOB_COUNT); - this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_FILE_SUCCESS_COUNT); + this.exportFileSuccessCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT); + this.activeExportS3ObjectConsumersGauge = pluginMetrics.gauge(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE, new AtomicLong()); } private void processDataFilePartition(DataFilePartition dataFilePartition) { @@ -81,8 +85,10 @@ public void run() { final Optional sourcePartition = coordinator.acquireAvailablePartition(DataFilePartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { + activeExportS3ObjectConsumersGauge.incrementAndGet(); DataFilePartition dataFilePartition = (DataFilePartition) sourcePartition.get(); processDataFilePartition(dataFilePartition); + activeExportS3ObjectConsumersGauge.decrementAndGet(); } } try { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java index 6c9d9108c2..b885ad0102 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportScheduler.java @@ -51,8 +51,8 @@ public class ExportScheduler implements Runnable { private static final String FAILED_STATUS = "Failed"; static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess"; - static final String EXPORT_JOB_ERROR_COUNT = "exportJobErrors"; - static final String EXPORT_FILES_TOTAL_COUNT = "exportFilesTotal"; + static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure"; + static final String EXPORT_S3_OBJECTS_TOTAL_COUNT = "exportS3ObjectsTotal"; static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal"; private final PluginMetrics pluginMetrics; @@ -68,9 +68,9 @@ public class ExportScheduler implements Runnable { private final ExportTaskManager exportTaskManager; private final Counter exportJobSuccessCounter; - private final Counter exportJobErrorCounter; + private final Counter exportJobFailureCounter; - private final Counter exportFilesTotalCounter; + private final Counter exportS3ObjectsTotalCounter; private final Counter exportRecordsTotalCounter; public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbClient dynamoDBClient, ManifestFileReader manifestFileReader, PluginMetrics pluginMetrics) { @@ -83,8 +83,8 @@ public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, Dyna executor = Executors.newCachedThreadPool(); exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT); - exportJobErrorCounter = pluginMetrics.counter(EXPORT_JOB_ERROR_COUNT); - exportFilesTotalCounter = pluginMetrics.counter(EXPORT_FILES_TOTAL_COUNT); + exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT); + exportS3ObjectsTotalCounter = pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT); exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT); @@ -186,7 +186,7 @@ private void createDataFilePartitions(String exportArn, String bucketName, Map sourcePartition = coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE); if (sourcePartition.isPresent()) { + activeChangeEventConsumers.incrementAndGet(); StreamPartition streamPartition = (StreamPartition) sourcePartition.get(); processStreamPartition(streamPartition); + activeChangeEventConsumers.decrementAndGet(); } } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java index d3f7175867..838bb9f0ab 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/ExportRecordConverterTest.java @@ -33,8 +33,8 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORD_ERROR_COUNT; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORD_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.ExportRecordConverter.EXPORT_RECORDS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class ExportRecordConverterTest { @@ -73,8 +73,8 @@ void setup() { tableInfo = new TableInfo(tableArn, metadata); - given(pluginMetrics.counter(EXPORT_RECORD_SUCCESS_COUNT)).willReturn(exportRecordSuccess); - given(pluginMetrics.counter(EXPORT_RECORD_ERROR_COUNT)).willReturn(exportRecordErrors); + given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSED_COUNT)).willReturn(exportRecordSuccess); + given(pluginMetrics.counter(EXPORT_RECORDS_PROCESSING_ERROR_COUNT)).willReturn(exportRecordErrors); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index f51d14b70f..1b9b161ecc 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -38,8 +38,8 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENT_ERROR_COUNT; -import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENT_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSING_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter.CHANGE_EVENTS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class StreamRecordConverterTest { @@ -77,8 +77,8 @@ void setup() { tableInfo = new TableInfo(tableArn, metadata); - given(pluginMetrics.counter(CHANGE_EVENT_SUCCESS_COUNT)).willReturn(changeEventSuccessCounter); - given(pluginMetrics.counter(CHANGE_EVENT_ERROR_COUNT)).willReturn(changeEventErrorCounter); + given(pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT)).willReturn(changeEventSuccessCounter); + given(pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT)).willReturn(changeEventErrorCounter); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java index 33acaf9003..f87ce693b3 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileSchedulerTest.java @@ -25,14 +25,17 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler.EXPORT_FILE_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler.ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.DataFileScheduler.EXPORT_S3_OBJECTS_PROCESSED_COUNT; @ExtendWith(MockitoExtension.class) class DataFileSchedulerTest { @@ -52,6 +55,9 @@ class DataFileSchedulerTest { @Mock private Counter exportFileSuccess; + @Mock + private AtomicLong activeExportS3ObjectConsumers; + @Mock private DataFileLoaderFactory loaderFactory; @@ -77,8 +83,6 @@ void setup() { DataFileProgressState state = new DataFileProgressState(); state.setLoaded(0); state.setTotal(100); -// lenient().when(dataFilePartition.getProgressState()).thenReturn(Optional.of(state)); - dataFilePartition = new DataFilePartition(exportArn, bucketName, manifestKey, Optional.of(state)); // Mock Global Table Info @@ -90,7 +94,7 @@ void setup() { .sortKeyAttributeName("SK") .streamArn(streamArn) .build(); -// Map tableState = metadata; + lenient().when(tableInfoGlobalState.getProgressState()).thenReturn(Optional.of(metadata.toMap())); @@ -99,7 +103,8 @@ void setup() { lenient().when(coordinator.getPartition(exportArn)).thenReturn(Optional.of(exportInfoGlobalState)); lenient().when(exportInfoGlobalState.getProgressState()).thenReturn(Optional.of(loadStatus.toMap())); - given(pluginMetrics.counter(EXPORT_FILE_SUCCESS_COUNT)).willReturn(exportFileSuccess); + given(pluginMetrics.counter(EXPORT_S3_OBJECTS_PROCESSED_COUNT)).willReturn(exportFileSuccess); + given(pluginMetrics.gauge(eq(ACTIVE_EXPORT_S3_OBJECT_CONSUMERS_GAUGE), any(AtomicLong.class))).willReturn(activeExportS3ObjectConsumers); lenient().when(coordinator.createPartition(any(EnhancedSourcePartition.class))).thenReturn(true); lenient().doNothing().when(coordinator).completePartition(any(EnhancedSourcePartition.class)); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java index dcd44a26aa..2a1506643f 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportSchedulerTest.java @@ -43,8 +43,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_FILES_TOTAL_COUNT; -import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_ERROR_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_S3_OBJECTS_TOTAL_COUNT; +import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_FAILURE_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_JOB_SUCCESS_COUNT; import static org.opensearch.dataprepper.plugins.source.dynamodb.export.ExportScheduler.EXPORT_RECORDS_TOTAL_COUNT; @@ -108,8 +108,8 @@ void setup() { when(exportPartition.getProgressState()).thenReturn(Optional.of(state)); given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccess); - given(pluginMetrics.counter(EXPORT_JOB_ERROR_COUNT)).willReturn(exportJobErrors); - given(pluginMetrics.counter(EXPORT_FILES_TOTAL_COUNT)).willReturn(exportFilesTotal); + given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobErrors); + given(pluginMetrics.counter(EXPORT_S3_OBJECTS_TOTAL_COUNT)).willReturn(exportFilesTotal); given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotal); ExportSummary summary = mock(ExportSummary.class); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java index 60ce8dd254..4b8bd46210 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamSchedulerTest.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; @@ -51,6 +52,9 @@ class StreamSchedulerTest { @Mock private ShardConsumerFactory consumerFactory; + @Mock + private PluginMetrics pluginMetrics; + private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; @@ -88,7 +92,7 @@ void setup() { public void test_normal_run() throws InterruptedException { given(coordinator.acquireAvailablePartition(StreamPartition.PARTITION_TYPE)).willReturn(Optional.of(streamPartition)).willReturn(Optional.empty()); - scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager); + scheduler = new StreamScheduler(coordinator, consumerFactory, shardManager, pluginMetrics); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(scheduler);