Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ai 376 #943

Merged
merged 6 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Common/Configuration/WorkflowManagerOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class WorkflowManagerOptions : PagedOptions
[ConfigurationKeyName("migExternalAppPlugins")]
public string[] MigExternalAppPlugins { get; set; }

[ConfigurationKeyName("dataRetentionDays")]
public int DataRetentionDays { get; set; }

public WorkflowManagerOptions()
{
Messaging = new MessageBrokerConfiguration();
Expand Down
8 changes: 8 additions & 0 deletions src/WorkflowManager/Common/Interfaces/IPayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,13 @@ Task<IList<PayloadDto>> GetAllAsync(int? skip = null,
/// <param name="payload"></param>
/// <returns></returns>
Task<bool> UpdateWorkflowInstanceIdsAsync(string payloadId, IEnumerable<string> workflowInstances);

/// <summary>
/// Gets the expiry date for a payload.
/// </summary>
/// <param name="now"></param>
/// <param name="workflowInstanceId"></param>
/// <returns>date of expiry or null</returns>
Task<DateTime?> GetExpiry(DateTime now, string? workflowInstanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Common\Configuration\Monai.Deploy.WorkflowManager.Common.Configuration.csproj" />
<ProjectReference Include="..\Contracts\Monai.Deploy.WorkflowManager.Contracts.csproj" />
<ProjectReference Include="..\Database\Monai.Deploy.WorkflowManager.Database.csproj" />
<ProjectReference Include="..\Storage\Monai.Deploy.WorkflowManager.Storage.csproj" />
Expand Down
36 changes: 35 additions & 1 deletion src/WorkflowManager/Common/Services/PayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
using Monai.Deploy.WorkflowManager.Common.Database.Interfaces;
using Monai.Deploy.WorkflowManager.Common.Logging;
using Monai.Deploy.WorkflowManager.Common.Storage.Services;
using Microsoft.Extensions.Options;
using Monai.Deploy.WorkflowManager.Common.Configuration;

namespace Monai.Deploy.WorkflowManager.Common.Miscellaneous.Services
{
Expand All @@ -34,23 +36,31 @@ public class PayloadService : IPayloadService

private readonly IWorkflowInstanceRepository _workflowInstanceRepository;

private readonly IWorkflowRepository _workflowRepository;

private readonly IDicomService _dicomService;

private readonly IStorageService _storageService;

private readonly WorkflowManagerOptions _options;

private readonly ILogger<PayloadService> _logger;

public PayloadService(
IPayloadRepository payloadRepository,
IDicomService dicomService,
IWorkflowInstanceRepository workflowInstanceRepository,
IWorkflowRepository workflowRepository,
IServiceScopeFactory serviceScopeFactory,
IOptions<WorkflowManagerOptions> options,
ILogger<PayloadService> logger)
{
_payloadRepository = payloadRepository ?? throw new ArgumentNullException(nameof(payloadRepository));
_workflowInstanceRepository = workflowInstanceRepository ?? throw new ArgumentNullException(nameof(workflowInstanceRepository));
_dicomService = dicomService ?? throw new ArgumentNullException(nameof(dicomService));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_workflowRepository = workflowRepository ?? throw new ArgumentNullException(nameof(workflowRepository));
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));

var scopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
var scope = scopeFactory.CreateScope();
Expand Down Expand Up @@ -85,7 +95,8 @@ public PayloadService(
DataTrigger = eventPayload.DataTrigger,
Timestamp = eventPayload.Timestamp,
PatientDetails = patientDetails,
PayloadDeleted = PayloadDeleted.No
PayloadDeleted = PayloadDeleted.No,
Expires = await GetExpiry(DateTime.UtcNow, eventPayload.WorkflowInstanceId)
};

if (await _payloadRepository.CreateAsync(payload))
Expand All @@ -106,6 +117,29 @@ public PayloadService(
return null;
}

public async Task<DateTime?> GetExpiry(DateTime now, string? workflowInstanceId)
{
var daysToKeep = await GetWorkflowDataExpiry(workflowInstanceId);
daysToKeep ??= _options.DataRetentionDays;

if (daysToKeep == -1) { return null; }

return now.AddDays(daysToKeep.Value);
}

private async Task<int?> GetWorkflowDataExpiry(string? workflowInstanceId)
{
if (string.IsNullOrWhiteSpace(workflowInstanceId)) { return null; }

var workflowInstance = await _workflowInstanceRepository.GetByWorkflowInstanceIdAsync(workflowInstanceId);

if (workflowInstance is null) { return null; }

var t = await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId);

return (await _workflowRepository.GetByWorkflowIdAsync(workflowInstance.WorkflowId))?.Workflow?.DataRetentionDays ?? null;
}

public async Task<Payload> GetByIdAsync(string payloadId)
{
Guard.Against.NullOrWhiteSpace(payloadId, nameof(payloadId));
Expand Down
42 changes: 42 additions & 0 deletions src/WorkflowManager/Contracts/Migrations/M004_Payload_expires.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Mongo.Migration.Migrations.Document;
using MongoDB.Bson;

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
{
public class M004_Payload_expires : DocumentMigration<Payload>

Check warning on line 22 in src/WorkflowManager/Contracts/Migrations/M004_Payload_expires.cs

View workflow job for this annotation

GitHub Actions / sonarscanner

Rename class 'M004_Payload_expires' to match pascal case naming rules, consider using 'M004Payloadexpires'. (https://rules.sonarsource.com/csharp/RSPEC-101)
{
public M004_Payload_expires() : base("1.0.4") { }

public override void Up(BsonDocument document)
{
document.Add("Expires", BsonNull.Create(null).ToJson(), true); //null = never expires
}

public override void Down(BsonDocument document)
{
try
{
document.Remove("DataTrigger");
}
catch
{ // can ignore we dont want failures stopping startup !
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright 2023 Guy’s and St Thomas’ NHS Foundation Trust
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
using Mongo.Migration.Migrations.Document;
using MongoDB.Bson;


namespace Monai.Deploy.WorkflowManager.Common.Contracts.Migrations
{
public class M004_WorkflowRevision_addDataRetension : DocumentMigration<WorkflowRevision>

Check warning on line 23 in src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs

View workflow job for this annotation

GitHub Actions / sonarscanner

Rename class 'M004_WorkflowRevision_addDataRetension' to match pascal case naming rules, consider using 'M004WorkflowRevisionaddDataRetension'. (https://rules.sonarsource.com/csharp/RSPEC-101)
{
public M004_WorkflowRevision_addDataRetension() : base("1.0.1") { }

public override void Up(BsonDocument document)
{
// document.Add("Workflow.DataRetentionDays", BsonNull.Create(null).ToJson(), true);

Check warning on line 29 in src/WorkflowManager/Contracts/Migrations/M004_WorkflowRevision_addDataRetension.cs

View workflow job for this annotation

GitHub Actions / sonarscanner

Remove this commented out code. (https://rules.sonarsource.com/csharp/RSPEC-125)
var workflow = document["Workflow"].AsBsonDocument;
workflow.Add("DataRetentionDays", BsonNull.Create(null).ToJson(), true);
}

public override void Down(BsonDocument document)
{
try
{
var workflow = document["Workflow"].AsBsonDocument;
workflow.Remove("DataRetentionDays");
}
catch
{ // can ignore we dont want failures stopping startup !
}
}
}
}
8 changes: 6 additions & 2 deletions src/WorkflowManager/Contracts/Models/Payload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models
{
[CollectionLocation("Payloads"), RuntimeVersion("1.0.3")]
[CollectionLocation("Payloads"), RuntimeVersion("1.0.4")]
public class Payload : IDocument
{
[JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))]
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 3);
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 4);

[JsonProperty(PropertyName = "id")]
public string Id { get; set; } = string.Empty;
Expand Down Expand Up @@ -67,6 +67,10 @@ public class Payload : IDocument
public PatientDetails PatientDetails { get; set; } = new PatientDetails();

public DataOrigin DataTrigger { get; set; } = new DataOrigin { DataService = DataService.DIMSE };

[JsonProperty(PropertyName = "expires")]
public DateTime? Expires { get; set; }

}

public enum PayloadDeleted
Expand Down
3 changes: 3 additions & 0 deletions src/WorkflowManager/Contracts/Models/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,8 @@ public class Workflow
[JsonProperty(PropertyName = "tasks")]
public TaskObject[] Tasks { get; set; } = System.Array.Empty<TaskObject>();

[JsonProperty(PropertyName = "dataRetentionDays")]
public int? DataRetentionDays { get; set; } = 3;// note. -1 = never delete

}
}
4 changes: 2 additions & 2 deletions src/WorkflowManager/Contracts/Models/WorkflowRevision.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@

namespace Monai.Deploy.WorkflowManager.Common.Contracts.Models
{
[CollectionLocation("Workflows"), RuntimeVersion("1.0.0")]
[CollectionLocation("Workflows"), RuntimeVersion("1.0.1")]
public class WorkflowRevision : ISoftDeleteable, IDocument
{
[BsonId]
[JsonProperty(PropertyName = "id")]
public string? Id { get; set; }

[JsonConverter(typeof(DocumentVersionConvert)), BsonSerializer(typeof(DocumentVersionConverBson))]
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 0);
public DocumentVersion Version { get; set; } = new DocumentVersion(1, 0, 1);

[JsonProperty(PropertyName = "workflow_id")]
public string WorkflowId { get; set; } = string.Empty;
Expand Down
17 changes: 17 additions & 0 deletions src/WorkflowManager/Database/Interfaces/IPayloadRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;
Expand Down Expand Up @@ -52,11 +53,27 @@ public interface IPayloadRepository
/// <returns>The updated payload.</returns>
Task<bool> UpdateAsync(Payload payload);

/// <summary>
/// Updates a payload in the database.
/// </summary>
/// <param name="payloadId"></param>
/// <param name="workflowInstances"></param>
/// <returns></returns>
Task<bool> UpdateAssociatedWorkflowInstancesAsync(string payloadId, IEnumerable<string> workflowInstances);

/// <summary>
/// Gets all the payloads that might need deleted
/// </summary>
/// <param name="now">the current datetime</param>
/// <returns></returns>
Task<IList<Payload>> GetPayloadsToDelete(DateTime now);

/// <summary>
/// Marks a bunch of payloads as a new deleted state
/// </summary>
/// <param name="Ids">a list of payloadIds to mark in new status</param>
/// <param name="status">the status to mark as</param>
/// <returns></returns>
Task MarkDeletedState(IList<string> Ids, PayloadDeleted status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Monai.Deploy.Messaging.Events;
using Monai.Deploy.WorkflowManager.Common.Contracts.Models;

namespace Monai.Deploy.WorkflowManager.Common.Database.Interfaces
Expand Down
55 changes: 55 additions & 0 deletions src/WorkflowManager/Database/Repositories/PayloadRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Ardalis.GuardClauses;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -47,13 +48,35 @@
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
var mongoDatabase = client.GetDatabase(databaseSettings.Value.DatabaseName);
_payloadCollection = mongoDatabase.GetCollection<Payload>("Payloads");
EnsureIndex().GetAwaiter().GetResult();
}

private async Task EnsureIndex()
{
var indexName = "PayloadDeletedIndex";

var model = new CreateIndexModel<Payload>(
Builders<Payload>.IndexKeys.Ascending(s => s.PayloadDeleted),
new CreateIndexOptions { Name = indexName }
);


var asyncCursor = (await _payloadCollection.Indexes.ListAsync());
var bsonDocuments = (await asyncCursor.ToListAsync());
var indexes = bsonDocuments.Select(_ => _.GetElement("name").Value.ToString()).ToList();

// If index not present create it else skip.
if (!indexes.Any(i => i is not null && i.Equals(indexName)))

Check warning on line 69 in src/WorkflowManager/Database/Repositories/PayloadRepository.cs

View workflow job for this annotation

GitHub Actions / sonarscanner

Collection-specific "Exists" method should be used instead of the "Any" extension. (https://rules.sonarsource.com/csharp/RSPEC-6605)
{
await _payloadCollection.Indexes.CreateOneAsync(model);
}
}

public Task<long> CountAsync() => CountAsync(_payloadCollection, null);

public async Task<bool> CreateAsync(Payload payload)
{
Guard.Against.Null(payload, nameof(payload));

Check warning on line 79 in src/WorkflowManager/Database/Repositories/PayloadRepository.cs

View workflow job for this annotation

GitHub Actions / sonarscanner

Remove this argument from the method call; it hides the caller information. (https://rules.sonarsource.com/csharp/RSPEC-3236)

try
{
Expand Down Expand Up @@ -91,7 +114,7 @@

public async Task<Payload> GetByIdAsync(string payloadId)
{
Guard.Against.NullOrWhiteSpace(payloadId, nameof(payloadId));

Check warning on line 117 in src/WorkflowManager/Database/Repositories/PayloadRepository.cs

View workflow job for this annotation

GitHub Actions / sonarscanner

Remove this argument from the method call; it hides the caller information. (https://rules.sonarsource.com/csharp/RSPEC-3236)

var payload = await _payloadCollection
.Find(x => x.PayloadId == payloadId)
Expand All @@ -102,7 +125,7 @@

public async Task<bool> UpdateAsync(Payload payload)
{
Guard.Against.Null(payload, nameof(payload));

Check warning on line 128 in src/WorkflowManager/Database/Repositories/PayloadRepository.cs

View workflow job for this annotation

GitHub Actions / sonarscanner

Remove this argument from the method call; it hides the caller information. (https://rules.sonarsource.com/csharp/RSPEC-3236)

try
{
Expand Down Expand Up @@ -137,5 +160,37 @@
return false;
}
}

public async Task<IList<Payload>> GetPayloadsToDelete(DateTime now)
{
try
{
var filter = (Builders<Payload>.Filter.Eq(p => p.PayloadDeleted, PayloadDeleted.No) |
Builders<Payload>.Filter.Eq(p => p.PayloadDeleted, PayloadDeleted.Failed)) &
Builders<Payload>.Filter.Lt(p => p.Expires, now);

return await (await _payloadCollection.FindAsync(filter)).ToListAsync();

}
catch (Exception ex)
{
_logger.DbGetPayloadsToDeleteError(ex);
return new List<Payload>();
}
}

public async Task MarkDeletedState(IList<string> Ids, PayloadDeleted status)
{
try
{
var filter = Builders<Payload>.Filter.In(p => p.PayloadId, Ids);
var update = Builders<Payload>.Update.Set(p => p.PayloadDeleted, status);
await _payloadCollection.UpdateManyAsync(filter, update);
}
catch (Exception ex)
{
_logger.DbGetPayloadsToDeleteError(ex);
}
}
}
}
Loading
Loading