diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
index 9c857b8..e0113a2 100644
--- a/.github/workflows/dotnet.yml
+++ b/.github/workflows/dotnet.yml
@@ -43,8 +43,17 @@ jobs:
run: dotnet build --no-restore
working-directory: ./src
- name: Test
- run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build
+ run: dotnet test --no-restore --no-build
working-directory: ./src/Fluss.UnitTest
+ - name: Start PostgreSQL
+ run: docker compose up -d
+ working-directory: ./src/Fluss.PostgreSQL.IntegrationTest
+ - name: Test PostgreSQL
+ run: dotnet test --collect:"XPlat Code Coverage" --no-restore --no-build
+ working-directory: ./src/Fluss.PostgreSQL.IntegrationTest
+ - name: Stop PostgreSQL
+ run: docker compose down
+ working-directory: ./src/Fluss.PostgreSQL.IntegrationTest
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
env:
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj b/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj
new file mode 100644
index 0000000..b801c77
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/Fluss.PostgreSQL.IntegrationTest.csproj
@@ -0,0 +1,34 @@
+
+
+
+ net9.0;net8.0
+ enable
+ enable
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs
new file mode 100644
index 0000000..bfcdb13
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/PostgreSQLTest.cs
@@ -0,0 +1,465 @@
+using Fluss.Events;
+using Fluss.Upcasting;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+using Npgsql;
+using Xunit.Abstractions;
+
+namespace Fluss.PostgreSQL.IntegrationTest;
+
+public class PostgreSQLTest : IAsyncLifetime
+{
+ private readonly ITestOutputHelper _testOutputHelper;
+ private string _dbName = null!;
+ private string _managementConnectionString = null!;
+ private string _connectionString = null!;
+
+ public PostgreSQLTest(ITestOutputHelper testOutputHelper)
+ {
+ _testOutputHelper = testOutputHelper;
+ }
+
+ public async Task InitializeAsync()
+ {
+ var config = new ConfigurationBuilder()
+ .AddJsonFile("appsettings.json", false, false)
+ .Build();
+
+ _managementConnectionString = config.GetConnectionString("DefaultConnection")!;
+ _dbName = "test" + Guid.NewGuid().ToString().Replace("-", "");
+
+ await using var npgsqlConnection = new NpgsqlConnection(_managementConnectionString);
+ await npgsqlConnection.OpenAsync();
+
+ await using var command = new NpgsqlCommand("CREATE DATABASE " + _dbName, npgsqlConnection);
+ await command.ExecuteNonQueryAsync();
+
+ _connectionString = new NpgsqlConnectionStringBuilder(_managementConnectionString)
+ {
+ Database = _dbName
+ }.ConnectionString;
+ }
+
+/* Unmerged change from project 'Fluss.PostgreSQL.IntegrationTest(net8.0)'
+Before:
+ Assert.Equal(1, events.Count);
+After:
+ Assert.Single(events);
+*/
+
+ [Fact]
+ public async Task SimpleTest()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().WaitForFinish();
+
+ var eventRepository = sp.GetRequiredService();
+ await eventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(42),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ var version = await eventRepository.GetLatestVersion();
+ Assert.Equal(0, version);
+
+ var events = await eventRepository.GetEvents(-1, 0);
+ Assert.Single(events);
+
+ Assert.Equal(new TestEvent(42), events[0].Span[0].Event);
+ }
+
+ [Fact]
+ public async Task TestGetRawEvents()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().WaitForFinish();
+
+ var eventRepository = sp.GetRequiredService();
+ var baseEventRepository = sp.GetRequiredService();
+
+ // Publish some events
+ await eventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ },
+ new EventEnvelope
+ {
+ Event = new TestEvent(2),
+ Version = 1,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Get raw events
+ var rawEvents = await baseEventRepository.GetRawEvents();
+ var eventList = rawEvents.ToList();
+
+ Assert.Equal(2, eventList.Count);
+ Assert.Equal(0, eventList[0].Version);
+ Assert.Equal(1, eventList[1].Version);
+ }
+
+ [Fact]
+ public async Task TestReplaceEvent()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().WaitForFinish();
+
+ var baseEventRepository = (PostgreSQLEventRepository)sp.GetRequiredService();
+
+ // Publish an event
+ await baseEventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Replace the event
+ var newEvent = new RawEventEnvelope
+ {
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null,
+ RawEvent = JObject.FromObject(new TestEvent(2), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings))
+ };
+
+ await baseEventRepository.ReplaceEvent(0, [newEvent]);
+
+ // Verify the event was replaced
+ var events = await baseEventRepository.GetEvents(-1, 0);
+ Assert.Single(events);
+ Assert.Equal(new TestEvent(2), events[0].Span[0].Event);
+ }
+
+ [Fact]
+ public async Task TestReplaceEventWithMultiple()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sp = sc.BuildServiceProvider();
+
+ await sp.GetRequiredService().StartAsync(default);
+ await sp.GetRequiredService().StartAsync(default);
+
+ await sp.GetRequiredService().WaitForFinish();
+
+ var baseEventRepository = sp.GetRequiredService();
+
+ // Publish an initial event
+ await baseEventRepository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Replace the event with multiple events
+ var newEvents = new List
+ {
+ new RawEventEnvelope
+ {
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null,
+ RawEvent = JObject.FromObject(new TestEvent(2), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings))
+ },
+ new RawEventEnvelope
+ {
+ Version = 1,
+ At = DateTimeOffset.UtcNow,
+ By = null,
+ RawEvent = JObject.FromObject(new TestEvent(3), JsonSerializer.Create(PostgreSQLEventRepository.JsonSerializerSettings))
+ }
+ };
+
+ await baseEventRepository.ReplaceEvent(0, newEvents);
+
+ // Verify the events were replaced
+ var events = await baseEventRepository.GetEvents(-1, 1);
+ Assert.Equal(2, events[0].Length);
+ Assert.Equal(new TestEvent(2), events[0].Span[0].Event);
+ Assert.Equal(new TestEvent(3), events[0].Span[1].Event);
+ }
+
+ [Fact]
+ public async Task TestUpcaster()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ await using (var sp = sc.BuildServiceProvider())
+ {
+ var migrator = sp.GetRequiredService();
+
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+
+ var repository = sp.GetRequiredService();
+ await repository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+ }
+
+ sc.AddUpcaster();
+
+ await using (var sp = sc.BuildServiceProvider())
+ {
+ var migrator = sp.GetRequiredService();
+ var upcaster = sp.GetRequiredService();
+
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+ await upcaster.StartAsync(default);
+ await upcaster.ExecuteTask!;
+
+ var repository = sp.GetRequiredService();
+ var events = await repository.GetEvents(-1, 0);
+ Assert.Single(events);
+ Assert.Equal(new TestEvent2(1), events[0].Span[0].Event);
+ }
+ }
+
+ [Fact]
+ public async Task TestNewEventsSubscription()
+ {
+ var sc = new ServiceCollection();
+ sc.AddEventSourcing();
+ sc.AddPostgresEventSourcingRepository(_connectionString);
+
+ await using var sp = sc.BuildServiceProvider();
+
+ var migrator = sp.GetRequiredService();
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+
+ var repository = sp.GetRequiredService();
+
+ var eventRaised = new TaskCompletionSource();
+
+ void Handler(object? sender, EventArgs args)
+ {
+ try
+ {
+ eventRaised.SetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ }
+
+ repository.NewEvents += Handler;
+
+ try
+ {
+ await repository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Wait for the event to be raised or timeout after 5 seconds
+ var eventRaisedTask = eventRaised.Task;
+ var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
+
+ var completedTask = await Task.WhenAny(eventRaisedTask, timeoutTask);
+
+ Assert.Equal(eventRaisedTask, completedTask);
+ Assert.True(await eventRaisedTask, "NewEvents event was not raised");
+ }
+ finally
+ {
+ repository.NewEvents -= Handler;
+ }
+
+ // Test removing the event handler
+ var secondEventRaised = new TaskCompletionSource();
+ repository.NewEvents += (_, _) =>
+ {
+ try
+ {
+ secondEventRaised.SetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ };
+
+ await repository.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(2),
+ Version = 1,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ var secondEventRaisedTask = secondEventRaised.Task;
+ var secondTimeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
+
+ var secondCompletedTask = await Task.WhenAny(secondEventRaisedTask, secondTimeoutTask);
+
+ Assert.Equal(secondEventRaisedTask, secondCompletedTask);
+ Assert.True(await secondEventRaisedTask, "NewEvents event was not raised after removing a handler");
+ }
+
+ [Fact]
+ public async Task TestDatabaseNotificationForwarding()
+ {
+ var sc1 = new ServiceCollection();
+ sc1.AddEventSourcing();
+ sc1.AddPostgresEventSourcingRepository(_connectionString);
+
+ var sc2 = new ServiceCollection();
+ sc2.AddEventSourcing();
+ sc2.AddPostgresEventSourcingRepository(_connectionString);
+
+ await using var sp1 = sc1.BuildServiceProvider();
+ await using var sp2 = sc2.BuildServiceProvider();
+
+ var migrator = sp1.GetRequiredService();
+ await migrator.StartAsync(default);
+ await migrator.WaitForFinish();
+
+ var repository1 = sp1.GetRequiredService();
+ var repository2 = sp2.GetRequiredService();
+
+ var eventRaised1 = new TaskCompletionSource();
+ var eventRaised2 = new TaskCompletionSource();
+
+ repository1.NewEvents += (_, _) =>
+ {
+ try
+ {
+ eventRaised1.TrySetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ };
+ repository2.NewEvents += (_, _) =>
+ {
+ try
+ {
+ eventRaised2.TrySetResult(true);
+ }
+ catch (Exception)
+ {
+ // ignored
+ }
+ };
+
+ // Publish an event using the first repository
+ await repository1.Publish([
+ new EventEnvelope
+ {
+ Event = new TestEvent(1),
+ Version = 0,
+ At = DateTimeOffset.UtcNow,
+ By = null
+ }
+ ]);
+
+ // Wait for both event handlers to be triggered or timeout after 5 seconds
+ var timeoutTask = Task.Delay(TimeSpan.FromSeconds(5));
+ var allTasks = await Task.WhenAny(
+ Task.WhenAll(eventRaised1.Task, eventRaised2.Task),
+ timeoutTask
+ );
+
+ Assert.NotEqual(timeoutTask, allTasks);
+ Assert.True(await eventRaised1.Task, "NewEvents event was not raised on the first repository");
+ Assert.True(await eventRaised2.Task, "NewEvents event was not raised on the second repository");
+ }
+
+ public async Task DisposeAsync()
+ {
+ await using var npgsqlConnection = new NpgsqlConnection(_managementConnectionString);
+ await npgsqlConnection.OpenAsync();
+
+ await using var command = new NpgsqlCommand($"DROP DATABASE {_dbName} WITH (FORCE)", npgsqlConnection);
+ await command.ExecuteNonQueryAsync();
+ }
+
+ public record TestEvent(int Test) : Event;
+ public record TestEvent2(int Test) : Event;
+
+ public record TestEventUpcaster : IUpcaster
+ {
+ public IEnumerable? Upcast(JObject eventJson)
+ {
+ var eventType = eventJson["$type"]?.Value();
+ if (eventType == typeof(TestEvent).AssemblyQualifiedName)
+ {
+ var eventJson2 = new JObject(eventJson)
+ {
+ ["$type"] = typeof(TestEvent2).AssemblyQualifiedName
+ };
+
+ return
+ [
+ eventJson2
+ ];
+ }
+
+ return null;
+ }
+ }
+}
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json b/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json
new file mode 100644
index 0000000..c644e43
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/appsettings.json
@@ -0,0 +1,5 @@
+{
+ "ConnectionStrings": {
+ "DefaultConnection": "Host=localhost;Port=5432;Database=postgres;Username=postgres;Password=postgres"
+ }
+}
\ No newline at end of file
diff --git a/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml b/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml
new file mode 100644
index 0000000..2113418
--- /dev/null
+++ b/src/Fluss.PostgreSQL.IntegrationTest/docker-compose.yaml
@@ -0,0 +1,9 @@
+services:
+ database:
+ image: postgres:15
+ environment:
+ POSTGRES_USER: postgres
+ POSTGRES_PASSWORD: postgres
+ POSTGRES_DB: postgres
+ ports:
+ - "5432:5432"
\ No newline at end of file
diff --git a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
index eee5d38..66ec37d 100644
--- a/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
+++ b/src/Fluss.PostgreSQL/Fluss.PostgreSQL.csproj
@@ -11,7 +11,6 @@
https://github.com/atmina/fluss
git
MIT
- true
diff --git a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
index 2a0fe74..c764a8a 100644
--- a/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
+++ b/src/Fluss.PostgreSQL/PostgreSQLEventRepository.cs
@@ -1,6 +1,7 @@
using System.Collections.ObjectModel;
using System.Data;
using System.Diagnostics;
+using System.Text.Json;
using Fluss.Events;
using Fluss.Exceptions;
using Newtonsoft.Json;
@@ -13,17 +14,17 @@ namespace Fluss.PostgreSQL;
public partial class PostgreSQLEventRepository : IBaseEventRepository
{
private readonly NpgsqlDataSource dataSource;
+ internal static readonly JsonSerializerSettings JsonSerializerSettings = new()
+ {
+ TypeNameHandling = TypeNameHandling.All,
+ TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full,
+ MetadataPropertyHandling = MetadataPropertyHandling.ReadAhead
+ };
public PostgreSQLEventRepository(PostgreSQLConfig config)
{
var dataSourceBuilder = new NpgsqlDataSourceBuilder(config.ConnectionString);
- dataSourceBuilder.UseJsonNet(settings: new JsonSerializerSettings
- {
- TypeNameHandling = TypeNameHandling.All,
- TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Full,
- MetadataPropertyHandling =
- MetadataPropertyHandling.ReadAhead // While this is marked as a performance hit, profiling approves
- });
+ dataSourceBuilder.UseJsonNet(settings: JsonSerializerSettings);
dataSource = dataSourceBuilder.Build();
}
diff --git a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
index f7ff5d4..629e695 100644
--- a/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
+++ b/src/Fluss.PostgreSQL/ServiceCollectionExtensions.cs
@@ -7,6 +7,7 @@
[assembly: InternalsVisibleTo("Fluss.UnitTest")]
+[assembly: InternalsVisibleTo("Fluss.PostgreSQL.IntegrationTest")]
namespace Fluss.PostgreSQL;
@@ -28,7 +29,8 @@ public static IServiceCollection AddPostgresEventSourcingRepository(this IServic
.AddSingleton(new PostgreSQLConfig(connectionString))
.AddSingleton()
.AddHostedService(sp => sp.GetRequiredService())
- .AddHostedService();
+ .AddSingleton()
+ .AddHostedService(sp => sp.GetRequiredService());
}
}
diff --git a/src/Fluss.sln b/src/Fluss.sln
index 0ae3d82..12cb03d 100644
--- a/src/Fluss.sln
+++ b/src/Fluss.sln
@@ -14,6 +14,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.Regen", "Fluss.Regen\
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Benchmark", "Benchmark\Benchmark.csproj", "{970AB461-4D44-4B99-AACA-7DAA569D4C34}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Fluss.PostgreSQL.IntegrationTest", "Fluss.PostgreSQL.IntegrationTest\Fluss.PostgreSQL.IntegrationTest.csproj", "{3210592D-7B76-4D90-BAAC-7B9CBF7E580D}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -48,5 +50,9 @@ Global
{970AB461-4D44-4B99-AACA-7DAA569D4C34}.Debug|Any CPU.Build.0 = Debug|Any CPU
{970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.ActiveCfg = Release|Any CPU
{970AB461-4D44-4B99-AACA-7DAA569D4C34}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3210592D-7B76-4D90-BAAC-7B9CBF7E580D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
diff --git a/src/Fluss/Fluss.csproj b/src/Fluss/Fluss.csproj
index 0c42f56..fc920a9 100644
--- a/src/Fluss/Fluss.csproj
+++ b/src/Fluss/Fluss.csproj
@@ -11,7 +11,6 @@
https://github.com/atmina/fluss
git
MIT
- true