Skip to content

Commit

Permalink
dynamic activate worker factor for pull
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Nov 23, 2023
1 parent c67d291 commit 7879266
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "22-11-2023 16:55:42 on flin (by mightqxc)"
timestamp = "23-11-2023 12:54:20 on flin (by mightqxc)"
42 changes: 40 additions & 2 deletions pandaharvester/harvesterbody/worker_adjuster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import traceback

from future.utils import iteritems
from pandaharvester.harvesterconfig import harvester_config
Expand All @@ -15,6 +16,7 @@
class WorkerAdjuster(object):
# constructor
def __init__(self, queue_config_mapper):
tmp_log = core_utils.make_logger(_logger, method_name="__init__")
self.queue_configMapper = queue_config_mapper
self.pluginFactory = PluginFactory()
self.dbProxy = DBProxy()
Expand All @@ -25,9 +27,43 @@ def __init__(self, queue_config_mapper):
except AttributeError:
self.maxNewWorkers = None
try:
self.activate_worker_factor = float(harvester_config.submitter.activateWorkerFactor)
if harvester_config.submitter.activateWorkerFactor == "auto":
self.activate_worker_factor = "auto"
else:
self.activate_worker_factor = float(harvester_config.submitter.activateWorkerFactor)
except AttributeError:
self.activate_worker_factor = 1.0
except Exception:
err_str = traceback.format_exc()
tmp_log.error(err_str)
tmp_log.warning("default activate_worker_factor = 1")
self.activate_worker_factor = 1.0

# get activate worker factor
def get_activate_worker_factor(self, site_name=None):
tmp_log = core_utils.make_logger(_logger, f"site={site_name}", method_name="get_activate_worker_factor")
ret_val = 1.0
if self.activate_worker_factor == "auto":
# dynamic factor
worker_stats_from_panda = self.dbProxy.get_cache("worker_statistics.json", None)
if not worker_stats_from_panda:
# got empty, return default
pass
else:
worker_stats_from_panda = worker_stats_from_panda.data
try:
# return 1/n_harvester_instances for the site
val_dict = worker_stats_from_panda[site_name]
n_harvester_instances = len(val_dict)
ret_val = 1.0 / min(n_harvester_instances, 1)
except KeyError:
# no data for this site, return default
pass
else:
# static factor
ret_val = self.activate_worker_factor
tmp_log.debug(f"ret_val={ret_val}")
return ret_val

# define number of workers to submit based on various information
def define_num_workers(self, static_num_workers, site_name):
Expand Down Expand Up @@ -149,7 +185,9 @@ def define_num_workers(self, static_num_workers, site_name):
else:
# limit the queue to the number of activated jobs to avoid empty pilots
try:
n_activated = max(job_stats[queue_name]["activated"] * self.activate_worker_factor, 1) # avoid no activity queues
n_activated = max(
int(job_stats[queue_name]["activated"] * self.get_activate_worker_factor(queue_name)), 1
) # avoid no activity queues
except KeyError:
# zero job in the queue
tmp_log.debug("no job in queue")
Expand Down

0 comments on commit 7879266

Please sign in to comment.