Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ai 230 #894

Merged
merged 26 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7786e8f
changes to add output artifact and validation
lillie-dae Oct 13, 2023
b97b6d5
minor change to using statements
lillie-dae Oct 13, 2023
b7d06ad
remove comment
lillie-dae Oct 13, 2023
7c26b74
Merge branch 'develop' into AI-229-add-output-artifact-and-validation
lillie-dae Oct 13, 2023
1584e56
add header files
lillie-dae Oct 13, 2023
6351295
Merge branch 'AI-229-add-output-artifact-and-validation' of https://g…
lillie-dae Oct 13, 2023
7a2e104
import missing references
lillie-dae Oct 13, 2023
ca2fa88
updated packages
lillie-dae Oct 16, 2023
84d7db0
bump package
lillie-dae Oct 16, 2023
ab793e1
bump package to release version
lillie-dae Oct 16, 2023
d5d1abc
adding lisener for artifact received
neildsouth Oct 16, 2023
ffe1282
updated ProcessArtifactReceived
lillie-dae Oct 16, 2023
7c9554f
small fixups
neildsouth Oct 16, 2023
f7dfdaf
added artifact repo
lillie-dae Oct 16, 2023
541eeef
Merge branch 'AI-230' of https://github.com/Project-MONAI/monai-deplo…
lillie-dae Oct 16, 2023
8f8781d
fix up tests
neildsouth Oct 17, 2023
9c35e22
fix tests
neildsouth Oct 17, 2023
4206b93
merge in dev
lillie-dae Oct 17, 2023
9b5a5a9
added EventPayloadValidatorTests and WorkflowExecuterServiceTests and…
lillie-dae Oct 17, 2023
d0b61b2
reduced code duplication
lillie-dae Oct 17, 2023
4d32c70
adding indexs
neildsouth Oct 18, 2023
a9898e4
minor fixes
lillie-dae Oct 19, 2023
69d7137
minor fixes
lillie-dae Oct 19, 2023
ee61cf9
minor fixes
lillie-dae Oct 19, 2023
5388837
final adjustments
neildsouth Oct 20, 2023
df25fa2
merge in
neildsouth Oct 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Common/Configuration/MessageBrokerConfigurationKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,8 @@ public class MessageBrokerConfigurationKeys

[ConfigurationKeyName("notificationEmailCancelation")]
public string NotificationEmailCancelation { get; set; } = "aide.notification_email.cancellation";

[ConfigurationKeyName("artifactrecieved")]
public string ArtifactRecieved { get; set; } = "md.workflow.artifactrecieved";
}
}
1 change: 1 addition & 0 deletions src/Monai.Deploy.WorkflowManager.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/CodeInspection/Highlighting/ConvertIfStatementToSwitchStatement/AssumeOpenTypeHierarchy/@EntryValue">True</s:Boolean>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=AR/@EntryIndexedValue">AR</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=AS/@EntryIndexedValue">AS</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ASMT/@EntryIndexedValue">ASMT</s:String>
Expand Down
52 changes: 52 additions & 0 deletions src/WorkflowManager/Database/Interfaces/IArtifactsRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Collections.Generic;
using System.Threading.Tasks;
using Artifact = Monai.Deploy.Messaging.Common.Artifact;

namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories
{
public interface IArtifactsRepository
{
/// <summary>
/// Gets All ArtifactsReceivedItems by workflowInstance and taskId.
/// </summary>
/// <param name="workflowInstance"></param>
/// <param name="taskId"></param>
/// <returns></returns>
Task<List<ArtifactReceivedItems>?> GetAllAsync(string workflowInstance, string taskId);

/// <summary>
/// Adds an item to the ArtifactsReceivedItems collection.
/// </summary>
/// <param name="item"></param>
/// <returns></returns>
Task AddItemAsync(ArtifactReceivedItems item);

/// <summary>
/// Adds an item to the ArtifactsReceivedItems collection.
/// </summary>
/// <param name="workflowInstanceId"></param>
/// <param name="taskId"></param>
/// <param name="artifactsOutputs"></param>
/// <returns></returns>
Task AddItemAsync(string workflowInstanceId, string taskId, List<Artifact> artifactsOutputs);

Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
IEnumerable<Artifact> artifactsOutputs);
}
}
152 changes: 152 additions & 0 deletions src/WorkflowManager/Database/Repositories/ArtifactsRepository.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2022 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Monai.Deploy.WorkflowManager.Common.Database.Options;
using MongoDB.Driver;
using Artifact = Monai.Deploy.Messaging.Common.Artifact;

namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories
{
public class ArtifactReceivedDetails : Artifact
{
/// <summary>
/// Gets or Sets LastReceived.
/// </summary>
public DateTime? Received { get; set; } = null;

public static ArtifactReceivedDetails FromArtifact(Artifact artifact) =>
new()
{
Received = DateTime.UtcNow,
Type = artifact.Type,
Path = artifact.Path
};
}

public class ArtifactReceivedItems
{
/// <summary>
/// Gets or Sets the Id.
/// </summary>
public double Id { get; set; }
neildsouth marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Gets or Sets WorkflowInstanceId.
/// </summary>
public string WorkflowInstanceId { get; set; } = string.Empty;

/// <summary>
/// Gets or Sets TaskId.
/// </summary>
public string TaskId { get; set; } = string.Empty;

/// <summary>
/// Gets or Sets Artifacts.
/// </summary>
public List<ArtifactReceivedDetails> Artifacts { get; set; } = new();
}

public class ArtifactsRepository : IArtifactsRepository
{
private readonly ILogger<ArtifactsRepository> _logger;
private readonly IMongoCollection<ArtifactReceivedItems> _artifactReceivedItemsCollection;

public ArtifactsRepository(
IMongoClient client,
IOptions<WorkloadManagerDatabaseSettings> bookStoreDatabaseSettings,
ILogger<ArtifactsRepository> logger)
{
if (client == null)
{
throw new ArgumentNullException(nameof(client));
}

_logger = logger ?? throw new ArgumentNullException(nameof(logger));
var mongoDatabase = client.GetDatabase(bookStoreDatabaseSettings.Value.DatabaseName);
_artifactReceivedItemsCollection = mongoDatabase.GetCollection<ArtifactReceivedItems>("ArtifactReceivedItems");
}

public async Task<List<ArtifactReceivedItems>> GetAllAsync(string workflowInstance, string taskId)
{
var result = await _artifactReceivedItemsCollection.FindAsync(a => a.WorkflowInstanceId == workflowInstance && a.TaskId == taskId).ConfigureAwait(false);
return await result.ToListAsync().ConfigureAwait(false);
}

public Task AddItemAsync(ArtifactReceivedItems item)
{
return _artifactReceivedItemsCollection.InsertOneAsync(item);
}

public Task AddItemAsync(string workflowInstanceId, string taskId, List<Artifact> artifactsOutputs)
{
var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails()
{
Path = a.Path,
Type = a.Type,
Received = DateTime.UtcNow
});

var item = new ArtifactReceivedItems()
{
WorkflowInstanceId = workflowInstanceId,
TaskId = taskId,
Artifacts = artifacts.ToList()
};

return _artifactReceivedItemsCollection.InsertOneAsync(item);
}

public async Task AddOrUpdateItemAsync(string workflowInstanceId, string taskId,
IEnumerable<Artifact> artifactsOutputs)
{
var artifacts = artifactsOutputs.Select(a => new ArtifactReceivedDetails()
{
Path = a.Path,
Type = a.Type,
Received = DateTime.UtcNow
});

var item = new ArtifactReceivedItems()
{
WorkflowInstanceId = workflowInstanceId,
TaskId = taskId,
Artifacts = artifacts.ToList()
};

var result = await _artifactReceivedItemsCollection
.FindAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId).ConfigureAwait(false);
var existing = await result.FirstOrDefaultAsync().ConfigureAwait(false);

if (existing == null)
{
await _artifactReceivedItemsCollection.InsertOneAsync(item).ConfigureAwait(false);
}
else
{
var update = Builders<ArtifactReceivedItems>.Update.Set(a => a.Artifacts, item.Artifacts);
await _artifactReceivedItemsCollection
.UpdateOneAsync(a => a.WorkflowInstanceId == workflowInstanceId && a.TaskId == taskId, update)
.ConfigureAwait(false);
}
}
}
}
8 changes: 4 additions & 4 deletions src/WorkflowManager/Database/Repositories/RepositoryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ namespace Monai.Deploy.WorkflowManager.Common.Database.Repositories
{
public abstract class RepositoryBase
{
public static async Task<long> CountAsync<T>(IMongoCollection<T> collection, FilterDefinition<T>? filter)
=> await collection.CountDocumentsAsync(filter ?? Builders<T>.Filter.Empty);
public static Task<long> CountAsync<T>(IMongoCollection<T> collection, FilterDefinition<T>? filter)
=> collection.CountDocumentsAsync(filter ?? Builders<T>.Filter.Empty);

/// <summary>
/// Get All T that match filters provided.
Expand All @@ -44,7 +44,7 @@ public static async Task<IList<T>> GetAllAsync<T>(IMongoCollection<T> collection
.Skip(skip)
.Limit(limit)
.Sort(sortFunction)
.ToListAsync();
.ToListAsync().ConfigureAwait(false);
}

public static async Task<IList<T>> GetAllAsync<T>(IMongoCollection<T> collection, FilterDefinition<T> filterFunction, SortDefinition<T> sortFunction, int? skip = null, int? limit = null)
Expand All @@ -54,7 +54,7 @@ public static async Task<IList<T>> GetAllAsync<T>(IMongoCollection<T> collection
.Skip(skip)
.Limit(limit)
.Sort(sortFunction)
.ToListAsync();
.ToListAsync().ConfigureAwait(false);
}
}
}
3 changes: 3 additions & 0 deletions src/WorkflowManager/Logging/Log.200000.Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static partial class Log
[LoggerMessage(EventId = 200019, Level = LogLevel.Debug, Message = "Task destination condition for task {taskId} with resolved condition: {resolvedConditional} resolved to false. initial conditional: {conditions}")]
public static partial void TaskDestinationConditionFalse(this ILogger logger, string resolvedConditional, string conditions, string taskId);

