Skip to content

Commit

Permalink
feat: Addition of event store method to retrieve events of a specific…
Browse files Browse the repository at this point in the history
… aggregate root class based on multiple entity ids.
  • Loading branch information
NickTsitlakidis committed Jul 3, 2024
1 parent 3ca8950 commit ead4e8f
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 1 deletion.
2 changes: 2 additions & 0 deletions libs/core/src/lib/aggregate-root/aggregate-root-name.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "reflect-metadata";

import { AGGREGATE_ROOT_NAME_KEY } from "../metadata-keys";

/**
Expand Down
8 changes: 8 additions & 0 deletions libs/core/src/lib/storage/abstract-event-store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { MissingAggregateRootNameException } from "../exceptions/missing-aggrega
import { UnknownEventVersionException } from "../exceptions/unknown-event-version-exception";
import { PublishedDomainEvent } from "../published-domain-event";
import { AbstractEventStore } from "./abstract-event-store";
import { AggregateRootClass } from "./event-store";
import { StoredAggregateRoot } from "./stored-aggregate-root";
import { StoredEvent } from "./stored-event";

Expand All @@ -28,6 +29,13 @@ class TestStore extends AbstractEventStore {
return Promise.resolve([]);
}

async findByAggregateRootIds<T extends AggregateRoot>(
aggregateRootClass: AggregateRootClass<T>,
ids: string[]
): Promise<Record<string, Array<StoredEvent>>> {
return {};
}

generateEntityId(): Promise<string> {
return Promise.resolve("generated-id");
}
Expand Down
5 changes: 5 additions & 0 deletions libs/core/src/lib/storage/abstract-event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ export abstract class AbstractEventStore implements EventStore {
id: string
): Promise<Array<StoredEvent>>;

abstract findByAggregateRootIds<T extends AggregateRoot>(
aggregateRootClass: AggregateRootClass<T>,
ids: string[]
): Promise<Record<string, Array<StoredEvent>>>;

abstract generateEntityId(): Promise<string>;

abstract save(events: Array<StoredEvent>, aggregate: StoredAggregateRoot): Promise<Array<StoredEvent>>;
Expand Down
13 changes: 13 additions & 0 deletions libs/core/src/lib/storage/event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,25 @@ export interface EventStore {
* is resolved from the aggregate root class. These events can later be used to recreate an aggregate root object.
* @param aggregateRootClass The class of the aggregate root for which the store will search for events
* @param id The unique id of the aggregate root object
* @returns An array of events that are associated with the provided id
*/
findByAggregateRootId<T extends AggregateRoot>(
aggregateRootClass: AggregateRootClass<T>,
id: string
): Promise<Array<StoredEvent>>;

/**
* Finds all events that are associated with the provided aggregate root ids and match the aggregate root name which
* is resolved from the aggregate root class. These events can later be used to recreate an aggregate root object.
* @param aggregateRootClass The class of the aggregate root for which the store will search for events
* @param ids The unique ids of the aggregate root objects
* @returns A map where the key is the aggregate root id and the value is an array of events that are associated with that id
*/
findByAggregateRootIds<T extends AggregateRoot>(
aggregateRootClass: AggregateRootClass<T>,
ids: string[]
): Promise<Record<string, Array<StoredEvent>>>;

