Skip to content

Commit

Permalink
Merge pull request #205 from HSF/flin
Browse files Browse the repository at this point in the history
Dynamic activate worker factor for pull
  • Loading branch information
mightqxc authored Nov 27, 2023
2 parents fe516ec + ced4c8e commit 34d470e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 11 deletions.
2 changes: 1 addition & 1 deletion package/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies = [
[project.optional-dependencies]
kubernetes = ['kubernetes', 'pyyaml']
mysql = ['mysqlclient']
atlasgrid = ['uWSGI >= 2.0.20', 'htcondor >= 9.2.0', 'mysqlclient >= 2.0.3']
atlasgrid = ['uWSGI >= 2.0.20', 'htcondor >= 10.3.0', 'mysqlclient >= 2.1.1']

[project.urls]
Homepage = "https://github.com/PanDAWMS/panda-harvester/wiki"
Expand Down
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "15-11-2023 11:08:52 on flin (by mightqxc)"
timestamp = "23-11-2023 14:47:01 on flin (by mightqxc)"
42 changes: 40 additions & 2 deletions pandaharvester/harvesterbody/worker_adjuster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import traceback

from future.utils import iteritems
from pandaharvester.harvesterconfig import harvester_config
Expand All @@ -15,6 +16,7 @@
class WorkerAdjuster(object):
# constructor
def __init__(self, queue_config_mapper):
tmp_log = core_utils.make_logger(_logger, method_name="__init__")
self.queue_configMapper = queue_config_mapper
self.pluginFactory = PluginFactory()
self.dbProxy = DBProxy()
Expand All @@ -25,9 +27,43 @@ def __init__(self, queue_config_mapper):
except AttributeError:
self.maxNewWorkers = None
try:
self.activate_worker_factor = float(harvester_config.submitter.activateWorkerFactor)
if harvester_config.submitter.activateWorkerFactor == "auto":
self.activate_worker_factor = "auto"
else:
self.activate_worker_factor = float(harvester_config.submitter.activateWorkerFactor)
except AttributeError:
self.activate_worker_factor = 1.0
except Exception:
err_str = traceback.format_exc()
tmp_log.error(err_str)
tmp_log.warning("default activate_worker_factor = 1")
self.activate_worker_factor = 1.0

# get activate worker factor
def get_activate_worker_factor(self, site_name=None):
tmp_log = core_utils.make_logger(_logger, f"site={site_name}", method_name="get_activate_worker_factor")
ret_val = 1.0
if self.activate_worker_factor == "auto":
# dynamic factor
worker_stats_from_panda = self.dbProxy.get_cache("worker_statistics.json", None)
if not worker_stats_from_panda:
# got empty, return default
pass
else:
worker_stats_from_panda = worker_stats_from_panda.data
try:
# return 1/n_harvester_instances for the site
val_dict = worker_stats_from_panda[site_name]
n_harvester_instances = len(val_dict)
ret_val = 1.0 / min(n_harvester_instances, 1)
except KeyError:
# no data for this site, return default
pass
else:
# static factor
ret_val = self.activate_worker_factor
tmp_log.debug(f"ret_val={ret_val}")
return ret_val

# define number of workers to submit based on various information
def define_num_workers(self, static_num_workers, site_name):
Expand Down Expand Up @@ -149,7 +185,9 @@ def define_num_workers(self, static_num_workers, site_name):
else:
# limit the queue to the number of activated jobs to avoid empty pilots
try:
n_activated = max(job_stats[queue_name]["activated"] * self.activate_worker_factor, 1) # avoid no activity queues
n_activated = max(
int(job_stats[queue_name]["activated"] * self.get_activate_worker_factor(queue_name)), 1
) # avoid no activity queues
except KeyError:
# zero job in the queue
tmp_log.debug("no job in queue")
Expand Down
19 changes: 19 additions & 0 deletions pandaharvester/harvestercommunicator/panda_communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,3 +868,22 @@ def get_max_worker_id(self):
retVal = retVal.text
tmpLog.debug(f"done with {retStat} {retVal}")
return retStat, retVal

# get worker stats from PanDA
def get_worker_stats_from_panda(self):
tmp_log = self.make_logger(method_name="get_worker_stats_from_panda")
tmp_log.debug("start")
tmp_stat, tmp_res = self.post_ssl("getWorkerStats", {})
stats = {}
if tmp_stat is False:
ret_msg = "FAILED"
core_utils.dump_error_message(tmp_log, tmp_res)
else:
try:
stats = tmp_res.json()
ret_msg = "OK"
except Exception:
ret_msg = "Exception"
core_utils.dump_error_message(tmp_log)
tmp_log.debug(f"done with {ret_msg} {stats}")
return stats, ret_msg
16 changes: 9 additions & 7 deletions pandaharvester/harvestercore/db_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3435,24 +3435,25 @@ def get_worker_stats(self, site_name):
for computingSite, jobType, resourceType, nNewWorkers in resQ:
retMap.setdefault(jobType, {})
if resourceType not in retMap[jobType]:
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": nNewWorkers}
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}

