Skip to content

Commit

Permalink
Merge pull request #8605 from romayalon/romy-backports-to-5.17
Browse files Browse the repository at this point in the history
Backports to 5.17.2
  • Loading branch information
nimrod-becker authored Dec 17, 2024
2 parents c28cbea + 279d55a commit 8de3616
Show file tree
Hide file tree
Showing 19 changed files with 472 additions and 211 deletions.
2 changes: 2 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;
// anonymous account name
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';

config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;

////////////////////////////
// NSFS NON CONTAINERIZED //
////////////////////////////
Expand Down
12 changes: 11 additions & 1 deletion src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,17 @@ async function main(options = {}) {
// the primary just forks and returns, workers will continue to serve
fork_count = options.forks ?? config.ENDPOINT_FORKS;
const metrics_port = options.metrics_port || config.EP_METRICS_SERVER_PORT;
if (fork_utils.start_workers(metrics_port, fork_count)) return;
/**
* Please notice that we can run the main in 2 states:
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
* is implemented here would be run by this process.
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
* in only relevant to the primary process it should be implemented in
* fork_utils.start_workers because the primary process returns after start_workers
* and the forks will continue executing the code lines in this function
* */
const is_workers_started_from_primary = await fork_utils.start_workers(metrics_port, fork_count);
if (is_workers_started_from_primary) return;

const http_port = options.http_port || config.ENDPOINT_PORT;
const https_port = options.https_port || config.ENDPOINT_SSL_PORT;
Expand Down
15 changes: 15 additions & 0 deletions src/endpoint/s3/s3_errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,21 @@ S3Error.InvalidEncodingType = Object.freeze({
message: 'Invalid Encoding Method specified in Request',
http_code: 400,
});
S3Error.AuthorizationQueryParametersError = Object.freeze({
code: 'AuthorizationQueryParametersError',
message: 'X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds',
http_code: 400,
});
S3Error.RequestExpired = Object.freeze({
code: 'AccessDenied',
message: 'Request has expired',
http_code: 403,
});
S3Error.RequestNotValidYet = Object.freeze({
code: 'AccessDenied',
message: 'request is not valid yet',
http_code: 403,
});

////////////////////////////////////////////////////////////////
// S3 Select //
Expand Down
12 changes: 10 additions & 2 deletions src/manage_nsfs/diagnose.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,17 @@ async function gather_metrics() {
const buffer = await buffer_utils.read_stream_join(res);
const body = buffer.toString('utf8');
metrics_output = JSON.parse(body);
if (!metrics_output) throw new Error('received empty metrics response', { cause: res.statusCode });
write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output);
} else if (res.statusCode >= 500 && res.rawHeaders.includes('application/json')) {
const buffer = await buffer_utils.read_stream_join(res);
const body = buffer.toString('utf8');
const error_output = JSON.parse(body);
if (!error_output) throw new Error('received empty metrics response', { cause: res.statusCode });
throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, ...error_output });
} else {
throw new Error('received empty metrics response', { cause: res.statusCode });
}
if (!metrics_output) throw new Error('recieved empty metrics response', { cause: res.statusCode });
write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output);
} catch (err) {
dbg.warn('could not receive metrics response', err);
throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, cause: err?.errors?.[0] || err });
Expand Down
29 changes: 16 additions & 13 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const stream_utils = require('../util/stream_utils');
const buffer_utils = require('../util/buffer_utils');
const size_utils = require('../util/size_utils');
const native_fs_utils = require('../util/native_fs_utils');
const ChunkFS = require('../util/chunk_fs');
const FileWriter = require('../util/file_writer');
const LRUCache = require('../util/lru_cache');
const nb_native = require('../util/nb_native');
const RpcError = require('../rpc/rpc_error');
Expand Down Expand Up @@ -1563,30 +1563,33 @@ class NamespaceFS {
// Can be finetuned further on if needed and inserting the Semaphore logic inside
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
const { source_stream, copy_source } = params;
const { copy_source } = params;
try {
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
const md5_enabled = this._is_force_md5_enabled(object_sdk);
const chunk_fs = new ChunkFS({
const file_writer = new FileWriter({
target_file,
fs_context,
stats: this.stats,
namespace_resource_id: this.namespace_resource_id,
md5_enabled,
offset,
md5_enabled,
stats: this.stats,
bucket: params.bucket,
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size,
namespace_resource_id: this.namespace_resource_id,
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err));
file_writer.on('finish', arg => dbg.log1('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg));
file_writer.on('close', arg => dbg.log1('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg));

if (copy_source) {
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
await this.read_object_stream(copy_source, object_sdk, file_writer);
} else if (params.source_params) {
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
await stream_utils.pipeline([params.source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
}
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
return { digest: file_writer.digest, total_bytes: file_writer.total_bytes };
} catch (error) {
dbg.error('_upload_stream had error: ', error);
throw error;
Expand Down
27 changes: 20 additions & 7 deletions src/server/analytic_services/prometheus_reporting.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,7 @@ async function start_server(
const server = http.createServer(async (req, res) => {
// Serve all metrics on the root path for system that do have one or more fork running.
if (fork_enabled) {
const metrics = await aggregatorRegistry.clusterMetrics();
if (req.url === '' || req.url === '/') {
res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType });
res.end(metrics);
return;
}
// we would like this part to be first as clusterMetrics might fail.
if (req.url === '/metrics/nsfs_stats') {
res.writeHead(200, { 'Content-Type': 'text/plain' });
const nsfs_report = {
Expand All @@ -77,6 +72,24 @@ async function start_server(
res.end(JSON.stringify(nsfs_report));
return;
}
let metrics;
try {
metrics = await aggregatorRegistry.clusterMetrics();
} catch (err) {
dbg.error('start_server: Could not get the metrics, got an error', err);
res.writeHead(504, { 'Content-Type': 'application/json' });
const reply = JSON.stringify({
error: 'Internal server error - timeout',
message: 'Looks like the server is taking a long time to respond (Could not get the metrics)',
});
res.end(reply);
return;
}
if (req.url === '' || req.url === '/') {
res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType });
res.end(metrics);
return;
}
// Serve report's metrics on the report name path
const report_name = req.url.substr(1);
const single_metrics = export_single_metrics(metrics, report_name);
Expand Down Expand Up @@ -165,7 +178,7 @@ async function metrics_nsfs_stats_handler() {
op_stats_counters: op_stats_counters,
fs_worker_stats_counters: fs_worker_stats_counters
};
dbg.log1(`_create_nsfs_report: nsfs_report ${nsfs_report}`);
dbg.log1('_create_nsfs_report: nsfs_report', nsfs_report);
return JSON.stringify(nsfs_report);
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder');
require('./test_mirror_writer');
require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_s3select');
Expand Down
2 changes: 2 additions & 0 deletions src/test/unit_tests/nc_coretest.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ function setup(options = {}) {
});

// TODO - run health
// wait 2 seconds before announcing nc coretes is ready
await P.delay(2000);
await announce(`nc coretest ready... (took ${((Date.now() - start) / 1000).toFixed(1)} sec)`);
});

Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/nc_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ coretest.setup();

require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_nc_nsfs_cli');
Expand Down
134 changes: 133 additions & 1 deletion src/test/unit_tests/test_bucketspace.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* Copyright (C) 2020 NooBaa */
/*eslint max-lines: ["error", 2200]*/
/*eslint max-lines: ["error", 2500]*/
/*eslint max-lines-per-function: ["error", 1300]*/
/*eslint max-statements: ["error", 80, { "ignoreTopLevelFunctions": true }]*/
'use strict';
Expand All @@ -12,9 +12,15 @@ const util = require('util');
const http = require('http');
const mocha = require('mocha');
const assert = require('assert');
const http_utils = require('../../util/http_utils');
const config = require('../../../config');
const fs_utils = require('../../util/fs_utils');
const { JSON_SUFFIX } = require('../../sdk/config_fs');
const fetch = require('node-fetch');
const P = require('../../util/promise');
const cloud_utils = require('../../util/cloud_utils');
const SensitiveString = require('../../util/sensitive_string');
const S3Error = require('../../../src/endpoint/s3/s3_errors').S3Error;
const test_utils = require('../system_tests/test_utils');
const { stat, open } = require('../../util/nb_native')().fs;
const { get_process_fs_context } = require('../../util/native_fs_utils');
Expand Down Expand Up @@ -2116,3 +2122,129 @@ async function delete_anonymous_account(accounts_dir_path, account_config_path)
console.log('Anonymous account Deleted');
}

mocha.describe('Presigned URL tests', function() {
this.timeout(50000); // eslint-disable-line no-invalid-this
const nsr = 'presigned_url_nsr';
const account_name = 'presigned_url_account';
const fs_path = path.join(TMP_PATH, 'presigned_url_tests/');
const presigned_url_bucket = 'presigned-url-bucket';
const presigned_url_object = 'presigned-url-object.txt';
const presigned_body = 'presigned_body';
let s3_client;
let access_key;
let secret_key;
CORETEST_ENDPOINT = coretest.get_http_address();
let valid_default_presigned_url;
let presigned_url_params;

mocha.before(async function() {
await fs_utils.create_fresh_path(fs_path);
await rpc_client.pool.create_namespace_resource({ name: nsr, nsfs_config: { fs_root_path: fs_path } });
const new_buckets_path = is_nc_coretest ? fs_path : '/';
const nsfs_account_config = {
uid: process.getuid(), gid: process.getgid(), new_buckets_path, nsfs_only: true
};
const account_params = { ...new_account_params, email: `${account_name}@noobaa.io`, name: account_name, default_resource: nsr, nsfs_account_config };
const res = await rpc_client.account.create_account(account_params);
access_key = res.access_keys[0].access_key;
secret_key = res.access_keys[0].secret_key;
s3_client = generate_s3_client(access_key.unwrap(), secret_key.unwrap(), CORETEST_ENDPOINT);
await s3_client.createBucket({ Bucket: presigned_url_bucket });
await s3_client.putObject({ Bucket: presigned_url_bucket, Key: presigned_url_object, Body: presigned_body });

presigned_url_params = {
bucket: new SensitiveString(presigned_url_bucket),
key: presigned_url_object,
endpoint: CORETEST_ENDPOINT,
access_key: access_key,
secret_key: secret_key
};
valid_default_presigned_url = cloud_utils.get_signed_url(presigned_url_params);
});

mocha.after(async function() {
if (!is_nc_coretest) return;
await s3_client.deleteObject({ Bucket: presigned_url_bucket, Key: presigned_url_object });
await s3_client.deleteBucket({ Bucket: presigned_url_bucket });
await rpc_client.account.delete_account({ email: `${account_name}@noobaa.io` });
await fs_utils.folder_delete(fs_path);
});

it('fetch valid presigned URL - 604800 seconds - epoch expiry - should return object data', async () => {
const data = await fetchData(valid_default_presigned_url);
assert.equal(data, presigned_body);
});

it('fetch valid presigned URL - 604800 seconds - should return object data - with valid date + expiry in seconds', async () => {
const now = new Date();
const valid_url_with_date = valid_default_presigned_url + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + 604800;
const data = await fetchData(valid_url_with_date);
assert.equal(data, presigned_body);
});

it('fetch invalid presigned URL - 604800 seconds - epoch expiry + with future date', async () => {
const now = new Date();
// Add one hour (3600000 milliseconds)
const one_hour_in_ms = 60 * 60 * 1000;
const one_hour_from_now = new Date(now.getTime() + one_hour_in_ms);
const future_presigned_url = valid_default_presigned_url + '&X-Amz-Date=' + one_hour_from_now.toISOString();
const expected_err = new S3Error(S3Error.RequestNotValidYet);
await assert_throws_async(fetchData(future_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - 604800 expiry seconds + with future date', async () => {
const now = new Date();
// Add one hour (3600000 milliseconds)
const one_hour_in_ms = 60 * 60 * 1000;
const one_hour_from_now = new Date(now.getTime() + one_hour_in_ms);
const future_presigned_url = valid_default_presigned_url + '&X-Amz-Date=' + one_hour_from_now.toISOString() + '&X-Amz-Expires=' + 604800;
const expected_err = new S3Error(S3Error.RequestNotValidYet);
await assert_throws_async(fetchData(future_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - 604800 seconds - epoch expiry - URL expired', async () => {
const expired_presigned_url = cloud_utils.get_signed_url(presigned_url_params, 1);
// wait for 2 seconds before fetching the url
await P.delay(2000);
const expected_err = new S3Error(S3Error.RequestExpired);
await assert_throws_async(fetchData(expired_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - 604800 expiry seconds - URL expired', async () => {
const now = new Date();
const expired_presigned_url = cloud_utils.get_signed_url(presigned_url_params, 1) + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + 1;
// wait for 2 seconds before fetching the url
await P.delay(2000);
const expected_err = new S3Error(S3Error.RequestExpired);
await assert_throws_async(fetchData(expired_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - expiry expoch - expire in bigger than limit', async () => {
const invalid_expiry = 604800 + 10;
const invalid_expiry_presigned_url = cloud_utils.get_signed_url(presigned_url_params, invalid_expiry);
const expected_err = new S3Error(S3Error.AuthorizationQueryParametersError);
await assert_throws_async(fetchData(invalid_expiry_presigned_url), expected_err.message);
});

it('fetch invalid presigned URL - expire in bigger than limit', async () => {
const now = new Date();
const invalid_expiry = 604800 + 10;
const invalid_expiry_presigned_url = cloud_utils.get_signed_url(presigned_url_params, invalid_expiry) + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + invalid_expiry;
const expected_err = new S3Error(S3Error.AuthorizationQueryParametersError);
await assert_throws_async(fetchData(invalid_expiry_presigned_url), expected_err.message);
});
});

async function fetchData(presigned_url) {
const response = await fetch(presigned_url, { agent: new http.Agent({ keepAlive: false }) });
let data;
if (!response.ok) {
data = (await response.text()).trim();
const err_json = (await http_utils.parse_xml_to_js(data)).Error;
const err = new Error(err_json.Message);
err.code = err_json.Code;
throw err;
}
data = await response.text();
return data.trim();
}
34 changes: 0 additions & 34 deletions src/test/unit_tests/test_chunk_fs.js

This file was deleted.

Loading

0 comments on commit 8de3616

Please sign in to comment.