Skip to content

Commit

Permalink
Removing K-means and test performance from node allocator
Browse files Browse the repository at this point in the history
Signed-off-by: jackyalbo <[email protected]>
  • Loading branch information
jackyalbo committed Nov 27, 2024
1 parent e5e6a35 commit 5b30564
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 250 deletions.
7 changes: 3 additions & 4 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ config.BUFFERS_MEM_LIMIT_MIN = 32 * 1024 * 1024; // just some workable minimum s
config.BUFFERS_MEM_LIMIT_MAX = 4 * 1024 * 1024 * 1024;
config.BUFFERS_MEM_LIMIT = Math.min(
config.BUFFERS_MEM_LIMIT_MAX,
Math.max(Math.floor(config.CONTAINER_MEM_LIMIT / 4), config.BUFFERS_MEM_LIMIT_MIN,)
Math.max(Math.floor(config.CONTAINER_MEM_LIMIT / 4), config.BUFFERS_MEM_LIMIT_MIN, )
);

////////////////////////
Expand Down Expand Up @@ -104,7 +104,6 @@ config.AGENT_HEARTBEAT_GRACE_TIME = 10 * 60 * 1000; // grace period before an ag
config.CLOUD_ALERT_GRACE_TIME = 3 * 60 * 1000; // grace period before dispatching alert on cloud node status
config.AGENT_RESPONSE_TIMEOUT = 1 * 60 * 1000;
config.AGENT_TEST_CONNECTION_TIMEOUT = 1 * 60 * 1000;
config.STORE_PERF_TEST_INTERVAL = 60 * 60 * 1000; // perform test_store_perf every 1 hour
config.CLOUD_MAX_ALLOWED_IO_TEST_ERRORS = 3;

