Skip to content

Commit

Permalink
Merge pull request #699 from kartoza/timlinux/issue670
Browse files Browse the repository at this point in the history
Optimise workflow execution
  • Loading branch information
timlinux authored Dec 11, 2024
2 parents 7ae7883 + e302fc8 commit 89269a5
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 53 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,9 @@ geest.tar.gz
gitlog.txt
geest/resources/models/prepare_mask_area.model3
test/output

# Ignore files named 'core'
core

# Don't ignore directories named 'core'
!core/
2 changes: 1 addition & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"author": "Kartoza",
"email": "[email protected]",
"description": "Gender Enabling Environments Spatial Tool",
"version": "0.4.0",
"version": "0.4.1",
"changelog": "",
"server": false
}
Expand Down
27 changes: 24 additions & 3 deletions geest/core/json_tree_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,24 @@ def appendChild(self, item):
def child(self, row):
return self.childItems[row]

def childCount(self):
return len(self.childItems)
def childCount(self, recursive=False):
"""Count the number of children of this item.
If the recursive flag is set to True, count all descendants.
Args:
recursive (bool, optional): _description_. Defaults to False.
Returns:
_type_: _description_
"""
if not recursive:
return len(self.childItems)
else:
count = len(self.childItems)
for child in self.childItems:
count += child.childCount(recursive=True)
return count

def columnCount(self):
return len(self.itemData)
Expand Down Expand Up @@ -126,10 +142,15 @@ def isDimension(self):
def isAnalysis(self):
return self.role == "analysis"

