Skip to content

Commit

Permalink
Shorten S3 prefix to meet the requirement of RDS export API (#4955)
Browse files Browse the repository at this point in the history
* Shorten prefix

Signed-off-by: Hai Yan <[email protected]>

* Add unit tests

Signed-off-by: Hai Yan <[email protected]>

---------

Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh authored Sep 18, 2024
1 parent 87c560a commit ed9f0c8
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 6 deletions.
1 change: 1 addition & 0 deletions data-prepper-pipeline-parser/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ group = 'org.opensearch.dataprepper.core'
dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:blocking-buffer')
implementation project(':data-prepper-plugins:rds-source')
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'org.apache.commons:commons-collections4:4.4'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import static java.lang.String.format;
import static org.opensearch.dataprepper.plugins.source.rds.RdsService.MAX_SOURCE_IDENTIFIER_LENGTH;

import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.opensearch.dataprepper.model.configuration.SinkModel;
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluator;
import org.opensearch.dataprepper.pipeline.parser.rule.RuleEvaluatorResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import software.amazon.awssdk.arns.Arn;

import javax.xml.transform.TransformerException;
Expand Down Expand Up @@ -441,15 +444,17 @@ public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
* @return the actual include_prefix
*/
public String getIncludePrefixForRdsSource(String s3Prefix) {
String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
final String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
final String shortenedSourceIdentifier = envSourceCoordinationIdentifier != null ?
IdentifierShortener.shortenIdentifier(envSourceCoordinationIdentifier, MAX_SOURCE_IDENTIFIER_LENGTH) : null;
if (s3Prefix == null && envSourceCoordinationIdentifier == null) {
return S3_BUFFER_PREFIX;
} else if (s3Prefix == null) {
return envSourceCoordinationIdentifier + S3_BUFFER_PREFIX;
return shortenedSourceIdentifier + S3_BUFFER_PREFIX;
} else if (envSourceCoordinationIdentifier == null) {
return s3Prefix + S3_BUFFER_PREFIX;
}
return s3Prefix + "/" + envSourceCoordinationIdentifier + S3_BUFFER_PREFIX;
return s3Prefix + "/" + shortenedSourceIdentifier + S3_BUFFER_PREFIX;
}

public String getAccountIdFromRole(final String roleArn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.source.rds.schema.SchemaManager;
import org.opensearch.dataprepper.plugins.source.rds.stream.BinlogClientFactory;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.rds.RdsClient;
Expand All @@ -45,6 +46,7 @@ public class RdsService {
*/
public static final int DATA_LOADER_MAX_JOB_COUNT = 1;
public static final String S3_PATH_DELIMITER = "/";
public static final int MAX_SOURCE_IDENTIFIER_LENGTH = 15;

private final RdsClient rdsClient;
private final S3Client s3Client;
Expand Down Expand Up @@ -162,10 +164,13 @@ private String getS3PathPrefix() {

final String s3PathPrefix;
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix();
// The prefix will be used in RDS export, which has a limit of 60 characters.
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + IdentifierShortener.shortenIdentifier(sourceCoordinator.getPartitionPrefix(), MAX_SOURCE_IDENTIFIER_LENGTH);
} else {
s3PathPrefix = s3UserPathPrefix;
}
return s3PathPrefix;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class LeaderScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class);
private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3;
private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1);
private static final String S3_EXPORT_PREFIX = "rds-export";
private static final String S3_EXPORT_PREFIX = "rds";
private final EnhancedSourceCoordinator sourceCoordinator;
private final RdsSourceConfig sourceConfig;
private final String s3Prefix;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.utils;

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;

public class IdentifierShortener {

public static String shortenIdentifier(final String identifier, final int maxLength) {
if (identifier.length() <= maxLength) {
return identifier;
}

try {
// Create SHA-256 hash
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] encodedhash = digest.digest(identifier.getBytes(StandardCharsets.UTF_8));

// Convert byte array to Base64 string
String base64Hash = Base64.getUrlEncoder().withoutPadding().encodeToString(encodedhash);

// Return the first maxLength characters
return base64Hash.substring(0, Math.min(base64Hash.length(), maxLength));
} catch (final NoSuchAlgorithmException e) {
return identifier.substring(0, maxLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.dataprepper.plugins.source.rds.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.source.rds.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.source.rds.stream.StreamScheduler;
import org.opensearch.dataprepper.plugins.source.rds.utils.IdentifierShortener;
import software.amazon.awssdk.services.rds.RdsClient;
import software.amazon.awssdk.services.rds.model.DBInstance;
import software.amazon.awssdk.services.rds.model.DescribeDbInstancesRequest;
Expand All @@ -46,6 +47,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.rds.RdsService.MAX_SOURCE_IDENTIFIER_LENGTH;
import static org.opensearch.dataprepper.plugins.source.rds.RdsService.S3_PATH_DELIMITER;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -150,7 +152,7 @@ void test_normal_service_start_when_stream_is_enabled() {
rdsService.start(buffer);
}

assertThat(s3PrefixArray[0], equalTo(s3Prefix + S3_PATH_DELIMITER + partitionPrefix));
assertThat(s3PrefixArray[0], equalTo(s3Prefix + S3_PATH_DELIMITER + IdentifierShortener.shortenIdentifier(partitionPrefix, MAX_SOURCE_IDENTIFIER_LENGTH)));
verify(executor).submit(any(LeaderScheduler.class));
verify(executor).submit(any(StreamScheduler.class));
verify(executor, never()).submit(any(ExportScheduler.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.rds.utils;

import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mockStatic;

class IdentifierShortenerTest {

@Test
void shortenIdentifier_when_input_is_shorter_than_max_then_return_original_input() {
final int maxLength = 10;
final String testInput = UUID.randomUUID().toString().substring(0, maxLength);

final String result = IdentifierShortener.shortenIdentifier(testInput, maxLength);

assertThat(result, equalTo(testInput));
}

@Test
void shortenIdentifier_when_input_is_longer_than_max_then_return_shortened_result() {
final int maxLength = 5;
final String testInput = UUID.randomUUID().toString();
assertThat(testInput.length(), greaterThan(maxLength));

final String result = IdentifierShortener.shortenIdentifier(testInput, maxLength);

assertThat(result.length(), lessThanOrEqualTo(maxLength));
}

@Test
void shortenIdentifier_when_NoSuchAlgorithmException_then_return_shortened_result() {
final int maxLength = 5;
final String testInput = UUID.randomUUID().toString();
assertThat(testInput.length(), greaterThan(maxLength));

try (MockedStatic<MessageDigest> messageDigestMockedStatic = mockStatic(MessageDigest.class)) {
messageDigestMockedStatic.when(() -> MessageDigest.getInstance("SHA-256"))
.thenThrow(new NoSuchAlgorithmException());
final String result = IdentifierShortener.shortenIdentifier(testInput, maxLength);
assertThat(result, equalTo(testInput.substring(0, maxLength)));
}


}
}

0 comments on commit ed9f0c8

Please sign in to comment.