reset member when seeing a new session id
also buffer to_device messages for members we don't have a member event for already.
This commit is contained in:
parent
da654a8c59
commit
beeb191588
2 changed files with 60 additions and 13 deletions
|
@ -65,6 +65,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
|
||||||
private _memberOptions: MemberOptions;
|
private _memberOptions: MemberOptions;
|
||||||
private _state: GroupCallState;
|
private _state: GroupCallState;
|
||||||
private localMuteSettings: MuteSettings = new MuteSettings(false, false);
|
private localMuteSettings: MuteSettings = new MuteSettings(false, false);
|
||||||
|
private bufferedDeviceMessages = new Map<string, Set<SignallingMessage<MGroupCallBase>>>();
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
public readonly id: string,
|
public readonly id: string,
|
||||||
|
@ -234,7 +235,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
|
||||||
let member = this._members.get(memberKey);
|
let member = this._members.get(memberKey);
|
||||||
if (member) {
|
if (member) {
|
||||||
log.set("update", true);
|
log.set("update", true);
|
||||||
member!.updateCallInfo(device);
|
member!.updateCallInfo(device, log);
|
||||||
} else {
|
} else {
|
||||||
const logItem = this.logItem.child({l: "member", id: memberKey});
|
const logItem = this.logItem.child({l: "member", id: memberKey});
|
||||||
log.set("add", true);
|
log.set("add", true);
|
||||||
|
@ -249,6 +250,9 @@ export class GroupCall extends EventEmitter<{change: never}> {
|
||||||
member.connect(this._localMedia!.clone(), this.localMuteSettings);
|
member.connect(this._localMedia!.clone(), this.localMuteSettings);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// flush pending messages, either after having created the member,
|
||||||
|
// or updated the session id with updateCallInfo
|
||||||
|
this.flushPendingDeviceMessages(member, log);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -282,6 +286,23 @@ export class GroupCall extends EventEmitter<{change: never}> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private flushPendingDeviceMessages(member: Member, log: ILogItem) {
|
||||||
|
const memberKey = getMemberKey(member.userId, member.deviceId);
|
||||||
|
const bufferedMessages = this.bufferedDeviceMessages.get(memberKey);
|
||||||
|
// check if we have any pending message for the member with (userid, deviceid, sessionid)
|
||||||
|
if (bufferedMessages) {
|
||||||
|
for (const message of bufferedMessages) {
|
||||||
|
if (message.content.sender_session_id === member.sessionId) {
|
||||||
|
member.handleDeviceMessage(message, log);
|
||||||
|
bufferedMessages.delete(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (bufferedMessages.size === 0) {
|
||||||
|
this.bufferedDeviceMessages.delete(memberKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private getDeviceIdsForUserId(userId: string): string[] {
|
private getDeviceIdsForUserId(userId: string): string[] {
|
||||||
return Array.from(this._members.keys())
|
return Array.from(this._members.keys())
|
||||||
.filter(key => memberKeyIsForUser(key, userId))
|
.filter(key => memberKeyIsForUser(key, userId))
|
||||||
|
@ -322,13 +343,21 @@ export class GroupCall extends EventEmitter<{change: never}> {
|
||||||
/** @internal */
|
/** @internal */
|
||||||
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, userId: string, deviceId: string, syncLog: ILogItem) {
|
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, userId: string, deviceId: string, syncLog: ILogItem) {
|
||||||
// TODO: return if we are not membering to the call
|
// TODO: return if we are not membering to the call
|
||||||
let member = this._members.get(getMemberKey(userId, deviceId));
|
const key = getMemberKey(userId, deviceId);
|
||||||
if (member) {
|
let member = this._members.get(key);
|
||||||
member.handleDeviceMessage(message, deviceId, syncLog);
|
if (member && message.content.sender_session_id === member.sessionId) {
|
||||||
|
member.handleDeviceMessage(message, syncLog);
|
||||||
} else {
|
} else {
|
||||||
const item = this.logItem.log({l: "could not find member for signalling message", userId, deviceId});
|
const item = this.logItem.log({l: "member not found, buffering", userId, deviceId, sessionId: message.content.sender_session_id});
|
||||||
syncLog.refDetached(item);
|
syncLog.refDetached(item);
|
||||||
// we haven't received the m.call.member yet for this caller. buffer the device messages or create the member/call anyway?
|
// we haven't received the m.call.member yet for this caller (or with this session id).
|
||||||
|
// buffer the device messages or create the member/call as it should arrive in a moment
|
||||||
|
let messages = this.bufferedDeviceMessages.get(key);
|
||||||
|
if (!messages) {
|
||||||
|
messages = new Set();
|
||||||
|
this.bufferedDeviceMessages.set(key, messages);
|
||||||
|
}
|
||||||
|
messages.add(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -33,6 +33,7 @@ export type Options = Omit<PeerCallOptions, "emitUpdate" | "sendSignallingMessag
|
||||||
confId: string,
|
confId: string,
|
||||||
ownUserId: string,
|
ownUserId: string,
|
||||||
ownDeviceId: string,
|
ownDeviceId: string,
|
||||||
|
// local session id of our client
|
||||||
sessionId: string,
|
sessionId: string,
|
||||||
hsApi: HomeServerApi,
|
hsApi: HomeServerApi,
|
||||||
encryptDeviceMessage: (userId: string, message: SignallingMessage<MGroupCallBase>, log: ILogItem) => Promise<EncryptedMessage>,
|
encryptDeviceMessage: (userId: string, message: SignallingMessage<MGroupCallBase>, log: ILogItem) => Promise<EncryptedMessage>,
|
||||||
|
@ -81,6 +82,11 @@ export class Member {
|
||||||
return this.callDeviceMembership.device_id;
|
return this.callDeviceMembership.device_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** session id of the member */
|
||||||
|
get sessionId(): string {
|
||||||
|
return this.callDeviceMembership.session_id;
|
||||||
|
}
|
||||||
|
|
||||||
get dataChannel(): any | undefined {
|
get dataChannel(): any | undefined {
|
||||||
return this.peerCall?.dataChannel;
|
return this.peerCall?.dataChannel;
|
||||||
}
|
}
|
||||||
|
@ -106,8 +112,8 @@ export class Member {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @internal */
|
/** @internal */
|
||||||
disconnect(hangup: boolean) {
|
disconnect(hangup: boolean, log?: ILogItem) {
|
||||||
this.logItem.wrap("disconnect", log => {
|
(log ?? this.logItem).wrap("disconnect", log => {
|
||||||
if (hangup) {
|
if (hangup) {
|
||||||
this.peerCall?.hangup(CallErrorCode.UserHangup);
|
this.peerCall?.hangup(CallErrorCode.UserHangup);
|
||||||
} else {
|
} else {
|
||||||
|
@ -121,8 +127,20 @@ export class Member {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @internal */
|
/** @internal */
|
||||||
updateCallInfo(callDeviceMembership: CallDeviceMembership) {
|
updateCallInfo(callDeviceMembership: CallDeviceMembership, log: ILogItem) {
|
||||||
|
log.wrap({l: "updateing device membership", deviceId: this.deviceId}, log => {
|
||||||
|
// session id is changing, disconnect so we start with a new slate for the new session
|
||||||
|
if (callDeviceMembership.session_id !== this.sessionId) {
|
||||||
|
log.wrap({
|
||||||
|
l: "member event changes session id",
|
||||||
|
oldSessionId: this.sessionId,
|
||||||
|
newSessionId: callDeviceMembership.session_id
|
||||||
|
}, log => {
|
||||||
|
this.disconnect(false, log);
|
||||||
|
});
|
||||||
|
}
|
||||||
this.callDeviceMembership = callDeviceMembership;
|
this.callDeviceMembership = callDeviceMembership;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @internal */
|
/** @internal */
|
||||||
|
@ -151,7 +169,7 @@ export class Member {
|
||||||
groupMessage.content.device_id = this.options.ownDeviceId;
|
groupMessage.content.device_id = this.options.ownDeviceId;
|
||||||
groupMessage.content.party_id = this.options.ownDeviceId;
|
groupMessage.content.party_id = this.options.ownDeviceId;
|
||||||
groupMessage.content.sender_session_id = this.options.sessionId;
|
groupMessage.content.sender_session_id = this.options.sessionId;
|
||||||
groupMessage.content.dest_session_id = this.callDeviceMembership.session_id;
|
groupMessage.content.dest_session_id = this.sessionId;
|
||||||
// const encryptedMessages = await this.options.encryptDeviceMessage(this.member.userId, groupMessage, log);
|
// const encryptedMessages = await this.options.encryptDeviceMessage(this.member.userId, groupMessage, log);
|
||||||
// const payload = formatToDeviceMessagesPayload(encryptedMessages);
|
// const payload = formatToDeviceMessagesPayload(encryptedMessages);
|
||||||
const payload = {
|
const payload = {
|
||||||
|
@ -174,7 +192,7 @@ export class Member {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @internal */
|
/** @internal */
|
||||||
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, deviceId: string, syncLog: ILogItem) {
|
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, syncLog: ILogItem): void {
|
||||||
syncLog.refDetached(this.logItem);
|
syncLog.refDetached(this.logItem);
|
||||||
const destSessionId = message.content.dest_session_id;
|
const destSessionId = message.content.dest_session_id;
|
||||||
if (destSessionId !== this.options.sessionId) {
|
if (destSessionId !== this.options.sessionId) {
|
||||||
|
@ -185,7 +203,7 @@ export class Member {
|
||||||
this.peerCall = this._createPeerCall(message.content.call_id);
|
this.peerCall = this._createPeerCall(message.content.call_id);
|
||||||
}
|
}
|
||||||
if (this.peerCall) {
|
if (this.peerCall) {
|
||||||
this.peerCall.handleIncomingSignallingMessage(message, deviceId);
|
this.peerCall.handleIncomingSignallingMessage(message, this.deviceId);
|
||||||
} else {
|
} else {
|
||||||
// TODO: need to buffer events until invite comes?
|
// TODO: need to buffer events until invite comes?
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue