From beeb191588caf0d7f876a9fe0fed85b0d5a01b83 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Tue, 26 Apr 2022 21:11:41 +0200 Subject: [PATCH] reset member when seeing a new session id also buffer to_device messages for members we don't have a member event for already. --- src/matrix/calls/group/GroupCall.ts | 41 ++++++++++++++++++++++++----- src/matrix/calls/group/Member.ts | 32 +++++++++++++++++----- 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/src/matrix/calls/group/GroupCall.ts b/src/matrix/calls/group/GroupCall.ts index 9f8eb88c..29be893d 100644 --- a/src/matrix/calls/group/GroupCall.ts +++ b/src/matrix/calls/group/GroupCall.ts @@ -65,6 +65,7 @@ export class GroupCall extends EventEmitter<{change: never}> { private _memberOptions: MemberOptions; private _state: GroupCallState; private localMuteSettings: MuteSettings = new MuteSettings(false, false); + private bufferedDeviceMessages = new Map>>(); constructor( public readonly id: string, @@ -234,7 +235,7 @@ export class GroupCall extends EventEmitter<{change: never}> { let member = this._members.get(memberKey); if (member) { log.set("update", true); - member!.updateCallInfo(device); + member!.updateCallInfo(device, log); } else { const logItem = this.logItem.child({l: "member", id: memberKey}); log.set("add", true); @@ -249,6 +250,9 @@ export class GroupCall extends EventEmitter<{change: never}> { member.connect(this._localMedia!.clone(), this.localMuteSettings); } } + // flush pending messages, either after having created the member, + // or updated the session id with updateCallInfo + this.flushPendingDeviceMessages(member, log); } }); } @@ -282,6 +286,23 @@ export class GroupCall extends EventEmitter<{change: never}> { }); } + private flushPendingDeviceMessages(member: Member, log: ILogItem) { + const memberKey = getMemberKey(member.userId, member.deviceId); + const bufferedMessages = this.bufferedDeviceMessages.get(memberKey); + // check if we have any pending message for the member with (userid, deviceid, sessionid) + if (bufferedMessages) { + for (const message of bufferedMessages) { + if (message.content.sender_session_id === member.sessionId) { + member.handleDeviceMessage(message, log); + bufferedMessages.delete(message); + } + } + if (bufferedMessages.size === 0) { + this.bufferedDeviceMessages.delete(memberKey); + } + } + } + private getDeviceIdsForUserId(userId: string): string[] { return Array.from(this._members.keys()) .filter(key => memberKeyIsForUser(key, userId)) @@ -322,13 +343,21 @@ export class GroupCall extends EventEmitter<{change: never}> { /** @internal */ handleDeviceMessage(message: SignallingMessage, userId: string, deviceId: string, syncLog: ILogItem) { // TODO: return if we are not membering to the call - let member = this._members.get(getMemberKey(userId, deviceId)); - if (member) { - member.handleDeviceMessage(message, deviceId, syncLog); + const key = getMemberKey(userId, deviceId); + let member = this._members.get(key); + if (member && message.content.sender_session_id === member.sessionId) { + member.handleDeviceMessage(message, syncLog); } else { - const item = this.logItem.log({l: "could not find member for signalling message", userId, deviceId}); + const item = this.logItem.log({l: "member not found, buffering", userId, deviceId, sessionId: message.content.sender_session_id}); syncLog.refDetached(item); - // we haven't received the m.call.member yet for this caller. buffer the device messages or create the member/call anyway? + // we haven't received the m.call.member yet for this caller (or with this session id). + // buffer the device messages or create the member/call as it should arrive in a moment + let messages = this.bufferedDeviceMessages.get(key); + if (!messages) { + messages = new Set(); + this.bufferedDeviceMessages.set(key, messages); + } + messages.add(message); } } diff --git a/src/matrix/calls/group/Member.ts b/src/matrix/calls/group/Member.ts index e3f50c06..bef9bdb2 100644 --- a/src/matrix/calls/group/Member.ts +++ b/src/matrix/calls/group/Member.ts @@ -33,6 +33,7 @@ export type Options = Omit, log: ILogItem) => Promise, @@ -81,6 +82,11 @@ export class Member { return this.callDeviceMembership.device_id; } + /** session id of the member */ + get sessionId(): string { + return this.callDeviceMembership.session_id; + } + get dataChannel(): any | undefined { return this.peerCall?.dataChannel; } @@ -106,8 +112,8 @@ export class Member { } /** @internal */ - disconnect(hangup: boolean) { - this.logItem.wrap("disconnect", log => { + disconnect(hangup: boolean, log?: ILogItem) { + (log ?? this.logItem).wrap("disconnect", log => { if (hangup) { this.peerCall?.hangup(CallErrorCode.UserHangup); } else { @@ -121,8 +127,20 @@ export class Member { } /** @internal */ - updateCallInfo(callDeviceMembership: CallDeviceMembership) { - this.callDeviceMembership = callDeviceMembership; + updateCallInfo(callDeviceMembership: CallDeviceMembership, log: ILogItem) { + log.wrap({l: "updateing device membership", deviceId: this.deviceId}, log => { + // session id is changing, disconnect so we start with a new slate for the new session + if (callDeviceMembership.session_id !== this.sessionId) { + log.wrap({ + l: "member event changes session id", + oldSessionId: this.sessionId, + newSessionId: callDeviceMembership.session_id + }, log => { + this.disconnect(false, log); + }); + } + this.callDeviceMembership = callDeviceMembership; + }); } /** @internal */ @@ -151,7 +169,7 @@ export class Member { groupMessage.content.device_id = this.options.ownDeviceId; groupMessage.content.party_id = this.options.ownDeviceId; groupMessage.content.sender_session_id = this.options.sessionId; - groupMessage.content.dest_session_id = this.callDeviceMembership.session_id; + groupMessage.content.dest_session_id = this.sessionId; // const encryptedMessages = await this.options.encryptDeviceMessage(this.member.userId, groupMessage, log); // const payload = formatToDeviceMessagesPayload(encryptedMessages); const payload = { @@ -174,7 +192,7 @@ export class Member { } /** @internal */ - handleDeviceMessage(message: SignallingMessage, deviceId: string, syncLog: ILogItem) { + handleDeviceMessage(message: SignallingMessage, syncLog: ILogItem): void { syncLog.refDetached(this.logItem); const destSessionId = message.content.dest_session_id; if (destSessionId !== this.options.sessionId) { @@ -185,7 +203,7 @@ export class Member { this.peerCall = this._createPeerCall(message.content.call_id); } if (this.peerCall) { - this.peerCall.handleIncomingSignallingMessage(message, deviceId); + this.peerCall.handleIncomingSignallingMessage(message, this.deviceId); } else { // TODO: need to buffer events until invite comes? }