Skip to content

Commit

Permalink
fix: GCS data source returns part stream failure if source blob doesn…
Browse files Browse the repository at this point in the history
…'t exist (#81)

* GCS data source throwing runtime exception if source blob doesn't exist

- GcsDataSource.GoogleStoragePart openStream throws EdcException if the requested blob doesn't exist
- GcsDataSink transferParts handles EdcException, plus minor update in log messages

* chore: updated DEPENDENCIES

* fix: GCS DataSource checks source blob when creating part stream, not when opening the stream

* chore: updated DEPENDENCIES

* refactor: removed unused catch block, minor refactor of GCS data source test

- check for source blob existence has been moved to the source, sink not handling it anymore
- minor refactor of test, mock doesn't need class argument when type explicit

* chore: updated DEPENDENCIES

* chore: updated DEPENDENCIES from gh workflow
  • Loading branch information
man8pr authored Nov 27, 2023
1 parent 94f0196 commit 17be30b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 11 deletions.
17 changes: 9 additions & 8 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.10.3, Apache-2.0, approved, CQ21280

Check warning on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / check / Dash-Verify-Licenses

Restricted Dependencies found

Some dependencies are marked 'restricted' - please review them
maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.14.1, Apache-2.0, approved, #5303
maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.14.2, Apache-2.0, approved, #5303
maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.15.2, Apache-2.0, approved, #7947
maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.15.3, Apache-2.0, approved, #7947
maven/mavencentral/com.fasterxml.jackson.core/jackson-annotations/2.16.0, Apache-2.0, approved, #11606
maven/mavencentral/com.fasterxml.jackson.core/jackson-core/2.14.2, Apache-2.0 AND MIT, approved, #4303
maven/mavencentral/com.fasterxml.jackson.core/jackson-core/2.15.2, MIT AND Apache-2.0, approved, #7932
maven/mavencentral/com.fasterxml.jackson.core/jackson-core/2.15.3, MIT AND Apache-2.0, approved, #7932
maven/mavencentral/com.fasterxml.jackson.core/jackson-core/2.16.0, Apache-2.0 AND MIT, approved, #11602
maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.14.1, Apache-2.0, approved, #4105
maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.14.2, Apache-2.0, approved, #4105
maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.15.2, Apache-2.0, approved, #7934
maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.15.3, Apache-2.0, approved, #7934
maven/mavencentral/com.fasterxml.jackson.core/jackson-databind/2.16.0, Apache-2.0, approved, #11605
maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.14.2, Apache-2.0, approved, #4699
maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.15.2, Apache-2.0, approved, #7930
maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.15.3, Apache-2.0, approved, #7930
maven/mavencentral/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.16.0, , restricted, clearlydefined
maven/mavencentral/com.fasterxml.jackson.module/jackson-module-jakarta-xmlbind-annotations/2.14.1, Apache-2.0, approved, #5308
maven/mavencentral/com.fasterxml.jackson.module/jackson-module-jakarta-xmlbind-annotations/2.15.3, Apache-2.0, approved, #9241
maven/mavencentral/com.fasterxml.jackson/jackson-bom/2.15.3, Apache-2.0, approved, #7929
maven/mavencentral/com.fasterxml.jackson.module/jackson-module-jakarta-xmlbind-annotations/2.16.0, , restricted, clearlydefined
maven/mavencentral/com.fasterxml.jackson/jackson-bom/2.16.0, , restricted, clearlydefined
maven/mavencentral/com.github.docker-java/docker-java-api/3.3.4, Apache-2.0, approved, #10346
maven/mavencentral/com.github.docker-java/docker-java-transport-zerodep/3.3.4, Apache-2.0 AND (Apache-2.0 AND BSD-3-Clause), approved, #7946
maven/mavencentral/com.github.docker-java/docker-java-transport/3.3.4, Apache-2.0, approved, #7942
Expand Down Expand Up @@ -122,9 +123,9 @@ maven/mavencentral/io.grpc/grpc-xds/1.56.1, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.opencensus/opencensus-api/0.31.1, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.opencensus/opencensus-contrib-http-util/0.31.1, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.opencensus/opencensus-proto/0.2.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.31.0, Apache-2.0, approved, #11085
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.31.0, Apache-2.0, approved, #11087
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.31.0, Apache-2.0, approved, #11088
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.32.0, Apache-2.0, approved, #11684
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.32.0, Apache-2.0, approved, #11682
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.32.0, Apache-2.0, approved, #11683
maven/mavencentral/io.perfmark/perfmark-api/0.26.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/jakarta.activation/jakarta.activation-api/2.1.0, EPL-2.0 OR BSD-3-Clause OR GPL-2.0-only with Classpath-exception-2.0, approved, ee4j.jaf
maven/mavencentral/jakarta.annotation/jakarta.annotation-api/2.1.1, EPL-2.0 OR GPL-2.0-only with Classpath-exception-2.0, approved, ee4j.ca
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
} catch (IOException e) {
monitor.severe("Cannot open the input part", e);
monitor.severe(e.toString());
return StreamResult.error("An error");
return StreamResult.error("Cannot open the input part");
} catch (Exception e) {
monitor.severe("Error writing data to the bucket", e);
return StreamResult.error("An error");
return StreamResult.error("Error writing data to the bucket");
}
}
return StreamResult.success();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;

