Skip to content

Commit

Permalink
In prgress implementation for workflow manager - see #286
Browse files Browse the repository at this point in the history
  • Loading branch information
timlinux committed Sep 20, 2024
1 parent 0cd30eb commit 8e1b8f7
Show file tree
Hide file tree
Showing 10 changed files with 479 additions and 1 deletion.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ nix-result/
.direnv/

data
core
app.py
/geest.zip
.~lock.*
Expand Down
76 changes: 76 additions & 0 deletions geest/core/generate_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python

import json
import os


def infer_schema(data):
"""Infers the JSON schema from the given JSON data."""
if isinstance(data, dict):
properties = {}
for key, value in data.items():
properties[key] = infer_schema(value)
return {
"type": "object",
"properties": properties,
"required": list(data.keys()), # Mark all keys as required
}
elif isinstance(data, list):
if len(data) > 0:
# Assume the schema of the first element for list items
return {"type": "array", "items": infer_schema(data[0])}
else:
return {"type": "array", "items": {}}
elif isinstance(data, str):
return {"type": "string"}
elif isinstance(data, int):
return {"type": "integer"}
elif isinstance(data, float):
return {"type": "number"}
elif isinstance(data, bool):
return {"type": "boolean"}
elif data is None:
return {"type": "null"}
else:
return {"type": "string"}


def generate_schema_from_json(json_file, schema_file):
"""Generates a schema from a JSON file and writes it to a schema file."""
# Load the JSON file
with open(json_file, "r") as f:
data = json.load(f)

# Infer the schema
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {"dimensions": infer_schema(data["dimensions"])},
"required": ["dimensions"],
}

# Save the schema to the schema file
with open(schema_file, "w") as f:
json.dump(schema, f, indent=4)

print(f"Schema has been generated and saved to {schema_file}")


# Main function to generate the schema
def main():
# Set default paths
cwd = os.getcwd()
model_json_path = os.path.join(cwd, "geest", "resources", "model.json")
schema_json_path = os.path.join(cwd, "geest", "resources", "schema.json")

# Check if model.json exists
if not os.path.exists(model_json_path):
print(f"Error: {model_json_path} not found.")
return

# Generate schema from model.json
generate_schema_from_json(model_json_path, schema_json_path)


if __name__ == "__main__":
main()
25 changes: 25 additions & 0 deletions geest/core/workflow_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from .workflows import RasterLayerWorkflow, DontUseWorkflow
from qgis.core import QgsFeedback


class WorkflowFactory:
"""
A factory class that creates workflow objects based on the attributes.
The workflows accept a QgsFeedback object to report progress and handle cancellation.
"""

def create_workflow(self, attributes, feedback: QgsFeedback):
"""
Determines the workflow to return based on 'Analysis Mode' in the attributes.
Passes the feedback object to the workflow for progress reporting.
"""
analysis_mode = attributes.get("Analysis Mode")

if analysis_mode == "Spatial Analysis":
return RasterLayerWorkflow(attributes, feedback)
elif analysis_mode == "Don’t Use":
return DontUseWorkflow(attributes, feedback)
elif analysis_mode == "Temporal Analysis":
return RasterLayerWorkflow(attributes, feedback)
else:
raise ValueError(f"Unknown Analysis Mode: {analysis_mode}")
64 changes: 64 additions & 0 deletions geest/core/workflow_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from qgis.core import QgsTask, QgsMessageLog, QgsFeedback, Qgis
from .workflow_factory import WorkflowFactory


class WorkflowJob(QgsTask):
"""
Represents an individual workflow task. Uses QgsFeedback for progress reporting
and cancellation, and the WorkflowFactory to create the appropriate workflow.
"""
# Custom signal to emit when the job is finished
job_finished = pyqtSignal(bool, dict)

