Skip to content

Commit

Permalink
fix artifact recieved for HL7
Browse files Browse the repository at this point in the history
Signed-off-by: Neil South <[email protected]>
  • Loading branch information
neildsouth committed Dec 21, 2023
1 parent 460e86a commit 1341116
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
}
else
{
item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList();
item.Artifacts = item.Artifacts.Union(existing.Artifacts).ToList();
var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts);
await _artifactReceivedItemsCollection
.UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ await _artifactsRepository
return true;
}

private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject task, string taskId)
private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, TaskObject taskTemplate, string taskId)
{
var artifactList = message.Artifacts.Select(a => $"{a.Path}").ToList();
var artifactsInStorage = (await _storageService.VerifyObjectsExistAsync(workflowInstance.BucketId, artifactList, default)) ?? new Dictionary<string, bool>();
Expand All @@ -263,22 +263,40 @@ private async Task ProcessArtifactReceivedOutputs(ArtifactsReceivedEvent message

var messageArtifactsInStorage = message.Artifacts.Where(m => artifactsInStorage.First(a => a.Value && a.Key == $"{m.Path}").Value).ToList();

var addedNew = false;
var validArtifacts = new Dictionary<string, string>();
foreach (var artifact in messageArtifactsInStorage)
{
var match = task.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
var match = taskTemplate.Artifacts.Output.FirstOrDefault(t => t.Type == artifact.Type);
if (match is not null && validArtifacts.ContainsKey(match!.Name) is false)
{
validArtifacts.Add(match.Name, $"{artifact.Path}");

}
}

var currentTask = workflowInstance.Tasks?.Find(t => t.TaskId == taskId);

currentTask!.OutputArtifacts = validArtifacts; // adding the actual paths here, the parent function does the saving of the changes

_logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
foreach (var artifact in validArtifacts)
{
if (currentTask?.OutputArtifacts.ContainsKey(artifact.Key) is false)
{
// adding the actual paths here, the parent function does the saving of the changes
currentTask?.OutputArtifacts.Add(artifact.Key, artifact.Value);
addedNew = true;
}
}

//if (addedNew)
//{
// _logger.AddingFilesToWorkflowInstance(workflowInstance.Id, taskId, JsonConvert.SerializeObject(validArtifacts));
// await _workflowInstanceRepository.UpdateTaskOutputArtifactsAsync(workflowInstance.Id, taskId, validArtifacts);
//}
if (currentTask is not null && addedNew)
{
await _workflowInstanceRepository.UpdateTaskAsync(workflowInstance.Id, taskId, currentTask);
}
}

private async Task<bool> AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance,
Expand Down Expand Up @@ -612,7 +630,12 @@ private async Task<bool> ExternalAppRequest(ExternalAppRequestEvent externalAppR
return true;
}

private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId, List<string>? plugins = null)
private async Task HandleDicomExportAsync(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
List<string>? plugins = null)
{
plugins ??= new List<string>();
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
Expand All @@ -629,15 +652,20 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstance.Id, task.TaskId, TaskExecutionStatus.Dispatched);
}

private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
private async Task<(string[]? exportList, string[]? artifactValues)> GetExportsAndArtifcts(
WorkflowRevision workflow,
WorkflowInstance workflowInstance,
TaskExecution task,
string correlationId,
bool enforceDcmOnly = true)
{
var exportList = workflow.Workflow?.Tasks?.FirstOrDefault(t => t.Id == task.TaskId)?.ExportDestinations.Select(e => e.Name).ToArray();
if (exportList is null || !exportList.Any())
{
exportList = null;
}

var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId);
var artifactValues = await GetArtifactValues(workflow, workflowInstance, task, exportList, correlationId, enforceDcmOnly);

if (artifactValues.IsNullOrEmpty())
{
Expand All @@ -646,7 +674,12 @@ private async Task HandleDicomExportAsync(WorkflowRevision workflow, WorkflowIns
return (exportList, artifactValues);
}

private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string[]? exportList, string correlationId)
private async Task<string[]> GetArtifactValues(
WorkflowRevision workflow, WorkflowInstance workflowInstance,
TaskExecution task,
string[]? exportList,
string correlationId,
bool enforceDcmOnly = true)
{
var artifactValues = GetDicomExports(workflow, task, exportList);

Expand All @@ -660,7 +693,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl
artifact,
true);

var dcmFiles = objects?.Where(o => o.IsValidDicomFile())?.ToList();
var dcmFiles = objects?.Where(o => o.IsValidDicomFile() || enforceDcmOnly is false)?.ToList();

if (dcmFiles?.IsNullOrEmpty() is false)
{
Expand All @@ -681,7 +714,7 @@ private async Task<string[]> GetArtifactValues(WorkflowRevision workflow, Workfl

private async Task HandleHl7ExportAsync(WorkflowRevision workflow, WorkflowInstance workflowInstance, TaskExecution task, string correlationId)
{
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId);
var (exportList, artifactValues) = await GetExportsAndArtifcts(workflow, workflowInstance, task, correlationId, false);

if (exportList is null || artifactValues is null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3184,44 +3184,7 @@ public async Task ArtifactReceveid_Valid_ReturnesTrue()

Assert.True(result);
}
[Fact]
public async Task ProcessArtifactReceived_Calls_WorkflowInstanceRepository_UpdateTaskOutputArtifactsAsync()
{
var artifactPath = "some path here";
//incoming artifacts
var message = new ArtifactsReceivedEvent
{
WorkflowInstanceId = "123", TaskId = "456",
Artifacts = new List<Messaging.Common.Artifact>() { new Messaging.Common.Artifact() { Type = ArtifactType.CT, Path = $"{new Guid()}/{artifactPath}" } }
};
var workflowInstance = new WorkflowInstance
{
WorkflowId = "789", Tasks = new List<TaskExecution>()
{ new TaskExecution() { TaskId = "456" } }
};
_workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))!
.ReturnsAsync(workflowInstance);
//expected artifacts
var templateArtifacts = new OutputArtifact[]
{
new OutputArtifact() { Type = ArtifactType.CT , Name = "CT scan"},
};
var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } };
var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new[] { taskTemplate } } };
_workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))!
.ReturnsAsync(workflowTemplate);

_storageService.Setup(s => s.VerifyObjectsExistAsync(It.IsAny<string>(), It.IsAny<List<string>>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Dictionary<string, bool> { { $"{message.PayloadId}/{artifactPath}", true } });

//previously received artifacts
_artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id))
.ReturnsAsync((List<ArtifactReceivedItems>?)null);

var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message);

_workflowInstanceRepository.Verify(w => w.UpdateTaskOutputArtifactsAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Dictionary<string, string>>()), Times.Once());
}
[Fact]
public async Task ProcessPayload_WithExportTask_NoExportsFails()
{
Expand Down

0 comments on commit 1341116

Please sign in to comment.