Skip to content

Commit

Permalink
Feature/improved event logging (#2214)
Browse files Browse the repository at this point in the history
* log events generated by backend to event stream with enhanced info

* lint and mypy

* simplify log format and assure all dicts not converted to strings

* hard coding the event source doesn't make a lot of sense

* fix lint error

* not sure how these tests passed before

---------

Co-authored-by: jasquat <[email protected]>
Co-authored-by: Kevin Burnett <[email protected]>
  • Loading branch information
3 people authored Jan 9, 2025
1 parent 005de8a commit 092fe39
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def config_from_env(variable_name: str, *, default: str | bool | int | None = No
config_from_env("SPIFFWORKFLOW_BACKEND_LOG_TO_FILE", default=False)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_HOST", default=None)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_PORT", default=None)
config_from_env("SPIFFWORKFLOW_BACKEND_EVENT_STREAM_SOURCE", default="spiffworkflow.org")

### permissions
config_from_env("SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ def process_instance_create(

process_instance = _process_instance_create(process_model_identifier)

LoggingService.log_event(
ProcessInstanceEventType.process_instance_created.value,
process_model_identifier=process_model_identifier,
process_instance_id=process_instance.id,
)
log_extras = {
"milestone": "Started",
"process_model_identifier": process_model_identifier,
"process_instance_id": process_instance.id,
}
LoggingService.log_event(ProcessInstanceEventType.process_instance_created.value, log_extras)

return Response(
json.dumps(ProcessInstanceModelSchema().dump(process_instance)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,11 @@ def format(self, record: Any) -> str:
return json.dumps(
{
"version": "1.0",
"action": "add_event",
"event": {
"specversion": "1.0",
"type": record.name,
"id": str(uuid4()),
"source": "spiffworkflow.org",
"timestamp": datetime.utcnow().timestamp(),
"data": record._spiff_data,
},
"type": record.name,
"id": str(uuid4()),
"source": self.app.config["SPIFFWORKFLOW_BACKEND_EVENT_STREAM_SOURCE"],
"timestamp": datetime.utcnow().timestamp(),
"data": record._spiff_data,
}
)

Expand All @@ -65,40 +61,49 @@ def get_default_process_info(self) -> tuple[int | None, str | None]:
return None, None

def filter(self, record: Any) -> bool:
if record.name.startswith("spiff") and getattr(record, "event_type", "") not in ["task_completed", "task_cancelled"]:
if record.name.startswith("spiff"):
user_id, user_name = self.get_user_info()

data = {
"message": record.msg,
"userid": user_id,
"username": user_name,
"process_instance_id": getattr(record, "process_instance_id", None),
"process_model_identifier": getattr(record, "process_model_identifier", None),
}

process_instance_id, process_model_identifier = self.get_default_process_info()

if not hasattr(record, "process_instance_id"):
if data["process_instance_id"] is None:
data["process_instance_id"] = process_instance_id
if not hasattr(record, "process_model_identifier"):
if data["process_model_identifier"] is None:
data["process_model_identifier"] = process_model_identifier

task_properties_from_spiff = [
"worflow_spec",
"task_spec",
"task_id",
"task_type",
"state",
"last_state_change",
"elapsed",
"parent",
]
workflow_properties_from_spiff = ["completed", "success"]
properties_from_spiff = task_properties_from_spiff + workflow_properties_from_spiff
for attr in properties_from_spiff:
if record.name in "spiff.task":
properties = [
"workflow_spec",
"task_spec",
"task_id",
"task_type",
"state",
"last_state_change",
"elapsed",
"parent",
]
elif record.name == "spiff.workflow":
properties = ["workflow_spec", "completed", "success"]
elif record.name == "spiff.data":
properties = ["workflow_spec", "task_spec", "task_id", "task_type"]
elif record.name == "spiff.event":
properties = ["bpmn_name", "milestone", "task_id", "task_spec", "metadata", "error_info"]
else:
properties = []

for attr in properties:
if hasattr(record, attr):
data[attr] = str(getattr(record, attr))
else:
data[attr] = None
record._spiff_data = data
data[attr] = getattr(record, attr)
if not (data[attr] is None or isinstance(data[attr], dict)):
data[attr] = str(data[attr])
record._spiff_data = data

return True
else:
return False
Expand Down Expand Up @@ -296,25 +301,8 @@ def get_log_formatter(app: Flask) -> logging.Formatter:


class LoggingService:
_spiff_logger = logging.getLogger("spiff")
_spiff_logger = logging.getLogger("spiff.event")

@classmethod
def log_event(
cls,
event_type: str,
task_guid: str | None = None,
process_model_identifier: str | None = None,
process_instance_id: int | None = None,
) -> None:
extra: dict[str, Any] = {"event_type": event_type}

if task_guid is not None:
extra["task_guid"] = task_guid

if process_model_identifier is not None:
extra["process_model_Identifier"] = process_model_identifier

if process_instance_id is not None:
extra["process_instance_id"] = process_instance_id

cls._spiff_logger.info(event_type, extra=extra)
def log_event(cls, message: str, log_extras: dict | None = None) -> None:
cls._spiff_logger.info(message, extra=log_extras)
Original file line number Diff line number Diff line change
Expand Up @@ -993,43 +993,14 @@ def get_potential_owners_from_task(self, task: SpiffTask) -> PotentialOwnerIdLis
"lane_assignment_id": lane_assignment_id,
}

def extract_metadata(self) -> None:
# we are currently not getting the metadata extraction paths based on the version in git from the process instance.
# it would make sense to do that if the shell-out-to-git performance cost was not too high.
# we also discussed caching this information in new database tables. something like:
# process_model_version
# id
# process_model_identifier
# git_hash
# display_name
# notification_type
# metadata_extraction
# id
# extraction_key
# extraction_path
# metadata_extraction_process_model_version
# process_model_version_id
# metadata_extraction_id
process_model_info = ProcessModelService.get_process_model(self.process_instance_model.process_model_identifier)
metadata_extraction_paths = process_model_info.metadata_extraction_paths
if metadata_extraction_paths is None:
return
if len(metadata_extraction_paths) <= 0:
return

current_data = self.get_current_data()
for metadata_extraction_path in metadata_extraction_paths:
key = metadata_extraction_path["key"]
path = metadata_extraction_path["path"]
path_segments = path.split(".")
data_for_key = current_data
for path_segment in path_segments:
if path_segment in data_for_key:
data_for_key = data_for_key[path_segment]
else:
data_for_key = None # type: ignore
break
def extract_metadata(self) -> dict:
return ProcessModelService.extract_metadata(
self.process_instance_model.process_model_identifier,
self.get_current_data(),
)

def store_metadata(self, metadata: dict) -> None:
for key, data_for_key in metadata.items():
if data_for_key is not None:
pim = ProcessInstanceMetadataModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
Expand Down Expand Up @@ -1190,21 +1161,26 @@ def save(self) -> None:
if self.process_instance_model.start_in_seconds is None:
self.process_instance_model.start_in_seconds = round(time.time())

metadata = self.extract_metadata()
if self.process_instance_model.end_in_seconds is None:
if self.bpmn_process_instance.is_completed():
self.process_instance_model.end_in_seconds = round(time.time())
if self._workflow_completed_handler is not None:
self._workflow_completed_handler(self.process_instance_model)
LoggingService.log_event(
ProcessInstanceEventType.process_instance_completed.value,
)
log_extras = {
"milestone": "Completed",
"process_model_identifier": self.process_instance_model.process_model_identifier,
"process_instance_id": self.process_instance_model.id,
"metadata": metadata,
}
LoggingService.log_event(ProcessInstanceEventType.process_instance_completed.value, log_extras)

db.session.add(self.process_instance_model)

human_tasks = HumanTaskModel.query.filter_by(process_instance_id=self.process_instance_model.id, completed=False).all()
ready_or_waiting_tasks = self.get_all_ready_or_waiting_tasks()

self.extract_metadata()
self.store_metadata(metadata)
self.update_summary()

for ready_or_waiting_task in ready_or_waiting_tasks:
Expand Down Expand Up @@ -1839,8 +1815,18 @@ def complete_task(self, spiff_task: SpiffTask, human_task: HumanTaskModel, user:
task_guid=task_model.guid,
user_id=user.id,
exception=task_exception,
log_event=False,
)

log_extras = {
"task_id": str(spiff_task.id),
"task_spec": spiff_task.task_spec.name,
"bpmn_name": spiff_task.task_spec.bpmn_name,
"process_model_identifier": self.process_instance_model.process_model_identifier,
"process_instance_id": self.process_instance_model.id,
"metadata": self.extract_metadata(),
}
LoggingService.log_event(task_event, log_extras)
# children of a multi-instance task has the attribute "triggered" set to True
# so use that to determine if a spiff_task is apart of a multi-instance task
# and therefore we need to process its parent since the current task will not
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
import traceback
from typing import Any

from flask import g
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
Expand Down Expand Up @@ -32,6 +33,7 @@ def add_event_to_process_instance(
timestamp: float | None = None,
add_to_db_session: bool | None = True,
migration_details: ProcessInstanceMigrationDetailDict | None = None,
log_event: bool = True,
) -> tuple[ProcessInstanceEventModel, ProcessInstanceErrorDetailModel | None]:
if user_id is None and hasattr(g, "user") and g.user:
user_id = g.user.id
Expand All @@ -47,6 +49,8 @@ def add_event_to_process_instance(
if add_to_db_session:
db.session.add(process_instance_event)

log_extras: dict[str, Any] = {"task_id": task_guid}

process_instance_error_detail = None
if exception is not None:
# NOTE: I tried to move this to its own method but
Expand Down Expand Up @@ -82,10 +86,19 @@ def add_event_to_process_instance(
task_offset=task_offset,
)

log_extras["error_info"] = {
"trace": stacktrace,
"line_number": task_line_number,
"line_offset": task_offset,
"line_content": task_line_contents,
}

if add_to_db_session:
db.session.add(process_instance_error_detail)

LoggingService.log_event(event_type, task_guid)
if log_event:
# Some events need to be logged elsewhere so that all required info can be included
LoggingService.log_event(event_type, log_extras)

if migration_details is not None:
pi_detail = cls.add_process_instance_migration_detail(process_instance_event, migration_details)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,46 @@ def add_json_data_to_json_file(cls, process_model: ProcessModelInfo, file_name:
full_json_data = {**existing_json, **json_data}
cls.write_json_file(json_path, full_json_data)

@classmethod
def extract_metadata(cls, process_model_identifier: str, current_data: dict) -> dict[str, Any]:
# we are currently not getting the metadata extraction paths based on the version in git from the process instance.
# it would make sense to do that if the shell-out-to-git performance cost was not too high.
# we also discussed caching this information in new database tables. something like:
# process_model_version
# id
# process_model_identifier
# git_hash
# display_name
# notification_type
# metadata_extraction
# id
# extraction_key
# extraction_path
# metadata_extraction_process_model_version
# process_model_version_id
# metadata_extraction_id
process_model_info = cls.get_process_model(process_model_identifier)
metadata_extraction_paths = process_model_info.metadata_extraction_paths
if metadata_extraction_paths is None:
return {}
if len(metadata_extraction_paths) <= 0:
return {}

current_metadata = {}
for metadata_extraction_path in metadata_extraction_paths:
key = metadata_extraction_path["key"]
path = metadata_extraction_path["path"]
path_segments = path.split(".")
data_for_key = current_data
for path_segment in path_segments:
if path_segment in data_for_key:
data_for_key = data_for_key[path_segment]
else:
data_for_key = None # type: ignore
break
current_metadata[key] = data_for_key
return current_metadata

@classmethod
def save_process_model(cls, process_model: ProcessModelInfo) -> None:
process_model_path = os.path.abspath(os.path.join(FileSystemService.root_path(), process_model.id_for_file_path()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def update_task_model_with_spiff_task(
task_guid=task_model.guid,
timestamp=timestamp,
add_to_db_session=False,
log_event=False, # Log this in the execution service instead
)
self.process_instance_events[task_model.guid] = process_instance_event

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.jinja_service import JinjaService
from spiffworkflow_backend.services.logging_service import LoggingService
from spiffworkflow_backend.services.process_instance_lock_service import ProcessInstanceLockService
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.task_service import StartAndEndTimes
from spiffworkflow_backend.services.task_service import TaskService

Expand Down Expand Up @@ -323,21 +325,36 @@ def did_complete_task(self, spiff_task: SpiffTask) -> None:
# NOTE: used with process-all-tasks and process-children-of-last-task
task_model = self.task_service.update_task_model_with_spiff_task(spiff_task)
if self.current_task_start_in_seconds is None:
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend")
raise Exception("Could not find cached current_task_start_in_seconds. This should never have happened")
task_model.start_in_seconds = self.current_task_start_in_seconds
task_model.end_in_seconds = time.time()

metadata = ProcessModelService.extract_metadata(
self.process_instance.process_model_identifier,
spiff_task.data,
)
log_extras = {
"task_id": str(spiff_task.id),
"task_spec": spiff_task.task_spec.name,
"bpmn_name": spiff_task.task_spec.bpmn_name,
"process_model_identifier": self.process_instance.process_model_identifier,
"process_instance_id": self.process_instance.id,
"metadata": metadata,
}
if (
spiff_task.task_spec.__class__.__name__ in ["StartEvent", "EndEvent", "IntermediateThrowEvent"]
and spiff_task.task_spec.bpmn_name is not None
):
self.process_instance.last_milestone_bpmn_name = spiff_task.task_spec.bpmn_name
log_extras["milestone"] = spiff_task.task_spec.bpmn_name
elif spiff_task.workflow.parent_task_id is None:
# if parent_task_id is None then this should be the top level process
if spiff_task.task_spec.__class__.__name__ == "EndEvent":
self.process_instance.last_milestone_bpmn_name = "Completed"
elif spiff_task.task_spec.__class__.__name__ == "StartEvent":
self.process_instance.last_milestone_bpmn_name = "Started"

LoggingService.log_event(ProcessInstanceEventType.task_completed.value, log_extras)
self.process_instance.task_updated_at_in_seconds = round(time.time())
self._last_completed_spiff_task = spiff_task
if self.secondary_engine_step_delegate:
Expand Down

0 comments on commit 092fe39

Please sign in to comment.