Skip to content

Commit

Permalink
report recent finished workers to panda
Browse files Browse the repository at this point in the history
  • Loading branch information
mightqxc committed Nov 23, 2023
1 parent 7879266 commit ced4c8e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pandaharvester/commit_timestamp.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
timestamp = "23-11-2023 12:54:20 on flin (by mightqxc)"
timestamp = "23-11-2023 14:47:01 on flin (by mightqxc)"
16 changes: 9 additions & 7 deletions pandaharvester/harvestercore/db_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3435,24 +3435,25 @@ def get_worker_stats(self, site_name):
for computingSite, jobType, resourceType, nNewWorkers in resQ:
retMap.setdefault(jobType, {})
if resourceType not in retMap[jobType]:
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": nNewWorkers}
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}

# get worker stats
sqlW = "SELECT wt.status, wt.computingSite, pq.jobType, pq.resourceType, COUNT(*) cnt "
sqlW += f"FROM {workTableName} wt, {pandaQueueTableName} pq "
sqlW += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status IN (:st1,:st2) "
sqlW += "WHERE pq.siteName=:siteName AND wt.computingSite=pq.queueName AND wt.status IN (:st1,:st2,:st3) "
sqlW += "GROUP BY wt.status, wt.computingSite, pq.jobType, pq.resourceType "
# get worker stats
varMap = dict()
varMap[":siteName"] = site_name
varMap[":st1"] = "running"
varMap[":st2"] = "submitted"
varMap[":st3"] = "finished"
self.execute(sqlW, varMap)
resW = self.cur.fetchall()
for workerStatus, computingSite, jobType, resourceType, cnt in resW:
retMap.setdefault(jobType, {})
if resourceType not in retMap:
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": 0}
retMap[jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}
retMap[jobType][resourceType][workerStatus] = cnt
# commit
self.commit()
Expand Down Expand Up @@ -3483,32 +3484,33 @@ def get_worker_stats_bulk(self, active_ups_queues):
retMap.setdefault(computingSite, {})
retMap[computingSite].setdefault(jobType, {})
if resourceType and resourceType != "ANY" and resourceType not in retMap[computingSite][jobType]:
retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "to_submit": nNewWorkers}
retMap[computingSite][jobType][resourceType] = {"running": 0, "submitted": 0, "finished": 0, "to_submit": nNewWorkers}

# get worker stats
sqlW = "SELECT wt.status, wt.computingSite, wt.jobType, wt.resourceType, COUNT(*) cnt "
sqlW += f"FROM {workTableName} wt "
sqlW += "WHERE wt.status IN (:st1,:st2) "
sqlW += "WHERE wt.status IN (:st1,:st2,:st3) "
sqlW += "GROUP BY wt.status,wt.computingSite, wt.jobType, wt.resourceType "
# get worker stats
varMap = dict()
varMap[":st1"] = "running"
varMap[":st2"] = "submitted"
varMap[":st3"] = "finished"
self.execute(sqlW, varMap)
resW = self.cur.fetchall()
for workerStatus, computingSite, jobType, resourceType, cnt in resW:
if resourceType and resourceType != "ANY":
retMap.setdefault(computingSite, {})
retMap[computingSite].setdefault(jobType, {})
retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "to_submit": 0})
retMap[computingSite][jobType].setdefault(resourceType, {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0})
retMap[computingSite][jobType][resourceType][workerStatus] = cnt

# if there are no jobs for an active UPS queue, it needs to be initialized so that the pilot streaming
# on panda server starts processing the queue
if active_ups_queues:
for ups_queue in active_ups_queues:
if ups_queue not in retMap or not retMap[ups_queue] or retMap[ups_queue] == {"ANY": {}}:
retMap[ups_queue] = {"managed": {"SCORE": {"running": 0, "submitted": 0, "to_submit": 0}}}
retMap[ups_queue] = {"managed": {"SCORE": {"running": 0, "submitted": 0, "finished": 0, "to_submit": 0}}}

# commit
self.commit()
Expand Down

0 comments on commit ced4c8e

Please sign in to comment.