diff --git a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs index 96e7e0c2e..886497f27 100644 --- a/src/Common/Configuration/MessageBrokerConfigurationKeys.cs +++ b/src/Common/Configuration/MessageBrokerConfigurationKeys.cs @@ -88,5 +88,8 @@ public class MessageBrokerConfigurationKeys [ConfigurationKeyName("notificationEmailCancelation")] public string NotificationEmailCancelation { get; set; } = "aide.notification_email.cancellation"; + + [ConfigurationKeyName("artifactrecieved")] + public string ArtifactRecieved { get; set; } = "md.workflow.artifactrecieved"; } } diff --git a/src/Monai.Deploy.WorkflowManager.sln.DotSettings b/src/Monai.Deploy.WorkflowManager.sln.DotSettings index 850fb5e55..7878bfca9 100644 --- a/src/Monai.Deploy.WorkflowManager.sln.DotSettings +++ b/src/Monai.Deploy.WorkflowManager.sln.DotSettings @@ -1,4 +1,5 @@  + True AR AS ASMT diff --git a/src/TaskManager/TaskManager/TaskManager.cs b/src/TaskManager/TaskManager/TaskManager.cs index a51fee4c7..80369aa62 100644 --- a/src/TaskManager/TaskManager/TaskManager.cs +++ b/src/TaskManager/TaskManager/TaskManager.cs @@ -241,15 +241,17 @@ private async Task HandleCancellationTask(JsonMessage mes } var pluginAssembly = string.Empty; - ITaskPlugin? taskRunner = null; + ITaskPlugin? taskRunner; try { var taskExecution = await _taskDispatchEventService.GetByTaskExecutionIdAsync(message.Body.ExecutionId).ConfigureAwait(false); - pluginAssembly = _options.Value.TaskManager.PluginAssemblyMappings[taskExecution?.Event.TaskPluginType] ?? string.Empty; - var taskExecEvent = taskExecution?.Event; - if (taskExecEvent == null) + + var taskExecEvent = taskExecution?.Event ?? throw new InvalidOperationException("Task Event data not found."); + + pluginAssembly = string.Empty; + if (_options.Value.TaskManager.PluginAssemblyMappings.ContainsKey(taskExecution?.Event.TaskPluginType)) { - throw new InvalidOperationException("Task Event data not found."); + pluginAssembly = _options.Value.TaskManager.PluginAssemblyMappings[taskExecution?.Event.TaskPluginType]; } taskRunner = typeof(ITaskPlugin).CreateInstance(serviceProvider: _scope.ServiceProvider, typeString: pluginAssembly, _serviceScopeFactory, taskExecEvent); diff --git a/src/WorkflowManager/Database/Interfaces/IArtifactsRepository.cs b/src/WorkflowManager/Database/Interfaces/IArtifactsRepository.cs new file mode 100644 index 000000000..3bf7ee7e9 --- /dev/null +++ b/src/WorkflowManager/Database/Interfaces/IArtifactsRepository.cs @@ -0,0 +1,52 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Collections.Generic; +using System.Threading.Tasks; +using Artifact = Monai.Deploy.Messaging.Common.Artifact; + +namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories +{ + public interface IArtifactsRepository + { + /// + /// Gets All ArtifactsReceivedItems by workflowInstance and taskId. + /// + /// + /// + /// + Task?> GetAllAsync(string workflowInstance, string taskId); + + /// + /// Adds an item to the ArtifactsReceivedItems collection. + /// + /// + /// + Task AddItemAsync(ArtifactReceivedItems item); + + /// + /// Adds an item to the ArtifactsReceivedItems collection. + /// + /// + /// + /// + /// + Task AddItemAsync(string workflowInstanceId, string taskId, List artifactsOutputs); + + Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId, + IEnumerable artifactsOutputs); + } +} diff --git a/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs b/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs new file mode 100644 index 000000000..ea2bd57ea --- /dev/null +++ b/src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs @@ -0,0 +1,203 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Ardalis.GuardClauses; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Monai.Deploy.WorkflowManager.Common.Database.Options; +using MongoDB.Bson; +using MongoDB.Driver; +using Artifact = Monai.Deploy.Messaging.Common.Artifact; + +namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories +{ + public class ArtifactReceivedDetails : Artifact + { + /// + /// Gets or Sets LastReceived. + /// + public DateTime? Received { get; set; } = null; + + public static ArtifactReceivedDetails FromArtifact(Artifact artifact) => + new() + { + Received = DateTime.UtcNow, + Type = artifact.Type, + Path = artifact.Path + }; + } + + public class ArtifactReceivedItems + { + public string Id { get; set; } + + /// + /// Gets or Sets WorkflowInstanceId. + /// + public string WorkflowInstanceId { get; set; } = string.Empty; + + /// + /// Gets or Sets TaskId. + /// + public string TaskId { get; set; } = string.Empty; + + /// + /// Gets or Sets Artifacts. + /// + public List Artifacts { get; set; } = new(); + + /// + /// The date Time this was received + /// + public DateTime Received { get; set; } = DateTime.UtcNow; + } + + public class ArtifactsRepository : IArtifactsRepository + { + private readonly ILogger _logger; + private readonly IMongoCollection _artifactReceivedItemsCollection; + + public ArtifactsRepository( + IMongoClient client, + IOptions bookStoreDatabaseSettings, + ILogger logger) + { + if (client == null) + { + throw new ArgumentNullException(nameof(client)); + } + + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + var mongoDatabase = client.GetDatabase(bookStoreDatabaseSettings.Value.DatabaseName); + _artifactReceivedItemsCollection = mongoDatabase.GetCollection("ArtifactReceivedItems"); + EnsureIndex().GetAwaiter().GetResult(); + } + private async Task EnsureIndex() + { + var indexName = "ArtifactReceivedWorkflowInstanceIdTaskIdIndex"; + + var model = new CreateIndexModel( + Builders.IndexKeys.Ascending(s => s.WorkflowInstanceId).Ascending(s => s.TaskId), + new CreateIndexOptions { Name = indexName } + ); + + await MakeIndex(_artifactReceivedItemsCollection, indexName, model); + + indexName = "ReceivedTime"; + + model = new CreateIndexModel( + Builders.IndexKeys.Ascending(s => s.Received), + new CreateIndexOptions { ExpireAfter = TimeSpan.FromDays(7), Name = "ReceivedTime" } + ); + + await MakeIndex(_artifactReceivedItemsCollection, indexName, model); + } + private static async Task MakeIndex(IMongoCollection collection, string indexName, CreateIndexModel model) + { + Guard.Against.Null(collection, nameof(collection)); + + var asyncCursor = (await collection.Indexes.ListAsync()); + var bsonDocuments = (await asyncCursor.ToListAsync()); + var indexes = bsonDocuments.Select(_ => _.GetElement("name").Value.ToString()).ToList(); + + // If index not present create it else skip. + if (!indexes.Any(i => i is not null && i.Equals(indexName))) + { + await collection.Indexes.CreateOneAsync(model); + } + } + + public async Task?> GetAllAsync(string workflowInstance, string taskId) + { + var result = await _artifactReceivedItemsCollection.FindAsync(a => a.WorkflowInstanceId == workflowInstance && a.TaskId == taskId).ConfigureAwait(false); + return await result.ToListAsync().ConfigureAwait(false); + } + + public Task AddItemAsync(ArtifactReceivedItems item) + { + return _artifactReceivedItemsCollection.InsertOneAsync(item); + } + + public Task AddItemAsync(string workflowInstanceId, string taskId, List artifactsOutputs) + { + var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails() + { + Path = a.Path, + Type = a.Type, + Received = DateTime.UtcNow + }); + + var item = new ArtifactReceivedItems() + { + WorkflowInstanceId = workflowInstanceId, + TaskId = taskId, + Artifacts = artifacts.ToList() + }; + + return _artifactReceivedItemsCollection.InsertOneAsync(item); + } + + public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId, + IEnumerable artifactsOutputs) + { + var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails() + { + Path = a.Path, + Type = a.Type, + Received = DateTime.UtcNow + }); + + var item = new ArtifactReceivedItems() + { + Id = workflowInstanceId + taskId, + WorkflowInstanceId = workflowInstanceId, + TaskId = taskId, + Artifacts = artifacts.ToList() + }; + + var result = await _artifactReceivedItemsCollection + .FindAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId).ConfigureAwait(false); + var existing = await result.FirstOrDefaultAsync().ConfigureAwait(false); + + try + { + if (existing == null) + { + await _artifactReceivedItemsCollection.InsertOneAsync(item).ConfigureAwait(false); + } + else + { + item.Artifacts = item.Artifacts.Concat(existing.Artifacts).ToList(); + var update = Builders.Update.Set(a => a.Artifacts, item.Artifacts); + await _artifactReceivedItemsCollection + .UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update) + .ConfigureAwait(false); + } + } + catch (Exception ex) + { + + throw; + } + + + } + } +} diff --git a/src/WorkflowManager/Database/Repositories/RepositoryBase.cs b/src/WorkflowManager/Database/Repositories/RepositoryBase.cs index e4f120d55..e36f126fb 100644 --- a/src/WorkflowManager/Database/Repositories/RepositoryBase.cs +++ b/src/WorkflowManager/Database/Repositories/RepositoryBase.cs @@ -24,8 +24,8 @@ namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories { public abstract class RepositoryBase { - public static async Task CountAsync(IMongoCollection collection, FilterDefinition? filter) - => await collection.CountDocumentsAsync(filter ?? Builders.Filter.Empty); + public static Task CountAsync(IMongoCollection collection, FilterDefinition? filter) + => collection.CountDocumentsAsync(filter ?? Builders.Filter.Empty); /// /// Get All T that match filters provided. @@ -44,7 +44,7 @@ public static async Task> GetAllAsync(IMongoCollection collection .Skip(skip) .Limit(limit) .Sort(sortFunction) - .ToListAsync(); + .ToListAsync().ConfigureAwait(false); } public static async Task> GetAllAsync(IMongoCollection collection, FilterDefinition filterFunction, SortDefinition sortFunction, int? skip = null, int? limit = null) @@ -54,7 +54,7 @@ public static async Task> GetAllAsync(IMongoCollection collection .Skip(skip) .Limit(limit) .Sort(sortFunction) - .ToListAsync(); + .ToListAsync().ConfigureAwait(false); } } } diff --git a/src/WorkflowManager/Logging/Log.200000.Workflow.cs b/src/WorkflowManager/Logging/Log.200000.Workflow.cs index b93da4732..98e85cbee 100644 --- a/src/WorkflowManager/Logging/Log.200000.Workflow.cs +++ b/src/WorkflowManager/Logging/Log.200000.Workflow.cs @@ -81,6 +81,9 @@ public static partial class Log [LoggerMessage(EventId = 200019, Level = LogLevel.Debug, Message = "Task destination condition for task {taskId} with resolved condition: {resolvedConditional} resolved to false. initial conditional: {conditions}")] public static partial void TaskDestinationConditionFalse(this ILogger logger, string resolvedConditional, string conditions, string taskId); + [LoggerMessage(EventId = 200020, Level = LogLevel.Warning, Message = "Use new ArtifactReceived Queue for continuation messages.")] + public static partial void DontUseWorkflowReceivedForPayload(this ILogger logger); + // Conditions Resolver [LoggerMessage(EventId = 210000, Level = LogLevel.Warning, Message = "Failed to parse condition: {condition}. resolvedConditional: {resolvedConditional}")] public static partial void FailedToParseCondition(this ILogger logger, string resolvedConditional, string condition, Exception ex); diff --git a/src/WorkflowManager/Logging/Log.500000.Messaging.cs b/src/WorkflowManager/Logging/Log.500000.Messaging.cs index c2b086bc2..2926fa903 100644 --- a/src/WorkflowManager/Logging/Log.500000.Messaging.cs +++ b/src/WorkflowManager/Logging/Log.500000.Messaging.cs @@ -71,7 +71,20 @@ public static partial class Log [LoggerMessage(EventId = 500016, Level = LogLevel.Debug, Message = "Export complete message received.")] public static partial void ExportCompleteReceived(this ILogger logger); - [LoggerMessage(EventId = 200017, Level = LogLevel.Debug, Message = "Workflow continuation event so not creating payload.")] + [LoggerMessage(EventId = 500017, Level = LogLevel.Debug, Message = "ArtifactReceived message so not creating payload.")] public static partial void WorkflowContinuation(this ILogger logger); + + [LoggerMessage(EventId = 500018, Level = LogLevel.Debug, Message = "ArtifactReceived message received.")] + public static partial void ArtifactReceivedReceived(this ILogger logger); + + [LoggerMessage(EventId = 500019, Level = LogLevel.Error, Message = "ArtifactReceived message {messageId} failed unexpectedly (no workflowId or TaskId ?) and has been requeued.")] + public static partial void ArtifactReceivedRequeuePayloadCreateError(this ILogger logger, string messageId); + + [LoggerMessage(EventId = 500020, Level = LogLevel.Error, Message = "ArtifactReveived message {messageId} failed unexpectedly and has been requeued.")] + public static partial void ArtifactReceivedRequeueUnknownError(this ILogger logger, string messageId, Exception ex); + + [LoggerMessage(EventId = 500021, Level = LogLevel.Error, Message = "ArtifactReveived message {messageId} is invalid and has been rejected without being requeued.")] + public static partial void ArtifactReceivedRejectValidationError(this ILogger logger, string messageId); + } } diff --git a/src/WorkflowManager/Logging/Log.700000.Artifact.cs b/src/WorkflowManager/Logging/Log.700000.Artifact.cs index 0eceb1e9e..215a043be 100644 --- a/src/WorkflowManager/Logging/Log.700000.Artifact.cs +++ b/src/WorkflowManager/Logging/Log.700000.Artifact.cs @@ -44,5 +44,12 @@ public static partial class Log [LoggerMessage(EventId = 700007, Level = LogLevel.Information, Message = "Task Dispatch resolved successfully output artifacts: PayloadId: {payloadId}, TaskId: {taskId}, WorkflowInstanceId: {workflowInstanceId}, WorkflowRevisionId: {workflowRevisionId}, output artifact object: {pathOutputArtifacts}")] public static partial void LogGeneralTaskDispatchInformation(this ILogger logger, string payloadId, string taskId, string workflowInstanceId, string workflowRevisionId, string pathOutputArtifacts); + + [LoggerMessage(EventId = 700008, Level = LogLevel.Warning, Message = "Unexpected Artifacts output artifacts: TaskId: {taskId}, WorkflowInstanceId: {workflowInstanceId}, output artifact object: {unexpectedArtifacts}")] + public static partial void UnexpectedArtifactsReceived(this ILogger logger, string taskId, string workflowInstanceId, string unexpectedArtifacts); + + [LoggerMessage(EventId = 700009, Level = LogLevel.Debug, Message = "Mandatory output artifacts for task {taskId} are missing. waiting for remaining artifacts... {missingArtifacts}")] + public static partial void MandatoryOutputArtifactsMissingForTask(this ILogger logger, string taskId, string missingArtifacts); + } } diff --git a/src/WorkflowManager/PayloadListener/Extensions/ValidationExtensions.cs b/src/WorkflowManager/PayloadListener/Extensions/ValidationExtensions.cs index 2745d712c..11a46ae29 100644 --- a/src/WorkflowManager/PayloadListener/Extensions/ValidationExtensions.cs +++ b/src/WorkflowManager/PayloadListener/Extensions/ValidationExtensions.cs @@ -15,6 +15,7 @@ */ using Ardalis.GuardClauses; +using Monai.Deploy.Messaging.Common; using Monai.Deploy.Messaging.Events; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; @@ -39,6 +40,39 @@ public static bool IsValid(this WorkflowRequestEvent workflowRequestMessage, out return valid; } + public static bool IsValid(this ArtifactsReceivedEvent artifactReceivedMessage, out IList validationErrors) + { + Guard.Against.Null(artifactReceivedMessage, nameof(artifactReceivedMessage)); + + validationErrors = new List(); + + var valid = true; + + valid &= IsAeTitleValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.DataTrigger.Source, validationErrors); + valid &= IsAeTitleValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.DataTrigger.Destination, validationErrors); + valid &= IsBucketValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.Bucket, validationErrors); + valid &= IsCorrelationIdValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.CorrelationId, validationErrors); + valid &= IsPayloadIdValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.PayloadId.ToString(), validationErrors); + valid &= string.IsNullOrEmpty(artifactReceivedMessage.WorkflowInstanceId) is false && string.IsNullOrEmpty(artifactReceivedMessage.TaskId) is false; + valid &= AllArtifactsAreValid(artifactReceivedMessage, validationErrors); + return valid; + } + + private static bool AllArtifactsAreValid(this ArtifactsReceivedEvent artifactReceivedMessage, IList validationErrors) + { + Guard.Against.Null(artifactReceivedMessage, nameof(artifactReceivedMessage)); + + var valid = artifactReceivedMessage.Artifacts.All(a => a.Type != ArtifactType.Unset); + + if (valid is false) + { + var unsetArtifacts = string.Join(',', artifactReceivedMessage.Artifacts.Where(a => a.Type == ArtifactType.Unset).Select(a => a.Path)); + validationErrors.Add($"The following artifacts are have unset artifact types: {unsetArtifacts}"); + } + + return valid; + } + public static bool IsInformaticsGatewayNotNull(string source, InformaticsGateway informaticsGateway, IList validationErrors) { Guard.Against.NullOrWhiteSpace(source, nameof(source)); diff --git a/src/WorkflowManager/PayloadListener/Services/EventPayloadRecieverService.cs b/src/WorkflowManager/PayloadListener/Services/EventPayloadRecieverService.cs index 94f69366d..2e2c49375 100644 --- a/src/WorkflowManager/PayloadListener/Services/EventPayloadRecieverService.cs +++ b/src/WorkflowManager/PayloadListener/Services/EventPayloadRecieverService.cs @@ -178,5 +178,47 @@ public async Task ExportCompletePayload(MessageReceivedEventArgs message) await _messageSubscriber.RequeueWithDelay(message.Message); } } + + public async Task ArtifactReceivePayload(MessageReceivedEventArgs message) + { + try + { + var requestEvent = message.Message.ConvertTo(); + + using var loggingScope = Logger.BeginScope(new LoggingDataDictionary + { + ["correlationId"] = requestEvent.CorrelationId, + ["workflowId"] = requestEvent.Workflows.FirstOrDefault(), + ["workflowInstanceId"] = requestEvent.WorkflowInstanceId, + ["taskId"] = requestEvent.TaskId + }); + + Logger.WorkflowContinuation(); + + var validation = PayloadValidator.ValidateArtifactReceived(requestEvent); + + if (!validation) + { + Logger.ArtifactReceivedRejectValidationError(message.Message.MessageId); + _messageSubscriber.Reject(message.Message, false); + + return; + } + + if (!await WorkflowExecuterService.ProcessArtifactReceivedAsync(requestEvent)) + { + Logger.ArtifactReceivedRequeuePayloadCreateError(message.Message.MessageId); + await _messageSubscriber.RequeueWithDelay(message.Message); + + return; + } + _messageSubscriber.Acknowledge(message.Message); + } + catch (Exception e) + { + Logger.ArtifactReceivedRequeueUnknownError(message.Message.MessageId, e); + await _messageSubscriber.RequeueWithDelay(message.Message); + } + } } } diff --git a/src/WorkflowManager/PayloadListener/Services/IEventPayloadRecieverService.cs b/src/WorkflowManager/PayloadListener/Services/IEventPayloadRecieverService.cs index cd30ec036..afbed8878 100644 --- a/src/WorkflowManager/PayloadListener/Services/IEventPayloadRecieverService.cs +++ b/src/WorkflowManager/PayloadListener/Services/IEventPayloadRecieverService.cs @@ -40,5 +40,12 @@ public interface IEventPayloadReceiverService /// /// The export complete event. Task ExportCompletePayload(MessageReceivedEventArgs message); + + /// + /// Receives a artifactReceived payload and validates it, + /// either passing it on to the workflow executor or handling the message accordingly. + /// + /// The artifactReceived event. + Task ArtifactReceivePayload(MessageReceivedEventArgs message); } } diff --git a/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs b/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs index 119f0b386..02b4497c8 100644 --- a/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs +++ b/src/WorkflowManager/PayloadListener/Services/PayloadListenerService.cs @@ -42,6 +42,7 @@ public class PayloadListenerService : IHostedService, IMonaiService, IDisposable public string WorkflowRequestRoutingKey { get; set; } public string TaskStatusUpdateRoutingKey { get; set; } public string ExportCompleteRoutingKey { get; set; } + public string ArtifactRecievedRoutingKey { get; set; } protected int Concurrency { get; set; } public ServiceStatus Status { get; set; } = ServiceStatus.Unknown; public string ServiceName => "Payload Listener Service"; @@ -65,6 +66,7 @@ public PayloadListenerService( TaskStatusUpdateRoutingKey = configuration.Value.Messaging.Topics.TaskUpdateRequest; WorkflowRequestRoutingKey = configuration.Value.Messaging.Topics.WorkflowRequest; ExportCompleteRoutingKey = configuration.Value.Messaging.Topics.ExportComplete; + ArtifactRecievedRoutingKey = configuration.Value.Messaging.Topics.ArtifactRecieved; Concurrency = 2; @@ -105,7 +107,11 @@ private void SetupPolling() _messageSubscriber.SubscribeAsync(ExportCompleteRoutingKey, ExportCompleteRoutingKey, OnExportCompleteReceivedCallback); _logger.EventSubscription(ServiceName, ExportCompleteRoutingKey); + + _messageSubscriber.SubscribeAsync(ArtifactRecievedRoutingKey, ArtifactRecievedRoutingKey, OnArtifactReceivedtReceivedCallbackAsync); + _logger.EventSubscription(ServiceName, ArtifactRecievedRoutingKey); } + private async Task OnWorkflowRequestReceivedCallbackAsync(MessageReceivedEventArgs eventArgs) { @@ -150,6 +156,20 @@ private async Task OnExportCompleteReceivedCallback(MessageReceivedEventArgs eve } + private async Task OnArtifactReceivedtReceivedCallbackAsync(MessageReceivedEventArgs eventArgs) + { + + using var loggerScope = _logger.BeginScope(new Common.Miscellaneous.LoggingDataDictionary + { + ["correlationId"] = eventArgs.Message.CorrelationId, + ["source"] = eventArgs.Message.ApplicationId, + ["messageId"] = eventArgs.Message.MessageId, + ["messageDescription"] = eventArgs.Message.MessageDescription, + }); + + _logger.ArtifactReceivedReceived(); + await _eventPayloadListenerService.ArtifactReceivePayload(eventArgs); + } protected virtual void Dispose(bool disposing) { diff --git a/src/WorkflowManager/PayloadListener/Validators/EventPayloadValidator.cs b/src/WorkflowManager/PayloadListener/Validators/EventPayloadValidator.cs index 38a4471c9..d9f4ad924 100644 --- a/src/WorkflowManager/PayloadListener/Validators/EventPayloadValidator.cs +++ b/src/WorkflowManager/PayloadListener/Validators/EventPayloadValidator.cs @@ -32,41 +32,76 @@ public EventPayloadValidator(ILogger logger) Logger = logger ?? throw new ArgumentNullException(nameof(logger)); } - public bool ValidateWorkflowRequest(WorkflowRequestEvent payload) + public bool ValidateArtifactReceivedOrWorkflowRequestEvent(EventBase payload) { Guard.Against.Null(payload, nameof(payload)); - using var loggingScope = Logger.BeginScope(new LoggingDataDictionary + if (payload is WorkflowRequestEvent or ArtifactsReceivedEvent) { - ["correlationId"] = payload.CorrelationId, - ["payloadId"] = payload.PayloadId, - }); + var correlationId = string.Empty; + Guid? payloadId = null; + IEnumerable workflows = Array.Empty(); - var valid = true; - var payloadValid = payload.IsValid(out var validationErrors); + switch (payload) + { + case WorkflowRequestEvent wre: + correlationId = wre.CorrelationId; + payloadId = wre.PayloadId; + workflows = wre.Workflows; + break; + case ArtifactsReceivedEvent are: + correlationId = are.CorrelationId; + payloadId = are.PayloadId; + workflows = are.Workflows; + break; + } - if (!payloadValid) - { - Log.FailedToValidateWorkflowRequestEvent(Logger, string.Join(Environment.NewLine, validationErrors)); - } + using var loggingScope = Logger.BeginScope(new LoggingDataDictionary + { + ["correlationId"] = correlationId, + ["payloadId"] = payloadId, + }); + + var valid = true; + var payloadValid = false; + IList validationErrors; + payloadValid = payload switch + { + ArtifactsReceivedEvent artifactsReceivedEvent => artifactsReceivedEvent.IsValid(out validationErrors), + WorkflowRequestEvent workflowRequestEvent => workflowRequestEvent.IsValid(out validationErrors), + _ => throw new ArgumentOutOfRangeException(nameof(payload), payload, null) + }; - valid &= payloadValid; + if (!payloadValid) + { + Log.FailedToValidateWorkflowRequestEvent(Logger, string.Join(Environment.NewLine, validationErrors)); + } - foreach (var workflow in payload.Workflows) - { - var workflowValid = !string.IsNullOrEmpty(workflow); + valid &= payloadValid; - if (!workflowValid) + foreach (var workflow in workflows) { - Log.FailedToValidateWorkflowRequestEvent(Logger, "Workflow id is empty string"); - } + var workflowValid = !string.IsNullOrEmpty(workflow); - valid &= workflowValid; - } + if (!workflowValid) + { + Log.FailedToValidateWorkflowRequestEvent(Logger, "Workflow id is empty string"); + } - return valid; + valid &= workflowValid; + } + + return valid; + }; + return false; } + public bool ValidateWorkflowRequest(WorkflowRequestEvent payload) + => ValidateArtifactReceivedOrWorkflowRequestEvent(payload); + + public bool ValidateArtifactReceived(ArtifactsReceivedEvent payload) + => ValidateArtifactReceivedOrWorkflowRequestEvent(payload); + public bool ValidateTaskUpdate(TaskUpdateEvent payload) { Guard.Against.Null(payload, nameof(payload)); diff --git a/src/WorkflowManager/PayloadListener/Validators/IEventPayloadValidator.cs b/src/WorkflowManager/PayloadListener/Validators/IEventPayloadValidator.cs index 77e4f7856..e7e16c9b5 100644 --- a/src/WorkflowManager/PayloadListener/Validators/IEventPayloadValidator.cs +++ b/src/WorkflowManager/PayloadListener/Validators/IEventPayloadValidator.cs @@ -37,5 +37,11 @@ public interface IEventPayloadValidator /// /// The workflow message event. bool ValidateExportComplete(ExportCompleteEvent payload); + + /// + /// Validates the artifactReceived payload from the RabbitMQ queue. + /// + /// The artifactReceived event. + bool ValidateArtifactReceived(ArtifactsReceivedEvent payload); } } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/IWorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/IWorkflowExecuterService.cs index 465979f1d..186b7994f 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/IWorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/IWorkflowExecuterService.cs @@ -49,5 +49,11 @@ public interface IWorkflowExecuterService /// Previous Tasks Id. /// Task CreateTaskExecutionAsync(TaskObject task, WorkflowInstance workflowInstance, string? bucketName = null, string? payloadId = null, string? previousTaskId = null); + + /// + /// Processes the artifactReceived payload and continue workflow instance. + /// + /// The workflow request message event. + Task ProcessArtifactReceivedAsync(ArtifactsReceivedEvent message); } } diff --git a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs index 7ef2adcfc..137a62fcf 100644 --- a/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs +++ b/src/WorkflowManager/WorkflowExecuter/Services/WorkflowExecuterService.cs @@ -14,7 +14,6 @@ * limitations under the License. */ -using System.Globalization; using Ardalis.GuardClauses; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -33,6 +32,7 @@ using Monai.Deploy.WorkflowManager.Common.Contracts.Models; using Monai.Deploy.WorkflowManager.Common.Database; using Monai.Deploy.WorkflowManager.Common.Database.Interfaces; +using Monai.Deploy.WorkflowManager.Common.Database.Repositories; using Monai.Deploy.WorkflowManager.Common.Logging; using Monai.Deploy.WorkflowManager.Common.WorkfowExecuter.Common; using Monai.Deploy.WorkloadManager.WorkfowExecuter.Extensions; @@ -49,6 +49,7 @@ public class WorkflowExecuterService : IWorkflowExecuterService private readonly IMessageBrokerPublisherService _messageBrokerPublisherService; private readonly IConditionalParameterParser _conditionalParameterParser; private readonly ITaskExecutionStatsRepository _taskExecutionStatsRepository; + private readonly IArtifactsRepository _artifactsRepository; private readonly List _migExternalAppPlugins; private readonly IArtifactMapper _artifactMapper; private readonly IStorageService _storageService; @@ -73,7 +74,8 @@ public WorkflowExecuterService( ITaskExecutionStatsRepository taskExecutionStatsRepository, IArtifactMapper artifactMapper, IStorageService storageService, - IPayloadService payloadService) + IPayloadService payloadService, + IArtifactsRepository artifactsRepository) { if (configuration is null) { @@ -98,11 +100,12 @@ public WorkflowExecuterService( _workflowInstanceRepository = workflowInstanceRepository ?? throw new ArgumentNullException(nameof(workflowInstanceRepository)); _workflowInstanceService = workflowInstanceService ?? throw new ArgumentNullException(nameof(workflowInstanceService)); _messageBrokerPublisherService = messageBrokerPublisherService ?? throw new ArgumentNullException(nameof(messageBrokerPublisherService)); - _conditionalParameterParser = conditionalParser ?? throw new ArgumentNullException(nameof(artifactMapper)); + _conditionalParameterParser = conditionalParser ?? throw new ArgumentNullException(nameof(conditionalParser)); _taskExecutionStatsRepository = taskExecutionStatsRepository ?? throw new ArgumentNullException(nameof(taskExecutionStatsRepository)); _artifactMapper = artifactMapper ?? throw new ArgumentNullException(nameof(artifactMapper)); _storageService = storageService ?? throw new ArgumentNullException(nameof(storageService)); _payloadService = payloadService ?? throw new ArgumentNullException(nameof(payloadService)); + _artifactsRepository = artifactsRepository ?? throw new ArgumentNullException(nameof(artifactsRepository)); } public async Task ProcessPayload(WorkflowRequestEvent message, Payload payload) @@ -111,20 +114,11 @@ public async Task ProcessPayload(WorkflowRequestEvent message, Payload pay using var loggerScope = _logger.BeginScope($"correlationId={message.CorrelationId}, payloadId={payload.PayloadId}"); - // for external App executions then workflowInstanceId will be supplied and we can continue the workflow from that task. - if (string.IsNullOrWhiteSpace(message.WorkflowInstanceId) is false) + // for external App executions use the ArtifactReceived queue. + if (string.IsNullOrWhiteSpace(message.WorkflowInstanceId) is false && string.IsNullOrEmpty(message.TaskId) is false) { - var instance = await _workflowInstanceRepository.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId); - if (instance is not null) - { - var task = instance.Tasks.First(t => t.TaskId == message.TaskId); - if (task is not null) - { - var workflow = await _workflowRepository.GetByWorkflowIdAsync(instance.WorkflowId); - await HandleTaskDestinations(instance, workflow, task, message.CorrelationId); - return true; - } - } + _logger.DontUseWorkflowReceivedForPayload(); + return false; } var processed = true; @@ -181,6 +175,104 @@ public async Task ProcessPayload(WorkflowRequestEvent message, Payload pay return true; } + public async Task ProcessArtifactReceivedAsync(ArtifactsReceivedEvent message) + { + Guard.Against.Null(message, nameof(message)); + + var workflowInstanceId = message.WorkflowInstanceId; + var taskId = message.TaskId; + + if (string.IsNullOrWhiteSpace(workflowInstanceId) || string.IsNullOrWhiteSpace(taskId)) + { + return false; + } + + var workflowInstance = await _workflowInstanceRepository.GetByWorkflowInstanceIdAsync(workflowInstanceId).ConfigureAwait(false); + if (workflowInstance is null) + { + _logger.WorkflowInstanceNotFound(workflowInstanceId); + return false; + } + + var workflowTemplate = await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId) + .ConfigureAwait(false); + + if (workflowTemplate is null) + { + _logger.WorkflowNotFound(workflowInstanceId); + return false; + } + + var taskTemplate = workflowTemplate.Workflow?.Tasks.FirstOrDefault(t => t.Id == taskId); + if (taskTemplate is null) + { + _logger.TaskNotFoundInWorkfow(message.PayloadId.ToString(), taskId, workflowTemplate.Id); + return false; + } + + var previouslyReceivedArtifactsFromRepo = await _artifactsRepository.GetAllAsync(workflowInstanceId, taskId).ConfigureAwait(false); + if (previouslyReceivedArtifactsFromRepo is null || previouslyReceivedArtifactsFromRepo.Count == 0) + { + previouslyReceivedArtifactsFromRepo = new List() { new ArtifactReceivedItems() + { + Id = workflowInstanceId + taskId, + TaskId = taskId, + WorkflowInstanceId = workflowInstanceId, + Artifacts = message.Artifacts.Select(ArtifactReceivedDetails.FromArtifact).ToList() + } }; + } + await _artifactsRepository + .AddOrUpdateItemAsync(workflowInstanceId, taskId, message.Artifacts).ConfigureAwait(false); + + var previouslyReceivedArtifacts = previouslyReceivedArtifactsFromRepo.SelectMany(a => a.Artifacts).Select(a => a.Type).ToList(); + + var requiredArtifacts = taskTemplate.Artifacts.Output.Where(a => a.Mandatory).Select(a => a.Type); + var receivedArtifacts = message.Artifacts.Select(a => a.Type).Concat(previouslyReceivedArtifacts).ToList(); + var missingArtifacts = requiredArtifacts.Except(receivedArtifacts).ToList(); + var allArtifacts = taskTemplate.Artifacts.Output.Select(a => a.Type); + var unexpectedArtifacts = receivedArtifacts.Except(allArtifacts).ToList(); + + if (unexpectedArtifacts.Any()) + { + _logger.UnexpectedArtifactsReceived(taskId, workflowInstanceId, string.Join(',', unexpectedArtifacts)); + } + + if (!missingArtifacts.Any()) + { + return await AllRequiredArtifactsReceivedAsync(message, workflowInstance, taskId, workflowInstanceId, workflowTemplate).ConfigureAwait(false); + } + + _logger.MandatoryOutputArtifactsMissingForTask(taskId, string.Join(',', missingArtifacts)); + return true; + } + + private async Task AllRequiredArtifactsReceivedAsync(ArtifactsReceivedEvent message, WorkflowInstance workflowInstance, + string taskId, string workflowInstanceId, WorkflowRevision workflowTemplate) + { + var taskExecution = workflowInstance.Tasks.FirstOrDefault(t => t.TaskId == taskId); + + if (taskExecution is null) + { + _logger.TaskNotFoundInWorkfowInstance(taskId, workflowInstanceId); + return false; + } + + await _workflowInstanceRepository.UpdateTaskStatusAsync(workflowInstanceId, taskId, + TaskExecutionStatus.Succeeded).ConfigureAwait(false); + + // Dispatch Task + var taskDispatchedResult = + await HandleTaskDestinations(workflowInstance, workflowTemplate, taskExecution, message.CorrelationId).ConfigureAwait(false); + + if (taskDispatchedResult is false) + { + _logger.LogTaskDispatchFailure(message.PayloadId.ToString(), taskId, workflowInstanceId, workflowTemplate.WorkflowId, JsonConvert.SerializeObject(message.Artifacts)); + return false; + } + + return true; + } + public async Task ProcessFirstWorkflowTask(WorkflowInstance workflowInstance, string correlationId, Payload payload) { if (workflowInstance.Status == Status.Failed) @@ -226,10 +318,10 @@ private static Task SwitchTasksAsync(TaskExecution task, Func defaultFunc) => task switch { - { TaskType: TaskTypeConstants.RouterTask } => routerFunc(), - { TaskType: TaskTypeConstants.ExportTask } => exportFunc(), - { TaskType: TaskTypeConstants.ExternalAppTask } => externalFunc(), - { Status: var s } when s != TaskExecutionStatus.Created => notCreatedStatusFunc(), + { TaskType: TaskTypeConstants.RouterTask } => routerFunc(), + { TaskType: TaskTypeConstants.ExportTask } => exportFunc(), + { TaskType: TaskTypeConstants.ExternalAppTask } => externalFunc(), + { Status: var s } when s != TaskExecutionStatus.Created => notCreatedStatusFunc(), _ => defaultFunc() }; @@ -247,7 +339,7 @@ public async Task ProcessTaskUpdate(TaskUpdateEvent message) return false; } - var currentTask = workflowInstance.Tasks.FirstOrDefault(t => t.TaskId == message.TaskId); + var currentTask = workflowInstance.Tasks.Find(t => t.TaskId == message.TaskId); using var loggingScope = _logger.BeginScope(new LoggingDataDictionary { @@ -348,7 +440,7 @@ public async Task UpdateTaskDetails(TaskExecution currentTask, string workflowIn public async Task ProcessExportComplete(ExportCompleteEvent message, string correlationId) { var workflowInstance = await _workflowInstanceRepository.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId); - var task = workflowInstance.Tasks.FirstOrDefault(t => t.TaskId == message.ExportTaskId); + var task = workflowInstance.Tasks.Find(t => t.TaskId == message.ExportTaskId); if (task is null) { diff --git a/src/WorkflowManager/WorkflowManager/Extentions/WorkflowExecutorExtensions.cs b/src/WorkflowManager/WorkflowManager/Extentions/WorkflowExecutorExtensions.cs index 2d94ec7e5..30e685202 100644 --- a/src/WorkflowManager/WorkflowManager/Extentions/WorkflowExecutorExtensions.cs +++ b/src/WorkflowManager/WorkflowManager/Extentions/WorkflowExecutorExtensions.cs @@ -19,6 +19,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Monai.Deploy.WorkflowManager.Common.ConditionsResolver.Parser; +using Monai.Deploy.WorkflowManager.Common.Database.Repositories; using Monai.Deploy.WorkflowManager.Common.Miscellaneous; using Monai.Deploy.WorkflowManager.Common.Miscellaneous.Interfaces; using Monai.Deploy.WorkflowManager.Common.Miscellaneous.Services; @@ -54,6 +55,7 @@ public static IServiceCollection AddWorkflowExecutor(this IServiceCollection ser services.AddTransient(); services.AddTransient(); + services.AddSingleton(); services.AddSingleton(); services.AddTransient(); services.AddSingleton(); diff --git a/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs b/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs index 91db2b324..580bdac9f 100644 --- a/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs +++ b/src/WorkflowManager/WorkflowManager/Validators/WorkflowValidator.cs @@ -649,7 +649,6 @@ private void ValidateExternalAppTask(Workflow workflow, TaskObject currentTask) Errors.Add($"Task: '{currentTask.Id}' has incorrect artifact output types set on artifacts with following name. {incorrectOutputs}"); } } - } } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/ArtifactReceivedEvent.feature b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/ArtifactReceivedEvent.feature new file mode 100644 index 000000000..a49b38c74 --- /dev/null +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Features/ArtifactReceivedEvent.feature @@ -0,0 +1,27 @@ +# Copyright 2022 MONAI Consortium +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@IntergrationTests +Feature: ArtifactReceivedEvent + +Publishing a artifact received event is consumed by the Workflow Manager. + + @ArtifactReceivedEvent + Scenario Outline: Publish a valid Artifact Received Event which creates an entry. + Given I have a clinical workflow I have a Workflow Instance + When I publish a Artifact Received Event + Then I can see 2 Artifact Received Items is created + Examples: + | clinicalWorkflow | workflowInstance | artifactReceivedEvent | + | Workflow_Revision_For_Artifact_ReceivedEvent_1 | Workflow_Instance_For_Artifact_ReceivedEvent_1 | Test1 | diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs index effc431dc..121e5d119 100755 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Hooks.cs @@ -43,6 +43,7 @@ public Hooks(IObjectContainer objectContainer) private static HttpClient? HttpClient { get; set; } public static AsyncRetryPolicy? RetryPolicy { get; private set; } private static RabbitPublisher? WorkflowPublisher { get; set; } + public static RabbitPublisher? ArtifactsPublisher { get; set; } private static RabbitConsumer? TaskDispatchConsumer { get; set; } private static RabbitPublisher? TaskUpdatePublisher { get; set; } private static RabbitConsumer? ExportRequestConsumer { get; set; } @@ -76,6 +77,7 @@ public static void Init() TestExecutionConfig.RabbitConfig.TaskDispatchQueue = "md.tasks.dispatch"; TestExecutionConfig.RabbitConfig.TaskCallbackQueue = "md.tasks.callback"; TestExecutionConfig.RabbitConfig.TaskUpdateQueue = "md.tasks.update"; + TestExecutionConfig.RabbitConfig.ArtifactsRequestQueue = "md.workflow.artifactrecieved"; TestExecutionConfig.RabbitConfig.ExportCompleteQueue = config.GetValue("WorkflowManager:messaging:topics:exportComplete"); TestExecutionConfig.RabbitConfig.ExportRequestQueue = $"{config.GetValue("WorkflowManager:messaging:topics:exportRequestPrefix")}.{config.GetValue("WorkflowManager:messaging:dicomAgents:scuAgentName")}"; @@ -84,6 +86,7 @@ public static void Init() TestExecutionConfig.MongoConfig.WorkflowCollection = "Workflows"; TestExecutionConfig.MongoConfig.WorkflowInstanceCollection = "WorkflowInstances"; TestExecutionConfig.MongoConfig.PayloadCollection = "Payloads"; + TestExecutionConfig.MongoConfig.ArtifactsCollection = "ArtifactReceivedItems"; TestExecutionConfig.MongoConfig.ExecutionStatsCollection = "ExecutionStats"; TestExecutionConfig.MinioConfig.Endpoint = config.GetValue("WorkflowManager:storage:settings:endpoint"); @@ -126,6 +129,7 @@ await RetryPolicy.ExecuteAsync(async () => } }); + ArtifactsPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.ArtifactsRequestQueue); WorkflowPublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.WorkflowRequestQueue); TaskDispatchConsumer = new RabbitConsumer(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskDispatchQueue); TaskUpdatePublisher = new RabbitPublisher(RabbitConnectionFactory.GetRabbitConnection(), TestExecutionConfig.RabbitConfig.Exchange, TestExecutionConfig.RabbitConfig.TaskUpdateQueue); @@ -144,6 +148,7 @@ public void SetUp(ScenarioContext scenarioContext, ISpecFlowOutputHelper outputH ObjectContainer.RegisterInstanceAs(TaskUpdatePublisher, "TaskUpdatePublisher"); ObjectContainer.RegisterInstanceAs(ExportCompletePublisher, "ExportCompletePublisher"); ObjectContainer.RegisterInstanceAs(ExportRequestConsumer, "ExportRequestConsumer"); + ObjectContainer.RegisterInstanceAs(ArtifactsPublisher, "ArtifactsPublisher"); ObjectContainer.RegisterInstanceAs(MongoClient); ObjectContainer.RegisterInstanceAs(MinioClient); var dataHelper = new DataHelper(TaskDispatchConsumer, ExportRequestConsumer, MongoClient); @@ -163,6 +168,7 @@ public static void ClearTestData() MongoClient?.DeleteAllWorkflowRevisionDocuments(); MongoClient?.DeleteAllWorkflowInstances(); MongoClient?.DeleteAllPayloadDocuments(); + MongoClient?.DeleteAllArtifactDocuments(); } [BeforeTestRun(Order = 3)] diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/POCO/TestExecutionConfig.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/POCO/TestExecutionConfig.cs index a8a021653..2e70cefa9 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/POCO/TestExecutionConfig.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/POCO/TestExecutionConfig.cs @@ -36,6 +36,8 @@ public static class RabbitConfig public static string WorkflowRequestQueue { get; set; } = string.Empty; + public static string ArtifactsRequestQueue { get; set; } = string.Empty; + public static string TaskDispatchQueue { get; set; } = string.Empty; public static string TaskCallbackQueue { get; set; } = string.Empty; @@ -66,6 +68,8 @@ public static class MongoConfig public static string PayloadCollection { get; set; } = string.Empty; + public static string ArtifactsCollection { get; set; } = string.Empty; + public static string ExecutionStatsCollection { get; set; } = string.Empty; } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs new file mode 100644 index 000000000..c5c77c0f4 --- /dev/null +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/ArtifactReceivedEventStepDefinitions.cs @@ -0,0 +1,130 @@ +/* + * Copyright 2022 MONAI Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using BoDi; +using Monai.Deploy.Messaging.Events; +using Monai.Deploy.Messaging.Messages; +using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Models; +using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Support; +using Monai.Deploy.WorkflowManager.Common.WorkflowExecutor.IntegrationTests.Support; +using MongoDB.Driver; +using Polly; +using Polly.Retry; +using TechTalk.SpecFlow.Infrastructure; + +namespace Monai.Deploy.WorkflowManager.Common.IntegrationTests.StepDefinitions +{ + [Binding] + public class ArtifactReceivedEventStepDefinitions + { + private RabbitPublisher WorkflowPublisher { get; set; } + private RabbitPublisher ArtifactsPublisher { get; set; } + private RabbitConsumer TaskDispatchConsumer { get; set; } + private MongoClientUtil MongoClient { get; set; } + private Assertions Assertions { get; set; } + private DataHelper DataHelper { get; set; } + + private readonly ISpecFlowOutputHelper _outputHelper; + private RetryPolicy RetryPolicy { get; set; } + private MinioDataSeeding MinioDataSeeding { get; set; } + + public ArtifactReceivedEventStepDefinitions(ObjectContainer objectContainer, ISpecFlowOutputHelper outputHelper) + { + ArtifactsPublisher = objectContainer.Resolve("ArtifactsPublisher"); + TaskDispatchConsumer = objectContainer.Resolve("TaskDispatchConsumer"); + MongoClient = objectContainer.Resolve(); + Assertions = new Assertions(objectContainer, outputHelper); + DataHelper = objectContainer.Resolve(); + _outputHelper = outputHelper; + MinioDataSeeding = + new MinioDataSeeding(objectContainer.Resolve(), DataHelper, _outputHelper); + RetryPolicy = Policy.Handle() + .WaitAndRetry(retryCount: 20, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500)); + } + + [When(@"I publish a Artifact Received Event (.*)")] + public async Task WhenIPublishAArtifactReceivedEvent(string name) + { + var message = new JsonMessage( + DataHelper.GetArtifactsReceivedEventTestData(name), + "16988a78-87b5-4168-a5c3-2cfc2bab8e54", + Guid.NewGuid().ToString(), + string.Empty); + + _outputHelper.WriteLine($"Publishing WorkflowRequestEvent with name={name}"); + ArtifactsPublisher.PublishMessage(message.ToMessage()); + _outputHelper.WriteLine($"Event published"); + } + + [Given(@"I have a clinical workflow (.*) I have a Workflow Instance (.*)")] + public async Task GivenIHaveAClinicalWorkflowIHaveAWorkflowInstance(string clinicalWorkflowName, string wfiName) + { + var (artifactReceivedItems, workflowInstance, workflowRevision) = + DataHelper.GetArtifactsEventTestData(clinicalWorkflowName, wfiName); + + _outputHelper.WriteLine("Seeding minio with workflow input artifacts"); + await MinioDataSeeding.SeedWorkflowInputArtifacts(workflowInstance.PayloadId); + + _outputHelper.WriteLine($"Retrieving workflow instance with name={wfiName}"); + await MongoClient.CreateWorkflowInstanceDocumentAsync(workflowInstance); + + _outputHelper.WriteLine($"Retrieving workflow revision with name={clinicalWorkflowName}"); + await MongoClient.CreateWorkflowRevisionDocumentAsync(workflowRevision); + + try + { + await MongoClient.CreateArtifactsEventsDocumentAsync(artifactReceivedItems); + } + catch (Exception e) + { + } + + _outputHelper.WriteLine("Seeding Data Tasks complete"); + } + + [Then(@"I can see a Artifact Received Item is created")] + public void ThenICanSeeAArtifactReceivedItemIsCreated() + { + ThenICanSeeXArtifactReceivedItemIsCreated(1); + } + + [Then(@"I can see ([1-9]*) Artifact Received Items are created")] + [Then(@"I can see ([0-9]*) Artifact Received Items is created")] + public void ThenICanSeeXArtifactReceivedItemIsCreated(int count) + { + _outputHelper.WriteLine($"Retrieving {count} workflow instance/s using the payloadid={DataHelper.WorkflowRequestMessage.PayloadId.ToString()}"); + RetryPolicy.Execute(() => + { + var artifactsReceivedItems = DataHelper.GetArtifactsReceivedItemsFromDB(count, DataHelper.ArtifactsReceivedEvent); + if (artifactsReceivedItems.Any()) + { + foreach (var artifactsReceivedItem in artifactsReceivedItems) + { + var wfiId = artifactsReceivedItems.FirstOrDefault().WorkflowInstanceId; + var wfi = DataHelper.WorkflowInstances.FirstOrDefault(a => a.Id == wfiId); + var workflow = DataHelper.WorkflowRevisions.FirstOrDefault(w => w.WorkflowId == wfi.WorkflowId); + if (workflow is null) + { + throw new Exception("Failing Test"); + } + Assertions.AssertArtifactsReceivedItemMatchesExpectedWorkflow(artifactsReceivedItem, workflow, wfi); + } + } + }); + _outputHelper.WriteLine($"Retrieved {count} workflow instance/s"); + } + } +} diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/WorkflowRequestStepDefinitions.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/WorkflowRequestStepDefinitions.cs index 64d25e445..685161c8e 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/WorkflowRequestStepDefinitions.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/StepDefinitions/WorkflowRequestStepDefinitions.cs @@ -50,7 +50,7 @@ public WorkflowRequestStepDefinitions(ObjectContainer objectContainer, ISpecFlow RetryPolicy = Policy.Handle().WaitAndRetry(retryCount: 20, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500)); } - [Given(@"I have a clinical workflow (.*)")] + [Given(@"I have a clinical workflow (?!.* )(.*)")] public void GivenIHaveClinicalWorkflows(string name) { _outputHelper.WriteLine($"Retrieving workflow revision with name={name}"); diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/Assertions.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/Assertions.cs index 5e66fa743..92f8baaa5 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/Assertions.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/Assertions.cs @@ -18,6 +18,7 @@ using BoDi; using Monai.Deploy.Messaging.Events; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Monai.Deploy.WorkflowManager.Common.Database.Repositories; using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Models; using Monai.Deploy.WorkflowManager.Common.IntegrationTests.POCO; using TechTalk.SpecFlow.Infrastructure; @@ -550,6 +551,22 @@ public void AssertExecutionStats(ExecutionStats executionStats, TaskDispatchEven } Output.WriteLine("Details ExecutionStats are correct"); } + + public static void AssertArtifactsReceivedItemMatchesExpectedWorkflow( + ArtifactReceivedItems artifactsReceivedItem, WorkflowRevision workflowRevision, + WorkflowInstance? workflowInstance) + { + artifactsReceivedItem.WorkflowInstanceId.Should().Be(workflowInstance?.Id); + var task = workflowRevision.Workflow!.Tasks.FirstOrDefault(t => t.Id == artifactsReceivedItem.TaskId); + task.Should().NotBeNull(); + artifactsReceivedItem.TaskId.Should().Be(task!.Id); + artifactsReceivedItem.Artifacts.Count.Should().Be(task.Artifacts.Output.Length); + artifactsReceivedItem.Received.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(20)); + foreach (var artifact in task.Artifacts.Output) + { + artifactsReceivedItem.Artifacts.FirstOrDefault(t => t.Type == artifact.Type).Should().NotBeNull(); + } + } } } #pragma warning restore CS8604 // Possible null reference argument. diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/DataHelper.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/DataHelper.cs index 446847381..14d6ce30f 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/DataHelper.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/DataHelper.cs @@ -14,14 +14,17 @@ * limitations under the License. */ +using Monai.Deploy.Messaging.Common; using Monai.Deploy.Messaging.Events; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Monai.Deploy.WorkflowManager.Common.Database.Repositories; using Monai.Deploy.WorkflowManager.Common.IntegrationTests.Models; using Monai.Deploy.WorkflowManager.Common.Models; using Monai.Deploy.WorkflowManager.Common.WorkflowExecutor.IntegrationTests.TestData; using Newtonsoft.Json; using Polly; using Polly.Retry; +using Artifact = Monai.Deploy.Messaging.Common.Artifact; #pragma warning disable CS8602 // Dereference of a possibly null reference. @@ -29,17 +32,19 @@ namespace Monai.Deploy.WorkflowManager.Common.IntegrationTests.Support { public class DataHelper { - public WorkflowRequestMessage WorkflowRequestMessage = new WorkflowRequestMessage(); - public List WorkflowInstances = new List(); - public PatientDetails PatientDetails { get; set; } = new PatientDetails(); - public TaskUpdateEvent TaskUpdateEvent = new TaskUpdateEvent(); - public ExportCompleteEvent ExportCompleteEvent = new ExportCompleteEvent(); - public List TaskDispatchEvents = new List(); - public List ExportRequestEvents = new List(); - public List WorkflowRevisions = new List(); - public List Workflows = new List(); - public List Payload = new List(); + public WorkflowRequestMessage WorkflowRequestMessage = new(); + public ArtifactsReceivedEvent ArtifactsReceivedEvent = new(); + public List WorkflowInstances = new(); + public PatientDetails PatientDetails { get; set; } = new(); + public TaskUpdateEvent TaskUpdateEvent = new(); + public ExportCompleteEvent ExportCompleteEvent = new(); + public List TaskDispatchEvents = new(); + public List ExportRequestEvents = new(); + public List WorkflowRevisions = new(); + public List Workflows = new(); + public List Payload = new(); private RetryPolicy> RetryWorkflowInstances { get; set; } + private RetryPolicy> RetryArtifactReceivedItems { get; set; } private RetryPolicy> RetryTaskDispatches { get; set; } private RetryPolicy> RetryExportRequests { get; set; } private RetryPolicy> RetryPayloadCollections { get; set; } @@ -51,6 +56,8 @@ public class DataHelper public List SeededWorkflowInstances { get; internal set; } public TaskDispatchEvent TaskDispatchEvent { get; set; } public TaskCallbackEvent TaskCallbackEvent { get; set; } + public List ArtifactsReceivedItems { get; set; } = new() { }; + #pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. public DataHelper(RabbitConsumer taskDispatchConsumer, RabbitConsumer exportRequestConsumer, MongoClientUtil mongoClient) @@ -59,6 +66,7 @@ public DataHelper(RabbitConsumer taskDispatchConsumer, RabbitConsumer exportRequ ExportRequestConsumer = exportRequestConsumer; TaskDispatchConsumer = taskDispatchConsumer; MongoClient = mongoClient; + RetryArtifactReceivedItems = Policy>.Handle().WaitAndRetry(retryCount: 20, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500)); RetryWorkflowInstances = Policy>.Handle().WaitAndRetry(retryCount: 20, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500)); RetryTaskDispatches = Policy>.Handle().WaitAndRetry(retryCount: 20, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500)); RetryExportRequests = Policy>.Handle().WaitAndRetry(retryCount: 20, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(500)); @@ -81,22 +89,21 @@ public WorkflowRevision GetWorkflowRevisionTestData(string name) { var workflowRevision = WorkflowRevisionsTestData.TestData.FirstOrDefault(c => c.Name.Equals(name)); - if (workflowRevision != null) + if (workflowRevision == null) { - if (workflowRevision.WorkflowRevision != null) - { - WorkflowRevisions.Add(workflowRevision.WorkflowRevision); - return workflowRevision.WorkflowRevision; - } - else - { - throw new Exception($"Workflow {name} does not have any applicable test data, please check and try again!"); - } + throw new Exception( + $"Workflow {name} does not have any applicable test data, please check and try again!"); } - else + + if (workflowRevision.WorkflowRevision == null) { - throw new Exception($"Workflow {name} does not have any applicable test data, please check and try again!"); + throw new Exception( + $"Workflow {name} does not have any applicable test data, please check and try again!"); } + + WorkflowRevisions.Add(workflowRevision.WorkflowRevision); + return workflowRevision.WorkflowRevision; + } public WorkflowRevision GetWorkflowRevisionTestDataByIndex(int index) @@ -147,23 +154,22 @@ public WorkflowInstance GetWorkflowInstanceTestData(string name) { var workflowInstance = WorkflowInstancesTestData.TestData.FirstOrDefault(c => c.Name.Contains(name)); - if (workflowInstance != null) + if (workflowInstance == null) { - if (workflowInstance.WorkflowInstance != null) - { - WorkflowInstances.Add(workflowInstance.WorkflowInstance); - - return workflowInstance.WorkflowInstance; - } - else - { - throw new Exception($"Workflow Intance {name} does not have any applicable test data, please check and try again!"); - } + throw new Exception( + $"Workflow Intance {name} does not have any applicable test data, please check and try again!"); } - else + + if (workflowInstance.WorkflowInstance == null) { - throw new Exception($"Workflow Intance {name} does not have any applicable test data, please check and try again!"); + throw new Exception( + $"Workflow Intance {name} does not have any applicable test data, please check and try again!"); } + + WorkflowInstances.Add(workflowInstance.WorkflowInstance); + + return workflowInstance.WorkflowInstance; + } public WorkflowInstance GetWorkflowInstanceTestDataByIndex(int index) @@ -210,6 +216,21 @@ public PatientDetails GetPatientDetailsTestData(string name) } } + public ArtifactsReceivedEvent GetArtifactsReceivedEventTestData(string name) + { + var artifactsReceivedEvent = ArtifactsReceivedEventTestData.TestData.FirstOrDefault(c => c != null && c.Value.Name.Equals(name)); + + if (artifactsReceivedEvent?.Event == null) + { + throw new Exception( + $"Workflow request {name} does not have any applicable test data, please check and try again!"); + } + + ArtifactsReceivedEvent = artifactsReceivedEvent.Value.Event; + return artifactsReceivedEvent.Value.Event; + } + + public WorkflowRequestMessage GetWorkflowRequestTestData(string name) { var workflowRequest = WorkflowRequestsTestData.TestData.FirstOrDefault(c => c.Name.Contains(name)); @@ -276,6 +297,23 @@ public ExportCompleteEvent GetExportCompleteTestData(string name) throw new Exception($"Export Complete message not found for {name}"); } + public List GetArtifactsReceivedItemsFromDB(int count, ArtifactsReceivedEvent artifactsReceivedEvent) + { + var res = RetryArtifactReceivedItems.Execute(() => + { + ArtifactsReceivedItems = MongoClient.GetArtifactsReceivedItems(artifactsReceivedEvent); + + if (ArtifactsReceivedItems.FirstOrDefault()?.Artifacts.Count == count) + { + return ArtifactsReceivedItems; + } + + throw new Exception($"{count} RetryArtifactReceivedItems could not be found for Artifact {artifactsReceivedEvent.WorkflowInstanceId}. Actual count is {WorkflowInstances.Count}"); + }); + + return res; + } + public List GetWorkflowInstances(int count, string payloadId) { var res = RetryWorkflowInstances.Execute(() => @@ -505,5 +543,76 @@ public string FormatResponse(string json) var parsedJson = JsonConvert.DeserializeObject(json); return JsonConvert.SerializeObject(parsedJson, Formatting.Indented); } + + public (List ArtifactReceivedItems, WorkflowInstance WorkflowInstance, WorkflowRevision + WorkflowRevision) GetArtifactsEventTestData(string clinicalWorkflowName, string wfiName) + { + + var workflowInstance = GetWorkflowInstanceTestData(wfiName); + var workflow = GetWorkflowRevisionTestData(clinicalWorkflowName); + + var artifacts = ArtifactsEventTestData.TestData.Where(c => c.WorkflowInstanceId == workflowInstance.Id).ToList(); + + if (artifacts == null) + { + throw new Exception( + $"ArtifactsEvent for {wfiName} does not have any applicable test data, please check and try again!"); + } + + return (artifacts, workflowInstance, workflow); + } + } + + public class ArtifactsReceivedEventTestData + { + public static List<(string Name, ArtifactsReceivedEvent Event)?> TestData = new() + { + ( + Name: "Test1", + Event: new ArtifactsReceivedEvent() + { + Workflows = new[] { "C139946F-0FB9-452C-843A-A77F4BAACB8E" }, + Artifacts = new List() + { + new() + { + Type = ArtifactType.AR, + Path = "path", + } + }, + PayloadId = Guid.NewGuid(), + CorrelationId = Guid.NewGuid().ToString(), + WorkflowInstanceId = "d32d5769-4ecf-4639-a048-6ecf2cced04a", + TaskId = "e545de90-c936-40ab-ad11-19ef07f49607", + Bucket = "bucket1", + Timestamp = DateTime.UtcNow, + DataOrigins = { new DataOrigin() { DataService = DataService.DIMSE, ArtifactType = ArtifactType.CT, Destination = "testAe", Source = "testAe" } }, + DataTrigger = new DataOrigin() { DataService = DataService.DIMSE, ArtifactType = ArtifactType.CT, Destination = "testAe", Source = "testAe" }, + FileCount = 1, + } + ), + }; + } + + public class ArtifactsEventTestData + { + public static List TestData = new List() + { + new ArtifactReceivedItems() + { + Id = "e545de90-c936-40ab-ad11-19ef07f49607" + "d32d5769-4ecf-4639-a048-6ecf2cced04a", + WorkflowInstanceId = "d32d5769-4ecf-4639-a048-6ecf2cced04a", + TaskId = "e545de90-c936-40ab-ad11-19ef07f49607", + Received = DateTime.UtcNow, + Artifacts = new List() { + new ArtifactReceivedDetails() + { + Type = ArtifactType.CT, + Received = DateTime.UtcNow, + Path = "path", + } + }, + } + }; } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs index 0cf44901c..8b84ef1e8 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MinioDataSeeding.cs @@ -36,6 +36,12 @@ public MinioDataSeeding(MinioClientUtil minioClient, DataHelper dataHelper, ISpe OutputHelper = outputHelper; } + + public async Task SeedArtifactRepo(string payloadId, string? folderName = null) + { + + } + public async Task SeedWorkflowInputArtifacts(string payloadId, string? folderName = null) { string localPath; @@ -83,6 +89,6 @@ public async Task SeedTaskOutputArtifacts(string payloadId, string workflowInsta private string? GetDirectory() { return Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); - } + } } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MongoClientUtil.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MongoClientUtil.cs index f19c11a63..3ccded927 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MongoClientUtil.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/MongoClientUtil.cs @@ -14,7 +14,9 @@ * limitations under the License. */ +using Monai.Deploy.Messaging.Events; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Monai.Deploy.WorkflowManager.Common.Database.Repositories; using Monai.Deploy.WorkflowManager.Common.IntegrationTests.POCO; using MongoDB.Driver; using Polly; @@ -34,21 +36,34 @@ public class MongoClientUtil private RetryPolicy> RetryPayload { get; set; } private RetryPolicy> RetryExecutionStats { get; set; } private IMongoCollection ExecutionStatsCollection { get; set; } + private IMongoCollection ArtifactsCollection { get; set; } public MongoClientUtil() { Client = new MongoClient(TestExecutionConfig.MongoConfig.ConnectionString); Database = Client.GetDatabase($"{TestExecutionConfig.MongoConfig.Database}"); + WorkflowRevisionCollection = Database.GetCollection($"{TestExecutionConfig.MongoConfig.WorkflowCollection}"); WorkflowInstanceCollection = Database.GetCollection($"{TestExecutionConfig.MongoConfig.WorkflowInstanceCollection}"); PayloadCollection = Database.GetCollection($"{TestExecutionConfig.MongoConfig.PayloadCollection}"); + ArtifactsCollection = Database.GetCollection($"{TestExecutionConfig.MongoConfig.ArtifactsCollection}"); + ExecutionStatsCollection = Database.GetCollection($"{TestExecutionConfig.MongoConfig.ExecutionStatsCollection}"); + RetryMongo = Policy.Handle().WaitAndRetry(retryCount: 10, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(1000)); RetryPayload = Policy>.Handle().WaitAndRetry(retryCount: 10, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(1000)); CreateCollection("dummy"); - ExecutionStatsCollection = Database.GetCollection($"{TestExecutionConfig.MongoConfig.ExecutionStatsCollection}"); RetryExecutionStats = Policy>.Handle().WaitAndRetry(retryCount: 10, sleepDurationProvider: _ => TimeSpan.FromMilliseconds(1000)); } + + //CreateArtifactsEvents + public Task CreateArtifactsEventsDocumentAsync(List artifactReceivedItems) + { + return RetryMongo.Execute(() => + Database.GetCollection($"{TestExecutionConfig.MongoConfig.ArtifactsCollection}") + .InsertManyAsync(artifactReceivedItems)); + } + #region WorkflowRevision public void CreateWorkflowRevisionDocument(WorkflowRevision workflowRevision) @@ -59,6 +74,9 @@ public void CreateWorkflowRevisionDocument(WorkflowRevision workflowRevision) }); } + public Task CreateWorkflowRevisionDocumentAsync(WorkflowRevision workflowRevision) => + RetryMongo.Execute(() => WorkflowRevisionCollection.InsertOneAsync(workflowRevision)); + public void DeleteWorkflowRevisionDocument(string id) { RetryMongo.Execute(() => @@ -115,6 +133,9 @@ public void CreateWorkflowInstanceDocument(WorkflowInstance workflowInstance) }); } + public Task CreateWorkflowInstanceDocumentAsync(WorkflowInstance workflowInstance) => + RetryMongo.Execute(() => WorkflowInstanceCollection.InsertOneAsync(workflowInstance)); + public WorkflowInstance GetWorkflowInstance(string payloadId) { return WorkflowInstanceCollection.Find(x => x.PayloadId == payloadId).FirstOrDefault(); @@ -218,6 +239,21 @@ public void DeleteAllPayloadDocuments() }); } + public void DeleteAllArtifactDocuments() + { + RetryMongo.Execute(() => + { + ArtifactsCollection.DeleteMany("{ }"); + + var artifacts = ArtifactsCollection.Find("{ }").ToList(); + + if (artifacts.Count > 0) + { + throw new Exception("All payloads are not deleted!"); + } + }); + } + #endregion Payload #region ExecutionStats @@ -279,5 +315,13 @@ private void CreateCollection(string collectionName) } }); } + + public List GetArtifactsReceivedItems(ArtifactsReceivedEvent? artifactsReceivedEvent = null) + { + return artifactsReceivedEvent is null + ? ArtifactsCollection.Find(FilterDefinition.Empty).ToList() + : ArtifactsCollection.Find(x => x.WorkflowInstanceId == artifactsReceivedEvent.WorkflowInstanceId) + .ToList(); + } } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs index b1391e7be..cec2021a1 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/Support/RabbitConnectionFactory.cs @@ -67,12 +67,15 @@ public static void DeleteAllQueues() DeleteQueue(TestExecutionConfig.RabbitConfig.TaskUpdateQueue); DeleteQueue(TestExecutionConfig.RabbitConfig.ExportCompleteQueue); DeleteQueue(TestExecutionConfig.RabbitConfig.ExportRequestQueue); + DeleteQueue(TestExecutionConfig.RabbitConfig.ArtifactsRequestQueue); DeleteQueue($"{TestExecutionConfig.RabbitConfig.WorkflowRequestQueue}-dead-letter"); DeleteQueue($"{TestExecutionConfig.RabbitConfig.TaskDispatchQueue}-dead-letter"); DeleteQueue($"{TestExecutionConfig.RabbitConfig.TaskCallbackQueue}-dead-letter"); DeleteQueue($"{TestExecutionConfig.RabbitConfig.TaskUpdateQueue}-dead-letter"); DeleteQueue($"{TestExecutionConfig.RabbitConfig.ExportCompleteQueue}-dead-letter"); DeleteQueue($"{TestExecutionConfig.RabbitConfig.ExportRequestQueue}-dead-letter"); + DeleteQueue($"{TestExecutionConfig.RabbitConfig.ArtifactsRequestQueue}-dead-letter"); + DeleteQueue("-dead-letter"); } public static void PurgeAllQueues() diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs index 4836fd9ec..3fbe55c9f 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowInstanceTestData.cs @@ -2212,6 +2212,46 @@ public static WorkflowInstance CreateWorkflowInstance(string workflowName) } } }, + new WorkflowInstanceTestData() + { + Name = "Workflow_Instance_For_Artifact_ReceivedEvent_1", + WorkflowInstance = new WorkflowInstance() + { + Id = "d32d5769-4ecf-4639-a048-6ecf2cced04a", + AeTitle = "Multi_Req", + WorkflowId = Helper.GetWorkflowByName("Workflow_Revision_For_Artifact_ReceivedEvent_1")?.WorkflowRevision?.WorkflowId ?? "", + PayloadId = "c4c3633b-c1dd-c4c9-8a1a-71adec3d47c3", + BucketId = "bucket1", + StartTime = DateTime.UtcNow, + Status = Status.Created, + InputMetaData = new Dictionary() + { + }, + Tasks = new List + { + new TaskExecution() + { + ExecutionId = Guid.NewGuid().ToString(), + TaskId = "root_task", + OutputDirectory = "payloadId/workflows/workflowInstanceId/executionId/", + TaskType = "router_task", + Status = TaskExecutionStatus.Succeeded, + }, + new TaskExecution() + { + ExecutionId = Guid.NewGuid().ToString(), + WorkflowInstanceId = "d32d5769-4ecf-4639-a048-6ecf2cced04a", + TaskId = "e545de90-c936-40ab-ad11-19ef07f49607", + Status = TaskExecutionStatus.Dispatched, + TaskType = "remote_task", + OutputArtifacts = new Dictionary() + { + { "key1", "value1" } + }, + }, + } + } + } }; } } diff --git a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs index ca7279a09..7b0749d07 100644 --- a/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs +++ b/tests/IntegrationTests/WorkflowExecutor.IntegrationTests/TestData/WorkflowRevisionTestData.cs @@ -14,7 +14,11 @@ * limitations under the License. */ +using Monai.Deploy.Messaging.Common; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; +using Artifact = Monai.Deploy.WorkflowManager.Common.Contracts.Models.Artifact; +// ReSharper disable ArrangeObjectCreationWhenTypeEvident +// ReSharper disable RedundantEmptyObjectCreationArgumentList namespace Monai.Deploy.WorkflowManager.Common.WorkflowExecutor.IntegrationTests.TestData { @@ -3128,6 +3132,70 @@ public static class WorkflowRevisionsTestData } } }, + new WorkflowRevisionTestData() + { + Name = "Workflow_Revision_For_Artifact_ReceivedEvent_1", + WorkflowRevision = new WorkflowRevision() + { + Id = "293C95D6-91AE-4417-95CA-D54FF9E592D6", + WorkflowId = "C139946F-0FB9-452C-843A-A77F4BAACB8E", + Revision = 1, + Workflow = new Workflow() + { + Name = "Basic workflow 1", + Description = "Basic workflow 1", + Version = "1", + Tasks = new TaskObject[] + { + new TaskObject + { + Id = "e545de90-c936-40ab-ad11-19ef07f4960a", + Type = "root_task", + Description = "Basic Workflow 1 Task 1 - root task", + Artifacts = new ArtifactMap(), + }, + new TaskObject + { + Id = "e545de90-c936-40ab-ad11-19ef07f49607", + Type = "remote_task", + Description = "Basic Workflow 1 Task 2 - remote_task", + Artifacts = new ArtifactMap() + { + Output = new OutputArtifact[] + { + new OutputArtifact() + { + Name = "artifact1", + Type = ArtifactType.CT, + Value = "artifactPath1", + Mandatory = true, + }, + new OutputArtifact() + { + Name = "artifact2", + Type = ArtifactType.AR, + Value = "artifactPath2", + Mandatory = true, + }, + } + } + }, + new TaskObject + { + Id = "e545de90-c936-40ab-ad11-19ef07f4960b", + Type = "clinical_review", + Description = "Basic Workflow 1 Task 3 - clinical_review", + Artifacts = new ArtifactMap(), + } + }, + InformaticsGateway = new InformaticsGateway() + { + AeTitle = "AIDE", + DataOrigins = new string[] { "PACS1", "PACS2" } + } + } + } + }, }; } } diff --git a/tests/UnitTests/PayloadListener.Tests/Services/EventPayloadRecieverServiceTests.cs b/tests/UnitTests/PayloadListener.Tests/Services/EventPayloadRecieverServiceTests.cs index 3ee331405..07a21d24b 100644 --- a/tests/UnitTests/PayloadListener.Tests/Services/EventPayloadRecieverServiceTests.cs +++ b/tests/UnitTests/PayloadListener.Tests/Services/EventPayloadRecieverServiceTests.cs @@ -298,6 +298,63 @@ public void ReceiveWorkflowPayload_With_WorkflowId_And_TaskID() _payloadService.Verify(p => p.CreateAsync(It.IsAny()), Times.Never()); } + + [Test] + public void ArtifactReceivedPayload_ValidateWorkFlowRequest() + { + var message = CreateMessageReceivedEventArgs(new string[] { "destination" }); + _eventPayloadReceiverService.ArtifactReceivePayload(message); + + _mockEventPayloadValidator.Verify(p => p.ValidateArtifactReceived(It.IsAny()), Times.Once()); + } + + + [Test] + public void ArtifactReceivedPayload_WorkFlowRequestIsNotValid_MessageSubscriberRejectsTheMessage() + { + var message = CreateMessageReceivedEventArgs(new string[] { "destination" }); + + _mockEventPayloadValidator.Setup(p => p.ValidateArtifactReceived(It.IsAny())).Returns(false); + + _eventPayloadReceiverService.ArtifactReceivePayload(message); + + _mockMessageBrokerSubscriberService.Verify(p => p.Reject(It.IsAny(), false), Times.Once()); + } + + [Test] + public void ArtifactReceivedPayload_WorkFlowRequestIsValid_MessageSubscriberAcknowledgeTheMessage() + { + var message = CreateMessageReceivedEventArgs(new string[] { "destination" }); + + _mockEventPayloadValidator.Setup(p => p.ValidateArtifactReceived(It.IsAny())).Returns(true); + + _workflowExecuterService.Setup(p => p.ProcessArtifactReceivedAsync(It.IsAny())).ReturnsAsync(true); + + _eventPayloadReceiverService.ArtifactReceivePayload(message); + + _mockMessageBrokerSubscriberService.Verify(p => p.Acknowledge(It.IsAny()), Times.Once()); + } + + [Test] + public void ArtifactReceivedPayload_FailsToProcessArtifactReceived_MessageIsRejectedAndRequeued() + { + // Arrange + var message = CreateMessageReceivedEventArgs(new string[] { "destination" }); + + _mockEventPayloadValidator.Setup(p => p.ValidateArtifactReceived(It.IsAny())).Returns(true); + _workflowExecuterService.Setup(p => p.ProcessArtifactReceivedAsync(It.IsAny())).ReturnsAsync(false); + + // Act + _eventPayloadReceiverService.ArtifactReceivePayload(message); + + // Assert + _mockEventPayloadValidator.Verify(x => x.ValidateArtifactReceived(It.IsAny()), Times.Once); + _mockEventPayloadValidator.VerifyNoOtherCalls(); + + _mockMessageBrokerSubscriberService.Verify(p => p.RequeueWithDelay(It.IsAny()), Times.Once()); + _mockMessageBrokerSubscriberService.VerifyNoOtherCalls(); + } + private static MessageReceivedEventArgs CreateMessageReceivedEventArgs(string[] destinations) { var exportRequestMessage = new ExportRequestEvent diff --git a/tests/UnitTests/PayloadListener.Tests/Validators/EventPayloadValidatorTests.cs b/tests/UnitTests/PayloadListener.Tests/Validators/EventPayloadValidatorTests.cs index 127c36d10..00c159356 100644 --- a/tests/UnitTests/PayloadListener.Tests/Validators/EventPayloadValidatorTests.cs +++ b/tests/UnitTests/PayloadListener.Tests/Validators/EventPayloadValidatorTests.cs @@ -17,6 +17,7 @@ using System; using System.Collections.Generic; using Microsoft.Extensions.Logging; +using Monai.Deploy.Messaging.Common; using Monai.Deploy.Messaging.Events; using Monai.Deploy.WorkflowManager.PayloadListener.Validators; using Moq; @@ -237,6 +238,104 @@ public void ValidateExportComplete_ExportCompleteEventIsNull_ThrowsArgumentNullE }); } + [Test] + public void ValidateArtifactsReceived_ArtifactsReceivedEventIsValid_ReturnsTrue() + { + var artifacts = new List + { + new Artifact() + { + Path = "testpath", + Type = ArtifactType.Folder + }, + new Artifact() + { + Path = "testdicompath", + Type = ArtifactType.CT + } + }; + var artifactsReceivedEvent = CreateTestArtifactsReceivedEvent(artifacts); + + var result = _eventPayloadValidator!.ValidateArtifactReceived(artifactsReceivedEvent); + + Assert.IsTrue(result); + } + + [Test] + public void ValidateArtifactsReceived_ArtifactsReceivedEventIsValidWithWorkflows_ReturnsTrue() + { + var artifacts = new List + { + new Artifact() + { + Path = "testpath", + Type = ArtifactType.Folder + }, + new Artifact() + { + Path = "testdicompath", + Type = ArtifactType.CT + } + }; + var workflows = new List { "123", "234", "345", "456" }; + var artifactsReceivedEvent = CreateTestArtifactsReceivedEvent(artifacts, workflows); + + var result = _eventPayloadValidator!.ValidateArtifactReceived(artifactsReceivedEvent); + + Assert.IsTrue(result); + } + + [Test] + public void ValidateArtifactsReceived_ArtifactsReceivedEventIsInvalid_ReturnsFalse() + { + var artifacts = new List + { + new Artifact() + { + Path = "testpath", + Type = ArtifactType.Folder + }, + new Artifact() + { + Path = "testdicompath", + Type = ArtifactType.Unset + } + }; + var artifactsReceivedEvent = CreateTestArtifactsReceivedEvent(artifacts); + + var result = _eventPayloadValidator!.ValidateArtifactReceived(artifactsReceivedEvent); + + Assert.IsFalse(result); + } + + private static ArtifactsReceivedEvent CreateTestArtifactsReceivedEvent(List artifacts, + IEnumerable? workflows = null) + { + var artifactsReceivedEvent = new ArtifactsReceivedEvent + { + DataTrigger = new DataOrigin() + { + Source = "source", + Destination = "destination", + ArtifactType = ArtifactType.CT + }, + Bucket = "Bucket", + PayloadId = Guid.NewGuid(), + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + + WorkflowInstanceId = Guid.NewGuid().ToString(), + TaskId = Guid.NewGuid().ToString(), + Artifacts = artifacts + }; + + if (workflows is not null) + { + artifactsReceivedEvent.Workflows = workflows; + } + return artifactsReceivedEvent; + } + private static WorkflowRequestEvent CreateWorkflowRequestMessageWithNoWorkFlow() { return new WorkflowRequestEvent diff --git a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs index b1426b85c..ea1df2d5f 100644 --- a/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs +++ b/tests/UnitTests/WorkflowExecuter.Tests/Services/WorkflowExecuterServiceTests.cs @@ -25,6 +25,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Monai.Deploy.Messaging.API; +using Monai.Deploy.Messaging.Common; using Monai.Deploy.Messaging.Events; using Monai.Deploy.Storage.API; using Monai.Deploy.Storage.Configuration; @@ -44,6 +45,8 @@ using Monai.Deploy.WorkflowManager.Common.ConditionsResolver.Parser; using Monai.Deploy.WorkflowManager.Common.Contracts.Models; using Monai.Deploy.WorkloadManager.WorkfowExecuter.Extensions; +using Monai.Deploy.WorkflowManager.Common.Database.Repositories; +using Artifact = Monai.Deploy.WorkflowManager.Common.Contracts.Models.Artifact; namespace Monai.Deploy.WorkflowManager.Common.WorkflowExecuter.Tests.Services { @@ -56,6 +59,7 @@ public class WorkflowExecuterServiceTests private readonly Mock> _logger; private readonly Mock _workflowInstanceRepository; private readonly Mock _workflowInstanceService; + private readonly Mock _artifactReceivedRepository; private readonly Mock _messageBrokerPublisherService; private readonly Mock _storageService; private readonly Mock _payloadService; @@ -69,6 +73,7 @@ public class WorkflowExecuterServiceTests public WorkflowExecuterServiceTests() { _workflowRepository = new Mock(); + _artifactReceivedRepository = new Mock(); _artifactMapper = new Mock(); _logger = new Mock>(); _workflowInstanceRepository = new Mock(); @@ -89,7 +94,7 @@ public WorkflowExecuterServiceTests() DicomAgents = new DicomAgentConfiguration { DicomWebAgentName = "monaidicomweb" } }, MigExternalAppPlugins = new List { { "examplePlugin" } }.ToArray() - }) ; + }); _storageConfiguration = Options.Create(new StorageServiceConfiguration() { Settings = new Dictionary { { "bucket", "testbucket" }, { "endpoint", "localhost" }, { "securedConnection", "False" } } }); @@ -113,7 +118,8 @@ public WorkflowExecuterServiceTests() _taskExecutionStatsRepository.Object, _artifactMapper.Object, _storageService.Object, - _payloadService.Object); + _payloadService.Object, + _artifactReceivedRepository.Object); } [Fact] @@ -140,7 +146,8 @@ public void WorkflowExecuterService_Throw_If_No_Config() _taskExecutionStatsRepository.Object, _artifactMapper.Object, _storageService.Object, - _payloadService.Object)); + _payloadService.Object, + _artifactReceivedRepository.Object)); } @@ -167,7 +174,133 @@ public void WorkflowExecuterService_Throw_If_No_Storage_Config() _taskExecutionStatsRepository.Object, _artifactMapper.Object, _storageService.Object, - _payloadService.Object)); + _payloadService.Object, + _artifactReceivedRepository.Object)); + } + + [Fact] + public async Task ProcessArtifactReceived_WhenMessageIsNull_ThrowsArgumentNullException() + { + await Assert.ThrowsAsync(() => WorkflowExecuterService.ProcessArtifactReceivedAsync(null)); + } + + [Fact] + public async Task ProcessArtifactReceived_WhenWorkflowInstanceIdIsNull_ReturnsFalse() + { + var message = new ArtifactsReceivedEvent { }; + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + Assert.False(result); + } + + [Fact] + public async Task ProcessArtifactReceived_WhenTaskIdIsNull_ReturnsFalse() + { + var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123" }; + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + Assert.False(result); + } + + [Fact] + public async Task ProcessArtifactReceived_WhenWorkflowInstanceRepositoryReturnsNull_ReturnsFalse() + { + var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456" }; + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! + .ReturnsAsync((WorkflowInstance)null!); + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + Assert.False(result); + } + + [Fact] + public async Task ProcessArtifactReceived_WhenWorkflowRepositoryReturnsNull_ReturnsFalse() + { + var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456" }; + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! + .ReturnsAsync(new WorkflowInstance { WorkflowId = "789" }); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! + .ReturnsAsync((WorkflowRevision)null!); + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + Assert.False(result); + } + + // ProcessArtifactReceived workflowTemplate.Workflow?.Tasks.FirstOrDefault returns null + [Fact] + public async Task ProcessArtifactReceived_WhenWorkflowTemplateReturnsNull_ReturnsFalse() + { + var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456" }; + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! + .ReturnsAsync(new WorkflowInstance { WorkflowId = "789" }); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! + .ReturnsAsync(new WorkflowRevision { Workflow = new Workflow { Tasks = new [] + { new TaskObject() { Id = "not456" } }} }); + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + Assert.False(result); + } + + [Fact] + public async Task ProcessArtifactReceived_WhenStillHasMissingArtifacts_ReturnsTrue() + { + var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456", + Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT } } }; + var workflowInstance = new WorkflowInstance { WorkflowId = "789", Tasks = new List() + { new TaskExecution() { TaskId = "456" } } }; + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! + .ReturnsAsync(workflowInstance); + var templateArtifacts = new OutputArtifact[] { new OutputArtifact() { Type = ArtifactType.CT }, new OutputArtifact() { Type = ArtifactType.DG } }; + var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } }; + var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new [] { taskTemplate }} }; + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! + .ReturnsAsync(workflowTemplate); + _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) + .ReturnsAsync((List?)null); + + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + Assert.True(result); + } + + [Fact] + public async Task ProcessArtifactReceived_WhenAllArtifactsReceivedArtifactsButTaskExecNotFound_ReturnsFalse() + { + //incoming artifacts + var message = new ArtifactsReceivedEvent { WorkflowInstanceId = "123", TaskId = "456", + Artifacts = new List() { new Messaging.Common.Artifact() { Type = ArtifactType.CT } } }; + var workflowInstance = new WorkflowInstance { WorkflowId = "789", Tasks = new List() + { new TaskExecution() { TaskId = "not456" } } }; + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(message.WorkflowInstanceId))! + .ReturnsAsync(workflowInstance); + //expected artifacts + var templateArtifacts = new OutputArtifact[] + { + new OutputArtifact() { Type = ArtifactType.CT }, + }; + var taskTemplate = new TaskObject() { Id = "456", Artifacts = new ArtifactMap { Output = templateArtifacts } }; + var workflowTemplate = new WorkflowRevision { Workflow = new Workflow { Tasks = new [] { taskTemplate }} }; + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync("789"))! + .ReturnsAsync(workflowTemplate); + + //previously received artifacts + _artifactReceivedRepository.Setup(r => r.GetAllAsync(workflowInstance.WorkflowId, taskTemplate.Id)) + .ReturnsAsync((List?)null); + + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(message); + Assert.False(result); + } + + [Fact] + public async Task ProcessPayload_WhenWorkflowInstanceAndTaskIdHaveAValue_ReturnsFalse() + { + var workflowRequest = new WorkflowRequestEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + WorkflowInstanceId = "123", + TaskId = "345" + }; + + var result = await WorkflowExecuterService.ProcessPayload(workflowRequest, new Payload() { Id = Guid.NewGuid().ToString() }); + + Assert.False(result); } [Fact] @@ -2733,7 +2866,7 @@ public async Task ProcessPayload_WithExternalAppComplete_Pauses() } [Fact] - public async Task ProcessPayload_With_WorkflowInstanceId_Continues() + public async Task ArtifactReceived_With_Happy_Path_Continues() { var workflowInstanceId = Guid.NewGuid().ToString(); var correlationId = Guid.NewGuid().ToString(); @@ -2817,18 +2950,17 @@ public async Task ProcessPayload_With_WorkflowInstanceId_Continues() } }; - var payload = new Payload { PatientDetails = new PatientDetails { } }; - _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstance.Id)).ReturnsAsync(workflowInstance); _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(workflowInstance.Id, It.IsAny>())).ReturnsAsync(true); _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflowInstance.WorkflowId)).ReturnsAsync(workflow); - _payloadService.Setup(p => p.GetByIdAsync(It.IsAny())).ReturnsAsync(payload); - var mess = new WorkflowRequestEvent { WorkflowInstanceId = workflowInstance.Id, TaskId = "coffee" }; + _artifactReceivedRepository.Setup(w => w.GetAllAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new List()); + + var mess = new ArtifactsReceivedEvent { WorkflowInstanceId = workflowInstance.Id, TaskId = "coffee" }; - var response = await WorkflowExecuterService.ProcessPayload(mess, payload); + var response = await WorkflowExecuterService.ProcessArtifactReceivedAsync(mess); _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny()), Times.Exactly(0)); _taskExecutionStatsRepository.Verify(w => w.UpdateExecutionStatsAsync(It.IsAny(), workflowId, TaskExecutionStatus.Succeeded)); @@ -2955,6 +3087,80 @@ public async Task ProcessTaskUpdate_Timout_Sends_Sets_Task_Status() var response = await WorkflowExecuterService.ProcessTaskUpdate(updateEvent); _workflowInstanceRepository.Verify(r => r.UpdateTaskStatusAsync(workflowInstance.Id, "pizza", TaskExecutionStatus.Failed), Times.Once); } + + [Fact] + public async Task ArtifactReceveid_Valid_ReturnesTrue() + { + var taskId = Guid.NewGuid().ToString(); + var workflowId = Guid.NewGuid().ToString(); + var workflowInstanceId = Guid.NewGuid().ToString(); + var artifactEvent = new ArtifactsReceivedEvent + { + Bucket = "testbucket", + DataTrigger = new DataOrigin { Source = "aetitle", Destination = "aetitle" }, + CorrelationId = Guid.NewGuid().ToString(), + Timestamp = DateTime.UtcNow, + WorkflowInstanceId = workflowInstanceId, + TaskId = taskId + }; + + var workflows = new List + { + new WorkflowRevision + { + Id = Guid.NewGuid().ToString(), + WorkflowId = workflowId, + Revision = 1, + Workflow = new Workflow + { + Name = "Workflowname", + Description = "Workflowdesc", + Version = "1", + InformaticsGateway = new InformaticsGateway + { + AeTitle = "aetitle" + }, + Tasks = new TaskObject[] + { + new TaskObject { + Id = taskId, + Type = "type", + Description = "outgoing", + TaskDestinations = new TaskDestination[] { new TaskDestination { Name = "task2" } } + }, + new TaskObject { + Id = "task2", + Type = "type", + Description = "returning", + } + } + } + } + }; + var workflowInstance = new WorkflowInstance + { + Id = workflowInstanceId, + BucketId = "BucketId", + PayloadId = "PayloadId", + WorkflowId = workflowId, + Tasks = new List + { + new TaskExecution{ + TaskId = taskId, + } + } + }; + _workflowInstanceRepository.Setup(w => w.UpdateTaskStatusAsync(It.IsAny(), It.IsAny(), It.IsAny())).ReturnsAsync(true); + _workflowRepository.Setup(w => w.GetByWorkflowIdAsync(workflows[0].WorkflowId)).ReturnsAsync(workflows[0]); + _workflowInstanceRepository.Setup(w => w.GetByWorkflowInstanceIdAsync(workflowInstanceId)).ReturnsAsync(workflowInstance); + _workflowInstanceRepository.Setup(w => w.UpdateTasksAsync(It.IsAny(), It.IsAny>())).ReturnsAsync(true); + _artifactReceivedRepository.Setup(w => w.GetAllAsync(It.IsAny(), It.IsAny())).ReturnsAsync(new List()); + var result = await WorkflowExecuterService.ProcessArtifactReceivedAsync(artifactEvent); + + _messageBrokerPublisherService.Verify(w => w.Publish(_configuration.Value.Messaging.Topics.TaskDispatchRequest, It.IsAny()), Times.Once()); + + Assert.True(result); + } } #pragma warning restore CS8625 // Cannot convert null literal to non-nullable reference type. }