Skip to content

Commit

Permalink
feat(s3stream): add S3 API compatibility for BOS DeleteObjects (#1028)
Browse files Browse the repository at this point in the history
* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

single import

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

single import

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

fix code review.
1. add unit test to verify logic.

2. auto check if the oss provider works in not expected mode to check the relate handle logic.

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects

* feat(s3stream): add S3 API compatibility for BOS DeleteObjects
  • Loading branch information
lifepuzzlefun authored Mar 28, 2024
1 parent 69c4c20 commit ceceb8e
Show file tree
Hide file tree
Showing 2 changed files with 287 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -52,6 +54,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
Expand All @@ -76,11 +79,13 @@
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.DeletedObject;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.Tagging;
import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest;
Expand All @@ -96,6 +101,7 @@ public class DefaultS3Operator implements S3Operator {
private static final int DEFAULT_CONCURRENCY_PER_CORE = 25;
private static final int MIN_CONCURRENCY = 50;
private static final int MAX_CONCURRENCY = 1000;
public static final String S3_API_NO_SUCH_KEY = "NoSuchKey";
public final float maxMergeReadSparsityRate;
private final int currentIndex;
private final String bucket;
Expand Down Expand Up @@ -124,6 +130,8 @@ public class DefaultS3Operator implements S3Operator {
private final HashedWheelTimer timeoutDetect = new HashedWheelTimer(
ThreadUtils.createThreadFactory("s3-timeout-detect", true), 1, TimeUnit.SECONDS, 100);

private boolean deleteObjectsReturnSuccessKeys;

public DefaultS3Operator(String endpoint, String region, String bucket, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders, boolean tagging) {
this(endpoint, region, bucket, forcePathStyle, credentialsProviders, tagging, null, null, false);
Expand Down Expand Up @@ -438,28 +446,93 @@ public CompletableFuture<Void> delete(String path) {
});
}

@Override
public CompletableFuture<List<String>> delete(List<String> objectKeys) {
TimerUtil timerUtil = new TimerUtil();
private CompletableFuture<DeleteObjectsResponse> deleteObjects(List<String> objectKeys) {
ObjectIdentifier[] toDeleteKeys = objectKeys.stream().map(key ->
ObjectIdentifier.builder()
.key(key)
.build()
ObjectIdentifier.builder()
.key(key)
.build()
).toArray(ObjectIdentifier[]::new);

DeleteObjectsRequest request = DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(Delete.builder().objects(toDeleteKeys).build())
.build();
// TODO: handle not exist object, should we regard it as deleted or ignore it.
return this.writeS3Client.deleteObjects(request).thenApply(resp -> {
.bucket(bucket)
.delete(Delete.builder().objects(toDeleteKeys).build())
.build();

return this.writeS3Client.deleteObjects(request);
}

@Override
public CompletableFuture<List<String>> delete(List<String> objectKeys) {
TimerUtil timerUtil = new TimerUtil();
return deleteObjects(objectKeys)
.thenApply(resp -> handleDeleteObjectsResponse(objectKeys, timerUtil, resp, deleteObjectsReturnSuccessKeys))
.exceptionally(ex -> {
S3OperationStats.getInstance().deleteObjectsStats(false)
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}",
objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
}

static List<String> handleDeleteObjectsResponse(List<String> objectKeys,
TimerUtil timerUtil,
DeleteObjectsResponse response,
boolean deleteObjectsReturnSuccessKeys) {
Set<String> successDeleteKeys = new HashSet<>();
int errDeleteCount = 0;
boolean hasUnExpectedResponse = false;

if (deleteObjectsReturnSuccessKeys) {
response.deleted().stream().map(DeletedObject::key).forEach(successDeleteKeys::add);

// expect NoSuchKey is not response because s3 api won't return this in errors.
for (S3Error error : response.errors()) {
LOGGER.error("[ControllerS3Operator]: Delete objects for key [{}] error code [{}] message [{}]",
error.key(), error.code(), error.message());
errDeleteCount++;
}

} else {
// deleteObjects not return successKeys think as all success.
successDeleteKeys.addAll(objectKeys);


for (S3Error error : response.errors()) {
if (S3_API_NO_SUCH_KEY.equals(error.code())) {
// ignore for delete objects.
continue;
}

if (errDeleteCount < 30) {
LOGGER.error("[ControllerS3Operator]: Delete objects for key [{}] error code [{}] message [{}]",
error.key(), error.code(), error.message());
}

if (!StringUtils.isEmpty(error.key())) {
successDeleteKeys.remove(error.key());
} else {
hasUnExpectedResponse = true;
}

errDeleteCount++;
}

if (hasUnExpectedResponse) {
successDeleteKeys = Collections.emptySet();
}
}

LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, errCount: {}, cost: {}",
successDeleteKeys.size(), errDeleteCount, timerUtil.elapsedAs(TimeUnit.NANOSECONDS));

if (!hasUnExpectedResponse) {
S3OperationStats.getInstance().deleteObjectsStats(true).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects finished, count: {}, cost: {}", resp.deleted().size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return resp.deleted().stream().map(DeletedObject::key).collect(Collectors.toList());
}).exceptionally(ex -> {
} else {
S3OperationStats.getInstance().deleteObjectsStats(false).record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
LOGGER.info("[ControllerS3Operator]: Delete objects failed, count: {}, cost: {}, ex: {}", objectKeys.size(), timerUtil.elapsedAs(TimeUnit.NANOSECONDS), ex.getMessage());
return Collections.emptyList();
});
}

return new ArrayList<>(successDeleteKeys);
}

@Override
Expand Down Expand Up @@ -640,6 +713,55 @@ private void checkConfig() {
}
}

private CompletableFuture<Boolean> asyncCheckDeleteObjectsReturnSuccessDeleteKeys(S3Utils.S3Context s3Context) {
byte[] content = new Date().toString().getBytes(StandardCharsets.UTF_8);
String path1 = String.format("check_available/deleteObjectsMode/%d", System.nanoTime());
String path2 = String.format("check_available/deleteObjectsMode/%d", System.nanoTime() + 1);

List<String> path = List.of(path1, path2);

return CompletableFuture.allOf(
this.write(path1, Unpooled.wrappedBuffer(content)),
this.write(path2, Unpooled.wrappedBuffer(content))
)
.thenCompose(__ -> deleteObjects(path)
.thenApply(resp -> checkIfDeleteObjectsWillReturnSuccessDeleteKeys(path, resp)));
}

static boolean checkIfDeleteObjectsWillReturnSuccessDeleteKeys(List<String> path, DeleteObjectsResponse resp) {
// BOS S3 API works as quiet mode
// in this mode success delete objects won't be returned.
// which could cause object not deleted in metadata.
//
// BOS doc: https://cloud.baidu.com/doc/BOS/s/tkc5twspg
// S3 doc: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestBody

boolean hasDeleted = resp.hasDeleted() && !resp.deleted().isEmpty();
boolean hasErrors = resp.hasErrors() && !resp.errors().isEmpty();
boolean hasErrorsWithoutNoSuchKey = resp.errors().stream().filter(s3Error -> !S3_API_NO_SUCH_KEY.equals(s3Error.code())).count() != 0;
boolean allDeleteKeyMatch = resp.deleted().stream().map(DeletedObject::key).sorted().collect(Collectors.toList()).equals(path);

if (hasDeleted && !hasErrors && allDeleteKeyMatch) {
LOGGER.info("call deleteObjects deleteObjectKeys returned.");

return true;

} else if (!hasDeleted && !hasErrorsWithoutNoSuchKey) {
LOGGER.info("call deleteObjects but deleteObjectKeys not returned. set deleteObjectsReturnSuccessKeys = false");

return false;
}

IllegalStateException exception = new IllegalStateException();

LOGGER.error("error when check if delete objects will return success." +
" delete keys {} resp {}, requestId {},httpCode {} httpText {}",
path, resp, resp.responseMetadata().requestId(),
resp.sdkHttpResponse().statusCode(), resp.sdkHttpResponse().statusText(), exception);

throw exception;
}

private void checkAvailable(S3Utils.S3Context s3Context) {
byte[] content = new Date().toString().getBytes(StandardCharsets.UTF_8);
String path = String.format("check_available/%d", System.nanoTime());
Expand All @@ -661,6 +783,9 @@ private void checkAvailable(S3Utils.S3Context s3Context) {
read = this.rangeRead(multipartPath, 0, content.length).get(30, TimeUnit.SECONDS);
read.release();
this.delete(multipartPath).get(30, TimeUnit.SECONDS);

// Check if oss provider deleteObjects will return successDeleteKeys in deleted.
this.deleteObjectsReturnSuccessKeys = asyncCheckDeleteObjectsReturnSuccessDeleteKeys(s3Context).get(30, TimeUnit.SECONDS);
} catch (Throwable e) {
LOGGER.error("Failed to write/read/delete object on S3 ", e);
String exceptionMsg = String.format("Failed to write/read/delete object on S3. You are using s3Context: %s.", s3Context);
Expand Down
Loading

0 comments on commit ceceb8e

Please sign in to comment.