From bfd87e63412f1edef5ce77b5023cf85cbeae95e7 Mon Sep 17 00:00:00 2001 From: Ruslan Altynnikov Date: Thu, 4 Jun 2020 15:10:14 +0200 Subject: [PATCH 1/3] Add support for Amazon S3 filesystem. --- dbeam-core/pom.xml | 5 +++++ .../com/spotify/dbeam/jobs/JdbcAvroJob.java | 6 +++++- .../spotify/dbeam/jobs/BeamHelperTest.java | 1 + .../spotify/dbeam/jobs/JdbcAvroJobTest.java | 19 +++++++++++++++++++ e2e/e2e.sh | 3 ++- pom.xml | 6 +++++- 6 files changed, 37 insertions(+), 3 deletions(-) diff --git a/dbeam-core/pom.xml b/dbeam-core/pom.xml index 5e6d5aaf..b26aa3a5 100644 --- a/dbeam-core/pom.xml +++ b/dbeam-core/pom.xml @@ -45,6 +45,11 @@ provided + + org.apache.beam + beam-sdks-java-io-amazon-web-services + + org.postgresql diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java index 6b3fc5e1..0ee82f5e 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java @@ -38,6 +38,7 @@ import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; @@ -91,7 +92,10 @@ public static JdbcAvroJob create(final String[] cmdLineArgs) public static PipelineOptions buildPipelineOptions(final String[] cmdLineArgs) { PipelineOptionsFactory.register(JdbcExportPipelineOptions.class); PipelineOptionsFactory.register(OutputOptions.class); - return PipelineOptionsFactory.fromArgs(cmdLineArgs).withValidation().create(); + final PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(cmdLineArgs) + .withValidation().create(); + FileSystems.setDefaultPipelineOptions(pipelineOptions); + return pipelineOptions; } public void prepareExport() throws Exception { diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java index a8ecbfc6..f7238371 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java @@ -186,4 +186,5 @@ public MetricResults metrics() { exception.getMessage()); } } + } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java index c00ba880..0e3d3a37 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java @@ -218,4 +218,23 @@ public void shouldIncrementCounterMetrics() { metering.incrementRecordCount(); metering.exposeWriteElapsed(); } + + //@Test + public void shouldRunJdbcAvroJobS3() throws IOException { + String outputPath = "s3://com.privacyone.bigdata/2020-05-28/18"; + + JdbcAvroJob.main( + new String[] { + "--targetParallelism=1", // no need for more threads when testing + "--partition=2025-02-28", + "--skipPartitionCheck", + "--exportTimeout=PT1M", + "--connectionUrl=" + CONNECTION_URL, + "--username=", + "--passwordFile=" + passwordPath.toString(), + "--table=COFFEES", + "--output=" + outputPath, + "--avroCodec=zstandard1" + }); + } } diff --git a/e2e/e2e.sh b/e2e/e2e.sh index a17e6769..bba84b19 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -73,7 +73,8 @@ DOCKER_PSQL_ARGS=( ) runDBeamDockerCon() { - OUTPUT="$SCRIPT_PATH/results/testn/$(date +%FT%H%M%S)/" + ## OUTPUT="$SCRIPT_PATH/results/testn/$(date +%FT%H%M%S)/" + OUTPUT="s3://com.privacyone.bigdata/2020-05-28/18" time \ runFromJar \ --skipPartitionCheck \ diff --git a/pom.xml b/pom.xml index 1204ac9e..8f2ee5ae 100644 --- a/pom.xml +++ b/pom.xml @@ -194,7 +194,11 @@ beam-sdks-java-extensions-google-cloud-platform-core ${beam.version} - + + org.apache.beam + beam-sdks-java-io-amazon-web-services + ${beam.version} + org.apache.beam beam-runners-google-cloud-dataflow-java From 287a0ad8bc43ed148eb78c484d50a1095ff25c08 Mon Sep 17 00:00:00 2001 From: Ruslan Altynnikov Date: Mon, 8 Jun 2020 17:12:01 +0200 Subject: [PATCH 2/3] Add support for Amazon S3 filesystem. --- .../spotify/dbeam/jobs/JdbcAvroJobTest.java | 7 ++++-- pom.xml | 24 +++++++++++++++---- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java index 0e3d3a37..d7806af0 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java @@ -49,6 +49,7 @@ import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; public class JdbcAvroJobTest { @@ -219,9 +220,10 @@ public void shouldIncrementCounterMetrics() { metering.exposeWriteElapsed(); } - //@Test + @Test + @Ignore // Cannot run this test towards a Amazon S3 bucket. For manual testing only. public void shouldRunJdbcAvroJobS3() throws IOException { - String outputPath = "s3://com.privacyone.bigdata/2020-05-28/18"; + String outputPath = "s3://com.privacyone.bigdata/2020-05-28/18/"; JdbcAvroJob.main( new String[] { @@ -233,6 +235,7 @@ public void shouldRunJdbcAvroJobS3() throws IOException { "--username=", "--passwordFile=" + passwordPath.toString(), "--table=COFFEES", + "--awsRegion=eu-west-1", "--output=" + outputPath, "--avroCodec=zstandard1" }); diff --git a/pom.xml b/pom.xml index 8f2ee5ae..904100d6 100644 --- a/pom.xml +++ b/pom.xml @@ -198,16 +198,32 @@ org.apache.beam beam-sdks-java-io-amazon-web-services ${beam.version} + + + + com.amazonaws + aws-java-sdk-sqs + + + com.amazonaws + jmespath-java + + org.apache.beam beam-runners-google-cloud-dataflow-java ${beam.version} - - junit - junit - junit junit From 0ecdfe520ee77a2244d84b76d98040bd98fbe299 Mon Sep 17 00:00:00 2001 From: Ruslan Altynnikov Date: Mon, 8 Jun 2020 17:16:28 +0200 Subject: [PATCH 3/3] Comment out S3 output target. --- e2e/e2e.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/e2e.sh b/e2e/e2e.sh index bba84b19..a93a45d3 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -73,8 +73,8 @@ DOCKER_PSQL_ARGS=( ) runDBeamDockerCon() { - ## OUTPUT="$SCRIPT_PATH/results/testn/$(date +%FT%H%M%S)/" - OUTPUT="s3://com.privacyone.bigdata/2020-05-28/18" + OUTPUT="$SCRIPT_PATH/results/testn/$(date +%FT%H%M%S)/" + #OUTPUT="s3://com.privacyone.bigdata/2020-05-28/18" time \ runFromJar \ --skipPartitionCheck \