Skip to content

Commit

Permalink
Add aggregate metrics for ddb source export and stream (#3724) (#3728)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
(cherry picked from commit 1af1ce9)

Co-authored-by: Taylor Gray <[email protected]>
  • Loading branch information
1 parent 23434e2 commit 3e1ea94
Show file tree
Hide file tree
Showing 14 changed files with 324 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ public static PluginMetrics fromNames(final String componentId, final String com
.add(componentId).toString());
}

/**
* Provides reference to APIs that register timer, counter, gauge into global registry.
*
* @param metricsPrefix the prefix to provide to metrics
* @return The {@link PluginMetrics}
*/
public static PluginMetrics fromPrefix(final String metricsPrefix) {
return new PluginMetrics(metricsPrefix);
}

private PluginMetrics(final String metricsPrefix) {
this.metricsPrefix = metricsPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Collections;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static java.util.Collections.emptyList;
Expand All @@ -41,6 +42,18 @@ void setUp() {
objectUnderTest = PluginMetrics.fromPluginSetting(pluginSetting);
}

@Test
public void testCounterWithMetricsPrefix() {

final String prefix = UUID.randomUUID().toString();

objectUnderTest = PluginMetrics.fromPrefix(prefix);
final Counter counter = objectUnderTest.counter("counter");
assertEquals(
prefix + MetricNames.DELIMITER + "counter",
counter.getId().getName());
}

@Test
public void testCounter() {
final Counter counter = objectUnderTest.counter("counter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumerFactory;
import org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.BackoffCalculator;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class DynamoDBService {

private final PluginMetrics pluginMetrics;

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private final AcknowledgementSetManager acknowledgementSetManager;


Expand All @@ -66,14 +69,15 @@ public DynamoDBService(final EnhancedSourceCoordinator coordinator,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.dynamoDBSourceConfig = sourceConfig;
this.dynamoDBSourceAggregateMetrics = new DynamoDBSourceAggregateMetrics();

// Initialize AWS clients
dynamoDbClient = clientFactory.buildDynamoDBClient();
dynamoDbStreamsClient = clientFactory.buildDynamoDbStreamClient();
s3Client = clientFactory.buildS3Client();

// A shard manager is responsible to retrieve the shard information from streams.
shardManager = new ShardManager(dynamoDbStreamsClient);
shardManager = new ShardManager(dynamoDbStreamsClient, dynamoDBSourceAggregateMetrics);
tableConfigs = sourceConfig.getTableConfigs();
executor = Executors.newFixedThreadPool(4);
}
Expand All @@ -89,12 +93,12 @@ public void start(Buffer<Record<Event>> buffer) {

LOG.info("Start running DynamoDB service");
ManifestFileReader manifestFileReader = new ManifestFileReader(new S3ObjectReader(s3Client));
Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics);
Runnable exportScheduler = new ExportScheduler(coordinator, dynamoDbClient, manifestFileReader, pluginMetrics, dynamoDBSourceAggregateMetrics);

DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, buffer);
ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null));
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ private void processDataFilePartition(DataFilePartition dataFilePartition) {
} else {
runLoader.whenComplete((v, ex) -> {
if (ex != null) {
LOG.error("There was an exception while processing an S3 data file: {}", ex);
coordinator.giveUpPartition(dataFilePartition);
}
numOfWorkers.decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.ExportSummary;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.LoadStatus;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
Expand Down Expand Up @@ -74,11 +75,15 @@ public class ExportScheduler implements Runnable {
private final Counter exportS3ObjectsTotalCounter;
private final Counter exportRecordsTotalCounter;

public ExportScheduler(EnhancedSourceCoordinator enhancedSourceCoordinator, DynamoDbClient dynamoDBClient, ManifestFileReader manifestFileReader, PluginMetrics pluginMetrics) {
public ExportScheduler(final EnhancedSourceCoordinator enhancedSourceCoordinator,
final DynamoDbClient dynamoDBClient,
final ManifestFileReader manifestFileReader,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) {
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.dynamoDBClient = dynamoDBClient;
this.pluginMetrics = pluginMetrics;
this.exportTaskManager = new ExportTaskManager(dynamoDBClient);
this.exportTaskManager = new ExportTaskManager(dynamoDBClient, dynamoDBSourceAggregateMetrics);

this.manifestFileReader = manifestFileReader;
executor = Executors.newCachedThreadPool();
Expand Down Expand Up @@ -213,7 +218,7 @@ private void createDataFilePartitions(final String exportArn,

private void closeExportPartitionWithError(ExportPartition exportPartition) {
LOG.error("The export from DynamoDb to S3 failed, it will be retried");
exportJobFailureCounter.increment(1);
exportJobFailureCounter.increment();
ExportProgressState exportProgressState = exportPartition.getProgressState().get();
// Clear current Arn, so that a new export can be submitted.
exportProgressState.setExportArn(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.export;

import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
Expand All @@ -14,6 +15,7 @@
import software.amazon.awssdk.services.dynamodb.model.ExportFormat;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeRequest;
import software.amazon.awssdk.services.dynamodb.model.ExportTableToPointInTimeResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.S3SseAlgorithm;

import java.time.Instant;
Expand All @@ -25,10 +27,12 @@ public class ExportTaskManager {
private static final ExportFormat DEFAULT_EXPORT_FORMAT = ExportFormat.ION;

private final DynamoDbClient dynamoDBClient;
private final DynamoDBSourceAggregateMetrics dynamoAggregateMetrics;


public ExportTaskManager(DynamoDbClient dynamoDBClient) {
public ExportTaskManager(final DynamoDbClient dynamoDBClient,
final DynamoDBSourceAggregateMetrics dynamoAggregateMetrics) {
this.dynamoDBClient = dynamoDBClient;
this.dynamoAggregateMetrics = dynamoAggregateMetrics;
}

public String submitExportJob(String tableArn, String bucket, String prefix, String kmsKeyId, Instant exportTime) {
Expand All @@ -46,12 +50,17 @@ public String submitExportJob(String tableArn, String bucket, String prefix, Str


try {
dynamoAggregateMetrics.getExportApiInvocations().increment();
ExportTableToPointInTimeResponse response = dynamoDBClient.exportTableToPointInTime(req);

String exportArn = response.exportDescription().exportArn();
String status = response.exportDescription().exportStatusAsString();
LOG.debug("Export Job submitted with ARN {} and status {}", exportArn, status);
return exportArn;
} catch (final InternalServerErrorException e) {
dynamoAggregateMetrics.getExport5xxErrors().increment();
LOG.error("Failed to submit an export job with error: {}", e.getMessage());
return null;
} catch (SdkException e) {
LOG.error("Failed to submit an export job with error " + e.getMessage());
return null;
Expand All @@ -64,11 +73,15 @@ public String getExportManifest(String exportArn) {

String manifestKey = null;
try {
dynamoAggregateMetrics.getExportApiInvocations().increment();
DescribeExportResponse resp = dynamoDBClient.describeExport(request);
manifestKey = resp.exportDescription().exportManifest();

} catch (final InternalServerErrorException e) {
dynamoAggregateMetrics.getExport5xxErrors().increment();
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
} catch (SdkException e) {
LOG.error("Unable to get manifest file for export " + exportArn);
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
}
return manifestKey;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.leader;

import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

Expand Down Expand Up @@ -43,10 +45,13 @@ public class ShardManager {


private final DynamoDbStreamsClient streamsClient;
private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;


public ShardManager(final DynamoDbStreamsClient streamsClient) {
public ShardManager(final DynamoDbStreamsClient streamsClient,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics) {
this.streamsClient = streamsClient;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
streamMap = new HashMap<>();
endingSequenceNumberMap = new HashMap<>();
}
Expand Down Expand Up @@ -148,22 +153,30 @@ private List<Shard> listShards(String streamArn, String lastEvaluatedShardId) {
long startTime = System.currentTimeMillis();
// Get all the shard IDs from the stream.
List<Shard> shards = new ArrayList<>();
do {
DescribeStreamRequest req = DescribeStreamRequest.builder()
.streamArn(streamArn)
.limit(MAX_SHARD_COUNT)
.exclusiveStartShardId(lastEvaluatedShardId)
.build();

DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req);
shards.addAll(describeStreamResult.streamDescription().shards());
try {
do {
DescribeStreamRequest req = DescribeStreamRequest.builder()
.streamArn(streamArn)
.limit(MAX_SHARD_COUNT)
.exclusiveStartShardId(lastEvaluatedShardId)
.build();

// If LastEvaluatedShardId is set,
// at least one more page of shard IDs to retrieve
lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId();
dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment();
DescribeStreamResponse describeStreamResult = streamsClient.describeStream(req);
shards.addAll(describeStreamResult.streamDescription().shards());

// If LastEvaluatedShardId is set,
// at least one more page of shard IDs to retrieve
lastEvaluatedShardId = describeStreamResult.streamDescription().lastEvaluatedShardId();

} while (lastEvaluatedShardId != null);

} while (lastEvaluatedShardId != null);
} catch(final InternalServerErrorException e) {
LOG.error("Received an internal server exception from DynamoDB while listing shards: {}", e.getMessage());
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
return shards;
}

long endTime = System.currentTimeMillis();
LOG.info("Listing shards (DescribeStream call) took {} milliseconds with {} shards found", endTime - startTime, shards.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;

import java.time.Duration;
Expand Down Expand Up @@ -101,6 +103,8 @@ public class ShardConsumer implements Runnable {

private final String shardId;

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private long recordsWrittenToBuffer;

private ShardConsumer(Builder builder) {
Expand All @@ -117,10 +121,14 @@ private ShardConsumer(Builder builder) {
this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout;
this.shardId = builder.shardId;
this.recordsWrittenToBuffer = 0;
this.dynamoDBSourceAggregateMetrics = builder.dynamoDBSourceAggregateMetrics;
}

public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, buffer);
public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
}


Expand All @@ -130,6 +138,8 @@ static class Builder {

private final PluginMetrics pluginMetrics;

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private final Buffer<Record<Event>> buffer;

private TableInfo tableInfo;
Expand All @@ -149,9 +159,13 @@ static class Builder {
private AcknowledgementSet acknowledgementSet;
private Duration dataFileAcknowledgmentTimeout;

public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final Buffer<Record<Event>> buffer) {
public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
this.dynamoDbStreamsClient = dynamoDbStreamsClient;
this.pluginMetrics = pluginMetrics;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
this.buffer = buffer;
}

Expand Down Expand Up @@ -303,9 +317,13 @@ private GetRecordsResponse callGetRecords(String shardIterator) {
.build();

try {
dynamoDBSourceAggregateMetrics.getStreamApiInvocations().increment();
GetRecordsResponse response = dynamoDbStreamsClient.getRecords(req);
return response;
} catch (Exception e) {
} catch(final InternalServerErrorException ex) {
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
throw new RuntimeException(ex.getMessage());
} catch (final Exception e) {
throw new RuntimeException(e.getMessage());
}

Expand Down
Loading

0 comments on commit 3e1ea94

Please sign in to comment.