Skip to content

Commit

Permalink
Event query improvements (#1033)
Browse files Browse the repository at this point in the history
* Schema name.

* Fix build.

* Query improvements.

* Simplifications.

* Fix regex.

* Build fix.
  • Loading branch information
SebastianStehle authored Oct 9, 2023
1 parent e663f51 commit 5440e68
Show file tree
Hide file tree
Showing 54 changed files with 369 additions and 369 deletions.
25 changes: 19 additions & 6 deletions backend/src/Migrations/RebuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Squidex.Domain.Apps.Entities.Rules.DomainObject;
using Squidex.Domain.Apps.Entities.Schemas.DomainObject;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;

namespace Migrations;

Expand All @@ -21,36 +22,48 @@ public static class RebuilderExtensions
public static Task RebuildAppsAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<AppDomainObject, AppDomainObject.State>("^app\\-", batchSize, AllowedErrorRate, ct);
var streamFilter = StreamFilter.Prefix("app-");

return rebuilder.RebuildAsync<AppDomainObject, AppDomainObject.State>(streamFilter, batchSize, AllowedErrorRate, ct);
}

public static Task RebuildSchemasAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<SchemaDomainObject, SchemaDomainObject.State>("^schema\\-", batchSize, AllowedErrorRate, ct);
var streamFilter = StreamFilter.Prefix("schema-");

return rebuilder.RebuildAsync<SchemaDomainObject, SchemaDomainObject.State>(streamFilter, batchSize, AllowedErrorRate, ct);
}

public static Task RebuildRulesAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<RuleDomainObject, RuleDomainObject.State>("^rule\\-", batchSize, AllowedErrorRate, ct);
var streamFilter = StreamFilter.Prefix("rule-");

return rebuilder.RebuildAsync<RuleDomainObject, RuleDomainObject.State>(streamFilter, batchSize, AllowedErrorRate, ct);
}

public static Task RebuildAssetsAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<AssetDomainObject, AssetDomainObject.State>("^asset\\-", batchSize, AllowedErrorRate, ct);
var streamFilter = StreamFilter.Prefix("asset-");

return rebuilder.RebuildAsync<AssetDomainObject, AssetDomainObject.State>(streamFilter, batchSize, AllowedErrorRate, ct);
}

public static Task RebuildAssetFoldersAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<AssetFolderDomainObject, AssetFolderDomainObject.State>("^assetFolder\\-", batchSize, AllowedErrorRate, ct);
var streamFilter = StreamFilter.Prefix("assetFolder-");

return rebuilder.RebuildAsync<AssetFolderDomainObject, AssetFolderDomainObject.State>(streamFilter, batchSize, AllowedErrorRate, ct);
}