# get worker stats
sqlW = "SELECT wt.status, wt.computingSite, pq.jobType, pq.resourceType, COUNT(*) cnt "
sqlW += f"FROM {workTableName} wt, {pandaQueueTableName} pq "
sqlW += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status IN (:st1,:st2) "
sqlW += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status IN (:st1,:st2,:st3) "
sqlW += "GROUP BY wt.status, wt.computingSite, pq.jobType, pq.resourceType "
# get worker stats
varMap = dict()
varMap[":siteName"] = site_name
varMap[":st1"] = "running"
varMap[":st2"] = "submitted"
varMap[":st3"] = "finished"
self.execute(sqlW, varMap)
resW = self.cur.fetchall()
for workerStatus, computingSite, jobType, resourceType, cnt in resW:
retMap.setdefault(jobType, {})
if resourceType not in retMap:
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": 0}
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}
retMap[jobType][resourceType][workerStatus] = cnt
# commit
self.commit()
Expand Down Expand Up @@ -3483,32 +3484,33 @@ def get_worker_stats_bulk(self, active_ups_queues):
retMap.setdefault(computingSite, {})
retMap[computingSite].setdefault(jobType, {})
if resourceType and resourceType != "ANY" and resourceType not in retMap[computingSite][jobType]:
retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": nNewWorkers}
retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}

# get worker stats
sqlW = "SELECT wt.status, wt.computingSite, wt.jobType, wt.resourceType, COUNT(*) cnt "
sqlW += f"FROM {workTableName} wt "
sqlW += "WHERE wt.status IN (:st1,:st2) "
sqlW += "WHERE wt.status IN (:st1,:st2,:st3) "
sqlW += "GROUP BY wt.status,wt.computingSite, wt.jobType, wt.resourceType "
# get worker stats
varMap = dict()
varMap[":st1"] = "running"
varMap[":st2"] = "submitted"
varMap[":st3"] = "finished"
self.execute(sqlW, varMap)
resW = self.cur.fetchall()
for workerStatus, computingSite, jobType, resourceType, cnt in resW:
if resourceType and resourceType != "ANY":
retMap.setdefault(computingSite, {})
retMap[computingSite].setdefault(jobType, {})
retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0})
retMap[computingSite][jobType][resourceType][workerStatus] = cnt

# if there are no jobs for an active UPS queue, it needs to be initialized so that the pilot streaming
# on panda server starts processing the queue
if active_ups_queues:
for ups_queue in active_ups_queues:
if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {"ANY": {}}:
retMap[ups_queue] = {"managed": {"SCORE": {"running": 0, "submitted": 0, "to_submit": 0}}}
retMap[ups_queue] = {"managed": {"SCORE": {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}}

# commit
self.commit()
Expand Down

0 comments on commit 34d470e

Please sign in to comment.