[LoggerMessage(EventId = 200020, Level = LogLevel.Warning, Message = "Use new ArtifactReceived Queue for continuation messages.")]
public static partial void DontUseWorkflowReceivedForPayload(this ILogger logger);

// Conditions Resolver
[LoggerMessage(EventId = 210000, Level = LogLevel.Warning, Message = "Failed to parse condition: {condition}. resolvedConditional: {resolvedConditional}")]
public static partial void FailedToParseCondition(this ILogger logger, string resolvedConditional, string condition, Exception ex);
Expand Down
15 changes: 14 additions & 1 deletion src/WorkflowManager/Logging/Log.500000.Messaging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,20 @@ public static partial class Log
[LoggerMessage(EventId = 500016, Level = LogLevel.Debug, Message = "Export complete message received.")]
public static partial void ExportCompleteReceived(this ILogger logger);

[LoggerMessage(EventId = 200017, Level = LogLevel.Debug, Message = "Workflow continuation event so not creating payload.")]
[LoggerMessage(EventId = 500017, Level = LogLevel.Debug, Message = "ArtifactReceived message so not creating payload.")]
public static partial void WorkflowContinuation(this ILogger logger);

[LoggerMessage(EventId = 500018, Level = LogLevel.Debug, Message = "ArtifactReceived message received.")]
public static partial void ArtifactReceivedReceived(this ILogger logger);

[LoggerMessage(EventId = 500019, Level = LogLevel.Error, Message = "ArtifactReceived message {messageId} failed unexpectedly (no workflowId or TaskId ?) and has been requeued.")]
public static partial void ArtifactReceivedRequeuePayloadCreateError(this ILogger logger, string messageId);

[LoggerMessage(EventId = 500020, Level = LogLevel.Error, Message = "ArtifactReveived message {messageId} failed unexpectedly and has been requeued.")]
public static partial void ArtifactReceivedRequeueUnknownError(this ILogger logger, string messageId, Exception ex);

[LoggerMessage(EventId = 500021, Level = LogLevel.Error, Message = "ArtifactReveived message {messageId} is invalid and has been rejected without being requeued.")]
public static partial void ArtifactReceivedRejectValidationError(this ILogger logger, string messageId);

}
}
7 changes: 7 additions & 0 deletions src/WorkflowManager/Logging/Log.700000.Artifact.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,12 @@ public static partial class Log

[LoggerMessage(EventId = 700007, Level = LogLevel.Information, Message = "Task Dispatch resolved successfully output artifacts: PayloadId: {payloadId}, TaskId: {taskId}, WorkflowInstanceId: {workflowInstanceId}, WorkflowRevisionId: {workflowRevisionId}, output artifact object: {pathOutputArtifacts}")]
public static partial void LogGeneralTaskDispatchInformation(this ILogger logger, string payloadId, string taskId, string workflowInstanceId, string workflowRevisionId, string pathOutputArtifacts);

