From 7879266fd985a89e52914cff797c3eb047787051 Mon Sep 17 00:00:00 2001 From: mightqxc Date: Thu, 23 Nov 2023 13:54:19 +0100 Subject: [PATCH] dynamic activate worker factor for pull --- pandaharvester/commit_timestamp.py | 2 +- .../harvesterbody/worker_adjuster.py | 42 ++++++++++++++++++- 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index fe67e0fa..c36734b6 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "22-11-2023 16:55:42 on flin (by mightqxc)" +timestamp = "23-11-2023 12:54:20 on flin (by mightqxc)" diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index 71f318ad..ff4c7811 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -1,4 +1,5 @@ import copy +import traceback from future.utils import iteritems from pandaharvester.harvesterconfig import harvester_config @@ -15,6 +16,7 @@ class WorkerAdjuster(object): # constructor def __init__(self, queue_config_mapper): + tmp_log = core_utils.make_logger(_logger, method_name="__init__") self.queue_configMapper = queue_config_mapper self.pluginFactory = PluginFactory() self.dbProxy = DBProxy() @@ -25,9 +27,43 @@ def __init__(self, queue_config_mapper): except AttributeError: self.maxNewWorkers = None try: - self.activate_worker_factor = float(harvester_config.submitter.activateWorkerFactor) + if harvester_config.submitter.activateWorkerFactor == "auto": + self.activate_worker_factor = "auto" + else: + self.activate_worker_factor = float(harvester_config.submitter.activateWorkerFactor) except AttributeError: self.activate_worker_factor = 1.0 + except Exception: + err_str = traceback.format_exc() + tmp_log.error(err_str) + tmp_log.warning("default activate_worker_factor = 1") + self.activate_worker_factor = 1.0 + + # get activate worker factor + def get_activate_worker_factor(self, site_name=None): + tmp_log = core_utils.make_logger(_logger, f"site={site_name}", method_name="get_activate_worker_factor") + ret_val = 1.0 + if self.activate_worker_factor == "auto": + # dynamic factor + worker_stats_from_panda = self.dbProxy.get_cache("worker_statistics.json", None) + if not worker_stats_from_panda: + # got empty, return default + pass + else: + worker_stats_from_panda = worker_stats_from_panda.data + try: + # return 1/n_harvester_instances for the site + val_dict = worker_stats_from_panda[site_name] + n_harvester_instances = len(val_dict) + ret_val = 1.0 / min(n_harvester_instances, 1) + except KeyError: + # no data for this site, return default + pass + else: + # static factor + ret_val = self.activate_worker_factor + tmp_log.debug(f"ret_val={ret_val}") + return ret_val # define number of workers to submit based on various information def define_num_workers(self, static_num_workers, site_name): @@ -149,7 +185,9 @@ def define_num_workers(self, static_num_workers, site_name): else: # limit the queue to the number of activated jobs to avoid empty pilots try: - n_activated = max(job_stats[queue_name]["activated"] * self.activate_worker_factor, 1) # avoid no activity queues + n_activated = max( + int(job_stats[queue_name]["activated"] * self.get_activate_worker_factor(queue_name)), 1 + ) # avoid no activity queues except KeyError: # zero job in the queue tmp_log.debug("no job in queue")