diff --git a/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs b/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs index 956e8b656..b3ecb4378 100644 --- a/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs +++ b/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs @@ -183,7 +183,7 @@ public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId, } else { - item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList(); + item.Artifacts = item.Artifacts.Union(existing.Artifacts).ToList(); var update = Builders.Update.Set(a => a.Artifacts, item.Artifacts); await _artifactReceivedItemsCollection .UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update) diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index f7fc03b5c..99d73aea5 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -251,7 +251,7 @@ await _artifactsRepository return true; } - private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId) + private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject taskTemplate, string taskId) { var artifactList = message.Artifacts.Select(a => $"{a.Path}").ToList(); var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary(); @@ -263,22 +263,40 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{m.Path}").Value).ToList(); + var addedNew = false; var validArtifacts = new Dictionary(); foreach (var artifact in messageArtifactsInStorage) { - var match = task.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type); + var match = taskTemplate.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type); if (match is not null && validArtifacts.ContainsKey(match!.Name) is false) { validArtifacts.Add(match.Name, $"{artifact.Path}"); + } } var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId); - currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes - _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); - await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + foreach (var artifact in validArtifacts) + { + if (currentTask?.OutputArtifacts.ContainsKey(artifact.Key) is false) + { + // adding the actual paths here, the parent function does the saving of the changes + currentTask?.OutputArtifacts.Add(artifact.Key, artifact.Value); + addedNew = true; + } + } + + //if (addedNew) + //{ + // _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts)); + // await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts); + //} + if (currentTask is not null && addedNew) + { + await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask); + } } private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, @@ -612,7 +630,12 @@ private async Task ExternalAppRequest(ExternalAppRequestEvent externalAppR return true; } - private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List? plugins = null) + private async Task HandleDicomExportAsync( + WorkflowRevision workflow, + WorkflowInstance workflowInstance, + TaskExecution task, + string correlationId, + List? plugins = null) { plugins ??= new List(); var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); @@ -629,7 +652,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched); } - private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) + private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts( + WorkflowRevision workflow, + WorkflowInstance workflowInstance, + TaskExecution task, + string correlationId, + bool enforceDcmOnly = true) { var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray(); if (exportList is null || !exportList.Any()) @@ -637,7 +665,7 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns exportList = null; } - var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId); + var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId, enforceDcmOnly); if (artifactValues.IsNullOrEmpty()) { @@ -646,7 +674,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns return (exportList, artifactValues); } - private async Task GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId) + private async Task GetArtifactValues( + WorkflowRevision workflow, WorkflowInstance workflowInstance, + TaskExecution task, + string[]? exportList, + string correlationId, + bool enforceDcmOnly = true) { var artifactValues = GetDicomExports(workflow, task, exportList); @@ -660,7 +693,7 @@ private async Task GetArtifactValues(WorkflowRevision workflow, Workfl artifact, true); - var dcmFiles = objects?.Where(o => o.IsValidDicomFile())?.ToList(); + var dcmFiles = objects?.Where(o => o.IsValidDicomFile() || enforceDcmOnly is false)?.ToList(); if (dcmFiles?.IsNullOrEmpty() is false) { @@ -681,7 +714,7 @@ private async Task GetArtifactValues(WorkflowRevision workflow, Workfl private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId) { - var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId); + var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId, false); if (exportList is null || artifactValues is null) { diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index c849fb622..58fdc5479 100755 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -3184,44 +3184,7 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue() Assert.True(result); } - [Fact] - public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync() - { - var artifactPath = "some path here"; - //incoming artifacts - var message = new ArtifactsReceivedEvent - { - WorkflowInstanceId = "123", TaskId = "456", - Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } } - }; - var workflowInstance = new WorkflowInstance - { - WorkflowId = "789", Tasks = new List() - { new TaskExecution() { TaskId = "456" } } - }; - _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! - .ReturnsAsync(workflowInstance); - //expected artifacts - var templateArtifacts = new OutputArtifact[] - { - new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"}, - }; - var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } }; - var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } }; - _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! - .ReturnsAsync(workflowTemplate); - - _storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny(), It.IsAny>(), It.IsAny())) - .ReturnsAsync(new Dictionary { { $"{message.PayloadId}/{artifactPath}", true } }); - //previously received artifacts - _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) - .ReturnsAsync((List?)null); - - var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); - - _workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny(), It.IsAny(), It.IsAny>()), Times.Once()); - } [Fact] public async Task ProcessPayload_WithExportTask_NoExportsFails() {