config.ENABLE_DEV_RANDOM_SEED = process.env.DISABLE_DEV_RANDOM_SEED === 'false' || false;
Expand Down Expand Up @@ -755,10 +754,10 @@ config.NSFS_BUF_POOL_MEM_LIMIT_S = Math.min(Math.floor(config.NSFS_MAX_MEM_SIZE_
config.NSFS_WANTED_BUFFERS_NUMBER) * config.NSFS_BUF_SIZE_S;
// Semaphore size will give 90% of remainning memory to large buffer size, 10% to medium
config.NSFS_BUF_POOL_MEM_LIMIT_M = range_utils.align_down((config.BUFFERS_MEM_LIMIT -
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.1,
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.1,
config.NSFS_BUF_SIZE_M);
config.NSFS_BUF_POOL_MEM_LIMIT_L = range_utils.align_down((config.BUFFERS_MEM_LIMIT -
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.9,
config.NSFS_BUF_POOL_MEM_LIMIT_S - config.NSFS_BUF_POOL_MEM_LIMIT_XS) * 0.9,
config.NSFS_BUF_SIZE_L);

config.NSFS_BUF_WARMUP_SPARSE_FILE_READS = true;
Expand Down
6 changes: 0 additions & 6 deletions src/agent/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ class Agent {
'update_create_node_token',
'update_rpc_config',
'n2n_signal',
'test_store_perf',
'test_store_validity',
'test_network_perf',
'test_network_perf_to_peer',
Expand Down Expand Up @@ -957,11 +956,6 @@ class Agent {
return this.rpc.accept_n2n_signal(req.rpc_params);
}

async test_store_perf(req) {
if (!this.block_store) return {};
return this.block_store.test_store_perf(req.rpc_params);
}

async test_store_validity(req) {
if (!this.block_store) return;
await this.block_store.test_store_validity();
Expand Down
1 change: 1 addition & 0 deletions src/agent/block_store_services/block_store_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ class BlockStoreBase {
});
// cleanup old versions for block stores that have versioning enabled
if (this._delete_block_past_versions) await this._delete_block_past_versions(block_md);
dbg.log1(`test_store_perf for node ${this.node_name}. results:`, reply);
return reply;
} catch (err) {
if (err.rpc_code !== 'AUTH_FAILED' && err.rpc_code !== 'STORAGE_NOT_EXIST') {
Expand Down
23 changes: 0 additions & 23 deletions src/api/agent_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,29 +217,6 @@ module.exports = {
},
},

test_store_perf: {
method: 'POST',
params: {
type: 'object',
properties: {
count: {
type: 'integer'
}
}
},
reply: {
type: 'object',
properties: {
write: {
$ref: 'node_api#/definitions/latency_array'
},
read: {
$ref: 'node_api#/definitions/latency_array'
}
}
}
},

test_store_validity: {
method: 'POST',
},
Expand Down
226 changes: 10 additions & 216 deletions src/server/node_services/nodes_monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ const chance = require('chance')();
// const dclassify = require('dclassify');
const EventEmitter = require('events').EventEmitter;

const kmeans = require('../../util/kmeans');
const P = require('../../util/promise');
const api = require('../../api');
const pkg = require('../../../package.json');
Expand Down Expand Up @@ -714,8 +713,6 @@ class NodesMonitor extends EventEmitter {
}
item.node.drives = item.node.drives || [];
item.node.latency_to_server = item.node.latency_to_server || [];
item.node.latency_of_disk_read = item.node.latency_of_disk_read || [];
item.node.latency_of_disk_write = item.node.latency_of_disk_write || [];
item.node.storage = _.defaults(item.node.storage, {
total: 0,
free: 0,
Expand Down Expand Up @@ -845,7 +842,6 @@ class NodesMonitor extends EventEmitter {
.then(worker);
};
return P.all(_.times(concur, worker))
// .then(() => this._suggest_pool_assign()) // need to be rethinked - out for
.then(() => this._update_nodes_store('force'))
.catch(err => {
dbg.warn('_run: ERROR', err.stack || err);
Expand Down Expand Up @@ -1386,44 +1382,10 @@ class NodesMonitor extends EventEmitter {
}
}

async _test_store_perf(item) {
const now = Date.now();
if (item.last_store_perf_test && now < item.last_store_perf_test + config.STORE_PERF_TEST_INTERVAL) return;
try {


dbg.log1('running _test_store_perf::', item.node.name);
const res = await P.timeout(config.AGENT_RESPONSE_TIMEOUT,
this.client.agent.test_store_perf({
count: 5
}, {
connection: item.connection
})
);
item.last_store_perf_test = Date.now();
dbg.log0(`_test_store_perf for node ${item.node.name} returned:`, res);
this._set_need_update.add(item);
item.node.latency_of_disk_read = js_utils.array_push_keep_latest(
item.node.latency_of_disk_read, res.read, MAX_NUM_LATENCIES);
item.node.latency_of_disk_write = js_utils.array_push_keep_latest(
item.node.latency_of_disk_write, res.write, MAX_NUM_LATENCIES);
} catch (err) {
// ignore "unkonown" errors for cloud resources - we don't want to put the node in detention in cases where we don't know what is the problem
// if there is a real issue, we will take it into account in report_error_on_node_blocks
if (this._is_cloud_node(item) && err.rpc_code !== 'AUTH_FAILED' && err.rpc_code !== 'STORAGE_NOT_EXIST') {
dbg.warn(`encountered an unknown error in _test_store_perf. `, err);
} else {
dbg.log0(`encountered an error in _test_store_perf. `, err);
throw err;
}
}
}

async _test_store(item) {
if (!item.connection) return;

try {
await this._test_store_perf(item);
await this._test_store_validity(item);

dbg.log2('_test_store:: success in test', item.node.name);
Expand Down Expand Up @@ -1874,8 +1836,6 @@ class NodesMonitor extends EventEmitter {
item.io_detention = this._get_item_io_detention(item);
item.connectivity = 'TCP';
item.avg_ping = _.mean(item.node.latency_to_server);
item.avg_disk_read = _.mean(item.node.latency_of_disk_read);
item.avg_disk_write = _.mean(item.node.latency_of_disk_write);
item.storage_full = this._get_item_storage_full(item);
item.has_issues = this._get_item_has_issues(item);
item.readable = this._get_item_readable(item);
Expand Down Expand Up @@ -2520,8 +2480,6 @@ class NodesMonitor extends EventEmitter {

// aggregate data used by suggested pools classification
host_item.avg_ping = _.mean(host_nodes.map(item => item.avg_ping));
host_item.avg_disk_read = _.mean(host_nodes.map(item => item.avg_disk_read));
host_item.avg_disk_write = _.mean(host_nodes.map(item => item.avg_disk_write));


const host_aggragate = this._aggregate_nodes_list(host_nodes);
Expand Down Expand Up @@ -2703,126 +2661,6 @@ class NodesMonitor extends EventEmitter {
return list.slice(skip, skip + limit);
}

// _suggest_pool_assign() {
// // prepare nodes data per pool
// const pools_data_map = new Map();
// for (const host_nodes of this._map_host_id.values()) {
// // get the host aggregated item
// const item = this._consolidate_host(host_nodes);
// item.suggested_pool = ''; // reset previous suggestion
// const host_id = String(item.node.host_id);
// const pool_id = String(item.node.pool);
// const pool = system_store.data.get_by_id(pool_id);
// dbg.log3('_suggest_pool_assign: node', item.node.name, 'pool', pool && pool.name);
// // skip new nodes and cloud\internal nodes
// if (pool && item.node_from_store && item.node.node_type === 'BLOCK_STORE_FS') {
// let pool_data = pools_data_map.get(pool_id);
// if (!pool_data) {
// pool_data = {
// pool_id: pool_id,
// pool_name: pool.name,
// docs: []
// };
// pools_data_map.set(pool_id, pool_data);
// }
// const tokens = this._classify_node_tokens(item);
// pool_data.docs.push(new dclassify.Document(host_id, tokens));
// }
// }

// // take the data of all the pools and use it to train a classifier of nodes to pools
// const data_set = new dclassify.DataSet();
// const classifier = new dclassify.Classifier({
// applyInverse: true
// });
// const pools_to_classify = ['default_resource', config.NEW_SYSTEM_POOL_NAME];
// let num_trained_pools = 0;
// for (const pool_data of pools_data_map.values()) {
// // don't train by the nodes that we need to classify
// if (!pools_to_classify.includes(pool_data.pool_name)) {
// dbg.log3('_suggest_pool_assign: add to data set',
// pool_data.pool_name, pool_data.docs);
// data_set.add(pool_data.pool_name, pool_data.docs);
// num_trained_pools += 1;
// }
// }
// if (num_trained_pools <= 0) {
// dbg.log3('_suggest_pool_assign: no pools to suggest');
// return;
// } else if (num_trained_pools === 1) {
// // the classifier requires at least two options to work
// dbg.log3('_suggest_pool_assign: only one pool to suggest,',
// 'too small for real suggestion');
// return;
// }
// classifier.train(data_set);
// dbg.log3('_suggest_pool_assign: Trained:', classifier,
// 'probabilities', JSON.stringify(classifier.probabilities));

// // for nodes in the default_resource use the classifier to suggest a pool
// const system = system_store.data.systems[0];
// const target_pool = system.pools_by_name[config.NEW_SYSTEM_POOL_NAME];
// const target_pool_data = pools_data_map.get(String(target_pool._id));
// if (target_pool_data) {
// for (const doc of target_pool_data.docs) {
// const host_nodes = this._map_host_id.get(doc.id);
// const hostname = this._item_hostname(host_nodes[0]);
// dbg.log0('_suggest_pool_assign: classify start', hostname, doc);
// const res = classifier.classify(doc);
// dbg.log0('_suggest_pool_assign: classify result', hostname, res);
// let suggested_pool;
// if (res.category !== config.NEW_SYSTEM_POOL_NAME) {
// suggested_pool = res.category;
// } else if (res.secondCategory !== config.NEW_SYSTEM_POOL_NAME) {
// suggested_pool = res.secondCategory;
// }
// host_nodes.forEach(item => {
// item.suggested_pool = suggested_pool;
// });

// }

// }
// }

_classify_node_tokens(item) {
// cannot use numbers as dclassify tokens only discrete strings,
// so we have to transform numbers to some relevant tokens
const tokens = [];
if (item.node.ip) {
const x = item.node.ip.split('.');
if (x.length === 4) {
tokens.push('ip:' + x[0] + '.x.x.x');
tokens.push('ip:' + x[0] + '.' + x[1] + '.x.x');
tokens.push('ip:' + x[0] + '.' + x[1] + '.' + x[2] + '.x');
tokens.push('ip:' + x[0] + '.' + x[1] + '.' + x[2] + '.' + x[3]);
}
}
if (item.node.os_info) {
tokens.push('platform:' + item.node.os_info.platform);
tokens.push('arch:' + item.node.os_info.arch);
tokens.push('totalmem:' + scale_size_token(item.node.os_info.totalmem));
}
if (_.isNumber(item.avg_ping)) {
tokens.push('avg_ping:' + scale_number_token(item.avg_ping));
}
if (_.isNumber(item.avg_disk_read)) {
tokens.push('avg_disk_read:' + scale_number_token(item.avg_disk_read));
}
if (_.isNumber(item.avg_disk_write)) {
tokens.push('avg_disk_write:' + scale_number_token(item.avg_disk_write));
}
if (item.node.storage && _.isNumber(item.node.storage.total)) {
const storage_other =
item.node.storage.total -
item.node.storage.used -
item.node.storage.free;
tokens.push('storage_other:' + scale_size_token(storage_other));
tokens.push('storage_total:' + scale_size_token(item.node.storage.total));
}
return tokens;
}

list_nodes(query, options) {
dbg.log2('list_nodes: query', query);
this._throw_if_not_started_and_loaded();
Expand Down Expand Up @@ -3484,53 +3322,18 @@ class NodesMonitor extends EventEmitter {
list.push(item);
}

const latency_groups = [];
// Not all nodes always have the avg_disk_write.
// KMeans needs valid vectors so we exclude the nodes and assume that they are the slowest
// Since we assume them to be the slowest we will place them in the last KMeans group
const partition_avg_disk_write = _.partition(list, item => !Number.isNaN(item.avg_disk_write) && _.isNumber(item.avg_disk_write));
const nodes_with_avg_disk_write = partition_avg_disk_write[0];
const nodes_without_avg_disk_write = partition_avg_disk_write[1];
if (nodes_with_avg_disk_write.length >= config.NODE_ALLOCATOR_NUM_CLUSTERS) {
// TODO:
// Not handling noise at all.
// This means that we can have a group of 1 noisy drive.
// I rely on avg_disk_write as an average reading to handle any noise.
const kmeans_clusters = kmeans.run(
nodes_with_avg_disk_write.map(item => [item.avg_disk_write]), {
k: config.NODE_ALLOCATOR_NUM_CLUSTERS
}
);

// Sort the groups by latency (centroid is the computed centralized latency for each group)
kmeans_clusters.sort(js_utils.sort_compare_by(item => item.centroid[0], 1));

kmeans_clusters.forEach(kmeans_cluster =>
latency_groups.push(kmeans_cluster.clusterInd.map(index => list[index]))
);

if (nodes_without_avg_disk_write.length) {
latency_groups[latency_groups.length - 1] =
_.concat(latency_groups[latency_groups.length - 1], nodes_without_avg_disk_write);
}

} else {
latency_groups.push(list);
}

const lg_res = latency_groups.map(cluster => {
const max = 1000;
// This is done in order to get the most unused or free drives
// Since we sclice the response up to 1000 drives
cluster.sort(js_utils.sort_compare_by(item => item.node.storage.used, 1));
const nodes_set = (cluster.length < max) ? cluster : cluster.slice(0, max);
return {
nodes: nodes_set.map(item => this._get_node_info(item, params.fields))
};
});
if (_.isEmpty(list)) return { latency_groups: [{ nodes: [] }] };
const max = 1000;
// This is done in order to get the most unused or free drives
// Since we sclice the response up to 1000 drives
list.sort(js_utils.sort_compare_by(item => item.node.storage.used, 1));
const nodes_set = (list.length < max) ? list : list.slice(0, max);
const latency_groups = [{
nodes: nodes_set.map(item => this._get_node_info(item, params.fields))
}];

return {
latency_groups: _.isEmpty(lg_res) ? [{ nodes: [] }] : lg_res
latency_groups
};
}

Expand Down Expand Up @@ -3666,15 +3469,6 @@ class NodesMonitor extends EventEmitter {
}
}

function scale_number_token(num) {
return 2 ** Math.round(Math.log2(num));
}

function scale_size_token(size) {
const scaled = Math.max(scale_number_token(size), size_utils.GIGABYTE);
return size_utils.human_size(scaled);
}

function progress_by_time(time, now) {
if (!time.end) return 0;
return Math.min(1, Math.max(0,
Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ require('./test_agent_blocks_reclaimer');
require('./test_s3_ops');
require('./test_s3_encryption');
require('./test_s3_bucket_policy');
require('./test_node_allocator');
// require('./test_node_allocator');
require('./test_namespace_cache');
require('./test_namespace_auth');
require('./test_encryption');
Expand Down

0 comments on commit 5b30564

Please sign in to comment.