def __init__(self, description: str, attributes: dict):
"""
Initialize the workflow job.
:param description: Task description
:param attributes: A dictionary of task attributes
"""
super().__init__(description)
self._attributes = attributes
self._feedback = QgsFeedback() # Feedback object for progress and cancellation
workflow_factory = WorkflowFactory()
self._workflow = workflow_factory.create_workflow(attributes, self._feedback) # Create the workflow

def run(self) -> bool:
"""
Executes the workflow created by the WorkflowFactory. Uses the QgsFeedback
object for progress reporting and cancellation.
:return: True if the task was successful, False otherwise
"""
if not self._workflow:
QgsMessageLog.logMessage(f"Error: No workflow assigned to {self.description()}", "Custom Workflows", Qgis.Critical)
return False

try:
QgsMessageLog.logMessage(f"Running workflow: {self.description()}", "Custom Workflows", Qgis.Info)

result = self._workflow.execute()

if result:
QgsMessageLog.logMessage(f"Workflow {self.description()} completed.", "Custom Workflows", Qgis.Info)
return True
else:
QgsMessageLog.logMessage(f"Workflow {self.description()} did not complete successfully.", "Custom Workflows", Qgis.Warning)
return False

except Exception as e:
QgsMessageLog.logMessage(f"Error during task execution: {e}", "Custom Workflows", Qgis.Critical)
return False

def feedback(self) -> QgsFeedback:
"""
Returns the feedback object, allowing external systems to monitor progress and cancellation.
:return: QgsFeedback object
"""
return self._feedback

def finished(self, success: bool) -> None:
"""
Override the finished method to emit a custom signal when the task is finished.
:param success: True if the task was completed successfully, False otherwise
"""
# Emit the custom signal job_finished with the success state and the updated attributes
self.job_finished.emit(success, self._attributes)
131 changes: 131 additions & 0 deletions geest/core/workflow_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from qgis.core import QgsMessageLog, Qgis, QgsApplication
from PyQt5.QtCore import QObject, pyqtSignal
from typing import List, Optional


class WorkflowQueue(QObject):
"""
A queue of workflow jobs. Handles submission of the jobs as background tasks
using a pool of available threads.
"""

# Signals
status_changed = pyqtSignal()
processing_completed = pyqtSignal(bool)
status_message = pyqtSignal(str)

def __init__(self, pool_size: int, parent=None):
super().__init__(parent=parent)
# The maximum number of concurrent threads to allow
self.thread_pool_size = pool_size
# A list of tasks that need to be executed but
# cannot be because the job queue is full.
self.job_queue: List[WorkflowJob] = []
self.active_tasks = {}

# Overall queue statistics
self.total_queue_size = 0
self.total_completed = 0

def active_queue_size(self) -> int:
"""
Returns the number of currently active tasks
"""
return len(self.active_tasks)

def reset(self):
"""
Resets the queue
"""
self.job_queue.clear()
self.active_tasks.clear()
self.total_queue_size = 0
self.total_completed = 0
self.update_status()

def cancel_processing(self):
"""
Cancels any in-progress operation
"""
self.job_queue.clear()
self.total_queue_size = 0
self.total_completed = 0

for _, task in self.active_tasks.items():
task.cancel()

self.status_message.emit("Cancelling...")
self.update_status()

def update_status(self):
"""
Called whenever the status of the queue has changed and listeners should be notified accordingly
"""
self.status_changed.emit()

def start_processing(self):
"""
Starts processing the queue
"""
self.process_queue()

def process_queue(self):
"""
Feed the QgsTaskManager with the next task in the queue
"""
if not self.job_queue and not self.active_tasks:
# All tasks are done
self.update_status()
self.processing_completed.emit(True)
return

if not self.job_queue:
# No more jobs to add, but some jobs are still running
self.update_status()
return

# Determine how many threads are free to take new jobs
free_threads = self.thread_pool_size - self.active_queue_size()
for _ in range(free_threads):
if not self.job_queue:
break
job = self.job_queue.pop(0)