def clear(self):
def clear(self, recursive=False):
"""
Mark the item as not run, keeping any configurations made
:param recursive: If True, clear all children as well
"""
if recursive:
for child in self.childItems:
child.clear(recursive)
data = self.attributes()
data["result"] = "Not Run"
data["result_file"] = ""
Expand Down
143 changes: 94 additions & 49 deletions geest/gui/panels/tree_panel.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import shutil
from logging import getLogger
from typing import Union, Dict, List
from qgis.PyQt.QtWidgets import (
QAction,
Expand Down Expand Up @@ -60,7 +61,8 @@ def __init__(self, parent=None, json_file=None):

# Initialize the QueueManager
self.working_directory = None
self.queue_manager = WorkflowQueueManager(pool_size=1)
pool_size = int(setting(key="render_thread_pool_size", default=1))
self.queue_manager = WorkflowQueueManager(pool_size=pool_size)
self.json_file = json_file
self.tree_view_visible = True
self.run_only_incomplete = (
Expand Down Expand Up @@ -221,6 +223,28 @@ def clear_workflows(self):
:param parent_item: The parent item to process. If none, start from the root.
"""
msg_box = QMessageBox(self)
msg_box.setIcon(QMessageBox.Warning)
msg_box.setWindowTitle("Clear Workflows")
msg_box.setText(
f"This action will DELETE all files and folders in the working directory ({self.working_directory}). Do you want to continue?"
)
msg_box.setStandardButtons(QMessageBox.Yes | QMessageBox.No)
open_folder_button = msg_box.addButton("Open Folder", QMessageBox.ActionRole)
msg_box.setDefaultButton(QMessageBox.No)

reply = msg_box.exec_()

if msg_box.clickedButton() == open_folder_button:
if self.working_directory:
if os.name == "nt":
os.startfile(self.working_directory)
elif os.name == "posix":
os.system(f'xdg-open "{self.working_directory}"')
return

if reply == QMessageBox.No:
return
self.run_only_incomplete = False
# Remove every file in self.working_directory except
# mode.json and the study_area folder
Expand All @@ -243,17 +267,10 @@ def clear_workflows(self):
if child.name() == "Geest":
root.removeChildNode(child)
# Mark all items in the data model as not run
self.clear_all_items()
self.save_json_to_working_directory()
item = self.model.rootItem
item.clear(recursive=True) # sets status to not run and blanks file path

def clear_all_items(self, parent_item=None):
if parent_item is None:
parent_item = self.model.rootItem
for i in range(parent_item.childCount()):
child_item = parent_item.child(i)
self.clear_all_items(child_item)
child_item.clear() # sets status to not run and blanks file path
parent_item.clear() # sets status to not run and blanks file path
self.save_json_to_working_directory()

@pyqtSlot(str)
def working_directory_changed(self, new_directory):
Expand Down Expand Up @@ -413,6 +430,7 @@ def open_context_menu(self, position: QPoint):

run_item_action = QAction("Run Item Workflow", self)

# If shift is pressed, change the text to "Rerun Item Workflow"
def update_action_text():
text = (
"Rerun Item Workflow"
Expand Down Expand Up @@ -453,6 +471,16 @@ def update_action_text():
add_study_area_layers_action.triggered.connect(self.add_study_area_to_map)
menu.addAction(add_study_area_layers_action)

open_log_file_action = QAction("Open Log File", self)
open_log_file_action.triggered.connect(self.open_log_file)
menu.addAction(open_log_file_action)
open_log_file_action.triggered.connect(self.open_log_file)
menu.addAction(open_log_file_action)

open_working_directory_action = QAction("Open Working Directory", self)
open_working_directory_action.triggered.connect(self.open_working_directory)
menu.addAction(open_working_directory_action)

# Check the role of the item directly from the stored role
if item.role == "dimension":
# Context menu for dimensions
Expand Down Expand Up @@ -528,6 +556,33 @@ def update_action_text():
# Show the menu at the cursor's position
menu.exec_(self.treeView.viewport().mapToGlobal(position))

def open_working_directory(self):
"""Open the working directory in the file explorer."""
if self.working_directory:
if os.name == "nt":
os.startfile(self.working_directory)
elif os.name == "posix":
os.system(f'xdg-open "{self.working_directory}"')
else:
QMessageBox.warning(
self, "No Working Directory", "The working directory is not set."
)

def open_log_file(self):
"""Open the log file in the default text editor."""
logger = getLogger()
log_file_path = logger.handlers[0].baseFilename

if os.path.exists(log_file_path):
if os.name == "nt":
os.startfile(log_file_path)
elif os.name == "posix":
os.system(f'xdg-open "{log_file_path}"')
else:
QMessageBox.warning(
self, "Log File Not Found", "The log file does not exist."
)

def disable_item(self, item):
"""Disable the item and its children."""
item.disable()
Expand Down Expand Up @@ -924,35 +979,36 @@ def edit_factor_aggregation(self, factor_item):
dialog.save_weightings_to_model()
self.save_json_to_working_directory() # Save changes to the JSON if necessary

def start_workflows(self, type=None):
def start_workflows(self, workflow_type=None):
"""Start a workflow for each 'layer' node in the tree.
We process in the order of layers, factors, and dimensions since there
is a dependency between them. For example, a factor depends on its layers.
"""
if type == "indicators":
self._start_workflows(self.treeView.model().rootItem, role="indicator")
elif type == "factors":
self._start_workflows(self.treeView.model().rootItem, role="factor")
elif type == "dimensions":
self._start_workflows(self.treeView.model().rootItem, role="dimension")
elif type == "analysis":
self._start_workflows(self.treeView.model().rootItem, role="analysis")

def _start_workflows(self, parent_item, role=None):
"""
Recursively start workflows for each node in the tree.
Connect workflow signals to the corresponding slots for updates.
:param parent_item: The parent item to process.
:param role: The role of the item to process (i.e., 'dimension', 'factor', 'layer').
"""
for i in range(parent_item.childCount()):
child_item = parent_item.child(i)
if child_item.getStatus() == "Excluded from analysis":
continue
self.queue_workflow_task(child_item, role)
# Recursively process children (dimensions, factors)
self._start_workflows(child_item, role)
log_message("############################################")
log_message(f"Starting {workflow_type} workflows")
log_message("############################################")
if workflow_type == "indicators":
for item in self.model.rootItem.getDescendantIndicators(
ignore_completed=self.run_only_incomplete, ignore_disabled=True
):
self.queue_workflow_task(item, item.role)
elif workflow_type == "factors":
for item in self.model.rootItem.getDescendantFactors(
ignore_completed=self.run_only_incomplete, ignore_disabled=True
):
self.queue_workflow_task(item, item.role)
elif workflow_type == "dimensions":
for item in self.model.rootItem.getDescendantDimensions(
ignore_completed=self.run_only_incomplete, ignore_disabled=True
):
self.queue_workflow_task(item, item.role)
elif workflow_type == "analysis":
item = self.model.get_analysis_item()
log_message("############################################")
log_message(f"Starting analysis workflow for {item.data(0)}")
log_message("############################################")
self.queue_workflow_task(item, item.role)

def _count_workflows_to_run(self, parent_item=None):
"""
Expand All @@ -963,17 +1019,8 @@ def _count_workflows_to_run(self, parent_item=None):
if parent_item is None:
parent_item = self.model.rootItem

for i in range(parent_item.childCount()):
child_item = parent_item.child(i)
self._count_workflows_to_run(child_item)
is_complete = child_item.getStatus() == "Workflow Completed"
is_disabled = child_item.getStatus() == "Excluded from analysis"
if is_disabled:
continue
elif not is_complete:
self.items_to_run += 1
elif is_complete and not self.run_only_incomplete:
self.items_to_run += 1
count = parent_item.childCount(recursive=True)
self.items_to_run = count

def queue_workflow_task(self, item, role):
"""Queue a workflow task based on the role of the item.
Expand Down Expand Up @@ -1195,7 +1242,6 @@ def run_all(self):
"""Run all workflows in the tree, regardless of their status."""
self.run_only_incomplete = False
self.clear_workflows()
self.items_to_run = 0
self._count_workflows_to_run()
log_message(f"Total items to process: {self.items_to_run}")
self._queue_workflows()
Expand All @@ -1207,7 +1253,6 @@ def run_incomplete(self):
processes each one whilst showing an animated icon.
"""
self.run_only_incomplete = True
self.items_to_run = 0
self._count_workflows_to_run()
self._queue_workflows()

Expand Down Expand Up @@ -1239,7 +1284,7 @@ def run_next_workflow_queue(self):
return
# pop the first item from the queue
next_workflow = self.workflow_queue.pop(0)
self.start_workflows(type=next_workflow)
self.start_workflows(workflow_type=next_workflow)
debug_env = int(os.getenv("GEEST_DEBUG", 0))
if debug_env:
self.queue_manager.start_processing_in_foreground()
Expand Down

0 comments on commit 89269a5

Please sign in to comment.