[LoggerMessage(EventId = 700008, Level = LogLevel.Warning, Message = "Unexpected Artifacts output artifacts: TaskId: {taskId}, WorkflowInstanceId: {workflowInstanceId}, output artifact object: {unexpectedArtifacts}")]
public static partial void UnexpectedArtifactsReceived(this ILogger logger, string taskId, string workflowInstanceId, string unexpectedArtifacts);

[LoggerMessage(EventId = 700009, Level = LogLevel.Debug, Message = "Mandatory output artifacts for task {taskId} are missing. waiting for remaining artifacts... {missingArtifacts}")]
public static partial void MandatoryOutputArtifactsMissingForTask(this ILogger logger, string taskId, string missingArtifacts);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

using Ardalis.GuardClauses;
using Monai.Deploy.Messaging.Common;
using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;

Expand All @@ -39,6 +40,39 @@ public static bool IsValid(this WorkflowRequestEvent workflowRequestMessage, out
return valid;
}

public static bool IsValid(this ArtifactsReceivedEvent artifactReceivedMessage, out IList<string> validationErrors)
{
Guard.Against.Null(artifactReceivedMessage, nameof(artifactReceivedMessage));

validationErrors = new List<string>();

var valid = true;

valid &= IsAeTitleValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.DataTrigger.Source, validationErrors);
valid &= IsAeTitleValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.DataTrigger.Destination, validationErrors);
valid &= IsBucketValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.Bucket, validationErrors);
valid &= IsCorrelationIdValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.CorrelationId, validationErrors);
valid &= IsPayloadIdValid(artifactReceivedMessage.GetType().Name, artifactReceivedMessage.PayloadId.ToString(), validationErrors);
valid &= string.IsNullOrEmpty(artifactReceivedMessage.WorkflowInstanceId) is false && string.IsNullOrEmpty(artifactReceivedMessage.TaskId) is false;
valid &= AllArtifactsAreValid(artifactReceivedMessage, validationErrors);
return valid;
}

private static bool AllArtifactsAreValid(this ArtifactsReceivedEvent artifactReceivedMessage, IList<string> validationErrors)
{
Guard.Against.Null(artifactReceivedMessage, nameof(artifactReceivedMessage));

var valid = artifactReceivedMessage.Artifacts.All(a => a.Type != ArtifactType.Unset);

if (valid is false)
{
var unsetArtifacts = string.Join(',', artifactReceivedMessage.Artifacts.Where(a => a.Type == ArtifactType.Unset).Select(a => a.Path));
validationErrors.Add($"The following artifacts are have unset artifact types: {unsetArtifacts}");
}

return valid;
}

public static bool IsInformaticsGatewayNotNull(string source, InformaticsGateway informaticsGateway, IList<string> validationErrors)
{
Guard.Against.NullOrWhiteSpace(source, nameof(source));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,47 @@ public async Task ExportCompletePayload(MessageReceivedEventArgs message)
await _messageSubscriber.RequeueWithDelay(message.Message);
}
}

public async Task ArtifactReceivePayload(MessageReceivedEventArgs message)
{
try
{
var requestEvent = message.Message.ConvertTo<ArtifactsReceivedEvent>();

using var loggingScope = Logger.BeginScope(new LoggingDataDictionary<string, object>
{
["correlationId"] = requestEvent.CorrelationId,
["workflowId"] = requestEvent.Workflows.FirstOrDefault(),
["workflowInstanceId"] = requestEvent.WorkflowInstanceId,
["taskId"] = requestEvent.TaskId
});

Logger.WorkflowContinuation();

var validation = PayloadValidator.ValidateArtifactReceived(requestEvent);

if (!validation)
{
Logger.ArtifactReceivedRejectValidationError(message.Message.MessageId);
_messageSubscriber.Reject(message.Message, false);

return;
}

if (!await WorkflowExecuterService.ProcessArtifactReceivedAsync(requestEvent))
{
Logger.ArtifactReceivedRequeuePayloadCreateError(message.Message.MessageId);
await _messageSubscriber.RequeueWithDelay(message.Message);

return;
}
_messageSubscriber.Acknowledge(message.Message);
}
catch (Exception e)
{
Logger.ArtifactReceivedRequeueUnknownError(message.Message.MessageId, e);
await _messageSubscriber.RequeueWithDelay(message.Message);
}
}
}
}
Loading