/**
* Each storage solution has its own way of dealing with unique ids. This method's implementation should reflect
* the way the storage solution generates unique ids. For example, in a MongoDB database this would usually return
Expand Down
120 changes: 119 additions & 1 deletion libs/mongodb/src/lib/storage/mongo-event-store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ beforeEach(async () => {
});

afterEach(async () => {
mongoClient.close(true);
await mongoClient.close(true);
});

@DomainEvent("test-event-1")
Expand Down Expand Up @@ -163,6 +163,124 @@ describe("save tests", () => {
});
});

describe("findByAggregateRootIds tests", () => {
test("returns empty object when no events found", async () => {
const events = await eventStore.findByAggregateRootIds(DecoratedAggregateRoot, [
new ObjectId().toHexString(),
new ObjectId().toHexString(),
new ObjectId().toHexString()
]);
expect(events).toEqual({});
});

test("returns mapped events when they are found and matched", async () => {
const aggregateRootId1 = new ObjectId().toHexString();
const aggregateRootId2 = new ObjectId().toHexString();
const aggregateRootId3 = new ObjectId().toHexString();
const ev1Id = new ObjectId().toHexString();
const ev2Id = new ObjectId().toHexString();
const ev3Id = new ObjectId().toHexString();
const ev4Id = new ObjectId().toHexString();

const ev1Date = new Date();
const ev2Date = new Date();
const ev3Date = new Date();
const ev4Date = new Date();

await eventsCollection.insertOne({
_id: new ObjectId(ev1Id),
aggregateRootId: aggregateRootId1,
aggregateRootName: "test-aggregate",
aggregateRootVersion: 1,
createdAt: ev1Date,
eventName: "test-event-1",
payload: {}
});

await eventsCollection.insertOne({
_id: new ObjectId(ev2Id),
aggregateRootId: aggregateRootId2,
aggregateRootName: "test-aggregate",
aggregateRootVersion: 2,
createdAt: ev2Date,
eventName: "test-event-2",
payload: {}
});

await eventsCollection.insertOne({
_id: new ObjectId(ev3Id),
aggregateRootId: aggregateRootId2,
aggregateRootName: "test-aggregate",
aggregateRootVersion: 2,
createdAt: ev3Date,
eventName: "test-event-2-2",
payload: {}
});

await eventsCollection.insertOne({
_id: new ObjectId(ev4Id),
aggregateRootId: aggregateRootId3,
aggregateRootName: "test-aggregate",
aggregateRootVersion: 2,
createdAt: ev4Date,
eventName: "test-event-3",
payload: {}
});

await eventsCollection.insertOne({
_id: new ObjectId(),
aggregateRootId: "other",
aggregateRootName: "other",
aggregateRootVersion: 2,
createdAt: ev4Date,
eventName: "other-event",
payload: {}
});

const events = await eventStore.findByAggregateRootIds(DecoratedAggregateRoot, [
aggregateRootId1,
aggregateRootId2,
aggregateRootId3
]);
expect(Object.keys(events).length).toBe(3);

expect(events[aggregateRootId1].length).toBe(1);
expect(events[aggregateRootId1][0].id).toBe(ev1Id);
expect(events[aggregateRootId1][0].aggregateRootVersion).toBe(1);
expect(events[aggregateRootId1][0].eventName).toBe("test-event-1");
expect(events[aggregateRootId1][0].aggregateRootId).toBe(aggregateRootId1);
expect(events[aggregateRootId1][0].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId1][0].payload).toEqual({});
expect(events[aggregateRootId1][0].createdAt).toEqual(ev1Date);

expect(events[aggregateRootId2].length).toBe(2);
expect(events[aggregateRootId2][0].id).toBe(ev2Id);
expect(events[aggregateRootId2][0].aggregateRootVersion).toBe(2);
expect(events[aggregateRootId2][0].eventName).toBe("test-event-2");
expect(events[aggregateRootId2][0].aggregateRootId).toBe(aggregateRootId2);
expect(events[aggregateRootId2][0].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId2][0].payload).toEqual({});
expect(events[aggregateRootId2][0].createdAt).toEqual(ev2Date);

expect(events[aggregateRootId2][1].id).toBe(ev3Id);
expect(events[aggregateRootId2][1].aggregateRootVersion).toBe(2);
expect(events[aggregateRootId2][1].eventName).toBe("test-event-2-2");
expect(events[aggregateRootId2][1].aggregateRootId).toBe(aggregateRootId2);
expect(events[aggregateRootId2][1].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId2][1].payload).toEqual({});
expect(events[aggregateRootId2][1].createdAt).toEqual(ev3Date);

expect(events[aggregateRootId3].length).toBe(1);
expect(events[aggregateRootId3][0].id).toBe(ev4Id);
expect(events[aggregateRootId3][0].aggregateRootVersion).toBe(2);
expect(events[aggregateRootId3][0].eventName).toBe("test-event-3");
expect(events[aggregateRootId3][0].aggregateRootId).toBe(aggregateRootId3);
expect(events[aggregateRootId3][0].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId3][0].payload).toEqual({});
expect(events[aggregateRootId3][0].createdAt).toEqual(ev4Date);
});
});

describe("findByAggregateRootId tests", () => {
test("returns empty array when no events found", async () => {
const events = await eventStore.findByAggregateRootId(DecoratedAggregateRoot, new ObjectId().toHexString());
Expand Down
40 changes: 40 additions & 0 deletions libs/mongodb/src/lib/storage/mongo-event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,46 @@ export class MongoEventStore extends AbstractEventStore {
return [];
}

async findByAggregateRootIds<T extends AggregateRoot>(
aggregateRootClass: AggregateRootClass<T>,
ids: string[]
): Promise<Record<string, Array<StoredEvent>>> {
const aggregateRootName = getAggregateRootName(aggregateRootClass);
if (isNil(aggregateRootName)) {
this._logger.error(
`Missing aggregate root name for class: ${aggregateRootClass.name}. Use the @AggregateRootName decorator.`
);
throw new MissingAggregateRootNameException(aggregateRootClass.name);
}

const documents = await this._mongoClient
.db()
.collection(this._eventsCollectionName)
.find({ aggregateRootId: { $in: ids }, aggregateRootName: aggregateRootName })
.toArray();

const grouped: Record<string, Array<StoredEvent>> = {};

documents.forEach((doc) => {
if (isNil(grouped[doc["aggregateRootId"]])) {
grouped[doc["aggregateRootId"]] = [];
}
grouped[doc["aggregateRootId"]].push(
StoredEvent.fromStorage(
doc._id.toHexString(),
doc["aggregateRootId"],
doc["eventName"],
doc["createdAt"],
doc["aggregateRootVersion"],
doc["aggregateRootName"],
doc["payload"]
)
);
});

return grouped;
}

generateEntityId(): Promise<string> {
return Promise.resolve(new ObjectId().toHexString());
}
Expand Down
141 changes: 141 additions & 0 deletions libs/postgresql/src/lib/storage/postgresql-event-store.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,147 @@ describe("findAggregateRootVersion tests", () => {
});
});

describe("findByAggregateRootIds tests", () => {
test("returns empty array when no events found", async () => {
const events = await eventStore.findByAggregateRootIds(DecoratedAggregateRoot, [
randomUUID(),
randomUUID(),
randomUUID()
]);
expect(events).toEqual({});
});

test("returns mapped events when they are found and matched", async () => {
const aggregateRootId1 = randomUUID();
const aggregateRootId2 = randomUUID();
const aggregateRootId3 = randomUUID();
const otherId = randomUUID();

const ev1Id = randomUUID();
const ev2Id = randomUUID();
const ev3Id = randomUUID();
const ev4Id = randomUUID();

const ev1Date = new Date();
const ev2Date = new Date();
const ev3Date = new Date();
const ev4Date = new Date();

await knexConnection(schema + ".es-aggregates").insert({
id: aggregateRootId1,
version: 11
});

await knexConnection(schema + ".es-aggregates").insert({
id: aggregateRootId2,
version: 11
});

await knexConnection(schema + ".es-aggregates").insert({
id: aggregateRootId3,
version: 11
});

await knexConnection(schema + ".es-aggregates").insert({
id: otherId,
version: 110
});

await knexConnection<EventRow>(schema + ".es-events").insert({
aggregate_root_id: aggregateRootId1,
aggregate_root_name: "test-aggregate",
aggregate_root_version: 34,
created_at: ev1Date,
event_name: "sql-event-1",
id: ev1Id,
payload: "{}"
});

await knexConnection<EventRow>(schema + ".es-events").insert({
aggregate_root_id: aggregateRootId2,
aggregate_root_name: "test-aggregate",
aggregate_root_version: 35,
created_at: ev2Date,
event_name: "sql-event-2",
id: ev2Id,
payload: "{}"
});

await knexConnection<EventRow>(schema + ".es-events").insert({
aggregate_root_id: aggregateRootId2,
aggregate_root_name: "test-aggregate",
aggregate_root_version: 12,
created_at: ev3Date,
event_name: "sql-event-2-2",
id: ev3Id,
payload: "{}"
});

await knexConnection<EventRow>(schema + ".es-events").insert({
aggregate_root_id: aggregateRootId3,
aggregate_root_name: "test-aggregate",
aggregate_root_version: 36,
created_at: ev4Date,
event_name: "sql-event-3",
id: ev4Id,
payload: "{}"
});

await knexConnection<EventRow>(schema + ".es-events").insert({
aggregate_root_id: otherId,
aggregate_root_name: "other",
aggregate_root_version: 36,
created_at: ev4Date,
event_name: "other-event",
id: randomUUID(),
payload: "{}"
});

const events = await eventStore.findByAggregateRootIds(DecoratedAggregateRoot, [
aggregateRootId1,
aggregateRootId2,
aggregateRootId3,
randomUUID()
]);

expect(Object.keys(events).length).toBe(3);
expect(events[aggregateRootId1].length).toBe(1);
expect(events[aggregateRootId1][0].id).toBe(ev1Id);
expect(events[aggregateRootId1][0].aggregateRootVersion).toBe(34);
expect(events[aggregateRootId1][0].eventName).toBe("sql-event-1");
expect(events[aggregateRootId1][0].aggregateRootId).toBe(aggregateRootId1);
expect(events[aggregateRootId1][0].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId1][0].payload).toEqual({});
expect(events[aggregateRootId1][0].createdAt).toEqual(ev1Date);

expect(events[aggregateRootId2].length).toBe(2);
expect(events[aggregateRootId2][0].id).toBe(ev2Id);
expect(events[aggregateRootId2][0].aggregateRootVersion).toBe(35);
expect(events[aggregateRootId2][0].eventName).toBe("sql-event-2");
expect(events[aggregateRootId2][0].aggregateRootId).toBe(aggregateRootId2);
expect(events[aggregateRootId2][0].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId2][0].payload).toEqual({});
expect(events[aggregateRootId2][0].createdAt).toEqual(ev2Date);

expect(events[aggregateRootId2][1].id).toBe(ev3Id);
expect(events[aggregateRootId2][1].aggregateRootVersion).toBe(12);
expect(events[aggregateRootId2][1].eventName).toBe("sql-event-2-2");
expect(events[aggregateRootId2][1].aggregateRootId).toBe(aggregateRootId2);
expect(events[aggregateRootId2][1].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId2][1].payload).toEqual({});
expect(events[aggregateRootId2][1].createdAt).toEqual(ev3Date);

expect(events[aggregateRootId3].length).toBe(1);
expect(events[aggregateRootId3][0].id).toBe(ev4Id);
expect(events[aggregateRootId3][0].aggregateRootVersion).toBe(36);
expect(events[aggregateRootId3][0].eventName).toBe("sql-event-3");
expect(events[aggregateRootId3][0].aggregateRootId).toBe(aggregateRootId3);
expect(events[aggregateRootId3][0].aggregateRootName).toBe("test-aggregate");
expect(events[aggregateRootId3][0].payload).toEqual({});
expect(events[aggregateRootId3][0].createdAt).toEqual(ev4Date);
});
});

describe("findByAggregateRootId tests", () => {
test("returns empty array when no events found", async () => {
const events = await eventStore.findByAggregateRootId(DecoratedAggregateRoot, randomUUID());
Expand Down
Loading

0 comments on commit ead4e8f

Please sign in to comment.