Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vyu/tdi 43726 upgrade hadoop for s3 #4

Open
wants to merge 6 commits into
base: tlnd-2.7.3.3-SNAPSHOT
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@

<!-- jackson versions -->
<jackson.version>1.9.13</jackson.version>
<jackson2.version>2.2.3</jackson2.version>
<jackson2.version>2.8.3</jackson2.version>

<!-- ProtocolBuffer version, used to verify the protoc version and -->
<!-- define the protobuf JAR version -->
Expand Down Expand Up @@ -435,17 +435,17 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
<version>4.5.9</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.5</version>
<version>4.4.11</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.4</version>
<version>1.11</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
Expand Down Expand Up @@ -595,7 +595,7 @@
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
<version>1.2</version>
<exclusions>
<exclusion>
<groupId>avalon-framework</groupId>
Expand Down Expand Up @@ -651,7 +651,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.4</version>
<version>1.11.729</version>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
Expand Down Expand Up @@ -947,7 +947,7 @@
<artifactId>azure-storage</artifactId>
<version>2.0.0</version>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
18 changes: 16 additions & 2 deletions hadoop-tools/hadoop-aws/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<relativePath>../../hadoop-project</relativePath>
</parent>
<artifactId>hadoop-aws-tlnd</artifactId>
<version>2.7.3.3</version>
<version>2.7.3.11</version>
<name>Apache Hadoop Amazon Web Services support</name>
<description>
This module contains code to support integration with Amazon Web Services.
Expand All @@ -39,6 +39,7 @@
<!-- the command in jenkins for auto deploy : mvn clean -U -V deploy -Denforcer.skip=true -->
<hadoop.version_for_build>2.7.3</hadoop.version_for_build>
<talend.nexus.url>https://artifacts-oss.talend.com</talend.nexus.url>
<enforcer.skip>true</enforcer.skip>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -241,7 +242,14 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.10.6</version>
<version>1.11.729</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>1.11.729</version>
<scope>compile</scope>
</dependency>

Expand All @@ -251,5 +259,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.fs.s3a;

import com.amazonaws.auth.*;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
Expand All @@ -41,10 +42,7 @@
import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.AWSCredentials;

import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
Expand Down Expand Up @@ -73,6 +71,8 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.sts.STSCredentialsProvider;
import org.apache.hadoop.fs.s3a.sts.Constants4STS;
import org.apache.hadoop.util.Progressable;

