This commit is contained in:
Bruno Windels 2022-02-17 16:58:44 +01:00 committed by Bruno Windels
parent 468841ecea
commit e5f44aecfb
9 changed files with 3026 additions and 381 deletions

View File

@ -36,9 +36,9 @@ enum CallSetupMessageType {
const CALL_ID = "m.call_id";
const CALL_TERMINATED = "m.terminated";
export class CallHandler {
export class GroupCallHandler {
// group calls by call id
public readonly groupCalls: ObservableMap<string, GroupCall> = new ObservableMap<string, GroupCall>();
public readonly calls: ObservableMap<string, GroupCall> = new ObservableMap<string, GroupCall>();
constructor() {
@ -49,15 +49,15 @@ export class CallHandler {
for (const event of events) {
if (event.type === GROUP_CALL_TYPE) {
const callId = event.state_key;
let call = this.groupCalls.get(callId);
let call = this.calls.get(callId);
if (call) {
call.updateCallEvent(event);
if (call.isTerminated) {
this.groupCalls.remove(call.id);
this.calls.remove(call.id);
}
} else {
call = new GroupCall(event, room);
this.groupCalls.set(call.id, call);
this.calls.set(call.id, call);
}
}
}
@ -67,7 +67,7 @@ export class CallHandler {
const participant = event.state_key;
const sources = event.content["m.sources"];
for (const source of sources) {
const call = this.groupCalls.get(source[CALL_ID]);
const call = this.calls.get(source[CALL_ID]);
if (call && !call.isTerminated) {
call.addParticipant(participant, source);
}
@ -85,33 +85,102 @@ export class CallHandler {
handleDeviceMessage(senderUserId: string, senderDeviceId: string, eventType: string, content: Record<string, any>, log: ILogItem) {
const callId = content[CALL_ID];
const call = this.groupCalls.get(callId);
const call = this.calls.get(callId);
call?.handleDeviceMessage(senderUserId, senderDeviceId, eventType, content, log);
}
}
function peerCallKey(senderUserId: string, senderDeviceId: string) {
function participantId(senderUserId: string, senderDeviceId: string | null) {
return JSON.stringify(senderUserId) + JSON.stringify(senderDeviceId);
}
class GroupParticipant implements PeerCallHandler {
private peerCall?: PeerCall;
constructor(
private readonly userId: string,
private readonly deviceId: string,
private localMedia: LocalMedia | undefined,
private readonly webRTC: WebRTC,
private readonly hsApi: HomeServerApi
) {}
sendInvite() {
this.peerCall = new PeerCall(this, this.webRTC);
this.peerCall.setLocalMedia(this.localMedia);
this.peerCall.sendOffer();
}
/** From PeerCallHandler
* @internal */
override emitUpdate() {
}
/** From PeerCallHandler
* @internal */
override onSendSignallingMessage() {
// TODO: this needs to be encrypted with olm first
this.hsApi.sendToDevice(type, {[this.userId]: {[this.deviceId ?? "*"]: content}});
}
}
class GroupCall {
private peerCalls: Map<string, PeerCall>
private readonly participants: ObservableMap<string, Participant> = new ObservableMap();
private localMedia?: LocalMedia;
constructor(private callEvent: StateEvent, private readonly room: Room) {
constructor(private readonly ownUserId: string, private callEvent: StateEvent, private readonly room: Room, private readonly webRTC: WebRTC) {
}
get id(): string { return this.callEvent.state_key; }
async participate(tracks: Track[]) {
this.localMedia = LocalMedia.fromTracks(tracks);
for (const [,participant] of this.participants) {
participant.setLocalMedia(this.localMedia.clone());
}
// send m.call.member state event
// send invite to all participants that are < my userId
for (const [,participant] of this.participants) {
if (participant.userId < this.ownUserId) {
participant.sendInvite();
}
}
}
updateCallEvent(callEvent: StateEvent) {
this.callEvent = callEvent;
}
addParticipant(userId, source) {
const participantId = getParticipantId(userId, source.device_id);
const participant = this.participants.get(participantId);
if (participant) {
participant.updateSource(source);
} else {
participant.add(participantId, new GroupParticipant(userId, source.device_id, this.localMedia?.clone(), this.webRTC));
}
}
handleDeviceMessage(senderUserId: string, senderDeviceId: string, eventType: string, content: Record<string, any>, log: ILogItem) {
const peerCall = this.peerCalls.get(peerCallKey(senderUserId, senderDeviceId));
peerCall?.handleIncomingSignallingMessage()
const participantId = getParticipantId(senderUserId, senderDeviceId);
let peerCall = this.participants.get(participantId);
let hasDeviceInKey = true;
if (!peerCall) {
hasDeviceInKey = false;
peerCall = this.participants.get(getParticipantId(senderUserId, null))
}
if (peerCall) {
peerCall.handleIncomingSignallingMessage(eventType, content, senderDeviceId);
if (!hasDeviceInKey && peerCall.opponentPartyId) {
this.participants.delete(getParticipantId(senderUserId, null));
this.participants.add(getParticipantId(senderUserId, peerCall.opponentPartyId));
}
} else {
// create peerCall
}
}
get id(): string {
@ -121,116 +190,4 @@ class GroupCall {
get isTerminated(): boolean {
return !!this.callEvent.content[CALL_TERMINATED];
}
private createPeerCall(userId: string, deviceId: string): PeerCall {
}
}
/**
* Does WebRTC signalling for a single PeerConnection, and deals with WebRTC wrappers from platform
* */
class LocalMedia {
private tracks = new Map<TrackType, Track>();
setTracks(tracks: Track[]) {
for (const track of tracks) {
this.setTrack(track);
}
}
setTrack(track: Track) {
let cameraAndMicStreamDontMatch = false;
if (track.type === TrackType.Microphone) {
const {cameraTrack} = this;
if (cameraTrack && track.streamId !== cameraTrack.streamId) {
cameraAndMicStreamDontMatch = true;
}
}
if (track.type === TrackType.Camera) {
const {microphoneTrack} = this;
if (microphoneTrack && track.streamId !== microphoneTrack.streamId) {
cameraAndMicStreamDontMatch = true;
}
}
if (cameraAndMicStreamDontMatch) {
throw new Error("The camera and audio track should have the same stream id");
}
this.tracks.set(track.type, track);
}
public get cameraTrack(): Track | undefined { return this.tracks.get(TrackType.Camera); };
public get screenShareTrack(): Track | undefined { return this.tracks.get(TrackType.ScreenShare); };
public get microphoneTrack(): AudioTrack | undefined { return this.tracks.get(TrackType.Microphone) as (AudioTrack | undefined); };
getSDPMetadata(): any {
const metadata = {};
const userMediaTrack = this.microphoneTrack ?? this.cameraTrack;
if (userMediaTrack) {
metadata[userMediaTrack.streamId] = {
purpose: StreamPurpose.UserMedia
};
}
if (this.screenShareTrack) {
metadata[this.screenShareTrack.streamId] = {
purpose: StreamPurpose.ScreenShare
};
}
return metadata;
}
}
// when sending, we need to encrypt message with olm. I think the flow of room => roomEncryption => olmEncryption as we already
// do for sharing keys will be best as that already deals with room tracking.
type SendSignallingMessageCallback = (type: CallSetupMessageType, content: Record<string, any>) => Promise<void>;
/** Implements a call between two peers with the signalling state keeping, while still delegating the signalling message sending. Used by GroupCall.*/
class PeerCall implements PeerConnectionHandler {
private readonly peerConnection: PeerConnection;
constructor(
private readonly sendSignallingMessage: SendSignallingMessageCallback,
private localMedia: LocalMedia,
webRTC: WebRTC
) {
this.peerConnection = webRTC.createPeerConnection(this);
}
onIceConnectionStateChange(state: RTCIceConnectionState) {}
onLocalIceCandidate(candidate: RTCIceCandidate) {}
onIceGatheringStateChange(state: RTCIceGatheringState) {}
onRemoteTracksChanged(tracks: Track[]) {}
onDataChannelChanged(dataChannel: DataChannel | undefined) {}
onNegotiationNeeded() {
const message = {
offer: this.peerConnection.createOffer(),
sdp_stream_metadata: this.localMedia.getSDPMetadata(),
version: 1
}
this.sendSignallingMessage(CallSetupMessageType.Invite, message);
}
setLocalMedia(localMedia: LocalMedia) {
this.localMedia = localMedia;
// TODO: send new metadata
}
// request the type of incoming track
getPurposeForStreamId(streamId: string): StreamPurpose {
// look up stream purpose
return StreamPurpose.UserMedia;
}
handleIncomingSignallingMessage(type: CallSetupMessageType, content: Record<string, any>) {
switch (type) {
case CallSetupMessageType.Invite:
case CallSetupMessageType.Answer:
case CallSetupMessageType.Candidates:
case CallSetupMessageType.Hangup:
}
}
}

2757
src/matrix/calls/PeerCall.ts Normal file

File diff suppressed because it is too large Load Diff

43
src/matrix/calls/TODO.md Normal file
View File

@ -0,0 +1,43 @@
## Store ongoing calls
Add store with all ongoing calls so when we quit and start again, we don't have to go through all the past calls to know which ones might still be ongoing.
## Notes
we send m.call as state event in room
we add m.call.participant for our own device
we wait for other participants to add their user and device (in the sources)
for each (userid, deviceid)
- if userId < ourUserId
- we setup a peer connection
- we wait for negotation event to get sdp
- we send an m.call.invite
- else
- wait for invite from other side
in some cases, we will actually send the invite to all devices (e.g. SFU), so
we probably still need to handle multiple anwsers?
so we would send an invite to multiple devices and pick the one for which we
received the anwser first. between invite and anwser, we could already receive
ice candidates that we need to buffer.
should a PeerCall only exist after we've received an answer?
Before that, we could have a PeerCallInvite
updating the metadata:
if we're renegotiating: use m.call.negotatie
if just muting: use m.call.sdp_stream_metadata_changed
party identification
- for 1:1 calls, we identify with a party_id
- for group calls, we identify with a device_id

View File

@ -0,0 +1,88 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {ObservableMap} from "../../../observable/map/ObservableMap";
function participantId(senderUserId: string, senderDeviceId: string | null) {
return JSON.stringify(senderUserId) + JSON.stringify(senderDeviceId);
}
class Call {
private readonly participants: ObservableMap<string, Participant> = new ObservableMap();
private localMedia?: LocalMedia;
constructor(private readonly ownUserId: string, private callEvent: StateEvent, private readonly room: Room, private readonly webRTC: WebRTC) {
}
get id(): string { return this.callEvent.state_key; }
async participate(tracks: Track[]) {
this.localMedia = LocalMedia.fromTracks(tracks);
for (const [,participant] of this.participants) {
participant.setLocalMedia(this.localMedia.clone());
}
// send m.call.member state event
// send invite to all participants that are < my userId
for (const [,participant] of this.participants) {
if (participant.userId < this.ownUserId) {
participant.sendInvite();
}
}
}
updateCallEvent(callEvent: StateEvent) {
this.callEvent = callEvent;
}
addParticipant(userId, source) {
const participantId = getParticipantId(userId, source.device_id);
const participant = this.participants.get(participantId);
if (participant) {
participant.updateSource(source);
} else {
participant.add(participantId, new Participant(userId, source.device_id, this.localMedia?.clone(), this.webRTC));
}
}
handleDeviceMessage(senderUserId: string, senderDeviceId: string, eventType: string, content: Record<string, any>, log: ILogItem) {
const participantId = getParticipantId(senderUserId, senderDeviceId);
let peerCall = this.participants.get(participantId);
let hasDeviceInKey = true;
if (!peerCall) {
hasDeviceInKey = false;
peerCall = this.participants.get(getParticipantId(senderUserId, null))
}
if (peerCall) {
peerCall.handleIncomingSignallingMessage(eventType, content, senderDeviceId);
if (!hasDeviceInKey && peerCall.opponentPartyId) {
this.participants.delete(getParticipantId(senderUserId, null));
this.participants.add(getParticipantId(senderUserId, peerCall.opponentPartyId));
}
} else {
// create peerCall
}
}
get id(): string {
return this.callEvent.state_key;
}
get isTerminated(): boolean {
return !!this.callEvent.content[CALL_TERMINATED];
}
}

View File

@ -0,0 +1,46 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
class Participant implements PeerCallHandler {
private peerCall?: PeerCall;
constructor(
private readonly userId: string,
private readonly deviceId: string,
private localMedia: LocalMedia | undefined,
private readonly webRTC: WebRTC,
private readonly hsApi: HomeServerApi
) {}
sendInvite() {
this.peerCall = new PeerCall(this, this.webRTC);
this.peerCall.setLocalMedia(this.localMedia);
this.peerCall.sendOffer();
}
/** From PeerCallHandler
* @internal */
override emitUpdate() {
}
/** From PeerCallHandler
* @internal */
override onSendSignallingMessage() {
// TODO: this needs to be encrypted with olm first
this.hsApi.sendToDevice(type, {[this.userId]: {[this.deviceId ?? "*"]: content}});
}
}

View File

@ -32,6 +32,7 @@ export interface Track {
get label(): string;
get id(): string;
get streamId(): string;
get settings(): MediaTrackSettings;
get muted(): boolean;
setMuted(muted: boolean): void;
}

View File

@ -47,7 +47,8 @@ export interface PeerConnection {
createOffer(): Promise<RTCSessionDescriptionInit>;
createAnswer(): Promise<RTCSessionDescriptionInit>;
setLocalDescription(description: RTCSessionDescriptionInit);
setRemoteDescription(description: RTCSessionDescriptionInit);
setRemoteDescription(description: RTCSessionDescriptionInit): Promise<void>;
addIceCandidate(candidate: RTCIceCandidate): Promise<void>;
addTrack(track: Track): void;
removeTrack(track: Track): boolean;
replaceTrack(oldTrack: Track, newTrack: Track): Promise<boolean>;

View File

@ -97,6 +97,7 @@ export class TrackWrapper implements Track {
get id(): string { return this.track.id; }
get streamId(): string { return this.stream.id; }
get muted(): boolean { return this.track.muted; }
get settings(): MediaTrackSettings { return this.track.getSettings(); }
setMuted(muted: boolean): void {
this.track.enabled = !muted;

View File

@ -15,7 +15,7 @@ limitations under the License.
*/
import {TrackWrapper, wrapTrack} from "./MediaDevices";
import {Track} from "../../types/MediaDevices";
import {Track, TrackType} from "../../types/MediaDevices";
import {WebRTC, PeerConnectionHandler, DataChannel, PeerConnection, StreamPurpose} from "../../types/WebRTC";
const POLLING_INTERVAL = 200; // ms
@ -57,6 +57,10 @@ class DOMPeerConnection implements PeerConnection {
this.peerConnection.setRemoteDescription(description);
}
addIceCandidate(candidate: RTCIceCandidate): Promise<void> {
return this.peerConnection.addIceCandidate(candidate);
}
addTrack(track: Track): void {
if (!(track instanceof TrackWrapper)) {
throw new Error("Not a TrackWrapper");
@ -152,263 +156,10 @@ class DOMPeerConnection implements PeerConnection {
let type: TrackType;
if (track.kind === "video") {
const purpose = this.handler.getPurposeForStreamId(stream.id);
type = purpose === StreamPurpose.Usermedia ? TrackType.Camera : TrackType.ScreenShare;
type = purpose === StreamPurpose.UserMedia ? TrackType.Camera : TrackType.ScreenShare;
} else {
type = TrackType.Microphone;
}
return wrapTrack(track, stream, type);
}
}
export interface ICallFeedOpts {
client: MatrixClient;
roomId: string;
userId: string;
stream: MediaStream;
purpose: SDPStreamMetadataPurpose;
audioMuted: boolean;
videoMuted: boolean;
}
export enum CallFeedEvent {
NewStream = "new_stream",
MuteStateChanged = "mute_state_changed",
VolumeChanged = "volume_changed",
Speaking = "speaking",
}
export class CallFeed extends EventEmitter {
public stream: MediaStream;
public sdpMetadataStreamId: string;
public userId: string;
public purpose: SDPStreamMetadataPurpose;
public speakingVolumeSamples: number[];
private client: MatrixClient;
private roomId: string;
private audioMuted: boolean;
private videoMuted: boolean;
private measuringVolumeActivity = false;
private audioContext: AudioContext;
private analyser: AnalyserNode;
private frequencyBinCount: Float32Array;
private speakingThreshold = SPEAKING_THRESHOLD;
private speaking = false;
private volumeLooperTimeout: number;
constructor(opts: ICallFeedOpts) {
super();
this.client = opts.client;
this.roomId = opts.roomId;
this.userId = opts.userId;
this.purpose = opts.purpose;
this.audioMuted = opts.audioMuted;
this.videoMuted = opts.videoMuted;
this.speakingVolumeSamples = new Array(SPEAKING_SAMPLE_COUNT).fill(-Infinity);
this.sdpMetadataStreamId = opts.stream.id;
this.updateStream(null, opts.stream);
if (this.hasAudioTrack) {
this.initVolumeMeasuring();
}
}
private get hasAudioTrack(): boolean {
return this.stream.getAudioTracks().length > 0;
}
private updateStream(oldStream: MediaStream, newStream: MediaStream): void {
if (newStream === oldStream) return;
if (oldStream) {
oldStream.removeEventListener("addtrack", this.onAddTrack);
this.measureVolumeActivity(false);
}
if (newStream) {
this.stream = newStream;
newStream.addEventListener("addtrack", this.onAddTrack);
if (this.hasAudioTrack) {
this.initVolumeMeasuring();
} else {
this.measureVolumeActivity(false);
}
}
this.emit(CallFeedEvent.NewStream, this.stream);
}
private initVolumeMeasuring(): void {
const AudioContext = window.AudioContext || window.webkitAudioContext;
if (!this.hasAudioTrack || !AudioContext) return;
this.audioContext = new AudioContext();
this.analyser = this.audioContext.createAnalyser();
this.analyser.fftSize = 512;
this.analyser.smoothingTimeConstant = 0.1;
const mediaStreamAudioSourceNode = this.audioContext.createMediaStreamSource(this.stream);
mediaStreamAudioSourceNode.connect(this.analyser);
this.frequencyBinCount = new Float32Array(this.analyser.frequencyBinCount);
}
private onAddTrack = (): void => {
this.emit(CallFeedEvent.NewStream, this.stream);
};
/**
* Returns callRoom member
* @returns member of the callRoom
*/
public getMember(): RoomMember {
const callRoom = this.client.getRoom(this.roomId);
return callRoom.getMember(this.userId);
}
/**
* Returns true if CallFeed is local, otherwise returns false
* @returns {boolean} is local?
*/
public isLocal(): boolean {
return this.userId === this.client.getUserId();
}
/**
* Returns true if audio is muted or if there are no audio
* tracks, otherwise returns false
* @returns {boolean} is audio muted?
*/
public isAudioMuted(): boolean {
return this.stream.getAudioTracks().length === 0 || this.audioMuted;
}
/**
* Returns true video is muted or if there are no video
* tracks, otherwise returns false
* @returns {boolean} is video muted?
*/
public isVideoMuted(): boolean {
// We assume only one video track
return this.stream.getVideoTracks().length === 0 || this.videoMuted;
}
public isSpeaking(): boolean {
return this.speaking;
}
/**
* Replaces the current MediaStream with a new one.
* This method should be only used by MatrixCall.
* @param newStream new stream with which to replace the current one
*/
public setNewStream(newStream: MediaStream): void {
this.updateStream(this.stream, newStream);
}
/**
* Set feed's internal audio mute state
* @param muted is the feed's audio muted?
*/
public setAudioMuted(muted: boolean): void {
this.audioMuted = muted;
this.speakingVolumeSamples.fill(-Infinity);
this.emit(CallFeedEvent.MuteStateChanged, this.audioMuted, this.videoMuted);
}
/**
* Set feed's internal video mute state
* @param muted is the feed's video muted?
*/
public setVideoMuted(muted: boolean): void {
this.videoMuted = muted;
this.emit(CallFeedEvent.MuteStateChanged, this.audioMuted, this.videoMuted);
}
/**
* Starts emitting volume_changed events where the emitter value is in decibels
* @param enabled emit volume changes
*/
public measureVolumeActivity(enabled: boolean): void {
if (enabled) {
if (!this.audioContext || !this.analyser || !this.frequencyBinCount || !this.hasAudioTrack) return;
this.measuringVolumeActivity = true;
this.volumeLooper();
} else {
this.measuringVolumeActivity = false;
this.speakingVolumeSamples.fill(-Infinity);
this.emit(CallFeedEvent.VolumeChanged, -Infinity);
}
}
public setSpeakingThreshold(threshold: number) {
this.speakingThreshold = threshold;
}
private volumeLooper = () => {
if (!this.analyser) return;
if (!this.measuringVolumeActivity) return;
this.analyser.getFloatFrequencyData(this.frequencyBinCount);
let maxVolume = -Infinity;
for (let i = 0; i < this.frequencyBinCount.length; i++) {
if (this.frequencyBinCount[i] > maxVolume) {
maxVolume = this.frequencyBinCount[i];
}
}
this.speakingVolumeSamples.shift();
this.speakingVolumeSamples.push(maxVolume);
this.emit(CallFeedEvent.VolumeChanged, maxVolume);
let newSpeaking = false;
for (let i = 0; i < this.speakingVolumeSamples.length; i++) {
const volume = this.speakingVolumeSamples[i];
if (volume > this.speakingThreshold) {
newSpeaking = true;
break;
}
}
if (this.speaking !== newSpeaking) {
this.speaking = newSpeaking;
this.emit(CallFeedEvent.Speaking, this.speaking);
}
this.volumeLooperTimeout = setTimeout(this.volumeLooper, POLLING_INTERVAL);
};
public clone(): CallFeed {
const mediaHandler = this.client.getMediaHandler();
const stream = this.stream.clone();
if (this.purpose === SDPStreamMetadataPurpose.Usermedia) {
mediaHandler.userMediaStreams.push(stream);
} else {
mediaHandler.screensharingStreams.push(stream);
}
return new CallFeed({
client: this.client,
roomId: this.roomId,
userId: this.userId,
stream,
purpose: this.purpose,
audioMuted: this.audioMuted,
videoMuted: this.videoMuted,
});
}
public dispose(): void {
clearTimeout(this.volumeLooperTimeout);
}
}