Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MatrixRTC: refactor MatrixRTCSession MemberManager API #4610

56 changes: 25 additions & 31 deletions spec/unit/matrixrtc/MatrixRTCSession.spec.ts
Original file line number Diff line number Diff line change
@@ -17,10 +17,10 @@ limitations under the License.
import { encodeBase64, EventType, MatrixClient, MatrixError, MatrixEvent, Room } from "../../../src";
import { KnownMembership } from "../../../src/@types/membership";
import { DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "../../../src/matrixrtc/CallMembership";
import { MembershipManager } from "../../../src/matrixrtc/MembershipManager";
import { MatrixRTCSession, MatrixRTCSessionEvent } from "../../../src/matrixrtc/MatrixRTCSession";
import { EncryptionKeysEventContent } from "../../../src/matrixrtc/types";
import { randomString } from "../../../src/randomstring";
import { flushPromises } from "../../test-utils/flushPromises";
import { makeMockRoom, makeMockRoomState, membershipTemplate } from "./mocks";

const mockFocus = { type: "mock" };
@@ -236,16 +236,15 @@ describe("MatrixRTCSession", () => {
});

async function testSession(membershipData: SessionMembershipData): Promise<void> {
const makeNewMembershipSpy = jest.spyOn(MembershipManager.prototype as any, "makeNewMembership");
sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData));

sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig);
await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]);

expect(makeNewMembershipSpy).toHaveBeenCalledTimes(1);
expect(sendStateEventMock).toHaveBeenCalledTimes(1);

await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]);
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
expect(sendDelayedStateMock).toHaveBeenCalledTimes(1);
}

