From 4b2da7278633af14216ed02016f41df1fedfc51f Mon Sep 17 00:00:00 2001 From: KAWACHI Takashi Date: Wed, 9 Apr 2014 07:52:38 +0900 Subject: [PATCH 1/3] Append token= to CREDENTIAL param of COPY command --- .../redshift/RedshiftBasicEmitter.java | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java index d991bf7..b403a1e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Properties; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +59,7 @@ public class RedshiftBasicEmitter extends S3Emitter { private final Properties loginProperties; private final String accessKey; private final String secretKey; + private final String sessionToken; public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) { super(configuration); @@ -67,8 +70,14 @@ public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) { loginProperties = new Properties(); loginProperties.setProperty("user", configuration.REDSHIFT_USERNAME); loginProperties.setProperty("password", configuration.REDSHIFT_PASSWORD); - accessKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSAccessKeyId(); - secretKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSSecretKey(); + AWSCredentials credentials = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials(); + accessKey = credentials.getAWSAccessKeyId(); + secretKey = credentials.getAWSSecretKey(); + if (credentials instanceof AWSSessionCredentials) { + sessionToken = ((AWSSessionCredentials) credentials).getSessionToken(); + } else { + sessionToken = null; + } } @Override @@ -116,7 +125,12 @@ private String generateCopyStatement(String s3File) { exec.append("COPY " + redshiftTable + " "); exec.append("FROM 's3://" + s3bucket + "/" + s3File + "' "); exec.append("CREDENTIALS 'aws_access_key_id=" + accessKey); - exec.append(";aws_secret_access_key=" + secretKey + "' "); + exec.append(";aws_secret_access_key=" + secretKey); + if (sessionToken != null) { + exec.append(";token=" + sessionToken); + } + exec.append("' "); + exec.append("DELIMITER '" + redshiftDelimiter + "'"); exec.append(";"); return exec.toString(); From 9370ecb3364cb59f69760f2936350c43d11211c0 Mon Sep 17 00:00:00 2001 From: KAWACHI Takashi Date: Wed, 16 Apr 2014 23:55:48 +0900 Subject: [PATCH 2/3] Append token in RedshiftManifestEmitter --- .../redshift/RedshiftBasicEmitter.java | 2 +- .../redshift/RedshiftManifestEmitter.java | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java index b403a1e..8b2ffe9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java @@ -127,7 +127,7 @@ private String generateCopyStatement(String s3File) { exec.append("CREDENTIALS 'aws_access_key_id=" + accessKey); exec.append(";aws_secret_access_key=" + secretKey); if (sessionToken != null) { - exec.append(";token=" + sessionToken); + exec.append(";token=").append(sessionToken); } exec.append("' "); diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java index 871243e..05bdbfa 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Properties; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSSessionCredentials; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,6 +75,7 @@ public class RedshiftManifestEmitter implements IEmitter { private final char dataDelimiter; private final String accessKey; private final String secretKey; + private final String sessionToken; private final String s3Endpoint; private final AmazonS3Client s3Client; private final boolean copyMandatory; @@ -91,8 +94,14 @@ public RedshiftManifestEmitter(KinesisConnectorConfiguration configuration) { if (s3Endpoint != null) { s3Client.setEndpoint(s3Endpoint); } - accessKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSAccessKeyId(); - secretKey = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials().getAWSSecretKey(); + AWSCredentials credentials = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials(); + accessKey = credentials.getAWSAccessKeyId(); + secretKey = credentials.getAWSSecretKey(); + if (credentials instanceof AWSSessionCredentials) { + sessionToken = ((AWSSessionCredentials) credentials).getSessionToken(); + } else { + sessionToken = null; + } loginProps = new Properties(); loginProps.setProperty("user", configuration.REDSHIFT_USERNAME); loginProps.setProperty("password", configuration.REDSHIFT_PASSWORD); @@ -277,6 +286,9 @@ private void redshiftCopy(Connection conn, List records) throws IOExcept redshiftCopy.append("aws_access_key_id=" + accessKey); redshiftCopy.append(";"); redshiftCopy.append("aws_secret_access_key=" + secretKey); + if (sessionToken != null) { + redshiftCopy.append(";token=").append(sessionToken); + } redshiftCopy.append("' "); redshiftCopy.append("DELIMITER '" + dataDelimiter + "' "); redshiftCopy.append("MANIFEST"); From fcfea03e3a70de4ef9a0ec228db8e30f2eb34670 Mon Sep 17 00:00:00 2001 From: KAWACHI Takashi Date: Tue, 22 Apr 2014 10:06:23 +0900 Subject: [PATCH 3/3] Get credential token on every COPY command --- .../connectors/redshift/CredentialsUtil.java | 33 +++++++++++++++++++ .../redshift/RedshiftBasicEmitter.java | 23 +++---------- .../redshift/RedshiftManifestEmitter.java | 23 +++---------- 3 files changed, 42 insertions(+), 37 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/connectors/redshift/CredentialsUtil.java diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/CredentialsUtil.java b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/CredentialsUtil.java new file mode 100644 index 0000000..fec411a --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/CredentialsUtil.java @@ -0,0 +1,33 @@ +package com.amazonaws.services.kinesis.connectors.redshift; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSSessionCredentials; + +/** + * Utility class to handle credentials. + */ +public class CredentialsUtil { + + /** + * Build a credential argument for Redshift COPY command. + * + * @param provider Credential provider. + * @return credential + */ + public static String buildCredential(AWSCredentialsProvider provider) { + AWSCredentials credentials = provider.getCredentials(); + StringBuilder builder = new StringBuilder(); + builder + .append("aws_access_key_id=") + .append(credentials.getAWSAccessKeyId()) + .append(";aws_secret_access_key=") + .append(credentials.getAWSSecretKey()); + if (credentials instanceof AWSSessionCredentials) { + builder + .append(";token=") + .append(((AWSSessionCredentials) credentials).getSessionToken()); + } + return builder.toString(); + } +} diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java index 8b2ffe9..28e20da 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftBasicEmitter.java @@ -24,8 +24,7 @@ import java.util.List; import java.util.Properties; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,9 +56,7 @@ public class RedshiftBasicEmitter extends S3Emitter { private final String redshiftURL; private final char redshiftDelimiter; private final Properties loginProperties; - private final String accessKey; - private final String secretKey; - private final String sessionToken; + private final AWSCredentialsProvider credentialsProvider; public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) { super(configuration); @@ -70,14 +67,7 @@ public RedshiftBasicEmitter(KinesisConnectorConfiguration configuration) { loginProperties = new Properties(); loginProperties.setProperty("user", configuration.REDSHIFT_USERNAME); loginProperties.setProperty("password", configuration.REDSHIFT_PASSWORD); - AWSCredentials credentials = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials(); - accessKey = credentials.getAWSAccessKeyId(); - secretKey = credentials.getAWSSecretKey(); - if (credentials instanceof AWSSessionCredentials) { - sessionToken = ((AWSSessionCredentials) credentials).getSessionToken(); - } else { - sessionToken = null; - } + credentialsProvider = configuration.AWS_CREDENTIALS_PROVIDER; } @Override @@ -124,11 +114,8 @@ private String generateCopyStatement(String s3File) { StringBuilder exec = new StringBuilder(); exec.append("COPY " + redshiftTable + " "); exec.append("FROM 's3://" + s3bucket + "/" + s3File + "' "); - exec.append("CREDENTIALS 'aws_access_key_id=" + accessKey); - exec.append(";aws_secret_access_key=" + secretKey); - if (sessionToken != null) { - exec.append(";token=").append(sessionToken); - } + exec.append("CREDENTIALS '"); + exec.append(CredentialsUtil.buildCredential(credentialsProvider)); exec.append("' "); exec.append("DELIMITER '" + redshiftDelimiter + "'"); diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java index 05bdbfa..9b8efab 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/redshift/RedshiftManifestEmitter.java @@ -25,8 +25,7 @@ import java.util.List; import java.util.Properties; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -73,9 +72,7 @@ public class RedshiftManifestEmitter implements IEmitter { private final String fileTable; private final String fileKeyColumn; private final char dataDelimiter; - private final String accessKey; - private final String secretKey; - private final String sessionToken; + private final AWSCredentialsProvider credentialsProvider; private final String s3Endpoint; private final AmazonS3Client s3Client; private final boolean copyMandatory; @@ -94,14 +91,7 @@ public RedshiftManifestEmitter(KinesisConnectorConfiguration configuration) { if (s3Endpoint != null) { s3Client.setEndpoint(s3Endpoint); } - AWSCredentials credentials = configuration.AWS_CREDENTIALS_PROVIDER.getCredentials(); - accessKey = credentials.getAWSAccessKeyId(); - secretKey = credentials.getAWSSecretKey(); - if (credentials instanceof AWSSessionCredentials) { - sessionToken = ((AWSSessionCredentials) credentials).getSessionToken(); - } else { - sessionToken = null; - } + credentialsProvider = configuration.AWS_CREDENTIALS_PROVIDER; loginProps = new Properties(); loginProps.setProperty("user", configuration.REDSHIFT_USERNAME); loginProps.setProperty("password", configuration.REDSHIFT_PASSWORD); @@ -283,12 +273,7 @@ private void redshiftCopy(Connection conn, List records) throws IOExcept redshiftCopy.append("COPY " + dataTable + " "); redshiftCopy.append("FROM 's3://" + s3Bucket + "/" + manifestFile + "' "); redshiftCopy.append("CREDENTIALS '"); - redshiftCopy.append("aws_access_key_id=" + accessKey); - redshiftCopy.append(";"); - redshiftCopy.append("aws_secret_access_key=" + secretKey); - if (sessionToken != null) { - redshiftCopy.append(";token=").append(sessionToken); - } + redshiftCopy.append(CredentialsUtil.buildCredential(credentialsProvider)); redshiftCopy.append("' "); redshiftCopy.append("DELIMITER '" + dataDelimiter + "' "); redshiftCopy.append("MANIFEST");