diff --git a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts index fd843a0e85..7e50d95223 100644 --- a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts +++ b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts @@ -17,10 +17,10 @@ limitations under the License. import { encodeBase64, EventType, MatrixClient, MatrixError, MatrixEvent, Room } from "../../../src"; import { KnownMembership } from "../../../src/@types/membership"; import { DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "../../../src/matrixrtc/CallMembership"; -import { MembershipManager } from "../../../src/matrixrtc/MembershipManager"; import { MatrixRTCSession, MatrixRTCSessionEvent } from "../../../src/matrixrtc/MatrixRTCSession"; import { EncryptionKeysEventContent } from "../../../src/matrixrtc/types"; import { randomString } from "../../../src/randomstring"; +import { flushPromises } from "../../test-utils/flushPromises"; import { makeMockRoom, makeMockRoomState, membershipTemplate } from "./mocks"; const mockFocus = { type: "mock" }; @@ -236,16 +236,15 @@ describe("MatrixRTCSession", () => { }); async function testSession(membershipData: SessionMembershipData): Promise { - const makeNewMembershipSpy = jest.spyOn(MembershipManager.prototype as any, "makeNewMembership"); sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData)); sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig); await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]); - expect(makeNewMembershipSpy).toHaveBeenCalledTimes(1); + expect(sendStateEventMock).toHaveBeenCalledTimes(1); await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]); - expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); + expect(sendDelayedStateMock).toHaveBeenCalledTimes(1); } it("sends events", async () => { @@ -323,9 +322,11 @@ describe("MatrixRTCSession", () => { let sendStateEventMock: jest.Mock; let sendDelayedStateMock: jest.Mock; let sendEventMock: jest.Mock; + let updateDelayedEventMock: jest.Mock; let sentStateEvent: Promise; let sentDelayedState: Promise; + let updatedDelayedEvent: Promise; beforeEach(() => { sentStateEvent = new Promise((resolve) => { @@ -339,12 +340,15 @@ describe("MatrixRTCSession", () => { }; }); }); + updatedDelayedEvent = new Promise((r) => { + updateDelayedEventMock = jest.fn(r); + }); sendEventMock = jest.fn(); client.sendStateEvent = sendStateEventMock; client._unstable_sendDelayedStateEvent = sendDelayedStateMock; client.sendEvent = sendEventMock; - client._unstable_updateDelayedEvent = jest.fn(); + client._unstable_updateDelayedEvent = updateDelayedEventMock; mockRoom = makeMockRoom([]); sess = MatrixRTCSession.roomSessionForRoom(client, mockRoom); @@ -482,19 +486,7 @@ describe("MatrixRTCSession", () => { membershipServerSideExpiryTimeout: 9000, }); - // needed to advance the mock timers properly - // depends on myMembershipManager being created - const scheduledDelayDisconnection = new Promise((resolve) => { - const membershipManager = (sess as any).membershipManager; - const originalFn: () => void = membershipManager.scheduleDelayDisconnection; - membershipManager.scheduleDelayDisconnection = jest.fn(() => { - originalFn.call(membershipManager); - resolve(); - }); - }); - await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches - await sendDelayedStateAttempt; const callProps = (d: number) => { return [mockRoom!.roomId, { delay: d }, "org.matrix.msc3401.call.member", {}, userStateKey]; @@ -525,11 +517,13 @@ describe("MatrixRTCSession", () => { await sentDelayedState; // should have prepared the heartbeat to keep delaying the leave event while still connected - await scheduledDelayDisconnection; - // should have tried updating the delayed leave to test that it wasn't replaced by own state + await updatedDelayedEvent; expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1); - // should update delayed disconnect + + // ensures that we reach the code that schedules the timeout for the next delay update before we advance the timers. + await flushPromises(); jest.advanceTimersByTime(5000); + // should update delayed disconnect expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(2); jest.useRealTimers(); @@ -561,7 +555,7 @@ describe("MatrixRTCSession", () => { const onMembershipsChanged = jest.fn(); sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged); - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); expect(onMembershipsChanged).not.toHaveBeenCalled(); }); @@ -574,7 +568,7 @@ describe("MatrixRTCSession", () => { sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged); mockRoom.getLiveTimeline().getState = jest.fn().mockReturnValue(makeMockRoomState([], mockRoom.roomId)); - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); expect(onMembershipsChanged).toHaveBeenCalled(); }); @@ -763,7 +757,7 @@ describe("MatrixRTCSession", () => { mockRoom.getLiveTimeline().getState = jest .fn() .mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId)); - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); // member2 re-joins which should trigger an immediate re-send const keysSentPromise2 = new Promise((resolve) => { @@ -772,7 +766,7 @@ describe("MatrixRTCSession", () => { mockRoom.getLiveTimeline().getState = jest .fn() .mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId)); - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); // but, that immediate resend is throttled so we need to wait a bit jest.advanceTimersByTime(1000); const { keys } = await keysSentPromise2; @@ -825,7 +819,7 @@ describe("MatrixRTCSession", () => { mockRoom.getLiveTimeline().getState = jest .fn() .mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId)); - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); await keysSentPromise2; @@ -879,7 +873,7 @@ describe("MatrixRTCSession", () => { sendEventMock.mockClear(); // these should be a no-op: - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); expect(sendEventMock).toHaveBeenCalledTimes(0); expect(sess!.statistics.counters.roomEventEncryptionKeysSent).toEqual(1); } finally { @@ -933,7 +927,7 @@ describe("MatrixRTCSession", () => { sendEventMock.mockClear(); // this should be a no-op: - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); expect(sendEventMock).toHaveBeenCalledTimes(0); // advance time to avoid key throttling @@ -947,7 +941,7 @@ describe("MatrixRTCSession", () => { }); // this should re-send the key - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); await keysSentPromise2; @@ -1010,7 +1004,7 @@ describe("MatrixRTCSession", () => { mockRoom.getLiveTimeline().getState = jest .fn() .mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId)); - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); jest.advanceTimersByTime(10000); @@ -1055,7 +1049,7 @@ describe("MatrixRTCSession", () => { ); } - sess!.onMembershipUpdate(); + sess!.onRTCSessionMemberUpdate(); // advance time to avoid key throttling jest.advanceTimersByTime(10000); @@ -1096,7 +1090,7 @@ describe("MatrixRTCSession", () => { mockRoom.getLiveTimeline().getState = jest .fn() .mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId)); - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); await new Promise((resolve) => { realSetTimeout(resolve); diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index e8d6e1f430..7f3665f3eb 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -29,7 +29,7 @@ import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts"; import { KnownMembership } from "../@types/membership.ts"; import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts"; import { MatrixEvent } from "../models/event.ts"; -import { MembershipManager } from "./MembershipManager.ts"; +import { LegacyMembershipManager, IMembershipManager } from "./MembershipManager.ts"; const logger = rootLogger.getChild("MatrixRTCSession"); @@ -132,7 +132,7 @@ export type JoinSessionConfig = MembershipConfig & EncryptionConfig; * This class doesn't deal with media at all, just membership & properties of a session. */ export class MatrixRTCSession extends TypedEventEmitter { - private membershipManager?: MembershipManager; + private membershipManager?: IMembershipManager; // The session Id of the call, this is the call_id of the call Member event. private _callId: string | undefined; @@ -283,7 +283,8 @@ export class MatrixRTCSession extends TypedEventEmitter { - await this.membershipManager?.leaveRoomSession(1000); + await this.membershipManager?.leave(1000); if (this.expiryTimeout) { clearTimeout(this.expiryTimeout); this.expiryTimeout = undefined; } - this.membershipManager?.stop(); const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS); - roomState?.off(RoomStateEvent.Members, this.onMembershipUpdate); + roomState?.off(RoomStateEvent.Members, this.onRoomMemberUpdate); } /** @@ -324,24 +324,21 @@ export class MatrixRTCSession extends TypedEventEmitter + this.membershipManager = new LegacyMembershipManager(joinConfig, this.room, this.client, () => this.getOldestMembership(), ); } - this.joinConfig = joinConfig; + this.membershipManager!.join(fociPreferred, fociActive); this.manageMediaKeys = joinConfig?.manageMediaKeys ?? this.manageMediaKeys; - // TODO: it feels wrong to be doing `setJoined()` and then `joinRoomSession()` non-atomically - // A new api between MembershipManager and the session will need to be defined. - this.membershipManager.setJoined(fociPreferred, fociActive); if (joinConfig?.manageMediaKeys) { this.makeNewSenderKey(); this.requestSendCurrentKey(); } - this.membershipManager.joinRoomSession(); this.emit(MatrixRTCSessionEvent.JoinStateChanged, true); } @@ -383,12 +380,17 @@ export class MatrixRTCSession extends TypedEventEmitter { + this.recalculateSessionMembers(); + }; + + /** + * Call this when the Matrix room members have changed. + */ + public onRoomMemberUpdate = (): void => { + this.recalculateSessionMembers(); + }; + + /** + * Call this when something changed that may impacts the current MatrixRTC members in this session. + */ + public onRTCSessionMemberUpdate = (): void => { + this.recalculateSessionMembers(); + }; + + /** + * Call this when anything that could impact rtc memberships has changed: Room Members or RTC members. + * * Examines the latest call memberships and handles any encryption key sending or rotation that is needed. * * This function should be called when the room members or call memberships might have changed. */ - public onMembershipUpdate = (): void => { + private recalculateSessionMembers = (): void => { const oldMemberships = this.memberships; this.memberships = MatrixRTCSession.callMembershipsForRoom(this.room); @@ -764,11 +797,7 @@ export class MatrixRTCSession extends TypedEventEmitter 0 && !isNewSession; - sess.onMembershipUpdate(); + sess.onRTCSessionMemberUpdate(); const nowActive = sess.memberships.length > 0; diff --git a/src/matrixrtc/MembershipManager.ts b/src/matrixrtc/MembershipManager.ts index 04d46b3178..e195917156 100644 --- a/src/matrixrtc/MembershipManager.ts +++ b/src/matrixrtc/MembershipManager.ts @@ -10,20 +10,43 @@ import { CallMembership, DEFAULT_EXPIRE_DURATION, SessionMembershipData } from " import { Focus } from "./focus.ts"; import { isLivekitFocusActive } from "./LivekitFocus.ts"; import { MembershipConfig } from "./MatrixRTCSession.ts"; +/** + * This interface defines what a MembershipManager uses and exposes. + * This interface is what we use to write tests and allows to change the actual implementation + * Without breaking tests because of some internal method renaming. + * + * @internal + */ +export interface IMembershipManager { + isJoined(): boolean; + join(fociPreferred: Focus[], fociActive?: Focus): void; + leave(timeout: number | undefined): Promise; + /** + * call this if the MatrixRTC session members have changed + */ + onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise; + getActiveFocus(): Focus | undefined; +} /** - * This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session. + * This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session. + * + * Its responsibitiy is to manage the locals user membership: + * - send that sate event + * - send the delayed leave event + * - update the delayed leave event while connected + * - update the state event when it times out (for calls longer than membershipExpiryTimeout ~ 4h) + * + * It is possible to test this class on its own. The api surface (to use for tests) is + * defined in `MembershipManagerInterface`. + * + * It is recommended to only use this interface for testing to allow replacing this class. + * * @internal */ -export class MembershipManager { +export class LegacyMembershipManager implements IMembershipManager { private relativeExpiry: number | undefined; - public constructor( - private joinConfig: MembershipConfig | undefined, - private room: Room, - private client: MatrixClient, - private getOldestMembership: () => CallMembership | undefined, - ) {} private memberEventTimeout?: ReturnType; /** @@ -57,29 +80,53 @@ export class MembershipManager { 8_000 ); } - private get membershipKeepAlivePeriod(): number { return this.joinConfig?.membershipKeepAlivePeriod ?? 5_000; } - private get callMemberEventRetryJitter(): number { return this.joinConfig?.callMemberEventRetryJitter ?? 2_000; } - public joinRoomSession(): void { - // We don't wait for this, mostly because it may fail and schedule a retry, so this - // function returning doesn't really mean anything at all. - this.triggerCallMembershipEventUpdate(); + + public constructor( + private joinConfig: MembershipConfig | undefined, + private room: Pick, + private client: Pick< + MatrixClient, + | "getUserId" + | "getDeviceId" + | "sendStateEvent" + | "_unstable_sendDelayedEvent" + | "_unstable_sendDelayedStateEvent" + | "_unstable_updateDelayedEvent" + >, + private getOldestMembership: () => CallMembership | undefined, + ) {} + + /* + * Returns true if we intend to be participating in the MatrixRTC session. + * This is determined by checking if the relativeExpiry has been set. + */ + public isJoined(): boolean { + return this.relativeExpiry !== undefined; } - public setJoined(fociPreferred: Focus[], fociActive?: Focus): void { + + public join(fociPreferred: Focus[], fociActive?: Focus): void { this.ownFocusActive = fociActive; this.ownFociPreferred = fociPreferred; this.relativeExpiry = this.membershipExpiryTimeout; + // We don't wait for this, mostly because it may fail and schedule a retry, so this + // function returning doesn't really mean anything at all. + this.triggerCallMembershipEventUpdate(); } - public setLeft(): void { + + public async leave(timeout: number | undefined = undefined): Promise { this.relativeExpiry = undefined; this.ownFocusActive = undefined; - } - public async leaveRoomSession(timeout: number | undefined = undefined): Promise { + + if (this.memberEventTimeout) { + clearTimeout(this.memberEventTimeout); + this.memberEventTimeout = undefined; + } if (timeout) { // The sleep promise returns the string 'timeout' and the membership update void // A success implies that the membership update was quicker then the timeout. @@ -90,13 +137,38 @@ export class MembershipManager { return true; } } - public stop(): void { - if (this.memberEventTimeout) { - clearTimeout(this.memberEventTimeout); - this.memberEventTimeout = undefined; + + public async onRTCSessionMemberUpdate(memberships: CallMembership[]): Promise { + const isMyMembership = (m: CallMembership): boolean => + m.sender === this.client.getUserId() && m.deviceId === this.client.getDeviceId(); + + if (this.isJoined() && !memberships.some(isMyMembership)) { + logger.warn("Missing own membership: force re-join"); + // TODO: Should this be awaited? And is there anything to tell the focus? + return this.triggerCallMembershipEventUpdate(); } } - public triggerCallMembershipEventUpdate = async (): Promise => { + + public getActiveFocus(): Focus | undefined { + if (this.ownFocusActive) { + // A livekit active focus + if (isLivekitFocusActive(this.ownFocusActive)) { + if (this.ownFocusActive.focus_selection === "oldest_membership") { + const oldestMembership = this.getOldestMembership(); + return oldestMembership?.getPreferredFoci()[0]; + } + } else { + logger.warn("Unknown own ActiveFocus type. This makes it impossible to connect to an SFU."); + } + } else { + // We do not understand the membership format (could be legacy). We default to oldestMembership + // Once there are other methods this is a hard error! + const oldestMembership = this.getOldestMembership(); + return oldestMembership?.getPreferredFoci()[0]; + } + } + + private triggerCallMembershipEventUpdate = async (): Promise => { // TODO: Should this await on a shared promise? if (this.updateCallMembershipRunning) { this.needCallMembershipUpdate = true; @@ -121,13 +193,7 @@ export class MembershipManager { } return {}; } - /* - * Returns true if we intend to be participating in the MatrixRTC session. - * This is determined by checking if the relativeExpiry has been set. - */ - public isJoined(): boolean { - return this.relativeExpiry !== undefined; - } + /** * Constructs our own membership */ @@ -143,21 +209,7 @@ export class MembershipManager { }; } - public getActiveFocus(): Focus | undefined { - if (this.ownFocusActive && isLivekitFocusActive(this.ownFocusActive)) { - // A livekit active focus - if (this.ownFocusActive.focus_selection === "oldest_membership") { - const oldestMembership = this.getOldestMembership(); - return oldestMembership?.getPreferredFoci()[0]; - } - } else { - // We do not understand the membership format (could be legacy). We default to oldestMembership - // Once there are other methods this is a hard error! - const oldestMembership = this.getOldestMembership(); - return oldestMembership?.getPreferredFoci()[0]; - } - } - public async updateCallMembershipEvent(): Promise { + private async updateCallMembershipEvent(): Promise { if (this.memberEventTimeout) { clearTimeout(this.memberEventTimeout); this.memberEventTimeout = undefined; @@ -192,9 +244,7 @@ export class MembershipManager { stateKey, ), ); - logger.log("BEFOER:", this.disconnectDelayId); this.disconnectDelayId = res.delay_id; - logger.log("AFTER:", this.disconnectDelayId); } catch (e) { if ( e instanceof MatrixError && @@ -213,6 +263,7 @@ export class MembershipManager { logger.error("Failed to prepare delayed disconnection event:", e); } }; + await prepareDelayedDisconnection(); // Send join event _after_ preparing the delayed disconnection event await resendIfRateLimited(() =>