it("sends events", async () => {
@@ -323,9 +322,11 @@ describe("MatrixRTCSession", () => {
let sendStateEventMock: jest.Mock;
let sendDelayedStateMock: jest.Mock;
let sendEventMock: jest.Mock;
let updateDelayedEventMock: jest.Mock;

let sentStateEvent: Promise<void>;
let sentDelayedState: Promise<void>;
let updatedDelayedEvent: Promise<void>;

beforeEach(() => {
sentStateEvent = new Promise((resolve) => {
@@ -339,12 +340,15 @@ describe("MatrixRTCSession", () => {
};
});
});
updatedDelayedEvent = new Promise((r) => {
updateDelayedEventMock = jest.fn(r);
});
sendEventMock = jest.fn();
client.sendStateEvent = sendStateEventMock;
client._unstable_sendDelayedStateEvent = sendDelayedStateMock;
client.sendEvent = sendEventMock;

client._unstable_updateDelayedEvent = jest.fn();
client._unstable_updateDelayedEvent = updateDelayedEventMock;

mockRoom = makeMockRoom([]);
sess = MatrixRTCSession.roomSessionForRoom(client, mockRoom);
@@ -482,19 +486,7 @@ describe("MatrixRTCSession", () => {
membershipServerSideExpiryTimeout: 9000,
});

// needed to advance the mock timers properly
// depends on myMembershipManager being created
const scheduledDelayDisconnection = new Promise<void>((resolve) => {
const membershipManager = (sess as any).membershipManager;
const originalFn: () => void = membershipManager.scheduleDelayDisconnection;
membershipManager.scheduleDelayDisconnection = jest.fn(() => {
originalFn.call(membershipManager);
resolve();
});
});

await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches

await sendDelayedStateAttempt;
const callProps = (d: number) => {
return [mockRoom!.roomId, { delay: d }, "org.matrix.msc3401.call.member", {}, userStateKey];
@@ -525,11 +517,13 @@ describe("MatrixRTCSession", () => {
await sentDelayedState;

// should have prepared the heartbeat to keep delaying the leave event while still connected
await scheduledDelayDisconnection;
// should have tried updating the delayed leave to test that it wasn't replaced by own state
await updatedDelayedEvent;
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1);
// should update delayed disconnect

// ensures that we reach the code that schedules the timeout for the next delay update before we advance the timers.
await flushPromises();
jest.advanceTimersByTime(5000);
// should update delayed disconnect
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(2);

jest.useRealTimers();
@@ -561,7 +555,7 @@ describe("MatrixRTCSession", () => {

const onMembershipsChanged = jest.fn();
sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged);
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

expect(onMembershipsChanged).not.toHaveBeenCalled();
});
@@ -574,7 +568,7 @@ describe("MatrixRTCSession", () => {
sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged);

mockRoom.getLiveTimeline().getState = jest.fn().mockReturnValue(makeMockRoomState([], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

expect(onMembershipsChanged).toHaveBeenCalled();
});
@@ -763,7 +757,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

// member2 re-joins which should trigger an immediate re-send
const keysSentPromise2 = new Promise<EncryptionKeysEventContent>((resolve) => {
@@ -772,7 +766,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
// but, that immediate resend is throttled so we need to wait a bit
jest.advanceTimersByTime(1000);
const { keys } = await keysSentPromise2;
@@ -825,7 +819,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

await keysSentPromise2;

@@ -879,7 +873,7 @@ describe("MatrixRTCSession", () => {
sendEventMock.mockClear();

// these should be a no-op:
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(sendEventMock).toHaveBeenCalledTimes(0);
expect(sess!.statistics.counters.roomEventEncryptionKeysSent).toEqual(1);
} finally {
@@ -933,7 +927,7 @@ describe("MatrixRTCSession", () => {
sendEventMock.mockClear();

// this should be a no-op:
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(sendEventMock).toHaveBeenCalledTimes(0);

// advance time to avoid key throttling
@@ -947,7 +941,7 @@ describe("MatrixRTCSession", () => {
});

// this should re-send the key
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

await keysSentPromise2;

@@ -1010,7 +1004,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

jest.advanceTimersByTime(10000);

@@ -1055,7 +1049,7 @@ describe("MatrixRTCSession", () => {
);
}

sess!.onMembershipUpdate();
sess!.onRTCSessionMemberUpdate();

// advance time to avoid key throttling
jest.advanceTimersByTime(10000);
@@ -1096,7 +1090,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

await new Promise((resolve) => {
realSetTimeout(resolve);
71 changes: 50 additions & 21 deletions src/matrixrtc/MatrixRTCSession.ts
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
import { KnownMembership } from "../@types/membership.ts";
import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
import { MatrixEvent } from "../models/event.ts";
import { MembershipManager } from "./MembershipManager.ts";
import { LegacyMembershipManager, IMembershipManager } from "./MembershipManager.ts";

const logger = rootLogger.getChild("MatrixRTCSession");

@@ -132,7 +132,7 @@ export type JoinSessionConfig = MembershipConfig & EncryptionConfig;
* This class doesn't deal with media at all, just membership & properties of a session.
*/
export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, MatrixRTCSessionEventHandlerMap> {
private membershipManager?: MembershipManager;
private membershipManager?: IMembershipManager;

// The session Id of the call, this is the call_id of the call Member event.
private _callId: string | undefined;
@@ -283,7 +283,8 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
super();
this._callId = memberships[0]?.callId;
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
roomState?.on(RoomStateEvent.Members, this.onMembershipUpdate);
// TODO: double check if this is actually needed. Should be covered by refreshRoom in MatrixRTCSessionManager
roomState?.on(RoomStateEvent.Members, this.onRoomMemberUpdate);
this.setExpiryTimer();
}

@@ -299,14 +300,13 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
* Performs cleanup & removes timers for client shutdown
*/
public async stop(): Promise<void> {
await this.membershipManager?.leaveRoomSession(1000);
await this.membershipManager?.leave(1000);
if (this.expiryTimeout) {
clearTimeout(this.expiryTimeout);
this.expiryTimeout = undefined;
}
this.membershipManager?.stop();
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
roomState?.off(RoomStateEvent.Members, this.onMembershipUpdate);
roomState?.off(RoomStateEvent.Members, this.onRoomMemberUpdate);
}

/**
@@ -324,24 +324,21 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
* @param joinConfig - Additional configuration for the joined session.
*/
public joinRoomSession(fociPreferred: Focus[], fociActive?: Focus, joinConfig?: JoinSessionConfig): void {
this.joinConfig = joinConfig;
if (this.isJoined()) {
logger.info(`Already joined to session in room ${this.room.roomId}: ignoring join call`);
return;
} else {
this.membershipManager = new MembershipManager(joinConfig, this.room, this.client, () =>
this.membershipManager = new LegacyMembershipManager(joinConfig, this.room, this.client, () =>
this.getOldestMembership(),
);
}
this.joinConfig = joinConfig;
this.membershipManager!.join(fociPreferred, fociActive);
this.manageMediaKeys = joinConfig?.manageMediaKeys ?? this.manageMediaKeys;
// TODO: it feels wrong to be doing `setJoined()` and then `joinRoomSession()` non-atomically
// A new api between MembershipManager and the session will need to be defined.
this.membershipManager.setJoined(fociPreferred, fociActive);
if (joinConfig?.manageMediaKeys) {
this.makeNewSenderKey();
this.requestSendCurrentKey();
}
this.membershipManager.joinRoomSession();
this.emit(MatrixRTCSessionEvent.JoinStateChanged, true);
}

@@ -383,12 +380,17 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M

logger.info(`Leaving call session in room ${this.room.roomId}`);
this.joinConfig = undefined;
this.membershipManager!.setLeft();
this.manageMediaKeys = false;
const leavePromise = this.membershipManager!.leave(timeout);
this.emit(MatrixRTCSessionEvent.JoinStateChanged, false);
return await this.membershipManager!.leaveRoomSession(timeout);
return await leavePromise;
}

/**
* Get the active focus from the current CallMemberState event
* @returns The focus that is currently in use to connect to this session. This is undefined
* if the client is not connected to this session.
*/
public getActiveFocus(): Focus | undefined {
return this.membershipManager?.getActiveFocus();
}
@@ -650,14 +652,21 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
}

if (soonestExpiry != undefined) {
this.expiryTimeout = setTimeout(this.onMembershipUpdate, soonestExpiry);
this.expiryTimeout = setTimeout(this.onRTCSessionMemberUpdate, soonestExpiry);
}
}

public getOldestMembership(): CallMembership | undefined {
return this.memberships[0];
}

/**
* This method is used when the user is not yet connected to the Session but wants to know what focus
* the users in the session are using to make a decision how it wants/should connect.
*
* See also `getActiveFocus`
* @returns The focus which should be used when joining this session.
*/
public getFocusInUse(): Focus | undefined {
const oldestMembership = this.getOldestMembership();
if (oldestMembership?.getFocusSelection() === "oldest_membership") {
@@ -746,11 +755,35 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
m.sender === this.client.getUserId() && m.deviceId === this.client.getDeviceId();

/**
* @deprecated use onRoomMemberUpdate or onRTCSessionMemberUpdate instead. this should be called when any membership in the call is updated
* the old name might have implied to only need to call this when your own membership changes.
*/
public onMembershipUpdate = (): void => {
this.recalculateSessionMembers();
};

/**
* Call this when the room members have changed.
toger5 marked this conversation as resolved.
Show resolved Hide resolved
*/
public onRoomMemberUpdate = (): void => {
this.recalculateSessionMembers();
};

/**
* Call this when sth changed that impacts the current rtc members in this session.
toger5 marked this conversation as resolved.
Show resolved Hide resolved
*/
public onRTCSessionMemberUpdate = (): void => {
this.recalculateSessionMembers();
};

/**
* Call this when anything that could impact rtc memberships has changed: Room Members or RTC members.
*
* Examines the latest call memberships and handles any encryption key sending or rotation that is needed.
*
* This function should be called when the room members or call memberships might have changed.
*/
public onMembershipUpdate = (): void => {
private recalculateSessionMembers = (): void => {
const oldMemberships = this.memberships;
this.memberships = MatrixRTCSession.callMembershipsForRoom(this.room);

@@ -764,11 +797,7 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
logger.info(`Memberships for call in room ${this.room.roomId} have changed: emitting`);
this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships);

if (this.isJoined() && !this.memberships.some(this.isMyMembership)) {
logger.warn("Missing own membership: force re-join");
// TODO: Should this be awaited? And is there anything to tell the focus?
this.membershipManager?.triggerCallMembershipEventUpdate();
}
this.membershipManager?.onRTCSessionMemberUpdate(this.memberships);
}

if (this.manageMediaKeys && this.isJoined()) {
2 changes: 1 addition & 1 deletion src/matrixrtc/MatrixRTCSessionManager.ts
Original file line number Diff line number Diff line change
@@ -153,7 +153,7 @@ export class MatrixRTCSessionManager extends TypedEventEmitter<MatrixRTCSessionM

const wasActiveAndKnown = sess.memberships.length > 0 && !isNewSession;

sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

const nowActive = sess.memberships.length > 0;

Loading