WIP: work on group calling code

This commit is contained in:
Bruno Windels 2022-03-10 14:53:31 +01:00
parent 4bedd4737b
commit 6da4a4209c
11 changed files with 301 additions and 202 deletions

View file

@ -59,7 +59,7 @@ export class DeviceMessageHandler {
})); }));
// TODO: pass this in the prep and run it in afterSync or afterSyncComplete (as callHandler can send events as well)? // TODO: pass this in the prep and run it in afterSync or afterSyncComplete (as callHandler can send events as well)?
for (const dr of callMessages) { for (const dr of callMessages) {
this._callHandler.handleDeviceMessage(dr.device.userId, dr.device.deviceId, dr.event, log); this._callHandler.handleDeviceMessage(dr.event, dr.device.userId, dr.device.deviceId, log);
} }
// TODO: somehow include rooms that received a call to_device message in the sync state? // TODO: somehow include rooms that received a call to_device message in the sync state?
// or have updates flow through event emitter? // or have updates flow through event emitter?

View file

@ -73,6 +73,19 @@ export class Session {
}; };
this._roomsBeingCreated = new ObservableMap(); this._roomsBeingCreated = new ObservableMap();
this._user = new User(sessionInfo.userId); this._user = new User(sessionInfo.userId);
this._callHandler = new CallHandler({
createTimeout: this._platform.clock.createTimeout,
hsApi: this._hsApi,
encryptDeviceMessage: async (roomId, message, log) => {
if (!this._deviceTracker || !this._olmEncryption) {
throw new Error("encryption is not enabled");
}
await this._deviceTracker.trackRoom(roomId, log);
const devices = await this._deviceTracker.devicesForTrackedRoom(roomId, this._hsApi, log);
const encryptedMessage = await this._olmEncryption.encrypt(message.type, message.content, devices, this._hsApi, log);
return encryptedMessage;
}
});
this._deviceMessageHandler = new DeviceMessageHandler({storage, callHandler: this._callHandler}); this._deviceMessageHandler = new DeviceMessageHandler({storage, callHandler: this._callHandler});
this._olm = olm; this._olm = olm;
this._olmUtil = null; this._olmUtil = null;
@ -100,7 +113,6 @@ export class Session {
this._createRoomEncryption = this._createRoomEncryption.bind(this); this._createRoomEncryption = this._createRoomEncryption.bind(this);
this._forgetArchivedRoom = this._forgetArchivedRoom.bind(this); this._forgetArchivedRoom = this._forgetArchivedRoom.bind(this);
this.needsKeyBackup = new ObservableValue(false); this.needsKeyBackup = new ObservableValue(false);
this._callHandler = new CallHandler(this._platform, this._hsApi);
} }
get fingerprintKey() { get fingerprintKey() {

View file

@ -15,48 +15,53 @@ limitations under the License.
*/ */
import {ObservableMap} from "../../observable/map/ObservableMap"; import {ObservableMap} from "../../observable/map/ObservableMap";
import {WebRTC, PeerConnection, PeerConnectionHandler} from "../../platform/types/WebRTC";
import {MediaDevices, Track, AudioTrack, TrackType} from "../../platform/types/MediaDevices";
import {handlesEventType} from "./PeerCall";
import {EventType} from "./callEventTypes";
import {GroupCall} from "./group/GroupCall";
import type {Room} from "../room/Room"; import type {Room} from "../room/Room";
import type {MemberChange} from "../room/members/RoomMember"; import type {MemberChange} from "../room/members/RoomMember";
import type {StateEvent} from "../storage/types"; import type {StateEvent} from "../storage/types";
import type {ILogItem} from "../../logging/types"; import type {ILogItem} from "../../logging/types";
import type {Platform} from "../../platform/web/Platform"; import type {Platform} from "../../platform/web/Platform";
import type {BaseObservableMap} from "../../observable/map/BaseObservableMap";
import {WebRTC, PeerConnection, PeerConnectionHandler, StreamPurpose} from "../../platform/types/WebRTC";
import {MediaDevices, Track, AudioTrack, TrackType} from "../../platform/types/MediaDevices";
import {handlesEventType, PeerCall, PeerCallHandler} from "./PeerCall";
import {EventType} from "./callEventTypes";
import type {SignallingMessage, MGroupCallBase} from "./callEventTypes"; import type {SignallingMessage, MGroupCallBase} from "./callEventTypes";
import type {GroupCall} from "./group/GroupCall"; import type {Options as GroupCallOptions} from "./group/GroupCall";
const GROUP_CALL_TYPE = "m.call"; const GROUP_CALL_TYPE = "m.call";
const GROUP_CALL_MEMBER_TYPE = "m.call.member"; const GROUP_CALL_MEMBER_TYPE = "m.call.member";
const CALL_TERMINATED = "m.terminated"; const CALL_TERMINATED = "m.terminated";
export class GroupCallHandler { export type Options = Omit<GroupCallOptions, "emitUpdate">;
private createPeerCall: (callId: string, handler: PeerCallHandler) => PeerCall; export class GroupCallHandler {
// group calls by call id // group calls by call id
public readonly calls: ObservableMap<string, GroupCall> = new ObservableMap<string, GroupCall>(); private readonly _calls: ObservableMap<string, GroupCall> = new ObservableMap<string, GroupCall>();
// map of userId to set of conf_id's they are in // map of userId to set of conf_id's they are in
private memberToCallIds: Map<string, Set<string>> = new Map(); private memberToCallIds: Map<string, Set<string>> = new Map();
private groupCallOptions: GroupCallOptions;
constructor(hsApi: HomeServerApi, platform: Platform, ownUserId: string, ownDeviceId: string) { constructor(private readonly options: Options) {
this.createPeerCall = (callId: string, handler: PeerCallHandler) => { this.groupCallOptions = Object.assign({}, this.options, {
return new PeerCall(callId, handler, platform.createTimeout, platform.webRTC); emitUpdate: (groupCall, params) => this._calls.update(groupCall.id, params)
} });
} }
get calls(): BaseObservableMap<string, GroupCall> { return this._calls; }
// TODO: check and poll turn server credentials here // TODO: check and poll turn server credentials here
/** @internal */
handleRoomState(room: Room, events: StateEvent[], log: ILogItem) { handleRoomState(room: Room, events: StateEvent[], log: ILogItem) {
// first update call events // first update call events
for (const event of events) { for (const event of events) {
if (event.type === EventType.GroupCall) { if (event.type === EventType.GroupCall) {
this.handleCallEvent(event); this.handleCallEvent(event, room);
} }
} }
// then update participants // then update members
for (const event of events) { for (const event of events) {
if (event.type === EventType.GroupCallMember) { if (event.type === EventType.GroupCallMember) {
this.handleCallMemberEvent(event); this.handleCallMemberEvent(event);
@ -64,59 +69,62 @@ export class GroupCallHandler {
} }
} }
/** @internal */
updateRoomMembers(room: Room, memberChanges: Map<string, MemberChange>) { updateRoomMembers(room: Room, memberChanges: Map<string, MemberChange>) {
} }
private handleCallEvent(event: StateEvent) { /** @internal */
const callId = event.state_key;
let call = this.calls.get(callId);
if (call) {
call.updateCallEvent(event);
if (call.isTerminated) {
this.calls.remove(call.id);
}
} else {
call = new GroupCall(event, room, this.createPeerCall);
this.calls.set(call.id, call);
}
}
private handleCallMemberEvent(event: StateEvent) {
const participant = event.state_key;
const calls = event.content["m.calls"] ?? [];
const newCallIdsMemberOf = new Set<string>(calls.map(call => {
const callId = call["m.call_id"];
const groupCall = this.calls.get(callId);
// TODO: also check the participant when receiving the m.call event
groupCall?.addParticipant(participant, call);
return callId;
}));
let previousCallIdsMemberOf = this.memberToCallIds.get(participant);
// remove user as participant of any calls not present anymore
if (previousCallIdsMemberOf) {
for (const previousCallId of previousCallIdsMemberOf) {
if (!newCallIdsMemberOf.has(previousCallId)) {
const groupCall = this.calls.get(previousCallId);
groupCall?.removeParticipant(participant);
}
}
}
if (newCallIdsMemberOf.size === 0) {
this.memberToCallIds.delete(participant);
} else {
this.memberToCallIds.set(participant, newCallIdsMemberOf);
}
}
handlesDeviceMessageEventType(eventType: string): boolean { handlesDeviceMessageEventType(eventType: string): boolean {
return handlesEventType(eventType); return handlesEventType(eventType);
} }
handleDeviceMessage(senderUserId: string, senderDeviceId: string, event: SignallingMessage<MGroupCallBase>, log: ILogItem) { /** @internal */
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, userId: string, deviceId: string, log: ILogItem) {
// TODO: buffer messages for calls we haven't received the state event for yet? // TODO: buffer messages for calls we haven't received the state event for yet?
const call = this.calls.get(event.content.conf_id); const call = this._calls.get(message.content.conf_id);
call?.handleDeviceMessage(senderUserId, senderDeviceId, event, log); call?.handleDeviceMessage(message, userId, deviceId, log);
}
private handleCallEvent(event: StateEvent, room: Room) {
const callId = event.state_key;
let call = this._calls.get(callId);
if (call) {
call.updateCallEvent(event);
if (call.isTerminated) {
this._calls.remove(call.id);
}
} else {
call = new GroupCall(event, room, this.groupCallOptions);
this._calls.set(call.id, call);
}
}
private handleCallMemberEvent(event: StateEvent) {
const userId = event.state_key;
const calls = event.content["m.calls"] ?? [];
const newCallIdsMemberOf = new Set<string>(calls.map(call => {
const callId = call["m.call_id"];
const groupCall = this._calls.get(callId);
// TODO: also check the member when receiving the m.call event
groupCall?.addMember(userId, call);
return callId;
}));
let previousCallIdsMemberOf = this.memberToCallIds.get(userId);
// remove user as member of any calls not present anymore
if (previousCallIdsMemberOf) {
for (const previousCallId of previousCallIdsMemberOf) {
if (!newCallIdsMemberOf.has(previousCallId)) {
const groupCall = this._calls.get(previousCallId);
groupCall?.removeMember(userId);
}
}
}
if (newCallIdsMemberOf.size === 0) {
this.memberToCallIds.delete(userId);
} else {
this.memberToCallIds.set(userId, newCallIdsMemberOf);
}
} }
} }

View file

@ -43,6 +43,13 @@ import type {
SignallingMessage SignallingMessage
} from "./callEventTypes"; } from "./callEventTypes";
export type Options = {
webRTC: WebRTC,
createTimeout: TimeoutCreator,
emitUpdate: (peerCall: PeerCall, params: any) => void;
sendSignallingMessage: (message: SignallingMessage<MCallBase>, log: ILogItem) => Promise<void>;
};
// when sending, we need to encrypt message with olm. I think the flow of room => roomEncryption => olmEncryption as we already // when sending, we need to encrypt message with olm. I think the flow of room => roomEncryption => olmEncryption as we already
// do for sharing keys will be best as that already deals with room tracking. // do for sharing keys will be best as that already deals with room tracking.
/** /**
@ -51,7 +58,7 @@ import type {
/** Implements a call between two peers with the signalling state keeping, while still delegating the signalling message sending. Used by GroupCall.*/ /** Implements a call between two peers with the signalling state keeping, while still delegating the signalling message sending. Used by GroupCall.*/
export class PeerCall implements IDisposable { export class PeerCall implements IDisposable {
private readonly peerConnection: PeerConnection; private readonly peerConnection: PeerConnection;
private state = CallState.Fledgling; private _state = CallState.Fledgling;
private direction: CallDirection; private direction: CallDirection;
private localMedia?: LocalMedia; private localMedia?: LocalMedia;
// A queue for candidates waiting to go out. // A queue for candidates waiting to go out.
@ -74,15 +81,12 @@ export class PeerCall implements IDisposable {
// perfect negotiation flags // perfect negotiation flags
private makingOffer: boolean = false; private makingOffer: boolean = false;
private ignoreOffer: boolean = false; private ignoreOffer: boolean = false;
constructor( constructor(
private callId: string, // generated or from invite private callId: string, // generated or from invite
private readonly handler: PeerCallHandler, private readonly options: Options
private readonly createTimeout: TimeoutCreator,
webRTC: WebRTC
) { ) {
const outer = this; const outer = this;
this.peerConnection = webRTC.createPeerConnection({ this.peerConnection = options.webRTC.createPeerConnection({
onIceConnectionStateChange(state: RTCIceConnectionState) {}, onIceConnectionStateChange(state: RTCIceConnectionState) {},
onLocalIceCandidate(candidate: RTCIceCandidate) {}, onLocalIceCandidate(candidate: RTCIceCandidate) {},
onIceGatheringStateChange(state: RTCIceGatheringState) {}, onIceGatheringStateChange(state: RTCIceGatheringState) {},
@ -104,12 +108,14 @@ export class PeerCall implements IDisposable {
} }
} }
get state(): CallState { return this._state; }
get remoteTracks(): Track[] { get remoteTracks(): Track[] {
return this.peerConnection.remoteTracks; return this.peerConnection.remoteTracks;
} }
async call(localMediaPromise: Promise<LocalMedia>): Promise<void> { async call(localMediaPromise: Promise<LocalMedia>): Promise<void> {
if (this.state !== CallState.Fledgling) { if (this._state !== CallState.Fledgling) {
return; return;
} }
this.direction = CallDirection.Outbound; this.direction = CallDirection.Outbound;
@ -131,7 +137,7 @@ export class PeerCall implements IDisposable {
} }
async answer(localMediaPromise: Promise<LocalMedia>): Promise<void> { async answer(localMediaPromise: Promise<LocalMedia>): Promise<void> {
if (this.state !== CallState.Ringing) { if (this._state !== CallState.Ringing) {
return; return;
} }
this.setState(CallState.WaitLocalMedia); this.setState(CallState.WaitLocalMedia);
@ -197,7 +203,7 @@ export class PeerCall implements IDisposable {
async hangup(errorCode: CallErrorCode) { async hangup(errorCode: CallErrorCode) {
} }
async handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId): Promise<void> { async handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId, log: ILogItem): Promise<void> {
switch (message.type) { switch (message.type) {
case EventType.Invite: case EventType.Invite:
if (this.callId !== message.content.call_id) { if (this.callId !== message.content.call_id) {
@ -226,10 +232,10 @@ export class PeerCall implements IDisposable {
if (reason) { if (reason) {
content["reason"] = reason; content["reason"] = reason;
} }
return this.handler.sendSignallingMessage({ return this.options.sendSignallingMessage({
type: EventType.Hangup, type: EventType.Hangup,
content content
}); }, undefined);
} }
// calls are serialized and deduplicated by responsePromiseChain // calls are serialized and deduplicated by responsePromiseChain
@ -249,7 +255,7 @@ export class PeerCall implements IDisposable {
await this.delay(200); await this.delay(200);
} }
if (this.state === CallState.Ended) { if (this._state === CallState.Ended) {
return; return;
} }
@ -268,12 +274,12 @@ export class PeerCall implements IDisposable {
version: 1, version: 1,
lifetime: CALL_TIMEOUT_MS lifetime: CALL_TIMEOUT_MS
}; };
if (this.state === CallState.CreateOffer) { if (this._state === CallState.CreateOffer) {
await this.handler.sendSignallingMessage({type: EventType.Invite, content}); await this.options.sendSignallingMessage({type: EventType.Invite, content});
this.setState(CallState.InviteSent); this.setState(CallState.InviteSent);
} else if (this.state === CallState.Connected || this.state === CallState.Connecting) { } else if (this._state === CallState.Connected || this._state === CallState.Connecting) {
// send Negotiate message // send Negotiate message
//await this.handler.sendSignallingMessage({type: EventType.Invite, content}); //await this.options.sendSignallingMessage({type: EventType.Invite, content});
//this.setState(CallState.InviteSent); //this.setState(CallState.InviteSent);
} }
} finally { } finally {
@ -282,10 +288,10 @@ export class PeerCall implements IDisposable {
this.sendCandidateQueue(); this.sendCandidateQueue();
if (this.state === CallState.InviteSent) { if (this._state === CallState.InviteSent) {
await this.delay(CALL_TIMEOUT_MS); await this.delay(CALL_TIMEOUT_MS);
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between // @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
if (this.state === CallState.InviteSent) { if (this._state === CallState.InviteSent) {
this.hangup(CallErrorCode.InviteTimeout); this.hangup(CallErrorCode.InviteTimeout);
} }
} }
@ -307,7 +313,7 @@ export class PeerCall implements IDisposable {
// TODO: review states to be unambigous, WaitLocalMedia for sending offer or answer? // TODO: review states to be unambigous, WaitLocalMedia for sending offer or answer?
// How do we interrupt `call()`? well, perhaps we need to not just await InviteSent but also CreateAnswer? // How do we interrupt `call()`? well, perhaps we need to not just await InviteSent but also CreateAnswer?
if (this.state === CallState.Fledgling || this.state === CallState.CreateOffer || this.state === CallState.WaitLocalMedia) { if (this._state === CallState.Fledgling || this._state === CallState.CreateOffer || this._state === CallState.WaitLocalMedia) {
} else { } else {
await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced); await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced);
@ -324,7 +330,7 @@ export class PeerCall implements IDisposable {
} }
private async handleFirstInvite(content: MCallInvite, partyId: PartyId): Promise<void> { private async handleFirstInvite(content: MCallInvite, partyId: PartyId): Promise<void> {
if (this.state !== CallState.Fledgling || this.opponentPartyId !== undefined) { if (this._state !== CallState.Fledgling || this.opponentPartyId !== undefined) {
// TODO: hangup or ignore? // TODO: hangup or ignore?
return; return;
} }
@ -370,7 +376,7 @@ export class PeerCall implements IDisposable {
await this.delay(content.lifetime ?? CALL_TIMEOUT_MS); await this.delay(content.lifetime ?? CALL_TIMEOUT_MS);
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between // @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
if (this.state === CallState.Ringing) { if (this._state === CallState.Ringing) {
this.logger.debug(`Call ${this.callId} invite has expired. Hanging up.`); this.logger.debug(`Call ${this.callId} invite has expired. Hanging up.`);
this.hangupParty = CallParty.Remote; // effectively this.hangupParty = CallParty.Remote; // effectively
this.setState(CallState.Ended); this.setState(CallState.Ended);
@ -384,7 +390,7 @@ export class PeerCall implements IDisposable {
private async handleAnswer(content: MCallAnswer, partyId: PartyId): Promise<void> { private async handleAnswer(content: MCallAnswer, partyId: PartyId): Promise<void> {
this.logger.debug(`Got answer for call ID ${this.callId} from party ID ${partyId}`); this.logger.debug(`Got answer for call ID ${this.callId} from party ID ${partyId}`);
if (this.state === CallState.Ended) { if (this._state === CallState.Ended) {
this.logger.debug(`Ignoring answer because call ID ${this.callId} has ended`); this.logger.debug(`Ignoring answer because call ID ${this.callId} has ended`);
return; return;
} }
@ -456,7 +462,7 @@ export class PeerCall implements IDisposable {
// if (description.type === 'offer') { // if (description.type === 'offer') {
// await this.peerConnection.setLocalDescription(); // await this.peerConnection.setLocalDescription();
// await this.handler.sendSignallingMessage({ // await this.options.sendSignallingMessage({
// type: EventType.CallNegotiate, // type: EventType.CallNegotiate,
// content: { // content: {
// description: this.peerConnection.localDescription!, // description: this.peerConnection.localDescription!,
@ -471,7 +477,7 @@ export class PeerCall implements IDisposable {
private async sendAnswer(): Promise<void> { private async sendAnswer(): Promise<void> {
const localDescription = this.peerConnection.localDescription!; const localDescription = this.peerConnection.localDescription!;
const answerContent: MCallAnswer = { const answerContent: MCallAnswer<MCallBase> = {
call_id: this.callId, call_id: this.callId,
version: 1, version: 1,
answer: { answer: {
@ -489,7 +495,7 @@ export class PeerCall implements IDisposable {
this.candidateSendQueue = []; this.candidateSendQueue = [];
try { try {
await this.handler.sendSignallingMessage({type: EventType.Answer, content: answerContent}); await this.options.sendSignallingMessage({type: EventType.Answer, content: answerContent}, undefined);
} catch (error) { } catch (error) {
this.terminate(CallParty.Local, CallErrorCode.SendAnswer, false); this.terminate(CallParty.Local, CallErrorCode.SendAnswer, false);
throw error; throw error;
@ -513,7 +519,7 @@ export class PeerCall implements IDisposable {
this.candidateSendQueue.push(content); this.candidateSendQueue.push(content);
// Don't send the ICE candidates yet if the call is in the ringing state // Don't send the ICE candidates yet if the call is in the ringing state
if (this.state === CallState.Ringing) return; if (this._state === CallState.Ringing) return;
// MSC2746 recommends these values (can be quite long when calling because the // MSC2746 recommends these values (can be quite long when calling because the
// callee will need a while to answer the call) // callee will need a while to answer the call)
@ -523,7 +529,7 @@ export class PeerCall implements IDisposable {
} }
private async sendCandidateQueue(): Promise<void> { private async sendCandidateQueue(): Promise<void> {
if (this.candidateSendQueue.length === 0 || this.state === CallState.Ended) { if (this.candidateSendQueue.length === 0 || this._state === CallState.Ended) {
return; return;
} }
@ -531,14 +537,14 @@ export class PeerCall implements IDisposable {
this.candidateSendQueue = []; this.candidateSendQueue = [];
this.logger.debug(`Call ${this.callId} attempting to send ${candidates.length} candidates`); this.logger.debug(`Call ${this.callId} attempting to send ${candidates.length} candidates`);
try { try {
await this.handler.sendSignallingMessage({ await this.options.sendSignallingMessage({
type: EventType.Candidates, type: EventType.Candidates,
content: { content: {
call_id: this.callId, call_id: this.callId,
version: 1, version: 1,
candidates candidates
} },
}); }, undefined);
// Try to send candidates again just in case we received more candidates while sending. // Try to send candidates again just in case we received more candidates while sending.
this.sendCandidateQueue(); this.sendCandidateQueue();
} catch (error) { } catch (error) {
@ -598,14 +604,14 @@ export class PeerCall implements IDisposable {
} }
private setState(state: CallState): void { private setState(state: CallState): void {
const oldState = this.state; const oldState = this._state;
this.state = state; this._state = state;
let deferred = this.statePromiseMap.get(state); let deferred = this.statePromiseMap.get(state);
if (deferred) { if (deferred) {
deferred.resolve(); deferred.resolve();
this.statePromiseMap.delete(state); this.statePromiseMap.delete(state);
} }
this.handler.emitUpdate(this, undefined); this.options.emitUpdate(this, undefined);
} }
private waitForState(states: CallState[]): Promise<void> { private waitForState(states: CallState[]): Promise<void> {
@ -638,7 +644,7 @@ export class PeerCall implements IDisposable {
private async delay(timeoutMs: number): Promise<void> { private async delay(timeoutMs: number): Promise<void> {
// Allow a short time for initial candidates to be gathered // Allow a short time for initial candidates to be gathered
const timeout = this.disposables.track(this.createTimeout(timeoutMs)); const timeout = this.disposables.track(this.options.createTimeout(timeoutMs));
await timeout.elapsed(); await timeout.elapsed();
this.disposables.untrack(timeout); this.disposables.untrack(timeout);
} }
@ -789,11 +795,6 @@ export class CallError extends Error {
} }
} }
export interface PeerCallHandler {
emitUpdate(peerCall: PeerCall, params: any);
sendSignallingMessage(message: SignallingMessage<MCallBase>);
}
export function handlesEventType(eventType: string): boolean { export function handlesEventType(eventType: string): boolean {
return eventType === EventType.Invite || return eventType === EventType.Invite ||
eventType === EventType.Candidates || eventType === EventType.Candidates ||

View file

@ -15,71 +15,95 @@ limitations under the License.
*/ */
import {ObservableMap} from "../../../observable/map/ObservableMap"; import {ObservableMap} from "../../../observable/map/ObservableMap";
import {Participant} from "./Participant"; import {Member} from "./Member";
import {LocalMedia} from "../LocalMedia"; import {LocalMedia} from "../LocalMedia";
import {RoomMember} from "../../room/members/RoomMember";
import type {Options as MemberOptions} from "./Member";
import type {BaseObservableMap} from "../../../observable/map/BaseObservableMap";
import type {Track} from "../../../platform/types/MediaDevices"; import type {Track} from "../../../platform/types/MediaDevices";
import type {SignallingMessage, MGroupCallBase} from "../callEventTypes"; import type {SignallingMessage, MGroupCallBase} from "../callEventTypes";
import type {Room} from "../../room/Room"; import type {Room} from "../../room/Room";
import type {StateEvent} from "../../storage/types"; import type {StateEvent} from "../../storage/types";
import type {Platform} from "../../../platform/web/Platform"; import type {Platform} from "../../../platform/web/Platform";
import type {EncryptedMessage} from "../../e2ee/olm/Encryption";
import type {ILogItem} from "../../../logging/types";
export type Options = Omit<MemberOptions, "emitUpdate" | "confId" | "encryptDeviceMessage"> & {
emitUpdate: (call: GroupCall, params?: any) => void;
encryptDeviceMessage: (roomId: string, message: SignallingMessage<MGroupCallBase>, log: ILogItem) => Promise<EncryptedMessage>,
};
export class GroupCall { export class GroupCall {
private readonly participants: ObservableMap<string, Participant> = new ObservableMap(); private readonly _members: ObservableMap<string, Member> = new ObservableMap();
private localMedia?: Promise<LocalMedia>; private localMedia?: Promise<LocalMedia>;
private _memberOptions: MemberOptions;
constructor( constructor(
private readonly ownUserId: string,
private callEvent: StateEvent, private callEvent: StateEvent,
private readonly room: Room, private readonly room: Room,
private readonly platform: Platform private readonly options: Options
) { ) {
this._memberOptions = Object.assign({
confId: this.id,
emitUpdate: member => this._members.update(member.member.userId, member),
encryptDeviceMessage: (message: SignallingMessage<MGroupCallBase>, log) => {
return this.options.encryptDeviceMessage(this.room.id, message, log);
}
}, options);
} }
get members(): BaseObservableMap<string, Member> { return this._members; }
get id(): string { return this.callEvent.state_key; } get id(): string { return this.callEvent.state_key; }
async participate(tracks: Promise<Track[]>) { get isTerminated(): boolean {
this.localMedia = tracks.then(tracks => LocalMedia.fromTracks(tracks)); return this.callEvent.content["m.terminated"] === true;
for (const [,participant] of this.participants) { }
participant.setLocalMedia(this.localMedia.then(localMedia => localMedia.clone()));
}
// send m.call.member state event
// send invite to all participants that are < my userId async join(tracks: Promise<Track[]>) {
for (const [,participant] of this.participants) { this.localMedia = tracks.then(tracks => LocalMedia.fromTracks(tracks));
if (participant.userId < this.ownUserId) { // send m.call.member state event
participant.call(); const request = this.options.hsApi.sendState(this.room.id, "m.call.member", this.options.ownUserId, {
}
});
await request.response();
// send invite to all members that are < my userId
for (const [,member] of this._members) {
member.connect(this.localMedia);
} }
} }
/** @internal */
updateCallEvent(callEvent: StateEvent) { updateCallEvent(callEvent: StateEvent) {
this.callEvent = callEvent; this.callEvent = callEvent;
// TODO: emit update
} }
addParticipant(userId, memberCallInfo) { /** @internal */
let participant = this.participants.get(userId); addMember(userId, memberCallInfo) {
if (participant) { let member = this._members.get(userId);
participant.updateCallInfo(memberCallInfo); if (member) {
member.updateCallInfo(memberCallInfo);
} else { } else {
participant = new Participant(userId, source.device_id, this.localMedia?.clone(), this.webRTC); member = new Member(RoomMember.fromUserId(this.room.id, userId, "join"), this._memberOptions);
participant.updateCallInfo(memberCallInfo); member.updateCallInfo(memberCallInfo);
this.participants.add(userId, participant); this._members.add(userId, member);
} }
} }
removeParticipant(userId) { /** @internal */
removeMember(userId) {
this._members.remove(userId);
} }
handleDeviceMessage(userId: string, senderDeviceId: string, message: SignallingMessage<MGroupCallBase>, log: ILogItem) { /** @internal */
let participant = this.participants.get(userId); handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, userId: string, deviceId: string, log: ILogItem) {
if (participant) { // TODO: return if we are not membering to the call
participant.handleIncomingSignallingMessage(message, senderDeviceId); let member = this._members.get(userId);
if (member) {
member.handleDeviceMessage(message, deviceId, log);
} else {
// we haven't received the m.call.member yet for this caller. buffer the device messages or create the member/call anyway?
} }
} }
get isTerminated(): boolean {
return !!this.callEvent.content[CALL_TERMINATED];
}
} }

View file

@ -0,0 +1,112 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {PeerCall, CallState} from "../PeerCall";
import {makeTxnId, makeId} from "../../common";
import {EventType} from "../callEventTypes";
import type {Options as PeerCallOptions} from "../PeerCall";
import type {LocalMedia} from "../LocalMedia";
import type {HomeServerApi} from "../../net/HomeServerApi";
import type {Track} from "../../../platform/types/MediaDevices";
import type {MCallBase, MGroupCallBase, SignallingMessage} from "../callEventTypes";
import type {GroupCall} from "./GroupCall";
import type {RoomMember} from "../../room/members/RoomMember";
import type {EncryptedMessage} from "../../e2ee/olm/Encryption";
import type {ILogItem} from "../../../logging/types";
export type Options = Omit<PeerCallOptions, "emitUpdate" | "sendSignallingMessage"> & {
confId: string,
ownUserId: string,
hsApi: HomeServerApi,
encryptDeviceMessage: (message: SignallingMessage<MGroupCallBase>, log: ILogItem) => Promise<EncryptedMessage>,
emitUpdate: (participant: Member, params?: any) => void,
}
export class Member {
private peerCall?: PeerCall;
private localMedia?: Promise<LocalMedia>;
constructor(
public readonly member: RoomMember,
private readonly options: Options
) {}
get remoteTracks(): Track[] {
return this.peerCall?.remoteTracks ?? [];
}
get isConnected(): boolean {
return this.peerCall?.state === CallState.Connected;
}
/* @internal */
connect(localMedia: Promise<LocalMedia>) {
this.localMedia = localMedia;
// otherwise wait for it to connect
if (this.member.userId < this.options.ownUserId) {
this.peerCall = this._createPeerCall(makeId("c"));
this.peerCall.call(localMedia);
}
}
/** @internal */
updateCallInfo(memberCallInfo) {
}
/** @internal */
emitUpdate = (peerCall: PeerCall, params: any) => {
if (peerCall.state === CallState.Ringing) {
peerCall.answer(this.localMedia!);
}
this.options.emitUpdate(this, params);
}
/** From PeerCallHandler
* @internal */
sendSignallingMessage = async (message: SignallingMessage<MCallBase>, log: ILogItem) => {
const groupMessage = message as SignallingMessage<MGroupCallBase>;
groupMessage.content.conf_id = this.options.confId;
const encryptedMessage = await this.options.encryptDeviceMessage(groupMessage, log);
const request = this.options.hsApi.sendToDevice(
"m.room.encrypted",
{[this.member.userId]: {
["*"]: encryptedMessage.content
}
}, makeTxnId(), {log});
await request.response();
}
/** @internal */
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, deviceId: string, log: ILogItem) {
if (message.type === EventType.Invite && !this.peerCall) {
this.peerCall = this._createPeerCall(message.content.call_id);
}
if (this.peerCall) {
this.peerCall.handleIncomingSignallingMessage(message, deviceId, log);
} else {
// TODO: need to buffer events until invite comes?
}
}
private _createPeerCall(callId: string): PeerCall {
return new PeerCall(callId, Object.assign({}, this.options, {
emitUpdate: this.emitUpdate,
sendSignallingMessage: this.sendSignallingMessage
}));
}
}

View file

@ -1,67 +0,0 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {EventType, PeerCall, SignallingMessage} from "../PeerCall";
import {makeTxnId} from "../../common";
import type {PeerCallHandler} from "../PeerCall";
import type {LocalMedia} from "../LocalMedia";
import type {HomeServerApi} from "../../net/HomeServerApi";
import type {Track} from "../../../platform/types/MediaDevices";
import type {MCallBase, MGroupCallBase} from "../callEventTypes";
import type {GroupCall} from "./GroupCall";
import type {RoomMember} from "../../room/members/RoomMember";
export class Participant implements PeerCallHandler {
constructor(
public readonly member: RoomMember,
private readonly deviceId: string | undefined,
private readonly peerCall: PeerCall,
private readonly hsApi: HomeServerApi,
private readonly groupCall: GroupCall
) {}
/* @internal */
call(localMedia: Promise<LocalMedia>) {
this.peerCall.call(localMedia);
}
get remoteTracks(): Track[] {
return this.peerCall.remoteTracks;
}
/** From PeerCallHandler
* @internal */
emitUpdate(params: any) {
this.groupCall.emitParticipantUpdate(this, params);
}
/** From PeerCallHandler
* @internal */
async sendSignallingMessage(message: SignallingMessage<MCallBase>) {
const groupMessage = message as SignallingMessage<MGroupCallBase>;
groupMessage.content.conf_id = this.groupCall.id;
// TODO: this needs to be encrypted with olm first
const request = this.hsApi.sendToDevice(
groupMessage.type,
{[this.member.userId]: {
[this.deviceId ?? "*"]: groupMessage.content
}
}, makeTxnId());
await request.response();
}
}

View file

@ -16,9 +16,13 @@ limitations under the License.
*/ */
export function makeTxnId() { export function makeTxnId() {
return makeId("t");
}
export function makeId(prefix) {
const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER);
const str = n.toString(16); const str = n.toString(16);
return "t" + "0".repeat(14 - str.length) + str; return prefix + "0".repeat(14 - str.length) + str;
} }
export function isTxnId(txnId) { export function isTxnId(txnId) {

View file

@ -311,7 +311,7 @@ class EncryptionTarget {
} }
} }
class EncryptedMessage { export class EncryptedMessage {
constructor( constructor(
public readonly content: OlmEncryptedMessageContent, public readonly content: OlmEncryptedMessageContent,
public readonly device: DeviceIdentity public readonly device: DeviceIdentity

View file

@ -159,6 +159,10 @@ export class HomeServerApi {
state(roomId: string, eventType: string, stateKey: string, options?: BaseRequestOptions): IHomeServerRequest { state(roomId: string, eventType: string, stateKey: string, options?: BaseRequestOptions): IHomeServerRequest {
return this._get(`/rooms/${encodeURIComponent(roomId)}/state/${encodeURIComponent(eventType)}/${encodeURIComponent(stateKey)}`, {}, undefined, options); return this._get(`/rooms/${encodeURIComponent(roomId)}/state/${encodeURIComponent(eventType)}/${encodeURIComponent(stateKey)}`, {}, undefined, options);
} }
sendState(roomId: string, eventType: string, stateKey: string, content: Record<string, any>, options?: BaseRequestOptions): IHomeServerRequest {
return this._put(`/rooms/${encodeURIComponent(roomId)}/state/${encodeURIComponent(eventType)}/${encodeURIComponent(stateKey)}`, {}, content, options);
}
getLoginFlows(): IHomeServerRequest { getLoginFlows(): IHomeServerRequest {
return this._unauthedRequest("GET", this._url("/login")); return this._unauthedRequest("GET", this._url("/login"));

View file

@ -17,6 +17,7 @@ limitations under the License.
export interface MediaDevices { export interface MediaDevices {
// filter out audiooutput // filter out audiooutput
enumerate(): Promise<MediaDeviceInfo[]>; enumerate(): Promise<MediaDeviceInfo[]>;
// to assign to a video element, we downcast to WrappedTrack and use the stream property.
getMediaTracks(audio: true | MediaDeviceInfo, video: boolean | MediaDeviceInfo): Promise<Track[]>; getMediaTracks(audio: true | MediaDeviceInfo, video: boolean | MediaDeviceInfo): Promise<Track[]>;
getScreenShareTrack(): Promise<Track | undefined>; getScreenShareTrack(): Promise<Track | undefined>;
} }