public static Task RebuildContentAsync(this Rebuilder rebuilder, int batchSize,
CancellationToken ct = default)
{
return rebuilder.RebuildAsync<ContentDomainObject, ContentDomainObject.State>("^content\\-", batchSize, AllowedErrorRate, ct);
var streamFilter = StreamFilter.Prefix("content-");

return rebuilder.RebuildAsync<ContentDomainObject, ContentDomainObject.State>(streamFilter, batchSize, AllowedErrorRate, ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public sealed record ReferencesFieldProperties : FieldProperties

public bool MustBePublished { get; init; }

public string? Query { get; init;}
public string? Query { get; init; }

public ReferencesFieldEditor Editor { get; init; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,13 @@ public sealed class SubscriptionPublisher : IEventConsumer
private readonly ISubscriptionService subscriptionService;
private readonly IEnumerable<ISubscriptionEventCreator> subscriptionEventCreators;

public string Name
{
get => "Subscriptions";
}
public string Name => "Subscriptions";

public string EventsFilter
{
get => "^(content-|asset-)";
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("content-", "asset-");

public bool StartLatest
{
get => true;
}
public bool StartLatest => true;

public bool CanClear
{
get => false;
}
public bool CanClear => false;

public SubscriptionPublisher(ISubscriptionService subscriptionService, IEnumerable<ISubscriptionEventCreator> subscriptionEventCreators)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,11 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Schemas;

public sealed class MongoSchemasHash : MongoRepositoryBase<MongoSchemasHashEntity>, ISchemasHash, IEventConsumer, IDeleter
{
public int BatchSize
{
get => 1000;
}
public int BatchSize => 1000;

public int BatchDelay
{
get => 500;
}
public int BatchDelay => 500;

public string Name
{
get => GetType().Name;
}

public string EventsFilter
{
get => "^schema-";
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("schema-");

public MongoSchemasHash(IMongoDatabase database)
: base(database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public AppEventDeleter(IEventStore eventStore)
public Task DeleteAppAsync(IAppEntity app,
CancellationToken ct)
{
return eventStore.DeleteAsync($"^([a-zA-Z0-9]+)\\-{app.Id}", ct);
var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)-{app.Id}");

return eventStore.DeleteAsync(streamFilter, ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@ public sealed class AppPermanentDeleter : IEventConsumer
private readonly IDomainObjectFactory factory;
private readonly HashSet<string> consumingTypes;

public string Name
{
get => GetType().Name;
}

public string EventsFilter
{
get => "^app-";
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("app-");

public AppPermanentDeleter(IEnumerable<IDeleter> deleters, IDomainObjectFactory factory, TypeRegistry typeRegistry)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,7 @@ public sealed class AssetPermanentDeleter : IEventConsumer
private readonly IAssetFileStore assetFileStore;
private readonly HashSet<string> consumingTypes;

public string Name
{
get => GetType().Name;
}

public string EventsFilter
{
get => "^asset-";
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("asset-");

public AssetPermanentDeleter(IAssetFileStore assetFileStore, TypeRegistry typeRegistry)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,11 @@ public partial class AssetUsageTracker : IEventConsumer
{
private IMemoryCache memoryCache;

public int BatchSize
{
get => 1000;
}
public int BatchSize => 1000;

public int BatchDelay
{
get => 1000;
}
public int BatchDelay => 1000;

public string Name
{
get => GetType().Name;
}

public string EventsFilter
{
get => "^asset-";
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("asset-");

private void ClearCache()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ public void RegisterLanguageExtensions(CustomFluidParser parser, TemplateOptions

private async ValueTask<Completion> ResolveAsset(ValueTuple<Expression, Expression> arguments, TextWriter writer, TextEncoder encoder, TemplateContext context)
{
if (context.GetValue("event")?.ToObjectValue() is EnrichedEvent enrichedEvent)
if (context.GetValue("event")?.ToObjectValue() is not EnrichedEvent enrichedEvent)
{
var (nameArg, idArg) = arguments;
return Completion.Normal;
}

var assetId = await idArg.EvaluateAsync(context);
var asset = await ResolveAssetAsync(serviceProvider, enrichedEvent.AppId.Id, assetId);
var (nameArg, idArg) = arguments;

if (asset != null)
{
var name = (await nameArg.EvaluateAsync(context)).ToStringValue();
var assetId = await idArg.EvaluateAsync(context);
var asset = await ResolveAssetAsync(serviceProvider, enrichedEvent.AppId.Id, assetId);

context.SetValue(name, asset);
}
if (asset != null)
{
var name = (await nameArg.EvaluateAsync(context)).ToStringValue();

context.SetValue(name, asset);
}

return Completion.Normal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public RebuildFiles(
public async Task RepairAsync(
CancellationToken ct = default)
{
await foreach (var storedEvent in eventStore.QueryAllAsync("^asset\\-", ct: ct))
var streamFilter = StreamFilter.Prefix("asset-");

await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, ct: ct))
{
var @event = eventFormatter.ParseIfKnown(storedEvent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,7 @@ public sealed class RecursiveDeleter : IEventConsumer
private readonly ILogger<RecursiveDeleter> log;
private readonly HashSet<string> consumingTypes;

public string Name
{
get => GetType().Name;
}

public string EventsFilter
{
get => "^assetFolder-";
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("assetFolder-");

public RecursiveDeleter(
ICommandBus commandBus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ private async Task ProcessAsync(Run run,
var backupUsers = new UserMapping(run.Actor);
var backupContext = new BackupContext(appId, backupUsers, writer);

await foreach (var storedEvent in eventStore.QueryAllAsync(GetFilter(), ct: ct))
var streamFilter = StreamFilter.Prefix($"[^\\-]*-{appId}");

await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, ct: ct))
{
var @event = eventFormatter.Parse(storedEvent);

Expand Down Expand Up @@ -200,11 +202,6 @@ private async Task ProcessAsync(Run run,
}
}

private string GetFilter()
{
return $"^[^\\-]*-{Regex.Escape(appId.ToString())}";
}

public Task DeleteAsync(DomainId id)
{
return scheduler.ScheduleAsync(async _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public CommentsStream(
public virtual async Task LoadAsync(
CancellationToken ct)
{
var storedEvents = await eventStore.QueryReverseAsync(streamName, 100, ct);
var storedEvents = await eventStore.QueryStreamReverseAsync(streamName, 100, ct);

foreach (var @event in storedEvents)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ public void RegisterLanguageExtensions(CustomFluidParser parser, TemplateOptions

private async ValueTask<Completion> ResolveReference(ValueTuple<Expression, Expression> arguments, TextWriter writer, TextEncoder encoder, TemplateContext context)
{
if (context.GetValue("event")?.ToObjectValue() is EnrichedEvent enrichedEvent)
if (context.GetValue("event")?.ToObjectValue() is not EnrichedEvent enrichedEvent)
{
var (nameArg, idArg) = arguments;
return Completion.Normal;
}

var contentId = await idArg.EvaluateAsync(context);
var content = await ResolveContentAsync(serviceProvider, enrichedEvent.AppId.Id, contentId);
var (nameArg, idArg) = arguments;

if (content != null)
{
var name = (await nameArg.EvaluateAsync(context)).ToStringValue();
var contentId = await idArg.EvaluateAsync(context);
var content = await ResolveContentAsync(serviceProvider, enrichedEvent.AppId.Id, contentId);

context.SetValue(name, content);
}
if (content != null)
{
var name = (await nameArg.EvaluateAsync(context)).ToStringValue();

context.SetValue(name, content);
}

return Completion.Normal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,13 @@ public sealed class TextIndexingProcess : IEventConsumer
private readonly ITextIndex textIndex;
private readonly ITextIndexerState textIndexerState;

public int BatchSize
{
get => 1000;
}
public int BatchSize => 1000;

public int BatchDelay
{
get => 1000;
}
public int BatchDelay => 1000;

public string Name
{
get => "TextIndexer5";
}
public string Name => "TextIndexer5";

public string EventsFilter
{
get => "^content-";
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("content-");

public ITextIndex TextIndex
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,9 @@ public sealed class InvitationEventConsumer : IEventConsumer
private readonly IAppProvider appProvider;
private readonly ILogger<InvitationEventConsumer> log;

public string Name
{
get => "NotificationEmailSender";
}
public string Name => "NotificationEmailSender";

public string EventsFilter
{
get { return "^app-|^app-"; }
}
public StreamFilter EventsFilter { get; } = StreamFilter.Prefix("app-");

public InvitationEventConsumer(
IAppProvider appProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ public async Task<List<SimulatedRuleEvent>> SimulateAsync(NamedId<DomainId> appI

var simulatedEvents = new List<SimulatedRuleEvent>(MaxSimulatedEvents);

var fromNow = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(7));
var streamStart = SystemClock.Instance.GetCurrentInstant().Minus(Duration.FromDays(7));
var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)-{appId.Id}");

await foreach (var storedEvent in eventStore.QueryAllReverseAsync($"^([a-zA-Z0-9]+)\\-{appId.Id}", fromNow, MaxSimulatedEvents, ct))
await foreach (var storedEvent in eventStore.QueryAllReverseAsync(streamFilter, streamStart, MaxSimulatedEvents, ct))
{
var @event = eventFormatter.ParseIfKnown(storedEvent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ private async Task EnqueueFromEventsAsync(Run run,
await using var batch = new RuleQueueWriter(ruleEventRepository, ruleUsageTracker, null);

// Use a prefix query so that the storage can use an index for the query.
var filter = $"^([a-z]+)\\-{appId}";
var streamFilter = StreamFilter.Prefix($"([a-zA-Z0-9]+)\\-{appId}");

await foreach (var storedEvent in eventStore.QueryAllAsync(filter, run.Job.Position, ct: ct))
await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, run.Job.Position, ct: ct))
{
var @event = eventFormatter.ParseIfKnown(storedEvent);

Expand Down
Loading

0 comments on commit 5440e68

Please sign in to comment.