From 191095ac71616d9d1d1aeb42b20456c00ced798b Mon Sep 17 00:00:00 2001 From: Andy Balaam Date: Wed, 26 Apr 2023 14:30:37 +0100 Subject: [PATCH 1/2] Performance tests for receipt accumulation --- spec/unit/sync-accumulator.spec.ts | 71 ++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/spec/unit/sync-accumulator.spec.ts b/spec/unit/sync-accumulator.spec.ts index 9255d7ec89c..d434e0145ac 100644 --- a/spec/unit/sync-accumulator.spec.ts +++ b/spec/unit/sync-accumulator.spec.ts @@ -356,6 +356,77 @@ describe("SyncAccumulator", function () { }); }); + it("can handle large numbers of identical receipts", () => { + const testSize = 1000; // Make this big to check performance (e.g. 10 million ~= 10s) + + const newReceipt = (ts: number) => { + return { + type: "m.receipt", + room_id: "!foo:bar", + content: { + "$event1:localhost": { + [ReceiptType.Read]: { + "@alice:localhost": { ts }, + }, + }, + }, + }; + }; + + const receipts = []; + for (let i = 0; i < testSize; i++) { + receipts.push(newReceipt(testSize - i)); + } + + sa.accumulate( + syncSkeleton({ + ephemeral: { + events: receipts, + }, + }), + ); + + const events = sa.getJSON().roomsData.join["!foo:bar"].ephemeral.events; + expect(events.length).toEqual(1); + expect(events[0]).toEqual(newReceipt(1)); + }); + + it("can handle large numbers of receipts for different users and events", () => { + const testSize = 100; // Make this big to check performance (e.g. 1 million ~= 10s) + + const newReceipt = (ts: number) => { + return { + type: "m.receipt", + room_id: "!foo:bar", + content: { + [`$event${ts}:localhost`]: { + [ReceiptType.Read]: { + [`@alice${ts}:localhost`]: { ts }, + }, + }, + }, + }; + }; + + const receipts = []; + for (let i = 0; i < testSize; i++) { + receipts.push(newReceipt(testSize - i)); + } + + sa.accumulate( + syncSkeleton({ + ephemeral: { + events: receipts, + }, + }), + ); + + const events = sa.getJSON().roomsData.join["!foo:bar"].ephemeral.events; + expect(events.length).toEqual(1); + expect(events[0]["content"]["$event1:localhost"]).toEqual({ "m.read": { "@alice1:localhost": { ts: 1 } } }); + expect(Object.keys(events[0]["content"]).length).toEqual(testSize); + }); + it("should accumulate threaded read receipts", () => { const receipt1 = { type: "m.receipt", From ef22653a74b81de274f3bb019ba5f7551ec16614 Mon Sep 17 00:00:00 2001 From: Andy Balaam Date: Wed, 26 Apr 2023 14:55:56 +0100 Subject: [PATCH 2/2] Split ReceiptAccumulator into its own module --- src/receipt-accumulator.ts | 53 ++++++++++++++++++++++++++++++++++++ src/sync-accumulator.ts | 55 +++++++++++++------------------------- 2 files changed, 72 insertions(+), 36 deletions(-) create mode 100644 src/receipt-accumulator.ts diff --git a/src/receipt-accumulator.ts b/src/receipt-accumulator.ts new file mode 100644 index 00000000000..e622ea2049b --- /dev/null +++ b/src/receipt-accumulator.ts @@ -0,0 +1,53 @@ +/* +Copyright 2023 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { AccumulatedReceipt } from "./sync-accumulator"; +import { MapWithDefault } from "./utils"; + +export class ReceiptAccumulator { + private readReceipts: Map = new Map(); + private threadedReadReceipts: MapWithDefault> = new MapWithDefault( + () => new Map(), + ); + + public setUnthreaded(userId: string, receipt: AccumulatedReceipt): void { + this.readReceipts.set(userId, receipt); + } + + public setThreaded(threadId: string, userId: string, receipt: AccumulatedReceipt): void { + this.threadedReadReceipts.getOrCreate(threadId).set(userId, receipt); + } + + /** + * @returns an iterator of pairs of [userId, AccumulatedReceipt] - all the + * unthreaded receipts for each user. + */ + public allUnthreaded(): IterableIterator<[string, AccumulatedReceipt]> { + return this.readReceipts.entries(); + } + + /** + * @returns an iterator of pairs of [userId, AccumulatedReceipt] - all the + * threaded receipts for each user, in all threads. + */ + public *allThreaded(): IterableIterator<[string, AccumulatedReceipt]> { + for (const receiptsForThread of this.threadedReadReceipts.values()) { + for (const e of receiptsForThread.entries()) { + yield e; + } + } + } +} diff --git a/src/sync-accumulator.ts b/src/sync-accumulator.ts index 570bc8b1a9a..198b4d413fd 100644 --- a/src/sync-accumulator.ts +++ b/src/sync-accumulator.ts @@ -1,5 +1,5 @@ /* -Copyright 2017 - 2021 The Matrix.org Foundation C.I.C. +Copyright 2017 - 2023 The Matrix.org Foundation C.I.C. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import { IRoomSummary } from "./models/room-summary"; import { EventType } from "./@types/event"; import { MAIN_ROOM_TIMELINE, ReceiptContent, ReceiptType } from "./@types/read_receipts"; import { UNREAD_THREAD_NOTIFICATIONS } from "./@types/sync"; +import { ReceiptAccumulator } from "./receipt-accumulator"; interface IOpts { /** @@ -43,6 +44,12 @@ export interface IMinimalEvent { unsigned?: IUnsigned; } +export interface AccumulatedReceipt { + data: IMinimalEvent; + type: ReceiptType; + eventId: string; +} + export interface IEphemeral { events: IMinimalEvent[]; } @@ -167,22 +174,7 @@ interface IRoom { _accountData: { [eventType: string]: IMinimalEvent }; _unreadNotifications: Partial; _unreadThreadNotifications?: Record>; - _readReceipts: { - [userId: string]: { - data: IMinimalEvent; - type: ReceiptType; - eventId: string; - }; - }; - _threadReadReceipts: { - [threadId: string]: { - [userId: string]: { - data: IMinimalEvent; - type: ReceiptType; - eventId: string; - }; - }; - }; + _receipts: ReceiptAccumulator; } export interface ISyncData { @@ -387,8 +379,7 @@ export class SyncAccumulator { _unreadNotifications: {}, _unreadThreadNotifications: {}, _summary: {}, - _readReceipts: {}, - _threadReadReceipts: {}, + _receipts: new ReceiptAccumulator(), }; } const currentData = this.joinRooms[roomId]; @@ -453,19 +444,13 @@ export class SyncAccumulator { const receipt = { data: e.content[eventId][key][userId], type: key as ReceiptType, - eventId: eventId, + eventId, }; if (!data.thread_id || data.thread_id === MAIN_ROOM_TIMELINE) { - currentData._readReceipts[userId] = receipt; + currentData._receipts.setUnthreaded(userId, receipt); } else { - currentData._threadReadReceipts = { - ...currentData._threadReadReceipts, - [data.thread_id]: { - ...(currentData._threadReadReceipts[data.thread_id] ?? {}), - [userId]: receipt, - }, - }; + currentData._receipts.setThreaded(data.thread_id, userId, receipt); } } }); @@ -590,20 +575,18 @@ export class SyncAccumulator { MapWithDefault> > = new MapWithDefault(() => new MapWithDefault(() => new Map())); - for (const [userId, receiptData] of Object.entries(roomData._readReceipts)) { + for (const [userId, receiptData] of roomData._receipts.allUnthreaded()) { receiptEventContent .getOrCreate(receiptData.eventId) .getOrCreate(receiptData.type) .set(userId, receiptData.data); } - for (const threadReceipts of Object.values(roomData._threadReadReceipts)) { - for (const [userId, receiptData] of Object.entries(threadReceipts)) { - receiptEventContent - .getOrCreate(receiptData.eventId) - .getOrCreate(receiptData.type) - .set(userId, receiptData.data); - } + for (const [userId, receiptData] of roomData._receipts.allThreaded()) { + receiptEventContent + .getOrCreate(receiptData.eventId) + .getOrCreate(receiptData.type) + .set(userId, receiptData.data); } receiptEvent.content = recursiveMapToObject(receiptEventContent);