diff --git a/dbeam-core/pom.xml b/dbeam-core/pom.xml index 5e6d5aaf..b26aa3a5 100644 --- a/dbeam-core/pom.xml +++ b/dbeam-core/pom.xml @@ -45,6 +45,11 @@ provided + + org.apache.beam + beam-sdks-java-io-amazon-web-services + + org.postgresql diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java index 6b3fc5e1..0ee82f5e 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/jobs/JdbcAvroJob.java @@ -38,6 +38,7 @@ import org.apache.beam.runners.direct.DirectOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; @@ -91,7 +92,10 @@ public static JdbcAvroJob create(final String[] cmdLineArgs) public static PipelineOptions buildPipelineOptions(final String[] cmdLineArgs) { PipelineOptionsFactory.register(JdbcExportPipelineOptions.class); PipelineOptionsFactory.register(OutputOptions.class); - return PipelineOptionsFactory.fromArgs(cmdLineArgs).withValidation().create(); + final PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(cmdLineArgs) + .withValidation().create(); + FileSystems.setDefaultPipelineOptions(pipelineOptions); + return pipelineOptions; } public void prepareExport() throws Exception { diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java index a8ecbfc6..f7238371 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/BeamHelperTest.java @@ -186,4 +186,5 @@ public MetricResults metrics() { exception.getMessage()); } } + } diff --git a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java index c00ba880..d7806af0 100644 --- a/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java +++ b/dbeam-core/src/test/java/com/spotify/dbeam/jobs/JdbcAvroJobTest.java @@ -49,6 +49,7 @@ import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; public class JdbcAvroJobTest { @@ -218,4 +219,25 @@ public void shouldIncrementCounterMetrics() { metering.incrementRecordCount(); metering.exposeWriteElapsed(); } + + @Test + @Ignore // Cannot run this test towards a Amazon S3 bucket. For manual testing only. + public void shouldRunJdbcAvroJobS3() throws IOException { + String outputPath = "s3://com.privacyone.bigdata/2020-05-28/18/"; + + JdbcAvroJob.main( + new String[] { + "--targetParallelism=1", // no need for more threads when testing + "--partition=2025-02-28", + "--skipPartitionCheck", + "--exportTimeout=PT1M", + "--connectionUrl=" + CONNECTION_URL, + "--username=", + "--passwordFile=" + passwordPath.toString(), + "--table=COFFEES", + "--awsRegion=eu-west-1", + "--output=" + outputPath, + "--avroCodec=zstandard1" + }); + } } diff --git a/e2e/e2e.sh b/e2e/e2e.sh index a17e6769..a93a45d3 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -74,6 +74,7 @@ DOCKER_PSQL_ARGS=( runDBeamDockerCon() { OUTPUT="$SCRIPT_PATH/results/testn/$(date +%FT%H%M%S)/" + #OUTPUT="s3://com.privacyone.bigdata/2020-05-28/18" time \ runFromJar \ --skipPartitionCheck \ diff --git a/pom.xml b/pom.xml index 1204ac9e..904100d6 100644 --- a/pom.xml +++ b/pom.xml @@ -194,16 +194,36 @@ beam-sdks-java-extensions-google-cloud-platform-core ${beam.version} - org.apache.beam - beam-runners-google-cloud-dataflow-java + beam-sdks-java-io-amazon-web-services ${beam.version} + + + com.amazonaws + aws-java-sdk-sqs + + + com.amazonaws + jmespath-java + + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${beam.version} + junit junit