self.status_message.emit(f"Starting workflow task: {job.description()}")

self.active_tasks[job.description()] = job

job.job_finished.connect(
partial(self.task_completed, job_name=job.description())
)
job.taskTerminated.connect(
partial(self.finalize_task, job_name=job.description())
)

QgsApplication.taskManager().addTask(task)

self.update_status()

def task_completed(self, job_name: str):
"""
Called whenever an active task is successfully completed
"""
self.finalize_task(job_name)

def finalize_task(self, job_name: str):
"""
Finalizes a task -- called for both successful and non-successful tasks
"""
if job_name in self.active_tasks:
del self.active_tasks[job_name]
self.total_completed += 1

self.status_changed.emit()
self.process_queue()

def add_job(self, job):
"""
Adds a job to the queue
"""
self.job_queue.append(job)
self.total_queue_size += 1
83 changes: 83 additions & 0 deletions geest/core/workflow_queue_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from PyQt5.QtCore import QObject
from qgis.core import QgsMessageLog, Qgis
from .workflow_queue import WorkflowQueue
from .workflow_job import WorkflowJob


class WorkflowQueueManager(QObject):
"""
Manages the overall workflow queue system. Delegates task management
to the WorkflowQueue, which handles concurrent task execution.
"""

def __init__(self, pool_size: int, parent=None):
"""
Initialize the WorkflowQueueManager with a thread pool size and a workflow factory.
:param pool_size: Maximum number of concurrent tasks
:param parent: Optional parent QObject
"""
super().__init__(parent=parent)
self.workflow_queue = WorkflowQueue(pool_size)

# Connect signals to manage task updates
self.workflow_queue.status_changed.connect(self.update_status)
self.workflow_queue.processing_completed.connect(self.on_processing_completed)
self.workflow_queue.status_message.connect(self.log_status_message)

def add_task(self, attributes: dict) -> None:
"""
Add a task to the WorkflowQueue for processing using the attributes provided.
Internally uses the WorkflowFactory to create the appropriate workflow.
:param attributes: A dictionary of task attributes
"""
task = WorkflowJob(
description="Workflow Task",
attributes=attributes
)
self.workflow_queue.add_job(task)
QgsMessageLog.logMessage(
f"Task added: {task.description()}", "Workflow Manager", Qgis.Info
)

def start_processing(self) -> None:
"""Start processing the tasks in the WorkflowQueue."""
QgsMessageLog.logMessage(
"Starting workflow queue processing...", "Workflow Manager", Qgis.Info
)
self.workflow_queue.start_processing()

def cancel_processing(self) -> None:
"""Cancels all tasks in the WorkflowQueue."""
QgsMessageLog.logMessage(
"Cancelling workflow queue...", "Workflow Manager", Qgis.Warning
)
self.workflow_queue.cancel_processing()

def update_status(self) -> None:
"""Update the status of the workflow queue (for UI updates, etc.)."""
QgsMessageLog.logMessage(
"Workflow queue status updated.", "Workflow Manager", Qgis.Info
)

def on_processing_completed(self, success: bool) -> None:
"""
Handle when all tasks in the queue have completed.
:param success: Indicates whether all tasks completed successfully
"""
if success:
QgsMessageLog.logMessage(
"All workflow tasks completed successfully.",
"Workflow Manager",
Qgis.Success,
)
else:
QgsMessageLog.logMessage(
"Workflow processing was canceled.", "Workflow Manager", Qgis.Warning
)

def log_status_message(self, message: str) -> None:
"""
Logs status messages from the WorkflowQueue.
:param message: Status message to log
"""
QgsMessageLog.logMessage(message, "Workflow Manager", Qgis.Info)
2 changes: 2 additions & 0 deletions geest/core/workflows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .raster_layer_workflow import RasterLayerWorkflow
from .dont_use_workflow import DontUseWorkflow
Loading

0 comments on commit 8e1b8f7

Please sign in to comment.