Skip to content

Commit

Permalink
- Upgrade S3 SDK from v2 to v3
Browse files Browse the repository at this point in the history
- Fix invalid call to stats collector

Signed-off-by: Ben <[email protected]>
  • Loading branch information
Neon-White committed Mar 28, 2024
1 parent 874ce40 commit 7eeb771
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions src/sdk/namespace_gcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ const s3_utils = require('../endpoint/s3/s3_utils');
const S3Error = require('../endpoint/s3/s3_errors').S3Error;
// we use this wrapper to set a custom user agent
const GoogleCloudStorage = require('../util/google_storage_wrap');
const AWS = require('aws-sdk');
const {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
ListPartsCommand,
S3Client,
UploadPartCommand,
UploadPartCopyCommand,
} = require('@aws-sdk/client-s3');

/**
* @implements {nb.Namespace}
Expand Down Expand Up @@ -54,10 +62,13 @@ class NamespaceGCP {
private_key: this.private_key,
}
});
this.s3_client = new AWS.S3({
this.s3_client = new S3Client({
endpoint: 'https://storage.googleapis.com',
accessKeyId: hmac_key.access_id,
secretAccessKey: hmac_key.secret_key
region: 'auto', //https://cloud.google.com/storage/docs/aws-simple-migration#storage-list-buckets-s3-python
credentials: {
accessKeyId: hmac_key.access_id,
secretAccessKey: hmac_key.secret_key,
},
});

this.bucket = target_bucket;
Expand Down Expand Up @@ -196,7 +207,7 @@ class NamespaceGCP {
read_stream.on('response', () => {
let count = 1;
const count_stream = stream_utils.get_tap_stream(data => {
this.stats_collector.update_namespace_write_stats({
this.stats.update_namespace_write_stats({
namespace_resource_id: this.namespace_resource_id,
bucket_name: params.bucket,
size: data.length,
Expand Down Expand Up @@ -303,28 +314,30 @@ class NamespaceGCP {
async create_object_upload(params, object_sdk) {
dbg.log0('NamespaceGCP.create_object_upload:', this.bucket, inspect(params));
const Tagging = params.tagging && params.tagging.map(tag => tag.key + '=' + tag.value).join('&');
/** @type {AWS.S3.CreateMultipartUploadRequest} */
const request = {

/** @type {import('@aws-sdk/client-s3').CreateMultipartUploadRequest} */
const mp_upload_input = {
Bucket: this.bucket,
Key: params.key,
ContentType: params.content_type,
StorageClass: params.storage_class,
Metadata: params.xattr,
Tagging
};
const res = await this.s3_client.createMultipartUpload(request).promise();
const mp_upload_cmd = new CreateMultipartUploadCommand(mp_upload_input);
const res = await this.s3_client.send(mp_upload_cmd);

dbg.log0('NamespaceGCP.create_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
return { obj_id: res.UploadId };
}

async upload_multipart(params, object_sdk) {
dbg.log0('NamespaceGCP.upload_multipart:', this.bucket, inspect(params));
let etag;
let res;
if (params.copy_source) {
const { copy_source, copy_source_range } = s3_utils.format_copy_source(params.copy_source);

/** @type {AWS.S3.UploadPartCopyRequest} */
/** @type {import('@aws-sdk/client-s3').UploadPartCopyRequest} */
const request = {
Bucket: this.bucket,
Key: params.key,
Expand All @@ -334,7 +347,9 @@ class NamespaceGCP {
CopySourceRange: copy_source_range,
};

res = await this.s3_client.uploadPartCopy(request).promise();
const command = new UploadPartCopyCommand(request);
res = await this.s3_client.send(command);
etag = s3_utils.parse_etag(res.CopyPartResult.ETag);
} else {
let count = 1;
const count_stream = stream_utils.get_tap_stream(data => {
Expand All @@ -346,7 +361,7 @@ class NamespaceGCP {
// clear count for next updates
count = 0;
});
/** @type {AWS.S3.UploadPartRequest} */
/** @type {import('@aws-sdk/client-s3').UploadPartRequest} */
const request = {
Bucket: this.bucket,
Key: params.key,
Expand All @@ -357,7 +372,8 @@ class NamespaceGCP {
ContentLength: params.size,
};
try {
res = await this.s3_client.uploadPart(request).promise();
const command = new UploadPartCommand(request);
res = await this.s3_client.send(command);
} catch (err) {
object_sdk.rpc_client.pool.update_issues_report({
namespace_resource_id: this.namespace_resource_id,
Expand All @@ -366,22 +382,25 @@ class NamespaceGCP {
});
throw err;
}
etag = s3_utils.parse_etag(res.ETag);
}
dbg.log0('NamespaceGCP.upload_multipart:', this.bucket, inspect(params), 'res', inspect(res));
const etag = s3_utils.parse_etag(res.ETag);
return { etag };
}

async list_multiparts(params, object_sdk) {
dbg.log0('NamespaceGCP.list_multiparts:', this.bucket, inspect(params));

const res = await this.s3_client.listParts({
/** @type {import('@aws-sdk/client-s3').ListPartsRequest} */
const request = {
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
MaxParts: params.max,
PartNumberMarker: params.num_marker,
}).promise();
};
const command = new ListPartsCommand(request);
const res = await this.s3_client.send(command);

dbg.log0('NamespaceGCP.list_multiparts:', this.bucket, inspect(params), 'res', inspect(res));
return {
Expand All @@ -399,7 +418,8 @@ class NamespaceGCP {
async complete_object_upload(params, object_sdk) {
dbg.log0('NamespaceGCP.complete_object_upload:', this.bucket, inspect(params));

const res = await this.s3_client.completeMultipartUpload({
/** @type {import('@aws-sdk/client-s3').CompleteMultipartUploadRequest} */
const request = {
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
Expand All @@ -409,7 +429,9 @@ class NamespaceGCP {
ETag: `"${p.etag}"`,
}))
}
}).promise();
};
const command = new CompleteMultipartUploadCommand(request);
const res = await this.s3_client.send(command);

dbg.log0('NamespaceGCP.complete_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
const etag = s3_utils.parse_etag(res.ETag);
Expand All @@ -418,11 +440,14 @@ class NamespaceGCP {

async abort_object_upload(params, object_sdk) {
dbg.log0('NamespaceGCP.abort_object_upload:', this.bucket, inspect(params));
const res = await this.s3_client.abortMultipartUpload({
/** @type {import('@aws-sdk/client-s3').AbortMultipartUploadRequest} */
const request = {
Bucket: this.bucket,
Key: params.key,
UploadId: params.obj_id,
}).promise();
};
const command = new AbortMultipartUploadCommand(request);
const res = await this.s3_client.send(command);

dbg.log0('NamespaceGCP.abort_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
}
Expand Down Expand Up @@ -471,8 +496,8 @@ class NamespaceGCP {

const res = await P.map_with_concurrency(10, params.objects, obj =>
this.gcs.bucket(this.bucket).file(obj.key).delete()
.then(() => ({}))
.catch(err => ({ err_code: err.code, err_message: err.errors[0].reason || 'InternalError' })));
.then(() => ({}))
.catch(err => ({ err_code: err.code, err_message: err.errors[0].reason || 'InternalError' })));

dbg.log1('NamespaceGCP.delete_multiple_objects:', this.bucket, inspect(params), 'res', inspect(res));

Expand Down Expand Up @@ -500,9 +525,9 @@ class NamespaceGCP {
// Set an empty metadata object to remove all tags
const res = await this.gcs.bucket(this.bucket).file(params.key).setMetadata({});
dbg.log0('NamespaceGCP.delete_object_tagging:', this.bucket, inspect(params), 'res', inspect(res));
} catch (err) {
} catch (err) {
dbg.error('NamespaceGCP.delete_object_tagging error:', err);
}
}
}
async put_object_tagging(params, object_sdk) {
dbg.log0('NamespaceGCP.put_object_tagging:', this.bucket, inspect(params));
Expand Down

0 comments on commit 7eeb771

Please sign in to comment.