Skip to content

Commit

Permalink
Leverage first_task in get_ready_engine_steps (#1858)
Browse files Browse the repository at this point in the history
* WIP

* Getting pyl to pass

* WIP
  • Loading branch information
jbirddog authored Jul 2, 2024
1 parent 3f52908 commit 09e2c3b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,7 @@ def _do_engine_steps(
exit_at,
save,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
# profile=True,
)
self.task_model_mapping, self.bpmn_subprocess_mapping = task_model_delegate.get_guid_to_db_object_mappings()
self.check_all_tasks()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.util.task import TaskState # type: ignore
from SpiffWorkflow.util.task import TaskFilter # type: ignore
from SpiffWorkflow.util.task import TaskState

from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_future_task_if_appropriate,
Expand Down Expand Up @@ -95,6 +96,10 @@ def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass

@abstractmethod
def last_completed_spiff_task(self) -> SpiffTask | None:
pass


class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""
Expand Down Expand Up @@ -195,7 +200,23 @@ def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.add_object_to_db_session(bpmn_process_instance)

def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
return [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual]
task_filter = TaskFilter(state=TaskState.READY, manual=False)

steps = list(
bpmn_process_instance.get_tasks(
first_task=self.delegate.last_completed_spiff_task(),
task_filter=task_filter,
)
)

if not steps:
steps = list(
bpmn_process_instance.get_tasks(
task_filter=task_filter,
)
)

return steps

def _run_engine_steps_with_threads(
self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None
Expand Down Expand Up @@ -261,7 +282,7 @@ def __init__(

self.current_task_start_in_seconds: float | None = None

self.last_completed_spiff_task: SpiffTask | None = None
self._last_completed_spiff_task: SpiffTask | None = None
self.spiff_tasks_to_process: set[UUID] = set()
self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {}

Expand Down Expand Up @@ -293,7 +314,6 @@ def did_complete_task(self, spiff_task: SpiffTask) -> None:
task_model.start_in_seconds = self.current_task_start_in_seconds
task_model.end_in_seconds = time.time()

self.last_completed_spiff_task = spiff_task
if (
spiff_task.task_spec.__class__.__name__ in ["StartEvent", "EndEvent", "IntermediateThrowEvent"]
and spiff_task.task_spec.bpmn_name is not None
Expand All @@ -306,6 +326,7 @@ def did_complete_task(self, spiff_task: SpiffTask) -> None:
elif spiff_task.task_spec.__class__.__name__ == "StartEvent":
self.process_instance.last_milestone_bpmn_name = "Started"
self.process_instance.task_updated_at_in_seconds = round(time.time())
self._last_completed_spiff_task = spiff_task
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)

Expand Down Expand Up @@ -347,6 +368,9 @@ def _should_update_task_model(self) -> bool:
"""No reason to save task model stuff if the process instance isn't persistent."""
return self.process_instance.persistence_level != "none"

def last_completed_spiff_task(self) -> SpiffTask | None:
return self._last_completed_spiff_task


class GreedyExecutionStrategy(ExecutionStrategy):
"""
Expand Down Expand Up @@ -471,6 +495,25 @@ def run_and_save(
exit_at: None = None,
save: bool = False,
should_schedule_waiting_timer_events: bool = True,
profile: bool = False,
) -> TaskRunnability:
if profile:
import cProfile
from pstats import SortKey

task_runnability = TaskRunnability.unknown_if_ready_tasks
with cProfile.Profile() as pr:
task_runnability = self._run_and_save(exit_at, save, should_schedule_waiting_timer_events)
pr.print_stats(sort=SortKey.CUMULATIVE)
return task_runnability

return self._run_and_save(exit_at, save, should_schedule_waiting_timer_events)

def _run_and_save(
self,
exit_at: None = None,
save: bool = False,
should_schedule_waiting_timer_events: bool = True,
) -> TaskRunnability:
if self.process_instance_model.persistence_level != "none":
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
Expand Down Expand Up @@ -610,24 +653,3 @@ def queue_waiting_receive_messages(self) -> None:
bpmn_process_correlations = self.bpmn_process_instance.correlations
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
db.session.add(bpmn_process)


class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service."""

def run_and_save(
self,
exit_at: None = None,
save: bool = False,
should_schedule_waiting_timer_events: bool = True,
) -> TaskRunnability:
import cProfile
from pstats import SortKey

task_runnability = TaskRunnability.unknown_if_ready_tasks
with cProfile.Profile() as pr:
task_runnability = super().run_and_save(
exit_at=exit_at, save=save, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events
)
pr.print_stats(sort=SortKey.CUMULATIVE)
return task_runnability

0 comments on commit 09e2c3b

Please sign in to comment.