Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out ReceiptAccumulator #3319

Merged
merged 2 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions spec/unit/sync-accumulator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,77 @@ describe("SyncAccumulator", function () {
});
});

it("can handle large numbers of identical receipts", () => {
const testSize = 1000; // Make this big to check performance (e.g. 10 million ~= 10s)

const newReceipt = (ts: number) => {
return {
type: "m.receipt",
room_id: "!foo:bar",
content: {
"$event1:localhost": {
[ReceiptType.Read]: {
"@alice:localhost": { ts },
},
},
},
};
};

const receipts = [];
for (let i = 0; i < testSize; i++) {
receipts.push(newReceipt(testSize - i));
}

sa.accumulate(
syncSkeleton({
ephemeral: {
events: receipts,
},
}),
);

const events = sa.getJSON().roomsData.join["!foo:bar"].ephemeral.events;
expect(events.length).toEqual(1);
expect(events[0]).toEqual(newReceipt(1));
});

it("can handle large numbers of receipts for different users and events", () => {
const testSize = 100; // Make this big to check performance (e.g. 1 million ~= 10s)

const newReceipt = (ts: number) => {
return {
type: "m.receipt",
room_id: "!foo:bar",
content: {
[`$event${ts}:localhost`]: {
[ReceiptType.Read]: {
[`@alice${ts}:localhost`]: { ts },
},
},
},
};
};

const receipts = [];
for (let i = 0; i < testSize; i++) {
receipts.push(newReceipt(testSize - i));
}

sa.accumulate(
syncSkeleton({
ephemeral: {
events: receipts,
},
}),
);

const events = sa.getJSON().roomsData.join["!foo:bar"].ephemeral.events;
expect(events.length).toEqual(1);
expect(events[0]["content"]["$event1:localhost"]).toEqual({ "m.read": { "@alice1:localhost": { ts: 1 } } });
expect(Object.keys(events[0]["content"]).length).toEqual(testSize);
});

it("should accumulate threaded read receipts", () => {
const receipt1 = {
type: "m.receipt",
Expand Down
53 changes: 53 additions & 0 deletions src/receipt-accumulator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
Copyright 2023 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

import { AccumulatedReceipt } from "./sync-accumulator";
import { MapWithDefault } from "./utils";

export class ReceiptAccumulator {
private readReceipts: Map<string, AccumulatedReceipt> = new Map();
andybalaam marked this conversation as resolved.
Show resolved Hide resolved
private threadedReadReceipts: MapWithDefault<string, Map<string, AccumulatedReceipt>> = new MapWithDefault(
andybalaam marked this conversation as resolved.
Show resolved Hide resolved
() => new Map(),
);

public setUnthreaded(userId: string, receipt: AccumulatedReceipt): void {
this.readReceipts.set(userId, receipt);
}

public setThreaded(threadId: string, userId: string, receipt: AccumulatedReceipt): void {
this.threadedReadReceipts.getOrCreate(threadId).set(userId, receipt);
}

/**
* @returns an iterator of pairs of [userId, AccumulatedReceipt] - all the
* unthreaded receipts for each user.
*/
public allUnthreaded(): IterableIterator<[string, AccumulatedReceipt]> {
return this.readReceipts.entries();
}

/**
* @returns an iterator of pairs of [userId, AccumulatedReceipt] - all the
* threaded receipts for each user, in all threads.
*/
public *allThreaded(): IterableIterator<[string, AccumulatedReceipt]> {
for (const receiptsForThread of this.threadedReadReceipts.values()) {
for (const e of receiptsForThread.entries()) {
yield e;
}
}
}
}
55 changes: 19 additions & 36 deletions src/sync-accumulator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2017 - 2021 The Matrix.org Foundation C.I.C.
Copyright 2017 - 2023 The Matrix.org Foundation C.I.C.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,7 @@ import { IRoomSummary } from "./models/room-summary";
import { EventType } from "./@types/event";
import { MAIN_ROOM_TIMELINE, ReceiptContent, ReceiptType } from "./@types/read_receipts";
import { UNREAD_THREAD_NOTIFICATIONS } from "./@types/sync";
import { ReceiptAccumulator } from "./receipt-accumulator";

interface IOpts {
/**
Expand All @@ -43,6 +44,12 @@ export interface IMinimalEvent {
unsigned?: IUnsigned;
}

export interface AccumulatedReceipt {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this belongs to the ReceiptAccumulator and should eventually be moved to the receipt-accumulator module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in next PR: #3320

data: IMinimalEvent;
type: ReceiptType;
eventId: string;
}

export interface IEphemeral {
events: IMinimalEvent[];
}
Expand Down Expand Up @@ -167,22 +174,7 @@ interface IRoom {
_accountData: { [eventType: string]: IMinimalEvent };
_unreadNotifications: Partial<UnreadNotificationCounts>;
_unreadThreadNotifications?: Record<string, Partial<UnreadNotificationCounts>>;
_readReceipts: {
[userId: string]: {
data: IMinimalEvent;
type: ReceiptType;
eventId: string;
};
};
_threadReadReceipts: {
[threadId: string]: {
[userId: string]: {
data: IMinimalEvent;
type: ReceiptType;
eventId: string;
};
};
};
_receipts: ReceiptAccumulator;
}

export interface ISyncData {
Expand Down Expand Up @@ -387,8 +379,7 @@ export class SyncAccumulator {
_unreadNotifications: {},
_unreadThreadNotifications: {},
_summary: {},
_readReceipts: {},
_threadReadReceipts: {},
_receipts: new ReceiptAccumulator(),
};
}
const currentData = this.joinRooms[roomId];
Expand Down Expand Up @@ -453,19 +444,13 @@ export class SyncAccumulator {
const receipt = {
data: e.content[eventId][key][userId],
type: key as ReceiptType,
eventId: eventId,
eventId,
};

if (!data.thread_id || data.thread_id === MAIN_ROOM_TIMELINE) {
currentData._readReceipts[userId] = receipt;
currentData._receipts.setUnthreaded(userId, receipt);
} else {
currentData._threadReadReceipts = {
...currentData._threadReadReceipts,
[data.thread_id]: {
...(currentData._threadReadReceipts[data.thread_id] ?? {}),
[userId]: receipt,
},
};
currentData._receipts.setThreaded(data.thread_id, userId, receipt);
}
}
});
Expand Down Expand Up @@ -590,20 +575,18 @@ export class SyncAccumulator {
MapWithDefault<ReceiptType, Map<string, object>>
> = new MapWithDefault(() => new MapWithDefault(() => new Map()));

for (const [userId, receiptData] of Object.entries(roomData._readReceipts)) {
for (const [userId, receiptData] of roomData._receipts.allUnthreaded()) {
receiptEventContent
.getOrCreate(receiptData.eventId)
.getOrCreate(receiptData.type)
.set(userId, receiptData.data);
}

for (const threadReceipts of Object.values(roomData._threadReadReceipts)) {
for (const [userId, receiptData] of Object.entries(threadReceipts)) {
receiptEventContent
.getOrCreate(receiptData.eventId)
.getOrCreate(receiptData.type)
.set(userId, receiptData.data);
}
for (const [userId, receiptData] of roomData._receipts.allThreaded()) {
receiptEventContent
.getOrCreate(receiptData.eventId)
.getOrCreate(receiptData.type)
.set(userId, receiptData.data);
}

receiptEvent.content = recursiveMapToObject(receiptEventContent);
Expand Down