From 5440e68bd81a75ccd8b8c67ddd9690c396be1150 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Mon, 9 Oct 2023 23:44:34 +0200 Subject: [PATCH] Event query improvements (#1033) * Schema name. * Fix build. * Query improvements. * Simplifications. * Fix regex. * Build fix. --- backend/src/Migrations/RebuilderExtensions.cs | 25 +++++-- .../Schemas/ReferencesFieldProperties.cs | 2 +- .../Subscriptions/SubscriptionPublisher.cs | 20 ++---- .../Schemas/MongoSchemasHash.cs | 20 +----- .../Apps/AppEventDeleter.cs | 4 +- .../Apps/AppPermanentDeleter.cs | 10 +-- .../Assets/AssetPermanentDeleter.cs | 10 +-- .../Assets/AssetUsageTracker_EventHandling.cs | 20 +----- .../Assets/AssetsFluidExtension.cs | 20 +++--- .../Assets/RebuildFiles.cs | 4 +- .../Assets/RecursiveDeleter.cs | 10 +-- .../Backup/BackupProcessor.cs | 9 +-- .../Comments/DomainObject/CommentsStream.cs | 2 +- .../Contents/ReferencesFluidExtension.cs | 20 +++--- .../Contents/Text/TextIndexingProcess.cs | 20 ++---- .../Invitation/InvitationEventConsumer.cs | 10 +-- .../Rules/Runner/DefaultRuleRunnerService.cs | 5 +- .../Rules/Runner/RuleRunnerProcessor.cs | 4 +- .../EventStoreProjectionClient.cs | 12 ++-- .../EventSourcing/GetEventStore.cs | 32 ++++----- .../GetEventStoreSubscription.cs | 4 +- .../EventSourcing/Utils.cs | 18 +++++ .../EventSourcing/FilterExtensions.cs | 45 ++++++------ .../MongoEventStoreSubscription.cs | 10 +-- .../EventSourcing/MongoEventStore_Reader.cs | 72 ++++++------------- .../EventSourcing/MongoEventStore_Writer.cs | 14 +--- .../Commands/Rebuilder.cs | 4 +- .../EventSourcing/IEventConsumer.cs | 4 +- .../EventSourcing/IEventStore.cs | 28 ++------ .../EventSourcing/PollingSubscription.cs | 2 +- .../EventSourcing/StreamFilter.cs | 35 +++++++-- .../EventSourcing/StreamFilterKind.cs | 14 ++++ .../States/BatchContext.cs | 15 ++-- .../States/Persistence.cs | 4 +- .../SubscriptionPublisherTests.cs | 2 +- .../Apps/AppEventDeleterTests.cs | 4 +- .../Apps/AppPermanentDeleterTests.cs | 4 +- .../Assets/AssetPermanentDeleterTests.cs | 4 +- .../Assets/AssetUsageTrackerTests.cs | 4 +- .../Assets/RecursiveDeleterTests.cs | 4 +- .../Assets/RepairFilesTests.cs | 4 +- .../Contents/Text/TextIndexerTestsBase.cs | 6 ++ .../InvitationEventConsumerTests.cs | 19 +++++ .../Rules/RuleEnqueuerTests.cs | 2 +- .../Consume/EventConsumerProcessorTests.cs | 22 +++--- .../EventSourcing/EventStoreTests.cs | 62 +++++++++------- .../EventSourcing/MongoEventStoreTests.cs | 5 +- .../EventSourcing/MongoParallelInsertTests.cs | 2 +- .../EventSourcing/PollingSubscriptionTests.cs | 2 +- .../EventSourcing/RetrySubscriptionTests.cs | 8 +-- .../EventSourcing/StreamFilterTests.cs | 27 +++++++ .../States/PersistenceBatchTests.cs | 15 ++-- .../States/PersistenceEventSourcingTests.cs | 12 ++-- .../States/PersistenceSnapshotTests.cs | 2 +- 54 files changed, 369 insertions(+), 369 deletions(-) create mode 100644 backend/src/Squidex.Infrastructure/EventSourcing/StreamFilterKind.cs create mode 100644 backend/tests/Squidex.Infrastructure.Tests/EventSourcing/StreamFilterTests.cs diff --git a/backend/src/Migrations/RebuilderExtensions.cs b/backend/src/Migrations/RebuilderExtensions.cs index 1dc8462bd5..73aea32279 100644 --- a/backend/src/Migrations/RebuilderExtensions.cs +++ b/backend/src/Migrations/RebuilderExtensions.cs @@ -11,6 +11,7 @@ using Squidex.Domain.Apps.Entities.Rules.DomainObject; using Squidex.Domain.Apps.Entities.Schemas.DomainObject; using Squidex.Infrastructure.Commands; +using Squidex.Infrastructure.EventSourcing; namespace Migrations; @@ -21,36 +22,48 @@ public static class RebuilderExtensions public static Task RebuildAppsAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^app\\-", batchSize, AllowedErrorRate, ct); + var streamFilter = StreamFilter.Prefix("app-"); + + return rebuilder.RebuildAsync(streamFilter, batchSize, AllowedErrorRate, ct); } public static Task RebuildSchemasAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^schema\\-", batchSize, AllowedErrorRate, ct); + var streamFilter = StreamFilter.Prefix("schema-"); + + return rebuilder.RebuildAsync(streamFilter, batchSize, AllowedErrorRate, ct); } public static Task RebuildRulesAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^rule\\-", batchSize, AllowedErrorRate, ct); + var streamFilter = StreamFilter.Prefix("rule-"); + + return rebuilder.RebuildAsync(streamFilter, batchSize, AllowedErrorRate, ct); } public static Task RebuildAssetsAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^asset\\-", batchSize, AllowedErrorRate, ct); + var streamFilter = StreamFilter.Prefix("asset-"); + + return rebuilder.RebuildAsync(streamFilter, batchSize, AllowedErrorRate, ct); } public static Task RebuildAssetFoldersAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^assetFolder\\-", batchSize, AllowedErrorRate, ct); + var streamFilter = StreamFilter.Prefix("assetFolder-"); + + return rebuilder.RebuildAsync(streamFilter, batchSize, AllowedErrorRate, ct); } public static Task RebuildContentAsync(this Rebuilder rebuilder, int batchSize, CancellationToken ct = default) { - return rebuilder.RebuildAsync("^content\\-", batchSize, AllowedErrorRate, ct); + var streamFilter = StreamFilter.Prefix("content-"); + + return rebuilder.RebuildAsync(streamFilter, batchSize, AllowedErrorRate, ct); } } diff --git a/backend/src/Squidex.Domain.Apps.Core.Model/Schemas/ReferencesFieldProperties.cs b/backend/src/Squidex.Domain.Apps.Core.Model/Schemas/ReferencesFieldProperties.cs index c38fbf9dbb..a35b58cd55 100644 --- a/backend/src/Squidex.Domain.Apps.Core.Model/Schemas/ReferencesFieldProperties.cs +++ b/backend/src/Squidex.Domain.Apps.Core.Model/Schemas/ReferencesFieldProperties.cs @@ -26,7 +26,7 @@ public sealed record ReferencesFieldProperties : FieldProperties public bool MustBePublished { get; init; } - public string? Query { get; init;} + public string? Query { get; init; } public ReferencesFieldEditor Editor { get; init; } diff --git a/backend/src/Squidex.Domain.Apps.Core.Operations/Subscriptions/SubscriptionPublisher.cs b/backend/src/Squidex.Domain.Apps.Core.Operations/Subscriptions/SubscriptionPublisher.cs index fa2db26d59..4bd054ed7e 100644 --- a/backend/src/Squidex.Domain.Apps.Core.Operations/Subscriptions/SubscriptionPublisher.cs +++ b/backend/src/Squidex.Domain.Apps.Core.Operations/Subscriptions/SubscriptionPublisher.cs @@ -16,25 +16,13 @@ public sealed class SubscriptionPublisher : IEventConsumer private readonly ISubscriptionService subscriptionService; private readonly IEnumerable subscriptionEventCreators; - public string Name - { - get => "Subscriptions"; - } + public string Name => "Subscriptions"; - public string EventsFilter - { - get => "^(content-|asset-)"; - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("content-", "asset-"); - public bool StartLatest - { - get => true; - } + public bool StartLatest => true; - public bool CanClear - { - get => false; - } + public bool CanClear => false; public SubscriptionPublisher(ISubscriptionService subscriptionService, IEnumerable subscriptionEventCreators) { diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs index 85fe766d89..e1bc24fbb0 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Schemas/MongoSchemasHash.cs @@ -19,25 +19,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Schemas; public sealed class MongoSchemasHash : MongoRepositoryBase, ISchemasHash, IEventConsumer, IDeleter { - public int BatchSize - { - get => 1000; - } + public int BatchSize => 1000; - public int BatchDelay - { - get => 500; - } + public int BatchDelay => 500; - public string Name - { - get => GetType().Name; - } - - public string EventsFilter - { - get => "^schema-"; - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("schema-"); public MongoSchemasHash(IMongoDatabase database) : base(database) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Apps/AppEventDeleter.cs b/backend/src/Squidex.Domain.Apps.Entities/Apps/AppEventDeleter.cs index 4574191b5e..08ac26e254 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Apps/AppEventDeleter.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Apps/AppEventDeleter.cs @@ -23,6 +23,8 @@ public AppEventDeleter(IEventStore eventStore) public Task DeleteAppAsync(IAppEntity app, CancellationToken ct) { - return eventStore.DeleteAsync($"^([a-zA-Z0-9]+)\\-{app.Id}", ct); + var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)-{app.Id}"); + + return eventStore.DeleteAsync(streamFilter, ct); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Apps/AppPermanentDeleter.cs b/backend/src/Squidex.Domain.Apps.Entities/Apps/AppPermanentDeleter.cs index 8c1857763a..d339b83fbb 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Apps/AppPermanentDeleter.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Apps/AppPermanentDeleter.cs @@ -20,15 +20,7 @@ public sealed class AppPermanentDeleter : IEventConsumer private readonly IDomainObjectFactory factory; private readonly HashSet consumingTypes; - public string Name - { - get => GetType().Name; - } - - public string EventsFilter - { - get => "^app-"; - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("app-"); public AppPermanentDeleter(IEnumerable deleters, IDomainObjectFactory factory, TypeRegistry typeRegistry) { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs index 0ca13dc5b6..85e299c826 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetPermanentDeleter.cs @@ -17,15 +17,7 @@ public sealed class AssetPermanentDeleter : IEventConsumer private readonly IAssetFileStore assetFileStore; private readonly HashSet consumingTypes; - public string Name - { - get => GetType().Name; - } - - public string EventsFilter - { - get => "^asset-"; - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("asset-"); public AssetPermanentDeleter(IAssetFileStore assetFileStore, TypeRegistry typeRegistry) { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs index b7251970d6..dd9af405b8 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetUsageTracker_EventHandling.cs @@ -21,25 +21,11 @@ public partial class AssetUsageTracker : IEventConsumer { private IMemoryCache memoryCache; - public int BatchSize - { - get => 1000; - } + public int BatchSize => 1000; - public int BatchDelay - { - get => 1000; - } + public int BatchDelay => 1000; - public string Name - { - get => GetType().Name; - } - - public string EventsFilter - { - get => "^asset-"; - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("asset-"); private void ClearCache() { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs index 4bdbb74f53..07f964a1a9 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/AssetsFluidExtension.cs @@ -39,19 +39,21 @@ public void RegisterLanguageExtensions(CustomFluidParser parser, TemplateOptions private async ValueTask ResolveAsset(ValueTuple arguments, TextWriter writer, TextEncoder encoder, TemplateContext context) { - if (context.GetValue("event")?.ToObjectValue() is EnrichedEvent enrichedEvent) + if (context.GetValue("event")?.ToObjectValue() is not EnrichedEvent enrichedEvent) { - var (nameArg, idArg) = arguments; + return Completion.Normal; + } - var assetId = await idArg.EvaluateAsync(context); - var asset = await ResolveAssetAsync(serviceProvider, enrichedEvent.AppId.Id, assetId); + var (nameArg, idArg) = arguments; - if (asset != null) - { - var name = (await nameArg.EvaluateAsync(context)).ToStringValue(); + var assetId = await idArg.EvaluateAsync(context); + var asset = await ResolveAssetAsync(serviceProvider, enrichedEvent.AppId.Id, assetId); - context.SetValue(name, asset); - } + if (asset != null) + { + var name = (await nameArg.EvaluateAsync(context)).ToStringValue(); + + context.SetValue(name, asset); } return Completion.Normal; diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/RebuildFiles.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/RebuildFiles.cs index 91a8c128cd..b9c248abcf 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/RebuildFiles.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/RebuildFiles.cs @@ -33,7 +33,9 @@ public RebuildFiles( public async Task RepairAsync( CancellationToken ct = default) { - await foreach (var storedEvent in eventStore.QueryAllAsync("^asset\\-", ct: ct)) + var streamFilter = StreamFilter.Prefix("asset-"); + + await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, ct: ct)) { var @event = eventFormatter.ParseIfKnown(storedEvent); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs index 1ec6fa1555..9d198968f6 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/RecursiveDeleter.cs @@ -23,15 +23,7 @@ public sealed class RecursiveDeleter : IEventConsumer private readonly ILogger log; private readonly HashSet consumingTypes; - public string Name - { - get => GetType().Name; - } - - public string EventsFilter - { - get => "^assetFolder-"; - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("assetFolder-"); public RecursiveDeleter( ICommandBus commandBus, diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupProcessor.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupProcessor.cs index 64b9e66a37..ef1c359fa7 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupProcessor.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/BackupProcessor.cs @@ -147,7 +147,9 @@ private async Task ProcessAsync(Run run, var backupUsers = new UserMapping(run.Actor); var backupContext = new BackupContext(appId, backupUsers, writer); - await foreach (var storedEvent in eventStore.QueryAllAsync(GetFilter(), ct: ct)) + var streamFilter = StreamFilter.Prefix($"[^\\-]*-{appId}"); + + await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, ct: ct)) { var @event = eventFormatter.Parse(storedEvent); @@ -200,11 +202,6 @@ private async Task ProcessAsync(Run run, } } - private string GetFilter() - { - return $"^[^\\-]*-{Regex.Escape(appId.ToString())}"; - } - public Task DeleteAsync(DomainId id) { return scheduler.ScheduleAsync(async _ => diff --git a/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsStream.cs b/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsStream.cs index a40721ac27..1b087e4704 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsStream.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Comments/DomainObject/CommentsStream.cs @@ -42,7 +42,7 @@ public CommentsStream( public virtual async Task LoadAsync( CancellationToken ct) { - var storedEvents = await eventStore.QueryReverseAsync(streamName, 100, ct); + var storedEvents = await eventStore.QueryStreamReverseAsync(streamName, 100, ct); foreach (var @event in storedEvents) { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs index fcd5d3b6d3..8695544abe 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/ReferencesFluidExtension.cs @@ -39,19 +39,21 @@ public void RegisterLanguageExtensions(CustomFluidParser parser, TemplateOptions private async ValueTask ResolveReference(ValueTuple arguments, TextWriter writer, TextEncoder encoder, TemplateContext context) { - if (context.GetValue("event")?.ToObjectValue() is EnrichedEvent enrichedEvent) + if (context.GetValue("event")?.ToObjectValue() is not EnrichedEvent enrichedEvent) { - var (nameArg, idArg) = arguments; + return Completion.Normal; + } - var contentId = await idArg.EvaluateAsync(context); - var content = await ResolveContentAsync(serviceProvider, enrichedEvent.AppId.Id, contentId); + var (nameArg, idArg) = arguments; - if (content != null) - { - var name = (await nameArg.EvaluateAsync(context)).ToStringValue(); + var contentId = await idArg.EvaluateAsync(context); + var content = await ResolveContentAsync(serviceProvider, enrichedEvent.AppId.Id, contentId); - context.SetValue(name, content); - } + if (content != null) + { + var name = (await nameArg.EvaluateAsync(context)).ToStringValue(); + + context.SetValue(name, content); } return Completion.Normal; diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/Text/TextIndexingProcess.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/Text/TextIndexingProcess.cs index 754f68d348..adf5aaf195 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/Text/TextIndexingProcess.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/Text/TextIndexingProcess.cs @@ -21,25 +21,13 @@ public sealed class TextIndexingProcess : IEventConsumer private readonly ITextIndex textIndex; private readonly ITextIndexerState textIndexerState; - public int BatchSize - { - get => 1000; - } + public int BatchSize => 1000; - public int BatchDelay - { - get => 1000; - } + public int BatchDelay => 1000; - public string Name - { - get => "TextIndexer5"; - } + public string Name => "TextIndexer5"; - public string EventsFilter - { - get => "^content-"; - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("content-"); public ITextIndex TextIndex { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Invitation/InvitationEventConsumer.cs b/backend/src/Squidex.Domain.Apps.Entities/Invitation/InvitationEventConsumer.cs index faaa7b4064..811cc6f07a 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Invitation/InvitationEventConsumer.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Invitation/InvitationEventConsumer.cs @@ -24,15 +24,9 @@ public sealed class InvitationEventConsumer : IEventConsumer private readonly IAppProvider appProvider; private readonly ILogger log; - public string Name - { - get => "NotificationEmailSender"; - } + public string Name => "NotificationEmailSender"; - public string EventsFilter - { - get { return "^app-|^app-"; } - } + public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("app-"); public InvitationEventConsumer( IAppProvider appProvider, diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs index efd5edefd9..4c8e8dbb47 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/DefaultRuleRunnerService.cs @@ -65,9 +65,10 @@ public async Task> SimulateAsync(NamedId appI var simulatedEvents = new List(MaxSimulatedEvents); - var fromNow = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(7)); + var streamStart = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(7)); + var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)-{appId.Id}"); - await foreach (var storedEvent in eventStore.QueryAllReverseAsync($"^([a-zA-Z0-9]+)\\-{appId.Id}", fromNow, MaxSimulatedEvents, ct)) + await foreach (var storedEvent in eventStore.QueryAllReverseAsync(streamFilter, streamStart, MaxSimulatedEvents, ct)) { var @event = eventFormatter.ParseIfKnown(storedEvent); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerProcessor.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerProcessor.cs index 7947b4d1ea..a9d2e5745c 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerProcessor.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/Runner/RuleRunnerProcessor.cs @@ -254,9 +254,9 @@ private async Task EnqueueFromEventsAsync(Run run, await using var batch = new RuleQueueWriter(ruleEventRepository, ruleUsageTracker, null); // Use a prefix query so that the storage can use an index for the query. - var filter = $"^([a-z]+)\\-{appId}"; + var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)\\-{appId}"); - await foreach (var storedEvent in eventStore.QueryAllAsync(filter, run.Job.Position, ct: ct)) + await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, run.Job.Position, ct: ct)) { var @event = eventFormatter.ParseIfKnown(storedEvent); diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs index f40798b0ea..7dd7239496 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/EventStoreProjectionClient.cs @@ -29,22 +29,22 @@ private string CreateFilterProjectionName(string filter) return $"by-{projectionPrefix.Slugify()}-{filter.Slugify()}"; } - public async Task CreateProjectionAsync(string? streamFilter = null) + public async Task CreateProjectionAsync(StreamFilter filter) { - if (!string.IsNullOrWhiteSpace(streamFilter) && streamFilter[0] != '^') + if (filter.Kind == StreamFilterKind.MatchFull && filter.Prefixes?.Count == 1) { - return $"{projectionPrefix}-{streamFilter}"; + return $"{projectionPrefix}-{filter.Prefixes[0]}"; } - streamFilter ??= ".*"; + var regex = filter.ToRegex(); - var name = CreateFilterProjectionName(streamFilter); + var name = CreateFilterProjectionName(regex); var query = $@"fromAll() .when({{ $any: function (s, e) {{ - if (e.streamId.indexOf('{projectionPrefix}') === 0 && /{streamFilter}/.test(e.streamId.substring({projectionPrefix.Length + 1}))) {{ + if (e.streamId.indexOf('{projectionPrefix}') === 0 && /{regex}/.test(e.streamId.substring({projectionPrefix.Length + 1}))) {{ linkTo('{name}', e); }} }} diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index 5e90f56a32..0ae89afa57 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -46,14 +46,14 @@ public async Task InitializeAsync( } } - public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, StreamFilter filter, string? position = null) { - Guard.NotNull(streamFilter); + Guard.NotNull(filter); - return new GetEventStoreSubscription(subscriber, client, projectionClient, serializer, position, StreamPrefix, streamFilter); + return new GetEventStoreSubscription(subscriber, client, projectionClient, serializer, position, StreamPrefix, filter); } - public async IAsyncEnumerable QueryAllAsync(string? streamFilter = null, string? position = null, int take = int.MaxValue, + public async IAsyncEnumerable QueryAllAsync(StreamFilter filter, string? position = null, int take = int.MaxValue, [EnumeratorCancellation] CancellationToken ct = default) { if (take <= 0) @@ -61,7 +61,7 @@ public async IAsyncEnumerable QueryAllAsync(string? streamFilter = yield break; } - var streamName = await projectionClient.CreateProjectionAsync(streamFilter); + var streamName = await projectionClient.CreateProjectionAsync(filter); var stream = QueryAsync(streamName, position.ToPosition(false), take, ct); @@ -71,7 +71,7 @@ public async IAsyncEnumerable QueryAllAsync(string? streamFilter = } } - public async IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, int take = int.MaxValue, + public async IAsyncEnumerable QueryAllReverseAsync(StreamFilter filter, Instant timestamp = default, int take = int.MaxValue, [EnumeratorCancellation] CancellationToken ct = default) { if (take <= 0) @@ -79,7 +79,7 @@ public async IAsyncEnumerable QueryAllReverseAsync(string? streamFi yield break; } - var streamName = await projectionClient.CreateProjectionAsync(streamFilter); + var streamName = await projectionClient.CreateProjectionAsync(filter); var stream = QueryReverseAsync(streamName, StreamPosition.End, take, ct); @@ -89,11 +89,9 @@ public async IAsyncEnumerable QueryAllReverseAsync(string? streamFi } } - public async Task> QueryReverseAsync(string streamName, int count = int.MaxValue, + public async Task> QueryStreamReverseAsync(string streamName, int count = int.MaxValue, CancellationToken ct = default) { - Guard.NotNullOrEmpty(streamName); - if (count <= 0) { return EmptyEvents; @@ -103,7 +101,7 @@ public async Task> QueryReverseAsync(string streamNam { var result = new List(); - var stream = QueryReverseAsync(GetStreamName(streamName), StreamPosition.End, count, ct); + var stream = QueryReverseAsync(streamName, StreamPosition.End, count, ct); await foreach (var storedEvent in stream.IgnoreNotFound(ct)) { @@ -114,16 +112,14 @@ public async Task> QueryReverseAsync(string streamNam } } - public async Task> QueryAsync(string streamName, long afterStreamPosition = EtagVersion.Empty, + public async Task> QueryStreamAsync(string streamName, long afterStreamPosition = EtagVersion.Empty, CancellationToken ct = default) { - Guard.NotNullOrEmpty(streamName); - using (Telemetry.Activities.StartActivity("GetEventStore/QueryAsync")) { var result = new List(); - var stream = QueryAsync(GetStreamName(streamName), afterStreamPosition.ToPositionBefore(), int.MaxValue, ct); + var stream = QueryAsync(streamName, afterStreamPosition.ToPositionBefore(), int.MaxValue, ct); await foreach (var storedEvent in stream.IgnoreNotFound(ct)) { @@ -156,7 +152,7 @@ private IAsyncEnumerable QueryReverseAsync(string streamName, Strea streamName, start, count, - resolveLinkTos: true, + true, cancellationToken: ct); return result.Select(x => Formatter.Read(x, StreamPrefix, serializer)); @@ -210,10 +206,10 @@ public async Task AppendAsync(Guid commitId, string streamName, long expectedVer } } - public async Task DeleteAsync(string streamFilter, + public async Task DeleteAsync(StreamFilter filter, CancellationToken ct = default) { - var streamName = await projectionClient.CreateProjectionAsync(streamFilter); + var streamName = await projectionClient.CreateProjectionAsync(filter); var events = client.ReadStreamAsync(Direction.Forwards, streamName, StreamPosition.Start, resolveLinkTos: true, cancellationToken: ct); diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index e5d511f581..432abadff1 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -23,14 +23,14 @@ public GetEventStoreSubscription( IJsonSerializer serializer, string? position, string? prefix, - string? streamFilter) + StreamFilter filter) { #pragma warning disable MA0134 // Observe result of async calls Task.Run(async () => { var ct = cts.Token; - var streamName = await projectionClient.CreateProjectionAsync(streamFilter); + var streamName = await projectionClient.CreateProjectionAsync(filter); async Task OnEvent(StreamSubscription subscription, ResolvedEvent @event, CancellationToken ct) diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs index f2e233b28e..d9ee49ad6b 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/Utils.cs @@ -7,6 +7,7 @@ using System.Globalization; using System.Runtime.CompilerServices; +using System.Text.RegularExpressions; using EventStore.Client; namespace Squidex.Infrastructure.EventSourcing; @@ -77,4 +78,21 @@ public static async IAsyncEnumerable IgnoreNotFound(this IAsyncEnum yield return enumerator.Current; } } + + public static string ToRegex(this StreamFilter filter) + { + if (filter.Prefixes == null) + { + return ".*"; + } + + if (filter.Kind == StreamFilterKind.MatchStart) + { + return $"^{string.Join('|', filter.Prefixes.Select(p => $"({p})"))}"; + } + else + { + return $"^{string.Join('|', filter.Prefixes.Select(p => $"({p})"))}$"; + } + } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/FilterExtensions.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/FilterExtensions.cs index 3e9ec4f836..fe75f1ff77 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/FilterExtensions.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/FilterExtensions.cs @@ -5,6 +5,7 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System.Text.RegularExpressions; using MongoDB.Driver; namespace Squidex.Infrastructure.EventSourcing; @@ -13,53 +14,57 @@ internal static class FilterExtensions { public static FilterDefinition ByOffset(long streamPosition) { - return Builders.Filter.Gte(x => x.EventStreamOffset, streamPosition); + var builder = Builders.Filter; + + return builder.Gte(x => x.EventStreamOffset, streamPosition); } public static FilterDefinition ByPosition(StreamPosition streamPosition) { + var builder = Builders.Filter; + if (streamPosition.IsEndOfCommit) { - return Builders.Filter.Gt(x => x.Timestamp, streamPosition.Timestamp); + return builder.Gt(x => x.Timestamp, streamPosition.Timestamp); } else { - return Builders.Filter.Gte(x => x.Timestamp, streamPosition.Timestamp); + return builder.Gte(x => x.Timestamp, streamPosition.Timestamp); } } - public static FilterDefinition? ByStream(string? streamFilter) + public static FilterDefinition ByStream(StreamFilter filter) { - if (StreamFilter.IsAll(streamFilter)) - { - return Builders.Filter.Exists(x => x.EventStream, true); - } + var builder = Builders.Filter; - if (streamFilter.Contains('^', StringComparison.Ordinal)) + if (filter.Prefixes == null) { - return Builders.Filter.Regex(x => x.EventStream, streamFilter); + return builder.Exists(x => x.EventStream, true); } - else + + if (filter.Kind == StreamFilterKind.MatchStart) { - return Builders.Filter.Eq(x => x.EventStream, streamFilter); + return builder.Or(filter.Prefixes.Select(p => builder.Regex(x => x.EventStream, $"^{p}"))); } + + return builder.In(x => x.EventStream, filter.Prefixes); } - public static FilterDefinition>? ByChangeInStream(string? streamFilter) + public static FilterDefinition>? ByChangeInStream(StreamFilter filter) { - if (StreamFilter.IsAll(streamFilter)) + var builder = Builders>.Filter; + + if (filter.Prefixes == null) { return null; } - if (streamFilter.Contains('^', StringComparison.Ordinal)) - { - return Builders>.Filter.Regex(x => x.FullDocument.EventStream, streamFilter); - } - else + if (filter.Kind == StreamFilterKind.MatchStart) { - return Builders>.Filter.Eq(x => x.FullDocument.EventStream, streamFilter); + return builder.Or(filter.Prefixes.Select(p => builder.Regex(x => x.FullDocument.EventStream, $"^{Regex.Escape(p)}"))); } + + return builder.In(x => x.FullDocument.EventStream, filter.Prefixes); } public static IEnumerable Filtered(this MongoEventCommit commit, StreamPosition position) diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs index 7239513c72..2e6177045e 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs @@ -18,7 +18,7 @@ public sealed class MongoEventStoreSubscription : IEventSubscription private readonly IEventSubscriber eventSubscriber; private readonly CancellationTokenSource stopToken = new CancellationTokenSource(); - public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber eventSubscriber, string? streamFilter, string? position) + public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber eventSubscriber, StreamFilter streamFilter, string? position) { this.eventStore = eventStore; this.eventSubscriber = eventSubscriber; @@ -26,7 +26,7 @@ public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber< QueryAsync(streamFilter, position).Forget(); } - private async Task QueryAsync(string? streamFilter, string? position) + private async Task QueryAsync(StreamFilter streamFilter, string? position) { try { @@ -51,7 +51,7 @@ private async Task QueryAsync(string? streamFilter, string? position) } } - private async Task QueryCurrentAsync(string? streamFilter, StreamPosition lastPosition) + private async Task QueryCurrentAsync(StreamFilter streamFilter, StreamPosition lastPosition) { BsonDocument? resumeToken = null; @@ -103,7 +103,7 @@ await cursor.ForEachAsync(async change => } } - private async Task QueryOldAsync(string? streamFilter, string? position) + private async Task QueryOldAsync(StreamFilter streamFilter, string? position) { string? lastRawPosition = null; @@ -134,7 +134,7 @@ await cursor.ForEachAsync(async change => return lastRawPosition; } - private static PipelineDefinition, ChangeStreamDocument>? Match(string? streamFilter) + private static PipelineDefinition, ChangeStreamDocument>? Match(StreamFilter streamFilter) { var result = new EmptyPipelineDefinition>(); diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index adaa68d6ef..7345bc320b 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -20,25 +20,23 @@ public partial class MongoEventStore : MongoRepositoryBase, IE { private static readonly List EmptyEvents = new List(); - public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, StreamFilter filter, string? position = null) { Guard.NotNull(subscriber); if (CanUseChangeStreams) { - return new MongoEventStoreSubscription(this, subscriber, streamFilter, position); + return new MongoEventStoreSubscription(this, subscriber, filter, position); } else { - return new PollingSubscription(this, subscriber, streamFilter, position); + return new PollingSubscription(this, subscriber, filter, position); } } - public async Task> QueryReverseAsync(string streamName, int count = int.MaxValue, + public async Task> QueryStreamReverseAsync(string streamName, int count = int.MaxValue, CancellationToken ct = default) { - Guard.NotNullOrEmpty(streamName); - if (count <= 0) { return EmptyEvents; @@ -46,7 +44,7 @@ public async Task> QueryReverseAsync(string streamNam using (Telemetry.Activities.StartActivity("MongoEventStore/QueryLatestAsync")) { - var filter = Filter.Eq(x => x.EventStream, streamName); + var filter = FilterExtensions.ByStream(StreamFilter.Name(streamName)); var commits = await Collection.Find(filter).Sort(Sort.Descending(x => x.Timestamp)).Limit(count) @@ -58,33 +56,26 @@ await Collection.Find(filter).Sort(Sort.Descending(x => x.Timestamp)).Limit(coun } } - public async Task> QueryAsync(string streamName, long afterStreamPosition = EtagVersion.Empty, + public async Task> QueryStreamAsync(string streamName, long afterStreamPosition = EtagVersion.Empty, CancellationToken ct = default) { - Guard.NotNullOrEmpty(streamName); - using (Telemetry.Activities.StartActivity("MongoEventStore/QueryAsync")) { - var filter = - Filter.And( - Filter.Eq(x => x.EventStream, streamName), - Filter.Gte(x => x.EventStreamOffset, afterStreamPosition)); - var commits = - await Collection.Find(filter) + await Collection.Find(CreateFilter(StreamFilter.Name(streamName), afterStreamPosition)) .ToListAsync(ct); var result = Convert(commits, afterStreamPosition); if ((commits.Count == 0 || commits[0].EventStreamOffset != afterStreamPosition) && afterStreamPosition > EtagVersion.Empty) { - filter = + var filterBefore = Filter.And( - Filter.Eq(x => x.EventStream, streamName), + FilterExtensions.ByStream(StreamFilter.Name(streamName)), Filter.Lt(x => x.EventStreamOffset, afterStreamPosition)); commits = - await Collection.Find(filter).SortByDescending(x => x.EventStreamOffset).Limit(1) + await Collection.Find(filterBefore).SortByDescending(x => x.EventStreamOffset).Limit(1) .ToListAsync(ct); result = Convert(commits, afterStreamPosition).Concat(result).ToList(); @@ -94,26 +85,7 @@ await Collection.Find(filter).SortByDescending(x => x.EventStreamOffset).Limit(1 } } - public async Task>> QueryManyAsync(IEnumerable streamNames, - CancellationToken ct = default) - { - Guard.NotNull(streamNames); - - using (Telemetry.Activities.StartActivity("MongoEventStore/QueryManyAsync")) - { - var filter = Filter.In(x => x.EventStream, streamNames); - - var commits = - await Collection.Find(filter) - .ToListAsync(ct); - - var result = commits.GroupBy(x => x.EventStream).ToDictionary(x => x.Key, Convert); - - return result; - } - } - - public async IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, int take = int.MaxValue, + public async IAsyncEnumerable QueryAllReverseAsync(StreamFilter filter, Instant timestamp = default, int take = int.MaxValue, [EnumeratorCancellation] CancellationToken ct = default) { if (take <= 0) @@ -123,10 +95,8 @@ public async IAsyncEnumerable QueryAllReverseAsync(string? streamFi StreamPosition lastPosition = timestamp; - var filterDefinition = CreateFilter(streamFilter, lastPosition); - var find = - Collection.Find(filterDefinition, Batching.Options) + Collection.Find(CreateFilter(filter, lastPosition), Batching.Options) .Limit(take).Sort(Sort.Descending(x => x.Timestamp).Ascending(x => x.EventStream)); var taken = 0; @@ -158,12 +128,12 @@ public async IAsyncEnumerable QueryAllReverseAsync(string? streamFi } } - public async IAsyncEnumerable QueryAllAsync(string? streamFilter = null, string? position = null, int take = int.MaxValue, + public async IAsyncEnumerable QueryAllAsync(StreamFilter filter, string? position = null, int take = int.MaxValue, [EnumeratorCancellation] CancellationToken ct = default) { StreamPosition lastPosition = position; - var filterDefinition = CreateFilter(streamFilter, lastPosition); + var filterDefinition = CreateFilter(filter, lastPosition); var find = Collection.Find(filterDefinition).SortBy(x => x.Timestamp).ThenByDescending(x => x.EventStream) @@ -197,15 +167,13 @@ private static IReadOnlyList Convert(IEnumerable return commits.OrderBy(x => x.EventStreamOffset).ThenBy(x => x.Timestamp).SelectMany(x => x.Filtered(streamPosition)).ToList(); } - private static FilterDefinition CreateFilter(string? streamFilter, StreamPosition streamPosition) + private static FilterDefinition CreateFilter(StreamFilter filter, StreamPosition streamPosition) { - var filter = FilterExtensions.ByPosition(streamPosition); - - if (streamFilter != null) - { - return Filter.And(filter, FilterExtensions.ByStream(streamFilter)); - } + return Filter.And(FilterExtensions.ByPosition(streamPosition), FilterExtensions.ByStream(filter)); + } - return filter; + private static FilterDefinition CreateFilter(StreamFilter filter, long streamPosition) + { + return Filter.And(FilterExtensions.ByStream(filter), FilterExtensions.ByOffset(streamPosition)); } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index f9a4901144..cfc17fe194 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -17,20 +17,12 @@ public partial class MongoEventStore private const int MaxWriteAttempts = 20; private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); - public Task DeleteStreamAsync(string streamName, + public Task DeleteAsync(StreamFilter filter, CancellationToken ct = default) { - Guard.NotNullOrEmpty(streamName); - - return Collection.DeleteManyAsync(x => x.EventStream == streamName, ct); - } - - public Task DeleteAsync(string streamFilter, - CancellationToken ct = default) - { - Guard.NotNullOrEmpty(streamFilter); + Guard.NotDefault(filter); - return Collection.DeleteManyAsync(FilterExtensions.ByStream(streamFilter), ct); + return Collection.DeleteManyAsync(FilterExtensions.ByStream(filter), ct); } public async Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events, diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index efb001f674..c5fa61ad13 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -49,14 +49,14 @@ public virtual async Task RebuildStateAsync(DomainId id, return domainObject; } - public virtual Task RebuildAsync(string filter, int batchSize, + public virtual Task RebuildAsync(StreamFilter filter, int batchSize, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() { return RebuildAsync(filter, batchSize, 0, ct); } - public virtual async Task RebuildAsync(string filter, int batchSize, double errorThreshold, + public virtual async Task RebuildAsync(StreamFilter filter, int batchSize, double errorThreshold, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() { diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs index cc664b1f3d..78331f763f 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventConsumer.cs @@ -13,9 +13,9 @@ public interface IEventConsumer int BatchSize => 1; - string Name { get; } + string Name => GetType().Name; - string EventsFilter => ".*"; + StreamFilter EventsFilter => default; bool StartLatest => false; diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index b957ec6318..314f109bc4 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -11,28 +11,25 @@ namespace Squidex.Infrastructure.EventSourcing; public interface IEventStore { - Task> QueryReverseAsync(string streamName, int take = int.MaxValue, + Task> QueryStreamReverseAsync(string streamName, int take = int.MaxValue, CancellationToken ct = default); - Task> QueryAsync(string streamName, long afterStreamPosition = EtagVersion.Empty, + Task> QueryStreamAsync(string streamName, long afterStreamPosition = EtagVersion.Empty, CancellationToken ct = default); - IAsyncEnumerable QueryAllReverseAsync(string? streamFilter = null, Instant timestamp = default, int take = int.MaxValue, + IAsyncEnumerable QueryAllReverseAsync(StreamFilter filter, Instant timestamp = default, int take = int.MaxValue, CancellationToken ct = default); - IAsyncEnumerable QueryAllAsync(string? streamFilter = null, string? position = null, int take = int.MaxValue, + IAsyncEnumerable QueryAllAsync(StreamFilter filter, string? position = null, int take = int.MaxValue, CancellationToken ct = default); Task AppendAsync(Guid commitId, string streamName, long expectedVersion, ICollection events, CancellationToken ct = default); - Task DeleteAsync(string streamFilter, + Task DeleteAsync(StreamFilter filter, CancellationToken ct = default); - Task DeleteStreamAsync(string streamName, - CancellationToken ct = default); - - IEventSubscription CreateSubscription(IEventSubscriber eventSubscriber, string? streamFilter = null, string? position = null); + IEventSubscription CreateSubscription(IEventSubscriber eventSubscriber, StreamFilter filter, string? position = null); async Task AppendUnsafeAsync(IEnumerable commits, CancellationToken ct = default) @@ -42,17 +39,4 @@ async Task AppendUnsafeAsync(IEnumerable commits, await AppendAsync(commit.Id, commit.StreamName, commit.Offset, commit.Events, ct); } } - - async Task>> QueryManyAsync(IEnumerable streamNames, - CancellationToken ct = default) - { - var result = new Dictionary>(); - - foreach (var streamName in streamNames) - { - result[streamName] = await QueryAsync(streamName, EtagVersion.Empty, ct); - } - - return result; - } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs index acc12d0425..e6129c5f5f 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs @@ -17,7 +17,7 @@ public sealed class PollingSubscription : IEventSubscription public PollingSubscription( IEventStore eventStore, IEventSubscriber eventSubscriber, - string? streamFilter, + StreamFilter streamFilter, string? position) { timer = new CompletionTimer(5000, async ct => diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/StreamFilter.cs b/backend/src/Squidex.Infrastructure/EventSourcing/StreamFilter.cs index 89605f3dc7..4b907bc967 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/StreamFilter.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/StreamFilter.cs @@ -5,17 +5,38 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== -using System.Diagnostics.CodeAnalysis; +using Squidex.Infrastructure.Collections; namespace Squidex.Infrastructure.EventSourcing; -public static class StreamFilter +public readonly record struct StreamFilter { - public static bool IsAll([NotNullWhen(false)] string? filter) + public ReadonlyList? Prefixes { get; } + + public StreamFilterKind Kind { get; } + + public StreamFilter(StreamFilterKind kind, params string[] prefixes) + { + Kind = kind; + + if (prefixes.Length > 0) + { + Prefixes = prefixes.ToReadonlyList(); + } + } + + public static StreamFilter Prefix(params string[] prefixes) + { + return new StreamFilter(StreamFilterKind.MatchStart, prefixes); + } + + public static StreamFilter Name(params string[] prefixes) + { + return new StreamFilter(StreamFilterKind.MatchFull, prefixes); + } + + public static StreamFilter All() { - return string.IsNullOrWhiteSpace(filter) - || string.Equals(filter, ".*", StringComparison.OrdinalIgnoreCase) - || string.Equals(filter, "(.*)", StringComparison.OrdinalIgnoreCase) - || string.Equals(filter, "(.*?)", StringComparison.OrdinalIgnoreCase); + return default; } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/StreamFilterKind.cs b/backend/src/Squidex.Infrastructure/EventSourcing/StreamFilterKind.cs new file mode 100644 index 0000000000..f52fda22cb --- /dev/null +++ b/backend/src/Squidex.Infrastructure/EventSourcing/StreamFilterKind.cs @@ -0,0 +1,14 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +namespace Squidex.Infrastructure.EventSourcing; + +public enum StreamFilterKind +{ + MatchFull, + MatchStart +} diff --git a/backend/src/Squidex.Infrastructure/States/BatchContext.cs b/backend/src/Squidex.Infrastructure/States/BatchContext.cs index 3251233028..a56671dea8 100644 --- a/backend/src/Squidex.Infrastructure/States/BatchContext.cs +++ b/backend/src/Squidex.Infrastructure/States/BatchContext.cs @@ -50,22 +50,27 @@ internal void Add(DomainId key, T snapshot, long version) public async Task LoadAsync(IEnumerable ids) { - var streamNames = ids.ToDictionary(x => x, x => eventStreamNames.GetStreamName(owner, x.ToString())); + var streamNames = ids.ToDictionary( + x => x, + x => eventStreamNames.GetStreamName(owner, x.ToString())); if (streamNames.Count == 0) { return; } - var streams = await eventStore.QueryManyAsync(streamNames.Values); + var eventsResults = await eventStore.QueryAllAsync(StreamFilter.Name(streamNames.Values.ToArray())).ToListAsync(); + var eventsByStream = eventsResults.ToLookup(x => x.StreamName); foreach (var (id, streamName) in streamNames) { - if (streams.TryGetValue(streamName, out var data)) + var byStream = eventsByStream[streamName].ToList(); + + if (byStream.Count > 0) { - var stream = data.Select(eventFormatter.ParseIfKnown).NotNull().ToList(); + var parsed = byStream.Select(eventFormatter.ParseIfKnown).NotNull().ToList(); - events[id] = (data.Count - 1, stream); + events[id] = (byStream.Count - 1, parsed); } else { diff --git a/backend/src/Squidex.Infrastructure/States/Persistence.cs b/backend/src/Squidex.Infrastructure/States/Persistence.cs index 8e2fcc7986..6592c48e04 100644 --- a/backend/src/Squidex.Infrastructure/States/Persistence.cs +++ b/backend/src/Squidex.Infrastructure/States/Persistence.cs @@ -82,7 +82,7 @@ public async Task DeleteAsync( { using (Telemetry.Activities.StartActivity("Persistence/ReadEvents")) { - await eventStore.DeleteStreamAsync(streamName.Value, ct); + await eventStore.DeleteAsync(StreamFilter.Name(streamName.Value), ct); } versionEvents = EtagVersion.Empty; @@ -144,7 +144,7 @@ private async Task ReadSnapshotAsync( private async Task ReadEventsAsync( CancellationToken ct) { - var events = await eventStore.QueryAsync(streamName.Value, versionEvents, ct); + var events = await eventStore.QueryStreamAsync(streamName.Value, versionEvents, ct); var isStopped = false; diff --git a/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/Subscriptions/SubscriptionPublisherTests.cs b/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/Subscriptions/SubscriptionPublisherTests.cs index 746f0e23c5..7594fe0ccd 100644 --- a/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/Subscriptions/SubscriptionPublisherTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Core.Tests/Operations/Subscriptions/SubscriptionPublisherTests.cs @@ -29,7 +29,7 @@ public SubscriptionPublisherTests() [Fact] public void Should_return_content_and_asset_filter_for_events_filter() { - Assert.Equal("^(content-|asset-)", sut.EventsFilter); + Assert.Equal(StreamFilter.Prefix("content-", "asset-"), sut.EventsFilter); } [Fact] diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppEventDeleterTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppEventDeleterTests.cs index a00c1acc95..850f21228f 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppEventDeleterTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppEventDeleterTests.cs @@ -33,7 +33,9 @@ public async Task Should_remove_events_from_streams() { await sut.DeleteAppAsync(App, CancellationToken); - A.CallTo(() => eventStore.DeleteAsync($"^[a-zA-Z0-9]-{AppId.Id}", A._)) + var streamFilter = StreamFilter.Prefix($"[a-zA-Z0-9]-{AppId.Id}"); + + A.CallTo(() => eventStore.DeleteAsync(streamFilter, A._)) .MustNotHaveHappened(); } } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppPermanentDeleterTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppPermanentDeleterTests.cs index 41df184f01..77a096bcc7 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppPermanentDeleterTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Apps/AppPermanentDeleterTests.cs @@ -29,7 +29,7 @@ public AppPermanentDeleterTests() [Fact] public void Should_return_assets_filter_for_events_filter() { - Assert.Equal("^app-", sut.EventsFilter); + Assert.Equal(StreamFilter.Prefix("app-"), sut.EventsFilter); } [Fact] @@ -41,7 +41,7 @@ public async Task Should_do_nothing_on_clear() [Fact] public void Should_return_type_name_for_name() { - Assert.Equal(nameof(AppPermanentDeleter), sut.Name); + Assert.Equal(nameof(AppPermanentDeleter), ((IEventConsumer)sut).Name); } [Fact] diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs index 9ada40d639..89e5a124cf 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetPermanentDeleterTests.cs @@ -27,7 +27,7 @@ public AssetPermanentDeleterTests() [Fact] public void Should_return_assets_filter_for_events_filter() { - Assert.Equal("^asset-", sut.EventsFilter); + Assert.Equal(StreamFilter.Prefix("asset-"), sut.EventsFilter); } [Fact] @@ -39,7 +39,7 @@ public async Task Should_do_nothing_on_clear() [Fact] public void Should_return_type_name_for_name() { - Assert.Equal(nameof(AssetPermanentDeleter), sut.Name); + Assert.Equal(nameof(AssetPermanentDeleter), ((IEventConsumer)sut).Name); } [Fact] diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetUsageTrackerTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetUsageTrackerTests.cs index 4d6d5b48f8..3aa8b61468 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetUsageTrackerTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/AssetUsageTrackerTests.cs @@ -35,7 +35,7 @@ public AssetUsageTrackerTests() [Fact] public void Should_return_assets_filter_for_events_filter() { - Assert.Equal("^asset-", sut.EventsFilter); + Assert.Equal(StreamFilter.Prefix("asset-"), sut.EventsFilter); } [Fact] @@ -47,7 +47,7 @@ public async Task Should_do_nothing_on_clear() [Fact] public void Should_return_type_name_for_name() { - Assert.Equal(nameof(AssetUsageTracker), sut.Name); + Assert.Equal(nameof(AssetUsageTracker), ((IEventConsumer)sut).Name); } [Fact] diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs index ffbd44f1ac..08acb029ee 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RecursiveDeleterTests.cs @@ -33,7 +33,7 @@ public RecursiveDeleterTests() [Fact] public void Should_return_assets_filter_for_events_filter() { - Assert.Equal("^assetFolder-", sut.EventsFilter); + Assert.Equal(StreamFilter.Prefix("assetFolder-"), sut.EventsFilter); } [Fact] @@ -45,7 +45,7 @@ public async Task Should_do_nothing_on_clear() [Fact] public void Should_return_type_name_for_name() { - Assert.Equal(nameof(RecursiveDeleter), sut.Name); + Assert.Equal(nameof(RecursiveDeleter), ((IEventConsumer)sut).Name); } [Fact] diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RepairFilesTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RepairFilesTests.cs index ba81142fd8..9903f21779 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RepairFilesTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Assets/RepairFilesTests.cs @@ -122,7 +122,9 @@ private void SetupEvent(IEvent? @event) .Returns(null); } - A.CallTo(() => eventStore.QueryAllAsync("^asset\\-", null, int.MaxValue, CancellationToken)) + var streamFilter = StreamFilter.Prefix("asset-"); + + A.CallTo(() => eventStore.QueryAllAsync(streamFilter, null, int.MaxValue, CancellationToken)) .Returns(storedEvents.ToAsyncEnumerable()); } } diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/TextIndexerTestsBase.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/TextIndexerTestsBase.cs index 4cbdfefde1..82f0324c5f 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/TextIndexerTestsBase.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Contents/Text/TextIndexerTestsBase.cs @@ -49,6 +49,12 @@ private TextIndexingProcess CreateSut() public abstract ITextIndex CreateIndex(); + [Fact] + public void Should_return_content_filter_for_events_filter() + { + Assert.Equal(StreamFilter.Prefix("content-"), Sut.EventsFilter); + } + [Fact] public async Task Should_search_with_fuzzy() { diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Invitation/InvitationEventConsumerTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Invitation/InvitationEventConsumerTests.cs index 0195372c75..d27aba9c64 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Invitation/InvitationEventConsumerTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Invitation/InvitationEventConsumerTests.cs @@ -9,6 +9,7 @@ using NodaTime; using Squidex.Domain.Apps.Core.TestHelpers; using Squidex.Domain.Apps.Entities.Apps; +using Squidex.Domain.Apps.Entities.Assets; using Squidex.Domain.Apps.Entities.Notifications; using Squidex.Domain.Apps.Entities.Teams; using Squidex.Domain.Apps.Entities.TestHelpers; @@ -45,6 +46,24 @@ public InvitationEventConsumerTests() sut = new InvitationEventConsumer(AppProvider, userNotifications, userResolver, log); } + [Fact] + public void Should_return_app_filter_for_events_filter() + { + Assert.Equal(StreamFilter.Prefix("app-"), sut.EventsFilter); + } + + [Fact] + public async Task Should_do_nothing_on_clear() + { + await ((IEventConsumer)sut).ClearAsync(); + } + + [Fact] + public void Should_return_custom_name() + { + Assert.Equal("NotificationEmailSender", sut.Name); + } + [Fact] public async Task Should_not_send_app_email_if_contributors_assigned_by_clients() { diff --git a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs index 947ae910b2..fa738f5983 100644 --- a/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs +++ b/backend/tests/Squidex.Domain.Apps.Entities.Tests/Rules/RuleEnqueuerTests.cs @@ -52,7 +52,7 @@ public RuleEnqueuerTests() [Fact] public void Should_return_wildcard_filter_for_events_filter() { - Assert.Equal(".*", ((IEventConsumer)sut).EventsFilter); + Assert.Equal(default, ((IEventConsumer)sut).EventsFilter); } [Fact] diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs index a65a67564b..9f41151c5b 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs @@ -62,7 +62,7 @@ public EventConsumerProcessorTests() } }; - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .Returns(eventSubscription); A.CallTo(() => eventConsumer.Name) @@ -101,15 +101,17 @@ public async Task Should_query_position_if_consumer_should_start_from_latest() { state.Snapshot = new EventConsumerState(); + var filter = StreamFilter.Name("my-filter"); + A.CallTo(() => eventConsumer.StartLatest) .Returns(true); A.CallTo(() => eventConsumer.EventsFilter) - .Returns("my-filter"); + .Returns(filter); var latestPosition = "LATEST"; - A.CallTo(() => eventStore.QueryAllReverseAsync("my-filter", default, 1, A._)) + A.CallTo(() => eventStore.QueryAllReverseAsync(filter, default, 1, A._)) .Returns(Enumerable.Repeat(new StoredEvent("Stream", latestPosition, 1, eventData), 1).ToAsyncEnumerable()); await sut.InitializeAsync(default); @@ -129,7 +131,7 @@ public async Task Should_not_subscribe_to_event_store_if_stopped_in_db() AssertGrainState(isStopped: true, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustNotHaveHappened(); } @@ -143,7 +145,7 @@ public async Task Should_subscribe_to_event_store_if_not_found_in_db() AssertGrainState(isStopped: false, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappenedOnceExactly(); } @@ -159,7 +161,7 @@ public async Task Should_subscribe_to_event_store_if_failed() AssertGrainState(isStopped: false, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappenedOnceExactly(); } @@ -173,7 +175,7 @@ public async Task Should_subscribe_to_event_store_if_not_stopped_in_db() AssertGrainState(isStopped: false, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappenedOnceExactly(); } @@ -219,10 +221,10 @@ public async Task Should_reset_consumer_if_resetting() A.CallTo(() => eventSubscription.Dispose()) .MustHaveHappenedOnceExactly(); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, state.Snapshot.Position)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, state.Snapshot.Position)) .MustHaveHappenedOnceExactly(); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, null)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, null)) .MustHaveHappenedOnceExactly(); } @@ -530,7 +532,7 @@ public async Task Should_start_after_stop_if_handling_failed() A.CallTo(() => eventSubscription.Dispose()) .MustHaveHappenedOnceExactly(); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappened(2, Times.Exactly); } diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 0cd4cd5939..c7227f5fb2 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -7,6 +7,7 @@ using System.Globalization; using System.Text.RegularExpressions; +using EventStore.Client; namespace Squidex.Infrastructure.EventSourcing; @@ -90,6 +91,7 @@ public async Task Should_throw_exception_for_version_mismatch_and_update() public async Task Should_append_events() { var streamName = $"test-{Guid.NewGuid()}"; + var streamFilter = StreamFilter.Name(streamName); var commit1 = new[] { @@ -107,7 +109,7 @@ public async Task Should_append_events() await Sut.AppendAsync(Guid.NewGuid(), streamName, EtagVersion.Any, commit2); var readEvents1 = await QueryAsync(streamName); - var readEvents2 = await QueryAllAsync(streamName); + var readEvents2 = await QueryAllAsync(streamFilter); var expected = new[] { @@ -125,6 +127,7 @@ public async Task Should_append_events() public async Task Should_append_events_unsafe() { var streamName = $"test-{Guid.NewGuid()}"; + var streamFilter = StreamFilter.Name(streamName); var commit1 = new[] { @@ -138,7 +141,7 @@ await Sut.AppendUnsafeAsync(new List }); var readEvents1 = await QueryAsync(streamName); - var readEvents2 = await QueryAllAsync(streamName); + var readEvents2 = await QueryAllAsync(streamFilter); var expected = new[] { @@ -154,6 +157,7 @@ await Sut.AppendUnsafeAsync(new List public async Task Should_subscribe_to_events() { var streamName = $"test-{Guid.NewGuid()}"; + var streamFilter = StreamFilter.Name(streamName); var commit1 = new[] { @@ -161,7 +165,7 @@ public async Task Should_subscribe_to_events() CreateEventData(2) }; - var readEvents = await QueryWithSubscriptionAsync(streamName, async () => + var readEvents = await QueryWithSubscriptionAsync(streamFilter, async () => { await Sut.AppendAsync(Guid.NewGuid(), streamName, EtagVersion.Any, commit1); }); @@ -179,6 +183,7 @@ public async Task Should_subscribe_to_events() public async Task Should_subscribe_to_next_events() { var streamName = $"test-{Guid.NewGuid()}"; + var streamFilter = StreamFilter.Name(streamName); var commit1 = new[] { @@ -187,7 +192,7 @@ public async Task Should_subscribe_to_next_events() }; // Append and read in parallel. - await QueryWithSubscriptionAsync(streamName, async () => + await QueryWithSubscriptionAsync(streamFilter, async () => { await Sut.AppendAsync(Guid.NewGuid(), streamName, EtagVersion.Any, commit1); }); @@ -199,7 +204,7 @@ await QueryWithSubscriptionAsync(streamName, async () => }; // Append and read in parallel. - var readEventsFromPosition = await QueryWithSubscriptionAsync(streamName, async () => + var readEventsFromPosition = await QueryWithSubscriptionAsync(streamFilter, async () => { await Sut.AppendAsync(Guid.NewGuid(), streamName, EtagVersion.Any, commit2); }); @@ -210,7 +215,7 @@ await QueryWithSubscriptionAsync(streamName, async () => new StoredEvent(streamName, "Position", 3, commit2[1]) }; - var readEventsFromBeginning = await QueryWithSubscriptionAsync(streamName, fromBeginning: true); + var readEventsFromBeginning = await QueryWithSubscriptionAsync(streamFilter, fromBeginning: true); var expectedFromBeginning = new[] { @@ -228,12 +233,13 @@ await QueryWithSubscriptionAsync(streamName, async () => public async Task Should_subscribe_with_parallel_writes() { var streamName = $"test-{Guid.NewGuid()}"; + var streamFilter = StreamFilter.Prefix(streamName); var numTasks = 50; var numEvents = 100; // Append and read in parallel. - var readEvents = await QueryWithSubscriptionAsync($"^{streamName}", async () => + var readEvents = await QueryWithSubscriptionAsync(streamFilter, async () => { await Parallel.ForEachAsync(Enumerable.Range(0, numTasks), async (i, ct) => { @@ -277,7 +283,7 @@ public async Task Should_read_multiple_streams() await Sut.AppendAsync(Guid.NewGuid(), streamName1, EtagVersion.Any, stream1Commit); await Sut.AppendAsync(Guid.NewGuid(), streamName2, EtagVersion.Any, stream2Commit); - var readEvents = await Sut.QueryManyAsync(new[] { streamName1, streamName2 }); + var readEvents = await Sut.QueryAllAsync(StreamFilter.Name(streamName1, streamName2)).ToListAsync(); var expected1 = new[] { @@ -291,8 +297,8 @@ public async Task Should_read_multiple_streams() new StoredEvent(streamName2, "Position", 1, stream2Commit[1]) }; - ShouldBeEquivalentTo(readEvents[streamName1], expected1); - ShouldBeEquivalentTo(readEvents[streamName2], expected2); + ShouldBeEquivalentTo(readEvents.Where(x => x.StreamName == streamName1), expected1); + ShouldBeEquivalentTo(readEvents.Where(x => x.StreamName == streamName2), expected2); } [Theory] @@ -300,7 +306,7 @@ public async Task Should_read_multiple_streams() [InlineData(5, 30)] [InlineData(5, 300)] [InlineData(5, 3000)] - public async Task Should_read_events_from_offset(int commits, int count) + public async Task Should_query_events_from_offset(int commits, int count) { var streamName = $"test-{Guid.NewGuid()}"; @@ -308,7 +314,7 @@ public async Task Should_read_events_from_offset(int commits, int count) var readEvents0 = await QueryAsync(streamName); var readEvents1 = await QueryAsync(streamName, count - 2); - var readEvents2 = await QueryAllAsync(streamName, readEvents0[^2].EventPosition); + var readEvents2 = await QueryAllAsync(default, readEvents0[^2].EventPosition); var expected = new[] { @@ -322,7 +328,7 @@ public async Task Should_read_events_from_offset(int commits, int count) [Theory] [InlineData(5, 30)] [InlineData(5, 300)] - public async Task Should_read_reverse(int commits, int count) + public async Task Should_query_reverse(int commits, int count) { var streamName = $"test-{Guid.NewGuid()}"; @@ -332,7 +338,7 @@ public async Task Should_read_reverse(int commits, int count) for (var take = 0; take < count; take += count / 10) { var eventsExpected = eventsStored.TakeLast(take).ToArray(); - var eventsQueried = await Sut.QueryReverseAsync(streamName, take); + var eventsQueried = await Sut.QueryStreamReverseAsync(streamName, take); ShouldBeEquivalentTo(eventsQueried, eventsExpected); } @@ -342,10 +348,10 @@ public async Task Should_read_reverse(int commits, int count) [InlineData(5, 30)] [InlineData(5, 300)] [InlineData(5, 3000)] - public async Task Should_read_all_reverse_by_name(int commits, int count) + public async Task Should_query_all_reverse_by_names(int commits, int count) { var streamName = $"test-{Guid.NewGuid()}"; - var streamFilter = streamName; + var streamFilter = StreamFilter.Name(streamName, "invalid"); var eventsWritten = await AppendEventsAsync(streamName, count, commits); var eventsStored = eventsWritten.Select((x, i) => new StoredEvent(streamName, "Position", i, x)).ToArray(); @@ -353,7 +359,7 @@ public async Task Should_read_all_reverse_by_name(int commits, int count) for (var take = 0; take < count; take += count / 10) { var eventsExpected = eventsStored.Reverse().Take(take).ToArray(); - var eventsQueried = await Sut.QueryAllReverseAsync(streamName, default, take).ToArrayAsync(); + var eventsQueried = await Sut.QueryAllReverseAsync(streamFilter, default, take).ToArrayAsync(); ShouldBeEquivalentTo(eventsQueried, eventsExpected); } @@ -363,10 +369,10 @@ public async Task Should_read_all_reverse_by_name(int commits, int count) [InlineData(5, 30)] [InlineData(5, 300)] [InlineData(5, 3000)] - public async Task Should_read_all_reverse_by_filter(int commits, int count) + public async Task Should_query_all_reverse_by_filter(int commits, int count) { var streamName = $"test-{Guid.NewGuid()}-suffix"; - var streamFilter = $"^{Regex.Escape(streamName)}"; + var streamFilter = StreamFilter.Prefix(streamName[..^7], "invalid"); var eventsWritten = await AppendEventsAsync(streamName, count, commits); var eventsStored = eventsWritten.Select((x, i) => new StoredEvent(streamName, "Position", i, x)).ToArray(); @@ -387,7 +393,7 @@ public async Task Should_read_all_reverse_by_filter(int commits, int count) public async Task Should_read_all_reverse(int commits, int count) { var streamName = $"test-{Guid.NewGuid()}-suffix"; - var streamFilter = ".*"; + var streamFilter = default(StreamFilter); await AppendEventsAsync(streamName, count, commits); @@ -403,6 +409,7 @@ public async Task Should_read_all_reverse(int commits, int count) public async Task Should_delete_by_filter() { var streamName = $"test-{Guid.NewGuid()}"; + var streamFilter = StreamFilter.Prefix($"{streamName[..10]}"); await AppendEventsAsync(streamName, 2, 1); @@ -410,7 +417,7 @@ public async Task Should_delete_by_filter() for (var i = 0; i < 5; i++) { - await Sut.DeleteAsync($"^{streamName[..10]}"); + await Sut.DeleteAsync(streamFilter); readEvents = await QueryAsync(streamName); @@ -427,9 +434,10 @@ public async Task Should_delete_by_filter() } [Fact] - public async Task Should_delete_stream() + public async Task Should_delete_by_name() { var streamName = $"test-{Guid.NewGuid()}"; + var streamFilter = StreamFilter.Name(streamName); await AppendEventsAsync(streamName, 2, 1); @@ -437,7 +445,7 @@ public async Task Should_delete_stream() for (var i = 0; i < 5; i++) { - await Sut.DeleteStreamAsync(streamName); + await Sut.DeleteAsync(streamFilter); readEvents = await QueryAsync(streamName); @@ -455,12 +463,12 @@ public async Task Should_delete_stream() private async Task> QueryAsync(string streamName, long position = EtagVersion.Any) { - return await Sut.QueryAsync(streamName, position); + return await Sut.QueryStreamAsync(streamName, position); } - private async Task?> QueryAllAsync(string? streamFilter = null, string? position = null) + private async Task?> QueryAllAsync(StreamFilter filter, string? position = null) { - return await Sut.QueryAllAsync(streamFilter, position).ToListAsync(); + return await Sut.QueryAllAsync(filter, position).ToListAsync(); } private static EventData CreateEventData(int i) @@ -473,7 +481,7 @@ private static EventData CreateEventData(int i) return new EventData($"Type{i}", headers, i.ToString(CultureInfo.InvariantCulture)); } - private async Task?> QueryWithSubscriptionAsync(string streamFilter, + private async Task?> QueryWithSubscriptionAsync(StreamFilter streamFilter, Func? subscriptionRunning = null, bool fromBeginning = false) { var subscriber = new EventSubscriber(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests.cs index 4f18b632d2..ec5b5270e2 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoEventStoreTests.cs @@ -41,9 +41,8 @@ public async Task DisposeAsync() Assert.All(queries, query => { - Assert.Equal(query.NumDocuments, query.DocsExamined); - Assert.True(query.KeysExamined >= query.NumDocuments); - Assert.True(query.KeysExamined <= query.NumDocuments * 2); + Assert.InRange(query.DocsExamined, 0, query.NumDocuments); + Assert.InRange(query.KeysExamined, query.NumDocuments, (Math.Max(1, query.NumDocuments) * 2) + 1); }); } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs index 9993191523..b72f177fa4 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/MongoParallelInsertTests.cs @@ -38,7 +38,7 @@ public sealed class MyEventConsumer : IEventConsumer public string Name { get; } = RandomHash.Simple(); - public string EventsFilter => $"^{Name}"; + public StreamFilter EventsFilter => StreamFilter.Prefix(Name); public Task Completed => tcs.Task; diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs index 013ecc5ac7..c5f65e29da 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs @@ -13,8 +13,8 @@ public class PollingSubscriptionTests { private readonly IEventStore eventStore = A.Fake(); private readonly IEventSubscriber eventSubscriber = A.Fake>(); + private readonly StreamFilter filter = StreamFilter.Name("my-stream"); private readonly string position = Guid.NewGuid().ToString(); - private readonly string filter = "^my-stream"; [Fact] public async Task Should_subscribe_on_start() diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs index 9118f2dbfa..8317e4513c 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs @@ -17,10 +17,10 @@ public class RetrySubscriptionTests public RetrySubscriptionTests() { - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .Returns(eventSubscription); - sut = new RetrySubscription(eventSubscriber, s => eventStore.CreateSubscription(s)) { ReconnectWaitMs = 50 }; + sut = new RetrySubscription(eventSubscriber, s => eventStore.CreateSubscription(s, default)) { ReconnectWaitMs = 50 }; sutSubscriber = sut; } @@ -29,7 +29,7 @@ public void Should_subscribe_after_constructor() { sut.Dispose(); - A.CallTo(() => eventStore.CreateSubscription(sut, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(sut, A._, A._)) .MustHaveHappened(); } @@ -47,7 +47,7 @@ public async Task Should_reopen_subscription_once_if_exception_is_retrieved() A.CallTo(() => eventSubscription.Dispose()) .MustHaveHappened(2, Times.Exactly); - A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappened(2, Times.Exactly); A.CallTo(() => eventSubscriber.OnErrorAsync(eventSubscription, A._)) diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/StreamFilterTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/StreamFilterTests.cs new file mode 100644 index 0000000000..45c310c631 --- /dev/null +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/StreamFilterTests.cs @@ -0,0 +1,27 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +namespace Squidex.Infrastructure.EventSourcing; + +public class StreamFilterTests +{ + [Fact] + public void Should_simplify_input_to_default_filter() + { + var sut = new StreamFilter(StreamFilterKind.MatchFull); + + Assert.Equal(default, sut); + } + + [Fact] + public void Should_simplify_input_to_default_filter_with_factory() + { + var sut = StreamFilter.Name(); + + Assert.Equal(default, sut); + } +} diff --git a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs index bbebf71780..1e2b24b2cc 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs @@ -185,12 +185,10 @@ public async Task Should_write_each_id_only_once_if_same_persistence_written_twi private void SetupEventStore(Dictionary> streams) { - var storedStreams = new Dictionary>(); + var events = new List(); foreach (var (id, stream) in streams) { - var storedStream = new List(); - var i = 0; foreach (var @event in stream) @@ -198,23 +196,20 @@ private void SetupEventStore(Dictionary> streams) var eventData = new EventData("Type", new EnvelopeHeaders(), "Payload"); var eventStored = new StoredEvent(id.ToString(), i.ToString(CultureInfo.InvariantCulture), i, eventData); - storedStream.Add(eventStored); - A.CallTo(() => eventFormatter.Parse(eventStored)) .Returns(new Envelope(@event)); A.CallTo(() => eventFormatter.ParseIfKnown(eventStored)) .Returns(new Envelope(@event)); + events.Add(eventStored); i++; } - - storedStreams[id.ToString()] = storedStream; } - var streamNames = streams.Keys.Select(x => x.ToString()).ToArray(); + var filter = StreamFilter.Name(streams.Keys.Select(x => x.ToString()).ToArray()); - A.CallTo(() => eventStore.QueryManyAsync(A>.That.IsSameSequenceAs(streamNames), A._)) - .Returns(storedStreams); + A.CallTo(() => eventStore.QueryAllAsync(filter, null, int.MaxValue, A._)) + .Returns(events.ToAsyncEnumerable()); } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs index 02761c9e64..f640b03490 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs @@ -65,7 +65,7 @@ public async Task Should_ignore_old_events() { var storedEvent = new StoredEvent("1", "1", 0, new EventData("Type", new EnvelopeHeaders(), "Payload")); - A.CallTo(() => eventStore.QueryAsync(key.ToString(), -1, A._)) + A.CallTo(() => eventStore.QueryStreamAsync(key.ToString(), -1, A._)) .Returns(new List { storedEvent }); A.CallTo(() => eventFormatter.ParseIfKnown(storedEvent)) @@ -96,7 +96,7 @@ public async Task Should_read_read_from_snapshot_store() Assert.False(persistence.IsSnapshotStale); - A.CallTo(() => eventStore.QueryAsync(key.ToString(), 2, A._)) + A.CallTo(() => eventStore.QueryStreamAsync(key.ToString(), 2, A._)) .MustHaveHappened(); } @@ -116,7 +116,7 @@ public async Task Should_mark_as_stale_if_snapshot_is_older_than_events() Assert.True(persistence.IsSnapshotStale); - A.CallTo(() => eventStore.QueryAsync(key.ToString(), 1, A._)) + A.CallTo(() => eventStore.QueryStreamAsync(key.ToString(), 1, A._)) .MustHaveHappened(); } @@ -327,7 +327,7 @@ public async Task Should_delete_events_but_not_snapshot_if_deleted_snapshot_only await persistence.DeleteAsync(); - A.CallTo(() => eventStore.DeleteStreamAsync(key.ToString(), A._)) + A.CallTo(() => eventStore.DeleteAsync(StreamFilter.Name(key.ToString()), A._)) .MustHaveHappened(); A.CallTo(() => snapshotStore.RemoveAsync(key, A._)) @@ -341,7 +341,7 @@ public async Task Should_delete_events_and_snapshot_if_deleted() await persistence.DeleteAsync(); - A.CallTo(() => eventStore.DeleteStreamAsync(key.ToString(), A._)) + A.CallTo(() => eventStore.DeleteAsync(StreamFilter.Name(key.ToString()), A._)) .MustHaveHappened(); A.CallTo(() => snapshotStore.RemoveAsync(key, A._)) @@ -382,7 +382,7 @@ private void SetupEventStore(MyEvent[] events, int eventOffset = 0, long readPos i++; } - A.CallTo(() => eventStore.QueryAsync(key.ToString(), readPosition, A._)) + A.CallTo(() => eventStore.QueryStreamAsync(key.ToString(), readPosition, A._)) .Returns(eventsStored); } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs index 64e634bfec..f77683b888 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs @@ -162,7 +162,7 @@ public async Task Should_delete_snapshot_but_not_events_if_deleted() await persistence.DeleteAsync(); - A.CallTo(() => eventStore.DeleteStreamAsync(A._, A._)) + A.CallTo(() => eventStore.DeleteAsync(A._, A._)) .MustNotHaveHappened(); A.CallTo(() => snapshotStore.RemoveAsync(key, A._))