import static org.apache.hadoop.fs.s3a.Constants.*;
Expand Down Expand Up @@ -167,6 +167,8 @@ public void initialize(URI name, Configuration conf) throws IOException {
String accessKey = conf.get(ACCESS_KEY, null);
String secretKey = conf.get(SECRET_KEY, null);

Boolean specifySTS = conf.getBoolean(Constants4STS.STS_SPECIFY,false);

String userInfo = name.getUserInfo();
if (userInfo != null) {
int index = userInfo.indexOf(':');
Expand All @@ -181,13 +183,17 @@ public void initialize(URI name, Configuration conf) throws IOException {
AWSCredentials credentials = null;
try {
credentials = new AWSCredentialsProviderChain(
new BasicAWSCredentialsProvider(accessKey, secretKey),
new InstanceProfileCredentialsProvider()
new BasicAWSCredentialsProvider(accessKey, secretKey),
new InstanceProfileCredentialsProvider()
).getCredentials();
} catch (AmazonClientException e) {
credentials = new AnonymousAWSCredentialsProvider().getCredentials();
}


if (specifySTS) {
STSCredentialsProvider stsCredentialsProvider = new STSCredentialsProvider(conf);
credentials = stsCredentialsProvider.getSTSCredentials(credentials);
}

bucket = name.getHost();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.hadoop.fs.s3a.sts;

/**
* To implement the feature in TDI-43726, the parameters below are needed.
*/
public class Constants4STS {

public static final String STS_SPECIFY = "tpd.s3.sts.specifySTS";

public static final String STS_ROLE_ARN = "tpd.s3.sts.roleARN";

public static final String STS_ROLE_SESSION_NAME = "tpd.s3.sts.roleSessionName";

public static final String STS_SIGNING_REGION = "tpd.s3.sts.signingRegion";

public static final String STS_SPECIFY_ROLE_EXTERNAL_ID = "tpd.s3.sts.specifyRoleExternalId";

public static final String STS_ROLE_EXTERNAL_ID = "tpd.s3.sts.roleExternalId";

public static final String STS_SPECIFY_ENDPOINT = "tpd.s3.sts.specifyEndpoint";

public static final String STS_ENDPOINT = "tpd.s3.sts.stsEndpoint";

public static final String STS_SPECIFY_SESSION_DURATION = "tpd.s3.sts.specifySessionDuration";

public static final String STS_SESSION_DURATION = "tpd.s3.sts.sessionDuration";

public static final String STS_SPECIFY_SERIAL_NUM = "tpd.s3.sts.specifySerialNum";

public static final String STS_SERIAL_NUMBER = "tpd.s3.sts.serialNumber";

public static final String STS_SPECIFY_TOKEN_CODE = "tpd.s3.sts.specifyTokenCode";

public static final String STS_TOKEN_CODE = "tpd.s3.sts.tokenCode";

public static final String STS_SPECIFY_TAGS = "tpd.s3.sts.specifyTags";

public static final String STS_TAGS = "tpd.s3.sts.tags";

public static final String STS_SPECIFY_POLICY_JSON = "tpd.s3.sts.specifyPolicyJson";

public static final String STS_POLICY_JSON = "tpd.s3.sts.policyJson";

public static final String STS_SPECIFY_POLICY_ARNS = "tpd.s3.sts.specifyPolicyARNs";

public static final String STS_POLICY_ARNS = "tpd.s3.sts.policyARNs";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package org.apache.hadoop.fs.s3a.sts;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import com.amazonaws.services.securitytoken.model.*;
import org.apache.hadoop.conf.Configuration;

import static org.apache.hadoop.fs.s3a.sts.Constants4STS.*;

import java.util.ArrayList;
import java.util.List;

public class STSCredentialsProvider {

private final String roleARN;

private final String roleSessionName;

private final String signingRegion;

private final boolean specifyRoleExternalId;

private final String roleExternalId;

private final boolean specifySTSEndpoint;

private final String stsEndpoint;

private final boolean specifySessionDuration;

private final int sessionDuration;

private final boolean specifySerialNum;

private final String serialNumber;

private final boolean specifyTokenCode;

private final String tokenCode;

private final boolean specifyTags;

private final String tags;

private final boolean specifyPolicyJson;

private final String policyJson;

private final boolean specifyPolicyARNs;

private final String policyARNs;

public STSCredentialsProvider(Configuration conf) {
roleARN = conf.get(STS_ROLE_ARN,null);
roleSessionName = conf.get(STS_ROLE_SESSION_NAME,null);
signingRegion = conf.get(STS_SIGNING_REGION,"us-east-1");
specifyRoleExternalId = conf.getBoolean(STS_SPECIFY_ROLE_EXTERNAL_ID,false);
roleExternalId = conf.get(STS_ROLE_EXTERNAL_ID,null);
specifySTSEndpoint = conf.getBoolean(STS_SPECIFY_ENDPOINT,false);
stsEndpoint = conf.get(STS_ENDPOINT,null);
specifySessionDuration = conf.getBoolean(STS_SPECIFY_SESSION_DURATION,false);
sessionDuration = conf.getInt(STS_SESSION_DURATION,15);
specifySerialNum = conf.getBoolean(STS_SPECIFY_SERIAL_NUM,false);
serialNumber = conf.get(STS_SERIAL_NUMBER,null);
specifyTokenCode = conf.getBoolean(STS_SPECIFY_TOKEN_CODE,false);
tokenCode = conf.get(STS_TOKEN_CODE,null);
specifyTags = conf.getBoolean(STS_SPECIFY_TAGS,false);
specifyPolicyJson = conf.getBoolean(STS_SPECIFY_POLICY_JSON,false);
policyJson = conf.get(STS_POLICY_JSON,null);
specifyPolicyARNs = conf.getBoolean(STS_SPECIFY_POLICY_ARNS,false);
tags = conf.get(STS_TAGS, null);
policyARNs = conf.get(STS_POLICY_ARNS, null);
}

public AWSCredentials getSTSCredentials(AWSCredentials basicCredentials) {
AWSSecurityTokenServiceClientBuilder stsClientBuilder = AWSSecurityTokenServiceClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(basicCredentials));
if (specifySTSEndpoint) {
stsClientBuilder
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(stsEndpoint, signingRegion));
} else {
stsClientBuilder.withRegion(signingRegion);
}

AssumeRoleRequest assumeRoleRequest = new AssumeRoleRequest().withRoleArn(roleARN).withRoleSessionName(roleSessionName);

if (specifyRoleExternalId) {
assumeRoleRequest.withExternalId(roleExternalId);
}

if (specifySessionDuration) {
assumeRoleRequest.withDurationSeconds(sessionDuration * 60);
}

if (specifySerialNum) {
assumeRoleRequest.withSerialNumber(serialNumber);
}

if (specifyTokenCode) {
assumeRoleRequest.withTokenCode(tokenCode);
}

if (specifyTags) {
List<Tag> tagList = new ArrayList<Tag>();
List<String> tranTagKeys = new ArrayList<String>();

String[] tagArray = tags.split(";");

for (String tagInfo : tagArray) {
String[] t = tagInfo.split(",");
Tag tag = new Tag().withKey(t[0]).withValue(t[1]);

tagList.add(tag);

if ("true".equalsIgnoreCase(t[2]))
tranTagKeys.add(t[0]);
}

assumeRoleRequest.withTags(tagList);
assumeRoleRequest.withTransitiveTagKeys(tranTagKeys);
}

if (specifyPolicyJson) {
assumeRoleRequest.withPolicy(policyJson);
}

if (specifyPolicyARNs) {
List<PolicyDescriptorType> policyARNList = new ArrayList<PolicyDescriptorType>();
String[] policyArray = policyARNs.split(",");

for (String arn : policyArray) {
policyARNList.add(new PolicyDescriptorType().withArn(arn));
}
assumeRoleRequest.withPolicyArns(policyARNList);
}

AssumeRoleResult assumeRoleResult = stsClientBuilder.build().assumeRole(assumeRoleRequest);
Credentials assumeRoleCred = assumeRoleResult.getCredentials();
BasicSessionCredentials roleSessionCred = new BasicSessionCredentials(assumeRoleCred.getAccessKeyId(),
assumeRoleCred.getSecretAccessKey(), assumeRoleCred.getSessionToken());

return roleSessionCred;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.apache.hadoop.fs.s3a.sts;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import org.apache.hadoop.conf.Configuration;
import org.junit.Ignore;
import org.junit.Test;

import static org.apache.hadoop.fs.s3a.sts.Constants4STS.*;

@Ignore("ignore now as no accout in test env")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

best to add mock test

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as here only a ignored IT test, can't auto test risk.

public class STSCredentialsProviderTest {

@Test
public void testSTS() {
Configuration conf = new Configuration();
conf.setBoolean(STS_SPECIFY, true);
conf.set(STS_ROLE_ARN, System.getProperty("s3.roleARN"));
conf.set(STS_ROLE_SESSION_NAME, "test_STS");
conf.setInt(STS_SESSION_DURATION, 20);
conf.set(STS_SIGNING_REGION, "us-east-1");
conf.setBoolean(STS_SPECIFY_ENDPOINT, true);
conf.set(STS_ENDPOINT, "sts.us-east-1.amazonaws.com");
conf.setBoolean(STS_SPECIFY_TAGS, true);
conf.set(STS_TAGS, "key1,val1,true;key2,val2,false");
conf.setBoolean(STS_SPECIFY_POLICY_ARNS, true);
conf.set(STS_POLICY_ARNS, "arn1,arn2,arn3");

STSCredentialsProvider stsCred = new STSCredentialsProvider(conf);

AWSCredentials credentials =
new BasicAWSCredentials(System.getProperty("s3.accesskey"), System.getProperty("s3.secretkey"));

stsCred.getSTSCredentials(credentials);
}
}