import java.io.InputStream;
import java.nio.channels.Channels;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.failure;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success;

public class GcsDataSource implements DataSource {
Expand All @@ -38,11 +42,17 @@ public class GcsDataSource implements DataSource {

@Override
public StreamResult<Stream<Part>> openPartStream() {
var blobId = BlobId.of(bucketName, blobName);
var blob = storageClient.get(blobId);
if (blob == null || !blob.exists()) {
return failure(new StreamFailure(List.of(String.format("Error accessing bucket %s or blob %s", bucketName, blobName)), GENERAL_ERROR));
}

try {
part = new GoogleStoragePart(storageClient, bucketName, blobName);
return success(Stream.of(part));
} catch (Exception e) {
monitor.severe(String.format("Error accessing bucket %s or blob %s in project %s", bucketName, blobName, storageClient.getOptions().getProjectId()), e);
monitor.severe(String.format("Error accessing bucket %s or blob %s", bucketName, blobName), e);
throw new EdcException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2023 Google LLC
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Google LCC - Initial implementation
*
*/

package org.eclipse.edc.connector.dataplane.gcp.storage;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import org.eclipse.edc.spi.monitor.Monitor;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GcsDataSourceTest {

Monitor monitor = mock();
Storage storageClient = mock();
String bucketName = "TestBucketName";
String blobName = "TestBlobName";
GcsDataSource dataSource = GcsDataSource.Builder.newInstance()
.storageClient(storageClient)
.monitor(monitor)
.bucketName(bucketName)
.blobName(blobName)
.build();

BlobId blobId = BlobId.of(bucketName, blobName);


@Test
void openPartStream_failsIfBlobIsNull() {
when(storageClient.get(blobId))
.thenReturn(null);

var partStream = dataSource.openPartStream();

assertThat(partStream.failed()).isTrue();
}

@Test
void openPartStream_failsIfBlobDoesntExist() {
var blob = mock(Blob.class);

when(blob.exists())
.thenReturn(false);

when(storageClient.get(blobId))
.thenReturn(blob);

var partStream = dataSource.openPartStream();

assertThat(partStream.failed()).isTrue();
}

@Test
void openPartStream_succeedsIfBlobExists() {
var blob = mock(Blob.class);

when(blob.exists())
.thenReturn(true);

when(storageClient.get(blobId))
.thenReturn(blob);

var partStream = dataSource.openPartStream();

assertThat(partStream.succeeded()).isTrue();
}
}

0 comments on commit 17be30b

Please sign in to comment.