Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
aritchie committed Jan 23, 2025
2 parents 2d9c9df + 51cf992 commit 79176ce
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 20 deletions.
12 changes: 5 additions & 7 deletions src/Shiny.Mediator/Middleware/ScheduledCommandMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@

namespace Shiny.Mediator.Middleware;


public class ScheduledCommandMiddleware<TCommand>(
ILogger<ScheduledCommandMiddleware<TCommand>> logger,
ICommandScheduler scheduler
) : ICommandMiddleware<TCommand> where TCommand : IScheduledCommand
{
const string CB_HEADER = "RunNow";


public async Task Process(
CommandContext<TCommand> context,
CommandHandlerDelegate next,
CancellationToken cancellationToken
)
{
if (context.Command.DueAt < DateTimeOffset.UtcNow || context.Values.ContainsKey(CB_HEADER))
if (context.Command.DueAt < DateTimeOffset.UtcNow)
{
logger.LogWarning($"Executing Scheduled Command that was due at {context.Command.DueAt}");
logger.LogWarning($"Executing Scheduled Command '{context.Command}' that was due at {context.Command.DueAt}");
await next().ConfigureAwait(false);
}
else
{
logger.LogInformation($"Command scheduled for {context.Command.DueAt}");
logger.LogInformation($"Command '{context.Command}' scheduled for {context.Command.DueAt}");
await scheduler
.Schedule(CB_HEADER, context.Command, cancellationToken)
.Schedule(context, cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/Shiny.Mediator/Services/ICommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ public interface ICommandScheduler
/// <summary>
/// Schedules and executes command
/// </summary>
/// <param name="sendCallbackHeader">Send this in headers back to mediator for middleware to execute and not reschedule</param>
/// <param name="command">The command to store</param>
/// <param name="context">The context containing the headers and contract</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task<bool> Schedule(
string sendCallbackHeader,
IScheduledCommand command,
CommandContext context,
CancellationToken cancellationToken
);
}
25 changes: 16 additions & 9 deletions src/Shiny.Mediator/Services/Impl/InMemoryCommandScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
namespace Shiny.Mediator.Services.Impl;


record RunStore(IScheduledCommand Command, string RunCallbackHeader);

public class InMemoryCommandScheduler : ICommandScheduler
{
readonly List<RunStore> commands = new();
readonly List<CommandContext> commands = new();
readonly ILogger logger;
readonly IMediator mediator;
readonly Timer timer = new();
Expand All @@ -24,13 +22,16 @@ public InMemoryCommandScheduler(IMediator mediator, ILogger<ICommandScheduler> l
}


public Task<bool> Schedule(string sendCallbackHeader, IScheduledCommand command, CancellationToken cancellationToken)
public Task<bool> Schedule(CommandContext command, CancellationToken cancellationToken)
{
var scheduled = false;
if (command.DueAt != null && command.DueAt < DateTimeOffset.UtcNow)
if (command.Command is not IScheduledCommand scheduledCommand)
throw new InvalidCastException($"Command {command.Command} is not of IScheduledCommand");

if (scheduledCommand.DueAt < DateTimeOffset.UtcNow)
{
lock (this.commands)
this.commands.Add(new(command, sendCallbackHeader));
this.commands.Add(command);

if (!this.timer.Enabled)
this.timer.Start();
Expand All @@ -44,18 +45,24 @@ public Task<bool> Schedule(string sendCallbackHeader, IScheduledCommand command,
protected virtual async void OnTimerElapsed(object? sender, ElapsedEventArgs e)
{
this.timer.Stop();
List<RunStore> items = null!;
List<CommandContext> items = null!;
lock (this.commands)
items = this.commands.ToList();

foreach (var item in items)
{
if (item.Command.DueAt >= DateTimeOffset.UtcNow)
var command = (IScheduledCommand)item.Command;
if (command.DueAt >= DateTimeOffset.UtcNow)
{
var headers = item
.Values
.Select(x => (Key: x.Key, Value: x.Value))
.ToList();

try
{
await this.mediator
.Send(item.Command, CancellationToken.None, (item.RunCallbackHeader, true))
.Send(command, CancellationToken.None, headers)
.ConfigureAwait(false);

}
Expand Down

0 comments on commit 79176ce

Please sign in to comment.