From b0e904fdbcd3ea0b7b8d160914ab233f8259b21e Mon Sep 17 00:00:00 2001 From: Romy <35330373+romayalon@users.noreply.github.com> Date: Thu, 31 Oct 2024 12:34:18 +0200 Subject: [PATCH 1/4] Fix pre-signed url issues Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com> (cherry picked from commit 6bb95dc192a7d4659377eaebb02734545d8f0ecf) --- src/endpoint/s3/s3_errors.js | 15 +++++++++++++++ src/util/http_utils.js | 11 +++++++++-- src/util/signature_utils.js | 15 ++++++++++++--- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/src/endpoint/s3/s3_errors.js b/src/endpoint/s3/s3_errors.js index fbed4e796a..2800a8d538 100644 --- a/src/endpoint/s3/s3_errors.js +++ b/src/endpoint/s3/s3_errors.js @@ -534,6 +534,21 @@ S3Error.InvalidEncodingType = Object.freeze({ message: 'Invalid Encoding Method specified in Request', http_code: 400, }); +S3Error.AuthorizationQueryParametersError = Object.freeze({ + code: 'AuthorizationQueryParametersError', + message: 'X-Amz-Expires must be less than a week (in seconds); that is, the given X-Amz-Expires must be less than 604800 seconds', + http_code: 400, +}); +S3Error.RequestExpired = Object.freeze({ + code: 'AccessDenied', + message: 'Request has expired', + http_code: 403, +}); +S3Error.RequestNotValidYet = Object.freeze({ + code: 'AccessDenied', + message: 'request is not valid yet', + http_code: 403, +}); //////////////////////////////////////////////////////////////// // S3 Select // diff --git a/src/util/http_utils.js b/src/util/http_utils.js index 8498b774b1..279ef780db 100644 --- a/src/util/http_utils.js +++ b/src/util/http_utils.js @@ -597,8 +597,15 @@ function check_headers(req, options) { if (isNaN(req_time) && !req.query.Expires && is_not_anonymous_req) { throw new options.ErrorClass(options.error_access_denied); } - - if (Math.abs(Date.now() - req_time) > config.AMZ_DATE_MAX_TIME_SKEW_MILLIS) { + // futureus presigned url request should throw AccessDenied with request is no valid yet message + // we add a grace period of one second + const is_presigned_url = req.query.Expires || (req.query['X-Amz-Date'] && req.query['X-Amz-Expires']); + if (is_presigned_url && (req_time > (Date.now() + 2000))) { + throw new S3Error(S3Error.RequestNotValidYet); + } + // on regular requests the skew limit is 15 minutes + // on presigned url requests we don't need to check skew + if (!is_presigned_url && (Math.abs(Date.now() - req_time) > config.AMZ_DATE_MAX_TIME_SKEW_MILLIS)) { throw new options.ErrorClass(options.error_request_time_too_skewed); } } diff --git a/src/util/signature_utils.js b/src/util/signature_utils.js index 785075d9b4..5ff6314d80 100644 --- a/src/util/signature_utils.js +++ b/src/util/signature_utils.js @@ -9,6 +9,7 @@ const path = require('path'); const crypto = require('crypto'); const S3Error = require('../endpoint/s3/s3_errors').S3Error; const http_utils = require('./http_utils'); +const time_utils = require('./time_utils'); const { RpcError } = require('../rpc'); @@ -126,9 +127,9 @@ function _string_to_sign_v4(req, signed_headers, xamzdate, region, service) { function _check_expiry_query_v4(request_date, expires_seconds) { const now = Date.now(); - const expires = (new Date(request_date).getTime()) + (Number(expires_seconds) * 1000); + const expires = (new Date(time_utils.parse_amz_date(request_date)).getTime()) + (Number(expires_seconds) * 1000); if (now > expires) { - throw new Error('Authentication Expired (V4)'); + throw new S3Error(S3Error.RequestExpired); } } @@ -180,7 +181,7 @@ function _check_expiry_query_s3(expires_epoch) { const now = Date.now(); const expires = Number(expires_epoch) * 1000; if (now > expires) { - throw new Error('Authentication Expired (S3)'); + throw new S3Error(S3Error.RequestExpired); } } @@ -285,12 +286,20 @@ function make_auth_token_from_request(req) { */ function check_request_expiry(req) { if (req.query['X-Amz-Date'] && req.query['X-Amz-Expires']) { + _check_expiry_limit(req.query['X-Amz-Expires']); _check_expiry_query_v4(req.query['X-Amz-Date'], req.query['X-Amz-Expires']); } else if (req.query.Expires) { + _check_expiry_limit(req.query.Expires); _check_expiry_query_s3(req.query.Expires); } } +// expiry_seconds limit is 7 days +function _check_expiry_limit(expiry_seconds) { + if (Number(expiry_seconds) > 7 * 24 * 60 * 60 * 1000) { + throw new S3Error(S3Error.AuthorizationQueryParametersError); + } +} /** * From 8c88f1f85a6b344abef4073b25973d10b0fb638d Mon Sep 17 00:00:00 2001 From: Romy <35330373+romayalon@users.noreply.github.com> Date: Mon, 4 Nov 2024 12:17:49 +0200 Subject: [PATCH 2/4] Presigned URL - fix expiry limit from milliseconds to seconds Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com> (cherry picked from commit 24ee56f0050d6f386efd2f99e7009dc72ff61248) --- src/test/unit_tests/nc_coretest.js | 2 + src/test/unit_tests/test_bucketspace.js | 134 +++++++++++++++++++++++- src/util/cloud_utils.js | 4 +- src/util/signature_utils.js | 7 +- 4 files changed, 141 insertions(+), 6 deletions(-) diff --git a/src/test/unit_tests/nc_coretest.js b/src/test/unit_tests/nc_coretest.js index 22a2156b4b..d085000b7b 100644 --- a/src/test/unit_tests/nc_coretest.js +++ b/src/test/unit_tests/nc_coretest.js @@ -96,6 +96,8 @@ function setup(options = {}) { }); // TODO - run health + // wait 2 seconds before announcing nc coretes is ready + await P.delay(2000); await announce(`nc coretest ready... (took ${((Date.now() - start) / 1000).toFixed(1)} sec)`); }); diff --git a/src/test/unit_tests/test_bucketspace.js b/src/test/unit_tests/test_bucketspace.js index 95d49f645b..fc14d2b0d7 100644 --- a/src/test/unit_tests/test_bucketspace.js +++ b/src/test/unit_tests/test_bucketspace.js @@ -1,5 +1,5 @@ /* Copyright (C) 2020 NooBaa */ -/*eslint max-lines: ["error", 2200]*/ +/*eslint max-lines: ["error", 2500]*/ /*eslint max-lines-per-function: ["error", 1300]*/ /*eslint max-statements: ["error", 80, { "ignoreTopLevelFunctions": true }]*/ 'use strict'; @@ -12,9 +12,15 @@ const util = require('util'); const http = require('http'); const mocha = require('mocha'); const assert = require('assert'); +const http_utils = require('../../util/http_utils'); const config = require('../../../config'); const fs_utils = require('../../util/fs_utils'); const { JSON_SUFFIX } = require('../../sdk/config_fs'); +const fetch = require('node-fetch'); +const P = require('../../util/promise'); +const cloud_utils = require('../../util/cloud_utils'); +const SensitiveString = require('../../util/sensitive_string'); +const S3Error = require('../../../src/endpoint/s3/s3_errors').S3Error; const test_utils = require('../system_tests/test_utils'); const { stat, open } = require('../../util/nb_native')().fs; const { get_process_fs_context } = require('../../util/native_fs_utils'); @@ -2116,3 +2122,129 @@ async function delete_anonymous_account(accounts_dir_path, account_config_path) console.log('Anonymous account Deleted'); } +mocha.describe('Presigned URL tests', function() { + this.timeout(50000); // eslint-disable-line no-invalid-this + const nsr = 'presigned_url_nsr'; + const account_name = 'presigned_url_account'; + const fs_path = path.join(TMP_PATH, 'presigned_url_tests/'); + const presigned_url_bucket = 'presigned-url-bucket'; + const presigned_url_object = 'presigned-url-object.txt'; + const presigned_body = 'presigned_body'; + let s3_client; + let access_key; + let secret_key; + CORETEST_ENDPOINT = coretest.get_http_address(); + let valid_default_presigned_url; + let presigned_url_params; + + mocha.before(async function() { + await fs_utils.create_fresh_path(fs_path); + await rpc_client.pool.create_namespace_resource({ name: nsr, nsfs_config: { fs_root_path: fs_path } }); + const new_buckets_path = is_nc_coretest ? fs_path : '/'; + const nsfs_account_config = { + uid: process.getuid(), gid: process.getgid(), new_buckets_path, nsfs_only: true + }; + const account_params = { ...new_account_params, email: `${account_name}@noobaa.io`, name: account_name, default_resource: nsr, nsfs_account_config }; + const res = await rpc_client.account.create_account(account_params); + access_key = res.access_keys[0].access_key; + secret_key = res.access_keys[0].secret_key; + s3_client = generate_s3_client(access_key.unwrap(), secret_key.unwrap(), CORETEST_ENDPOINT); + await s3_client.createBucket({ Bucket: presigned_url_bucket }); + await s3_client.putObject({ Bucket: presigned_url_bucket, Key: presigned_url_object, Body: presigned_body }); + + presigned_url_params = { + bucket: new SensitiveString(presigned_url_bucket), + key: presigned_url_object, + endpoint: CORETEST_ENDPOINT, + access_key: access_key, + secret_key: secret_key + }; + valid_default_presigned_url = cloud_utils.get_signed_url(presigned_url_params); + }); + + mocha.after(async function() { + if (!is_nc_coretest) return; + await s3_client.deleteObject({ Bucket: presigned_url_bucket, Key: presigned_url_object }); + await s3_client.deleteBucket({ Bucket: presigned_url_bucket }); + await rpc_client.account.delete_account({ email: `${account_name}@noobaa.io` }); + await fs_utils.folder_delete(fs_path); + }); + + it('fetch valid presigned URL - 604800 seconds - epoch expiry - should return object data', async () => { + const data = await fetchData(valid_default_presigned_url); + assert.equal(data, presigned_body); + }); + + it('fetch valid presigned URL - 604800 seconds - should return object data - with valid date + expiry in seconds', async () => { + const now = new Date(); + const valid_url_with_date = valid_default_presigned_url + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + 604800; + const data = await fetchData(valid_url_with_date); + assert.equal(data, presigned_body); + }); + + it('fetch invalid presigned URL - 604800 seconds - epoch expiry + with future date', async () => { + const now = new Date(); + // Add one hour (3600000 milliseconds) + const one_hour_in_ms = 60 * 60 * 1000; + const one_hour_from_now = new Date(now.getTime() + one_hour_in_ms); + const future_presigned_url = valid_default_presigned_url + '&X-Amz-Date=' + one_hour_from_now.toISOString(); + const expected_err = new S3Error(S3Error.RequestNotValidYet); + await assert_throws_async(fetchData(future_presigned_url), expected_err.message); + }); + + it('fetch invalid presigned URL - 604800 expiry seconds + with future date', async () => { + const now = new Date(); + // Add one hour (3600000 milliseconds) + const one_hour_in_ms = 60 * 60 * 1000; + const one_hour_from_now = new Date(now.getTime() + one_hour_in_ms); + const future_presigned_url = valid_default_presigned_url + '&X-Amz-Date=' + one_hour_from_now.toISOString() + '&X-Amz-Expires=' + 604800; + const expected_err = new S3Error(S3Error.RequestNotValidYet); + await assert_throws_async(fetchData(future_presigned_url), expected_err.message); + }); + + it('fetch invalid presigned URL - 604800 seconds - epoch expiry - URL expired', async () => { + const expired_presigned_url = cloud_utils.get_signed_url(presigned_url_params, 1); + // wait for 2 seconds before fetching the url + await P.delay(2000); + const expected_err = new S3Error(S3Error.RequestExpired); + await assert_throws_async(fetchData(expired_presigned_url), expected_err.message); + }); + + it('fetch invalid presigned URL - 604800 expiry seconds - URL expired', async () => { + const now = new Date(); + const expired_presigned_url = cloud_utils.get_signed_url(presigned_url_params, 1) + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + 1; + // wait for 2 seconds before fetching the url + await P.delay(2000); + const expected_err = new S3Error(S3Error.RequestExpired); + await assert_throws_async(fetchData(expired_presigned_url), expected_err.message); + }); + + it('fetch invalid presigned URL - expiry expoch - expire in bigger than limit', async () => { + const invalid_expiry = 604800 + 10; + const invalid_expiry_presigned_url = cloud_utils.get_signed_url(presigned_url_params, invalid_expiry); + const expected_err = new S3Error(S3Error.AuthorizationQueryParametersError); + await assert_throws_async(fetchData(invalid_expiry_presigned_url), expected_err.message); + }); + + it('fetch invalid presigned URL - expire in bigger than limit', async () => { + const now = new Date(); + const invalid_expiry = 604800 + 10; + const invalid_expiry_presigned_url = cloud_utils.get_signed_url(presigned_url_params, invalid_expiry) + '&X-Amz-Date=' + now.toISOString() + '&X-Amz-Expires=' + invalid_expiry; + const expected_err = new S3Error(S3Error.AuthorizationQueryParametersError); + await assert_throws_async(fetchData(invalid_expiry_presigned_url), expected_err.message); + }); +}); + +async function fetchData(presigned_url) { + const response = await fetch(presigned_url, { agent: new http.Agent({ keepAlive: false }) }); + let data; + if (!response.ok) { + data = (await response.text()).trim(); + const err_json = (await http_utils.parse_xml_to_js(data)).Error; + const err = new Error(err_json.Message); + err.code = err_json.Code; + throw err; + } + data = await response.text(); + return data.trim(); +} diff --git a/src/util/cloud_utils.js b/src/util/cloud_utils.js index 33b5a005f2..c963d05ceb 100644 --- a/src/util/cloud_utils.js +++ b/src/util/cloud_utils.js @@ -58,7 +58,7 @@ async function generate_aws_sts_creds(params, roleSessionName) { ); } -function get_signed_url(params) { +function get_signed_url(params, expiry = 604800) { const s3 = new AWS.S3({ endpoint: params.endpoint, credentials: { @@ -81,7 +81,7 @@ function get_signed_url(params) { Bucket: params.bucket.unwrap(), Key: params.key, VersionId: params.version_id, - Expires: 604800 + Expires: expiry } ); } diff --git a/src/util/signature_utils.js b/src/util/signature_utils.js index 5ff6314d80..76cecb4eae 100644 --- a/src/util/signature_utils.js +++ b/src/util/signature_utils.js @@ -289,14 +289,15 @@ function check_request_expiry(req) { _check_expiry_limit(req.query['X-Amz-Expires']); _check_expiry_query_v4(req.query['X-Amz-Date'], req.query['X-Amz-Expires']); } else if (req.query.Expires) { - _check_expiry_limit(req.query.Expires); + const expiry_seconds = req.query.Expires - Math.ceil(Date.now() / 1000); + _check_expiry_limit(expiry_seconds); _check_expiry_query_s3(req.query.Expires); } } -// expiry_seconds limit is 7 days +// expiry_seconds limit is 7 days = 604800 seconds function _check_expiry_limit(expiry_seconds) { - if (Number(expiry_seconds) > 7 * 24 * 60 * 60 * 1000) { + if (Number(expiry_seconds) > 604800) { throw new S3Error(S3Error.AuthorizationQueryParametersError); } } From fb926c4cb8c757838d2b8062b5055d765cbc73fc Mon Sep 17 00:00:00 2001 From: shirady <57721533+shirady@users.noreply.github.com> Date: Wed, 27 Nov 2024 13:33:41 +0200 Subject: [PATCH 3/4] NC | NSFS | Disable Prometheus reporting 1. The function clusterMetrics might throw an error (with an error message Operation timed out.), in case it fails, there is no try-catch block, and will end with uncaughtException. To avoid that we wrap it with try-catch block, log an error message and return from the function (we can't proceed with undefined metrics variable). Also added a json response with the details of the error in case it happens. 2. I noticed that the call prom_reporting.start_server(metrics_port, true); in fork_utils.js did not have await and added it - had to change the function signature of start_workers to have async and update its JSDoc @returns part, and also separate the call and the condition in endpoint.js. 3. Add a comment in the endpoint main for developers regarding implementation (running on the main process, running with multiple forks). 4. Change the handing under the function gather_metrics so we will print the json error object that we returned from the server. 5. Change the printing of "_create_nsfs_report: nsfs_report" so we can see the object (we used to see: "core.server.analytic_services.prometheus_reporting:: _create_nsfs_report: nsfs_report [object Object]"). 6. Rename a function (we had a minor typo) from nsfs_io_state_handler to nsfs_io_stats_handler. 7. Change the order inside the function start_server in prometheus_reporting.js and set the if (req.url === '/metrics/nsfs_stats') { because it doesn't use metrics. Signed-off-by: shirady <57721533+shirady@users.noreply.github.com> (cherry picked from commit 09c4c456cd0069606cf373b623662af3ea3e8fb5) --- src/endpoint/endpoint.js | 12 ++++++++- src/manage_nsfs/diagnose.js | 12 +++++++-- .../analytic_services/prometheus_reporting.js | 27 ++++++++++++++----- src/util/fork_utils.js | 10 +++---- 4 files changed, 46 insertions(+), 15 deletions(-) diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index c5d08c567b..09afd64fb9 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -109,7 +109,17 @@ async function main(options = {}) { // the primary just forks and returns, workers will continue to serve fork_count = options.forks ?? config.ENDPOINT_FORKS; const metrics_port = options.metrics_port || config.EP_METRICS_SERVER_PORT; - if (fork_utils.start_workers(metrics_port, fork_count)) return; + /** + * Please notice that we can run the main in 2 states: + * 1. Only the primary process runs the main (fork is 0 or undefined) - everything that + * is implemented here would be run by this process. + * 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that + * in only relevant to the primary process it should be implemented in + * fork_utils.start_workers because the primary process returns after start_workers + * and the forks will continue executing the code lines in this function + * */ + const is_workers_started_from_primary = await fork_utils.start_workers(metrics_port, fork_count); + if (is_workers_started_from_primary) return; const http_port = options.http_port || config.ENDPOINT_PORT; const https_port = options.https_port || config.ENDPOINT_SSL_PORT; diff --git a/src/manage_nsfs/diagnose.js b/src/manage_nsfs/diagnose.js index 56d65266d2..2e87015b31 100644 --- a/src/manage_nsfs/diagnose.js +++ b/src/manage_nsfs/diagnose.js @@ -59,9 +59,17 @@ async function gather_metrics() { const buffer = await buffer_utils.read_stream_join(res); const body = buffer.toString('utf8'); metrics_output = JSON.parse(body); + if (!metrics_output) throw new Error('received empty metrics response', { cause: res.statusCode }); + write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output); + } else if (res.statusCode >= 500 && res.rawHeaders.includes('application/json')) { + const buffer = await buffer_utils.read_stream_join(res); + const body = buffer.toString('utf8'); + const error_output = JSON.parse(body); + if (!error_output) throw new Error('received empty metrics response', { cause: res.statusCode }); + throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, ...error_output }); + } else { + throw new Error('received empty metrics response', { cause: res.statusCode }); } - if (!metrics_output) throw new Error('recieved empty metrics response', { cause: res.statusCode }); - write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output); } catch (err) { dbg.warn('could not receive metrics response', err); throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, cause: err?.errors?.[0] || err }); diff --git a/src/server/analytic_services/prometheus_reporting.js b/src/server/analytic_services/prometheus_reporting.js index 164e4d502b..caee6bf86b 100644 --- a/src/server/analytic_services/prometheus_reporting.js +++ b/src/server/analytic_services/prometheus_reporting.js @@ -61,12 +61,7 @@ async function start_server( const server = http.createServer(async (req, res) => { // Serve all metrics on the root path for system that do have one or more fork running. if (fork_enabled) { - const metrics = await aggregatorRegistry.clusterMetrics(); - if (req.url === '' || req.url === '/') { - res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType }); - res.end(metrics); - return; - } + // we would like this part to be first as clusterMetrics might fail. if (req.url === '/metrics/nsfs_stats') { res.writeHead(200, { 'Content-Type': 'text/plain' }); const nsfs_report = { @@ -77,6 +72,24 @@ async function start_server( res.end(JSON.stringify(nsfs_report)); return; } + let metrics; + try { + metrics = await aggregatorRegistry.clusterMetrics(); + } catch (err) { + dbg.error('start_server: Could not get the metrics, got an error', err); + res.writeHead(504, { 'Content-Type': 'application/json' }); + const reply = JSON.stringify({ + error: 'Internal server error - timeout', + message: 'Looks like the server is taking a long time to respond (Could not get the metrics)', + }); + res.end(reply); + return; + } + if (req.url === '' || req.url === '/') { + res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType }); + res.end(metrics); + return; + } // Serve report's metrics on the report name path const report_name = req.url.substr(1); const single_metrics = export_single_metrics(metrics, report_name); @@ -165,7 +178,7 @@ async function metrics_nsfs_stats_handler() { op_stats_counters: op_stats_counters, fs_worker_stats_counters: fs_worker_stats_counters }; - dbg.log1(`_create_nsfs_report: nsfs_report ${nsfs_report}`); + dbg.log1('_create_nsfs_report: nsfs_report', nsfs_report); return JSON.stringify(nsfs_report); } diff --git a/src/util/fork_utils.js b/src/util/fork_utils.js index 5b6e6378e8..52c00d7b60 100644 --- a/src/util/fork_utils.js +++ b/src/util/fork_utils.js @@ -30,9 +30,9 @@ const fs_workers_stats = {}; * * @param {number?} count number of workers to start. * @param {number?} metrics_port prometheus metris port. - * @returns {boolean} true if workers were started. + * @returns {Promise} true if workers were started. */ -function start_workers(metrics_port, count = 0) { +async function start_workers(metrics_port, count = 0) { const exit_events = []; if (cluster.isPrimary && count > 0) { for (let i = 0; i < count; ++i) { @@ -68,12 +68,12 @@ function start_workers(metrics_port, count = 0) { }); for (const id in cluster.workers) { if (id) { - cluster.workers[id].on('message', nsfs_io_state_handler); + cluster.workers[id].on('message', nsfs_io_stats_handler); } } if (metrics_port > 0) { dbg.log0('Starting metrics server', metrics_port); - prom_reporting.start_server(metrics_port, true); + await prom_reporting.start_server(metrics_port, true); dbg.log0('Started metrics server successfully'); } return true; @@ -82,7 +82,7 @@ function start_workers(metrics_port, count = 0) { return false; } -function nsfs_io_state_handler(msg) { +function nsfs_io_stats_handler(msg) { if (msg.io_stats) { for (const [key, value] of Object.entries(msg.io_stats)) { io_stats[key] += value; From 279d55a1a3b74b0a61c1b406cf368b7a1b657986 Mon Sep 17 00:00:00 2001 From: Romy <35330373+romayalon@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:28:06 +0200 Subject: [PATCH 4/4] NSFS | Replace ChunkFS with FileWriter Signed-off-by: Romy <35330373+romayalon@users.noreply.github.com> (cherry picked from commit a451243179968472dca31fc3322b1e311cf3d64c) --- config.js | 2 + src/sdk/namespace_fs.js | 29 ++-- src/test/unit_tests/index.js | 2 +- src/test/unit_tests/nc_index.js | 2 +- src/test/unit_tests/test_chunk_fs.js | 34 ----- src/test/unit_tests/test_file_writer.js | 69 +++++++++ ...k_fs_hashing.js => file_writer_hashing.js} | 51 +++---- src/util/chunk_fs.js | 111 -------------- src/util/file_writer.js | 140 ++++++++++++++++++ 9 files changed, 252 insertions(+), 188 deletions(-) delete mode 100644 src/test/unit_tests/test_chunk_fs.js create mode 100644 src/test/unit_tests/test_file_writer.js rename src/tools/{chunk_fs_hashing.js => file_writer_hashing.js} (76%) delete mode 100644 src/util/chunk_fs.js create mode 100644 src/util/file_writer.js diff --git a/config.js b/config.js index 8dcb737d71..11a88b0e0d 100644 --- a/config.js +++ b/config.js @@ -872,6 +872,8 @@ config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10; // anonymous account name config.ANONYMOUS_ACCOUNT_NAME = 'anonymous'; +config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024; + //////////////////////////// // NSFS NON CONTAINERIZED // //////////////////////////// diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index c9b1488a36..a32c9692f3 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -19,7 +19,7 @@ const stream_utils = require('../util/stream_utils'); const buffer_utils = require('../util/buffer_utils'); const size_utils = require('../util/size_utils'); const native_fs_utils = require('../util/native_fs_utils'); -const ChunkFS = require('../util/chunk_fs'); +const FileWriter = require('../util/file_writer'); const LRUCache = require('../util/lru_cache'); const nb_native = require('../util/nb_native'); const RpcError = require('../rpc/rpc_error'); @@ -1563,30 +1563,33 @@ class NamespaceFS { // Can be finetuned further on if needed and inserting the Semaphore logic inside // Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream) async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) { - const { source_stream, copy_source } = params; + const { copy_source } = params; try { // Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy const md5_enabled = this._is_force_md5_enabled(object_sdk); - const chunk_fs = new ChunkFS({ + const file_writer = new FileWriter({ target_file, fs_context, - stats: this.stats, - namespace_resource_id: this.namespace_resource_id, - md5_enabled, offset, + md5_enabled, + stats: this.stats, bucket: params.bucket, - large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size + large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size, + namespace_resource_id: this.namespace_resource_id, }); - chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1)); + file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err)); + file_writer.on('finish', arg => dbg.log1('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg)); + file_writer.on('close', arg => dbg.log1('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg)); + if (copy_source) { - await this.read_object_stream(copy_source, object_sdk, chunk_fs); + await this.read_object_stream(copy_source, object_sdk, file_writer); } else if (params.source_params) { - await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs); + await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer); } else { - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + await stream_utils.pipeline([params.source_stream, file_writer]); + await stream_utils.wait_finished(file_writer); } - return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes }; + return { digest: file_writer.digest, total_bytes: file_writer.total_bytes }; } catch (error) { dbg.error('_upload_stream had error: ', error); throw error; diff --git a/src/test/unit_tests/index.js b/src/test/unit_tests/index.js index ecec0b3775..aa6a8d8f3f 100644 --- a/src/test/unit_tests/index.js +++ b/src/test/unit_tests/index.js @@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder'); require('./test_mirror_writer'); require('./test_namespace_fs'); require('./test_ns_list_objects'); -require('./test_chunk_fs'); +require('./test_file_writer'); require('./test_namespace_fs_mpu'); require('./test_nb_native_fs'); require('./test_s3select'); diff --git a/src/test/unit_tests/nc_index.js b/src/test/unit_tests/nc_index.js index 145120e3b8..4473ce53bb 100644 --- a/src/test/unit_tests/nc_index.js +++ b/src/test/unit_tests/nc_index.js @@ -7,7 +7,7 @@ coretest.setup(); require('./test_namespace_fs'); require('./test_ns_list_objects'); -require('./test_chunk_fs'); +require('./test_file_writer'); require('./test_namespace_fs_mpu'); require('./test_nb_native_fs'); require('./test_nc_nsfs_cli'); diff --git a/src/test/unit_tests/test_chunk_fs.js b/src/test/unit_tests/test_chunk_fs.js deleted file mode 100644 index 3885e21ed7..0000000000 --- a/src/test/unit_tests/test_chunk_fs.js +++ /dev/null @@ -1,34 +0,0 @@ -/* Copyright (C) 2020 NooBaa */ -/* eslint-disable no-invalid-this */ -'use strict'; - -const mocha = require('mocha'); -const chunk_fs_hashing = require('../../tools/chunk_fs_hashing'); - -mocha.describe('ChunkFS', function() { - const RUN_TIMEOUT = 10 * 60 * 1000; - - mocha.it('Concurrent ChunkFS with hash target', async function() { - const self = this; - self.timeout(RUN_TIMEOUT); - await chunk_fs_hashing.hash_target(); - }); - - mocha.it('Concurrent ChunkFS with file target', async function() { - const self = this; - self.timeout(RUN_TIMEOUT); - await chunk_fs_hashing.file_target(); - }); - - mocha.it('Concurrent ChunkFS with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() { - const self = this; - self.timeout(RUN_TIMEOUT); - // The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L - // so we will flush buffers because of reaching max num of buffers and not because we reached the max NSFS buf size - // chunk size = 100, num_chunks = (10 * 1024 * 1024)/100 < 104587, 104587 = num_chunks > 1024 - // chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L - const chunk_size = 100; - const parts_s = 50; - await chunk_fs_hashing.file_target(chunk_size, parts_s); - }); -}); diff --git a/src/test/unit_tests/test_file_writer.js b/src/test/unit_tests/test_file_writer.js new file mode 100644 index 0000000000..c3773de62b --- /dev/null +++ b/src/test/unit_tests/test_file_writer.js @@ -0,0 +1,69 @@ +/* Copyright (C) 2020 NooBaa */ +/* eslint-disable no-invalid-this */ +'use strict'; + +const mocha = require('mocha'); +const config = require('../../../config'); +const file_writer_hashing = require('../../tools/file_writer_hashing'); +const orig_iov_max = config.NSFS_DEFAULT_IOV_MAX; + +// on iov_max small tests we need to use smaller amount of parts and chunks to ensure that the test will finish +// in a reasonable period of time because we will flush max 1/2 buffers at a time. +const small_iov_num_parts = 20; + + +mocha.describe('FileWriter', function() { + const RUN_TIMEOUT = 10 * 60 * 1000; + + mocha.afterEach(function() { + config.NSFS_DEFAULT_IOV_MAX = orig_iov_max; + }); + + mocha.it('Concurrent FileWriter with hash target', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.hash_target(); + }); + + mocha.it('Concurrent FileWriter with file target', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.file_target(); + }); + + mocha.it('Concurrent FileWriter with hash target - iov_max=1', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 1); + }); + + mocha.it('Concurrent FileWriter with file target - iov_max=1', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.file_target(undefined, small_iov_num_parts, 1); + }); + + mocha.it('Concurrent FileWriter with hash target - iov_max=2', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 2); + }); + + mocha.it('Concurrent FileWriter with file target - iov_max=2', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + await file_writer_hashing.file_target(undefined, small_iov_num_parts, 2); + }); + + mocha.it('Concurrent FileWriter with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() { + const self = this; + self.timeout(RUN_TIMEOUT); + // The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L + // so we will flush buffers because of reaching max num of buffers and not because we reached the max NSFS buf size + // chunk size = 100, num_chunks = (10 * 1024 * 1024)/100 < 104587, 104587 = num_chunks > 1024 + // chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L + const chunk_size = 100; + const parts_s = 50; + await file_writer_hashing.file_target(chunk_size, parts_s); + }); +}); diff --git a/src/tools/chunk_fs_hashing.js b/src/tools/file_writer_hashing.js similarity index 76% rename from src/tools/chunk_fs_hashing.js rename to src/tools/file_writer_hashing.js index 09a3465720..e3f2c980dc 100644 --- a/src/tools/chunk_fs_hashing.js +++ b/src/tools/file_writer_hashing.js @@ -3,7 +3,7 @@ const crypto = require('crypto'); const assert = require('assert'); -const ChunkFS = require('../util/chunk_fs'); +const FileWriter = require('../util/file_writer'); const config = require('../../config'); const nb_native = require('../util/nb_native'); const stream_utils = require('../util/stream_utils'); @@ -19,7 +19,8 @@ const PARTS = Number(argv.parts) || 1000; const CONCURRENCY = Number(argv.concurrency) || 20; const CHUNK = Number(argv.chunk) || 16 * 1024; const PART_SIZE = Number(argv.part_size) || 20 * 1024 * 1024; -const F_PREFIX = argv.dst_folder || '/tmp/chunk_fs_hashing/'; +const F_PREFIX = argv.dst_folder || '/tmp/file_writer_hashing/'; +const IOV_MAX = argv.iov_max || config.NSFS_DEFAULT_IOV_MAX; const DEFAULT_FS_CONFIG = { uid: Number(argv.uid) || process.getuid(), @@ -28,12 +29,6 @@ const DEFAULT_FS_CONFIG = { warn_threshold_ms: 100, }; -const DUMMY_RPC = { - object: { - update_endpoint_stats: (...params) => null - } -}; - const XATTR_USER_PREFIX = 'user.'; // TODO: In order to verify validity add content_md5_mtime as well const XATTR_MD5_KEY = XATTR_USER_PREFIX + 'content_md5'; @@ -64,41 +59,42 @@ function assign_md5_to_fs_xattr(md5_digest, fs_xattr) { return fs_xattr; } -async function hash_target() { - await P.map_with_concurrency(CONCURRENCY, Array(PARTS).fill(), async () => { +async function hash_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) { + config.NSFS_DEFAULT_IOV_MAX = iov_max; + await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => { const data = crypto.randomBytes(PART_SIZE); const content_md5 = crypto.createHash('md5').update(data).digest('hex'); // Using async generator function in order to push data in small chunks const source_stream = stream.Readable.from(async function*() { - for (let i = 0; i < data.length; i += CHUNK) { - yield data.slice(i, i + CHUNK); + for (let i = 0; i < data.length; i += chunk_size) { + yield data.slice(i, i + chunk_size); } }()); const target = new TargetHash(); - const chunk_fs = new ChunkFS({ + const file_writer = new FileWriter({ target_file: target, fs_context: DEFAULT_FS_CONFIG, - rpc_client: DUMMY_RPC, namespace_resource_id: 'MajesticSloth' }); - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + await stream_utils.pipeline([source_stream, file_writer]); + await stream_utils.wait_finished(file_writer); const write_hash = target.digest(); console.log( 'Hash target', - `NativeMD5=${chunk_fs.digest}`, + `NativeMD5=${file_writer.digest}`, `DataWriteCryptoMD5=${write_hash}`, `DataOriginMD5=${content_md5}`, ); assert.strictEqual(content_md5, write_hash); if (config.NSFS_CALCULATE_MD5) { - assert.strictEqual(chunk_fs.digest, content_md5); - assert.strictEqual(chunk_fs.digest, write_hash); + assert.strictEqual(file_writer.digest, content_md5); + assert.strictEqual(file_writer.digest, write_hash); } }); } -async function file_target(chunk_size = CHUNK, parts = PARTS) { +async function file_target(chunk_size = CHUNK, parts = PARTS, iov_max = IOV_MAX) { + config.NSFS_DEFAULT_IOV_MAX = iov_max; fs.mkdirSync(F_PREFIX); await P.map_with_concurrency(CONCURRENCY, Array(parts).fill(), async () => { let target_file; @@ -113,32 +109,31 @@ async function file_target(chunk_size = CHUNK, parts = PARTS) { yield data.slice(i, i + chunk_size); } }()); - const chunk_fs = new ChunkFS({ + const file_writer = new FileWriter({ target_file, fs_context: DEFAULT_FS_CONFIG, - rpc_client: DUMMY_RPC, namespace_resource_id: 'MajesticSloth' }); - await stream_utils.pipeline([source_stream, chunk_fs]); - await stream_utils.wait_finished(chunk_fs); + await stream_utils.pipeline([source_stream, file_writer]); + await stream_utils.wait_finished(file_writer); if (XATTR) { await target_file.replacexattr( DEFAULT_FS_CONFIG, - assign_md5_to_fs_xattr(chunk_fs.digest, {}) + assign_md5_to_fs_xattr(file_writer.digest, {}) ); } if (FSYNC) await target_file.fsync(DEFAULT_FS_CONFIG); const write_hash = crypto.createHash('md5').update(fs.readFileSync(F_TARGET)).digest('hex'); console.log( 'File target', - `NativeMD5=${chunk_fs.digest}`, + `NativeMD5=${file_writer.digest}`, `DataWriteMD5=${write_hash}`, `DataOriginMD5=${content_md5}`, ); assert.strictEqual(content_md5, write_hash); if (config.NSFS_CALCULATE_MD5) { - assert.strictEqual(chunk_fs.digest, content_md5); - assert.strictEqual(chunk_fs.digest, write_hash); + assert.strictEqual(file_writer.digest, content_md5); + assert.strictEqual(file_writer.digest, write_hash); } // Leave parts on error fs.rmSync(F_TARGET); diff --git a/src/util/chunk_fs.js b/src/util/chunk_fs.js deleted file mode 100644 index e60c9aa9e0..0000000000 --- a/src/util/chunk_fs.js +++ /dev/null @@ -1,111 +0,0 @@ -/* Copyright (C) 2016 NooBaa */ -'use strict'; - -const stream = require('stream'); -const config = require('../../config'); -const nb_native = require('./nb_native'); -const dbg = require('../util/debug_module')(__filename); - -/** - * - * ChunkFS - * - * Calculates etag and writes stream data to the filesystem batching data buffers - * - */ -class ChunkFS extends stream.Transform { - - /** - * @param {{ - * target_file: object, - * fs_context: object, - * namespace_resource_id: string, - * md5_enabled: boolean, - * stats: import('../sdk/endpoint_stats_collector').EndpointStatsCollector, - * offset?: number, - * bucket?: string, - * large_buf_size?: number, - * }} params - */ - constructor({ target_file, fs_context, namespace_resource_id, md5_enabled, stats, offset, bucket, large_buf_size }) { - super(); - this.q_buffers = []; - this.q_size = 0; - this.MD5Async = md5_enabled ? new (nb_native().crypto.MD5Async)() : undefined; - this.target_file = target_file; - this.fs_context = fs_context; - this.count = 1; - this.total_bytes = 0; - this.offset = offset; - this.namespace_resource_id = namespace_resource_id; - this.stats = stats; - this._total_num_buffers = 0; - const platform_iov_max = nb_native().fs.PLATFORM_IOV_MAX; - this.iov_max = platform_iov_max ? Math.min(platform_iov_max, config.NSFS_DEFAULT_IOV_MAX) : config.NSFS_DEFAULT_IOV_MAX; - this.bucket = bucket; - this.large_buf_size = large_buf_size || config.NSFS_BUF_SIZE_L; - } - - async _transform(chunk, encoding, callback) { - try { - if (this.MD5Async) await this.MD5Async.update(chunk); - this.stats?.update_nsfs_write_stats({ - namespace_resource_id: this.namespace_resource_id, - size: chunk.length, - count: this.count, - bucket_name: this.bucket, - }); - this.count = 0; - while (chunk && chunk.length) { - const available_size = this.large_buf_size - this.q_size; - const buf = (available_size < chunk.length) ? chunk.slice(0, available_size) : chunk; - this.q_buffers.push(buf); - this.q_size += buf.length; - // Should flush when num of chunks equals to max iov which is the limit according to https://linux.die.net/man/2/writev - // or when q_size equals to config.NSFS_BUF_SIZE_L, but added greater than just in case - if (this.q_buffers.length === this.iov_max || this.q_size >= config.NSFS_BUF_SIZE_L) await this._flush_buffers(); - chunk = (available_size < chunk.length) ? chunk.slice(available_size) : null; - } - return callback(); - } catch (error) { - console.error('ChunkFS _transform failed', this.q_size, this._total_num_buffers, error); - return callback(error); - } - } - - async _flush(callback) { - // wait before the last writev to finish - await this._flush_buffers(callback); - } - - // callback will be passed only at the end of the stream by _flush() - // while this function is called without callback during _transform() and returns a promise. - async _flush_buffers(callback) { - try { - if (this.q_buffers.length) { - const buffers_to_write = this.q_buffers; - const size_to_write = this.q_size; - this.q_buffers = []; - this.q_size = 0; - dbg.log1(`Chunk_fs._flush_buffers: writing ${buffers_to_write.length} buffers, total size is ${size_to_write}`); - await this.target_file.writev(this.fs_context, buffers_to_write, this.offset); - // Hold the ref on the buffers from the JS side - this._total_num_buffers += buffers_to_write.length; - this.total_bytes += size_to_write; - if (this.offset >= 0) this.offset += size_to_write; - } - if (callback) { - if (this.MD5Async) this.digest = (await this.MD5Async.digest()).toString('hex'); - return callback(); - } - } catch (error) { - console.error('ChunkFS _flush_buffers failed', this.q_size, this._total_num_buffers, error); - if (callback) { - return callback(error); - } - throw error; - } - } -} - -module.exports = ChunkFS; diff --git a/src/util/file_writer.js b/src/util/file_writer.js new file mode 100644 index 0000000000..c8df126719 --- /dev/null +++ b/src/util/file_writer.js @@ -0,0 +1,140 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const stream = require('stream'); +const config = require('../../config'); +const nb_native = require('./nb_native'); +const dbg = require('../util/debug_module')(__filename); + +/** + * FileWriter is a Writable stream that write data to a filesystem file, + * with optional calculation of md5 for etag. + */ +class FileWriter extends stream.Writable { + + /** + * @param {{ + * target_file: object, + * fs_context: object, + * namespace_resource_id: string, + * md5_enabled: boolean, + * stats: import('../sdk/endpoint_stats_collector').EndpointStatsCollector, + * offset?: number, + * bucket?: string, + * large_buf_size?: number, + * }} params + */ + constructor({ target_file, fs_context, namespace_resource_id, md5_enabled, stats, offset, bucket, large_buf_size }) { + super({ highWaterMark: config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD }); + this.target_file = target_file; + this.fs_context = fs_context; + this.offset = offset; + this.total_bytes = 0; + this.count_once = 1; + this.stats = stats; + this.bucket = bucket; + this.namespace_resource_id = namespace_resource_id; + this.large_buf_size = large_buf_size || config.NSFS_BUF_SIZE_L; + this.MD5Async = md5_enabled ? new (nb_native().crypto.MD5Async)() : undefined; + const platform_iov_max = nb_native().fs.PLATFORM_IOV_MAX; + this.iov_max = platform_iov_max ? Math.min(platform_iov_max, config.NSFS_DEFAULT_IOV_MAX) : config.NSFS_DEFAULT_IOV_MAX; + } + + /** + * @param {number} size + */ + _update_stats(size) { + const count = this.count_once; + this.count_once = 0; // counting the entire operation just once + this.stats?.update_nsfs_write_stats({ + namespace_resource_id: this.namespace_resource_id, + size, + count, + bucket_name: this.bucket, + }); + } + + /** + * @param {Buffer[]} buffers + * @param {number} size + */ + async _update_md5(buffers, size) { + // TODO optimize by calling once with all buffers + for (const buf of buffers) { + await this.MD5Async.update(buf); + } + } + + /** + * @param {Buffer[]} buffers + * @param {number} size + */ + async _write_all_buffers(buffers, size) { + if (buffers.length <= this.iov_max) { + await this._write_to_file(buffers, size); + } else { + let iov_start = 0; + while (iov_start < buffers.length) { + const iov_end = Math.min(buffers.length, iov_start + this.iov_max); + const buffers_to_write = buffers.slice(iov_start, iov_end); + const size_to_write = buffers_to_write.reduce((s, b) => s + b.length, 0); + await this._write_to_file(buffers_to_write, size_to_write); + iov_start = iov_end; + } + } + } + + /** + * @param {Buffer[]} buffers + * @param {number} size + */ + async _write_to_file(buffers, size) { + dbg.log1(`FileWriter._write_to_file: buffers ${buffers.length} size ${size} offset ${this.offset}`); + await this.target_file.writev(this.fs_context, buffers, this.offset); + if (this.offset >= 0) this.offset += size; // when offset<0 we just append + this.total_bytes += size; + } + + /** + * @param {Array<{ chunk: Buffer; encoding: BufferEncoding; }>} chunks + * @param {(error?: Error | null) => void} callback + */ + async _writev(chunks, callback) { + try { + let size = 0; + const buffers = chunks.map(it => { + size += it.chunk.length; + return it.chunk; + }); + await Promise.all([ + this.MD5Async && this._update_md5(buffers, size), + this._write_all_buffers(buffers, size), + ]); + this._update_stats(size); + return callback(); + } catch (err) { + console.error('FileWriter._writev: failed', err); + return callback(err); + } + } + + /** + * @param {(error?: Error | null) => void} callback + */ + async _final(callback) { + try { + if (this.MD5Async) { + const digest = await this.MD5Async.digest(); + this.digest = digest.toString('hex'); + } + + return callback(); + } catch (err) { + console.error('FileWriter._final: failed', err); + return callback(err); + } + } + +} + +module.exports = FileWriter;