remove thick abstraction layer

instead just copy the DOM typing and make it part of the platform layer
This commit is contained in:
Bruno Windels 2022-04-21 17:39:11 +02:00
parent baa884e9d0
commit ac60d1b61d
13 changed files with 424 additions and 899 deletions

View file

@ -57,9 +57,7 @@ export class CallViewModel extends ViewModel<Options> {
} }
async toggleVideo() { async toggleVideo() {
const localMedia = this.call.localMedia!; //this.call.setMuted();
const toggledMedia = localMedia.withMuted(localMedia.microphoneMuted, !localMedia.cameraMuted);
await this.call.setMedia(toggledMedia);
} }
} }

View file

@ -367,9 +367,8 @@ export class RoomViewModel extends ViewModel {
const session = this.getOption("session"); const session = this.getOption("session");
const stream = await this.platform.mediaDevices.getMediaTracks(false, true); const stream = await this.platform.mediaDevices.getMediaTracks(false, true);
const localMedia = new LocalMedia().withUserMedia(stream); const localMedia = new LocalMedia().withUserMedia(stream);
await this._call.join(localMedia);
// this will set the callViewModel above as a call will be added to callHandler.calls // this will set the callViewModel above as a call will be added to callHandler.calls
const call = await session.callHandler.createCall(this._room.id, localMedia, "A call " + Math.round(this.platform.random() * 100)); const call = await session.callHandler.createCall(this._room.id, "m.video", "A call " + Math.round(this.platform.random() * 100));
await call.join(localMedia); await call.join(localMedia);
} catch (err) { } catch (err) {
console.error(err.stack); console.error(err.stack);

View file

@ -15,8 +15,8 @@ limitations under the License.
*/ */
import {ObservableMap} from "../../observable/map/ObservableMap"; import {ObservableMap} from "../../observable/map/ObservableMap";
import {WebRTC, PeerConnection, PeerConnectionHandler} from "../../platform/types/WebRTC"; import {WebRTC, PeerConnection} from "../../platform/types/WebRTC";
import {MediaDevices, Track, AudioTrack} from "../../platform/types/MediaDevices"; import {MediaDevices, Track} from "../../platform/types/MediaDevices";
import {handlesEventType} from "./PeerCall"; import {handlesEventType} from "./PeerCall";
import {EventType, CallIntent} from "./callEventTypes"; import {EventType, CallIntent} from "./callEventTypes";
import {GroupCall} from "./group/GroupCall"; import {GroupCall} from "./group/GroupCall";
@ -107,7 +107,7 @@ export class CallHandler {
}); });
} }
async createCall(roomId: string, localMedia: LocalMedia, name: string, intent: CallIntent = CallIntent.Ring): Promise<GroupCall> { async createCall(roomId: string, type: "m.video" | "m.voice", name: string, intent: CallIntent = CallIntent.Ring): Promise<GroupCall> {
const logItem = this.options.logger.child({l: "call", incoming: false}); const logItem = this.options.logger.child({l: "call", incoming: false});
const call = new GroupCall(makeId("conf-"), true, { const call = new GroupCall(makeId("conf-"), true, {
"m.name": name, "m.name": name,
@ -116,7 +116,7 @@ export class CallHandler {
this._calls.set(call.id, call); this._calls.set(call.id, call);
try { try {
await call.create(localMedia); await call.create(type);
// store call info so it will ring again when reopening the app // store call info so it will ring again when reopening the app
const txn = await this.options.storage.readWriteTxn([this.options.storage.storeNames.calls]); const txn = await this.options.storage.readWriteTxn([this.options.storage.storeNames.calls]);
txn.calls.add({ txn.calls.add({

View file

@ -17,32 +17,28 @@ limitations under the License.
import {SDPStreamMetadataPurpose} from "./callEventTypes"; import {SDPStreamMetadataPurpose} from "./callEventTypes";
import {Stream} from "../../platform/types/MediaDevices"; import {Stream} from "../../platform/types/MediaDevices";
import {SDPStreamMetadata} from "./callEventTypes"; import {SDPStreamMetadata} from "./callEventTypes";
import {getStreamVideoTrack, getStreamAudioTrack} from "./common";
export class LocalMedia { export class LocalMedia {
constructor( constructor(
public readonly userMedia?: Stream, public readonly userMedia?: Stream,
public readonly microphoneMuted: boolean = false,
public readonly cameraMuted: boolean = false,
public readonly screenShare?: Stream, public readonly screenShare?: Stream,
public readonly dataChannelOptions?: RTCDataChannelInit, public readonly dataChannelOptions?: RTCDataChannelInit,
) {} ) {}
withMuted(microphone: boolean, camera: boolean) {
return new LocalMedia(this.userMedia, microphone, camera, this.screenShare, this.dataChannelOptions);
}
withUserMedia(stream: Stream) { withUserMedia(stream: Stream) {
return new LocalMedia(stream, this.microphoneMuted, this.cameraMuted, this.screenShare, this.dataChannelOptions); return new LocalMedia(stream, this.screenShare, this.dataChannelOptions);
} }
withScreenShare(stream: Stream) { withScreenShare(stream: Stream) {
return new LocalMedia(this.userMedia, this.microphoneMuted, this.cameraMuted, stream, this.dataChannelOptions); return new LocalMedia(this.userMedia, stream, this.dataChannelOptions);
} }
withDataChannel(options: RTCDataChannelInit): LocalMedia { withDataChannel(options: RTCDataChannelInit): LocalMedia {
return new LocalMedia(this.userMedia, this.microphoneMuted, this.cameraMuted, this.screenShare, options); return new LocalMedia(this.userMedia, this.screenShare, options);
} }
/** @internal */
replaceClone(oldClone: LocalMedia | undefined, oldOriginal: LocalMedia | undefined): LocalMedia { replaceClone(oldClone: LocalMedia | undefined, oldOriginal: LocalMedia | undefined): LocalMedia {
let userMedia; let userMedia;
let screenShare; let screenShare;
@ -52,21 +48,21 @@ export class LocalMedia {
stream = oldCloneStream; stream = oldCloneStream;
} else { } else {
stream = newStream?.clone(); stream = newStream?.clone();
oldCloneStream?.audioTrack?.stop(); getStreamAudioTrack(oldCloneStream)?.stop();
oldCloneStream?.videoTrack?.stop(); getStreamVideoTrack(oldCloneStream)?.stop();
} }
return stream; return stream;
} }
return new LocalMedia( return new LocalMedia(
cloneOrAdoptStream(oldOriginal?.userMedia, oldClone?.userMedia, this.userMedia), cloneOrAdoptStream(oldOriginal?.userMedia, oldClone?.userMedia, this.userMedia),
this.microphoneMuted, this.cameraMuted,
cloneOrAdoptStream(oldOriginal?.screenShare, oldClone?.screenShare, this.screenShare), cloneOrAdoptStream(oldOriginal?.screenShare, oldClone?.screenShare, this.screenShare),
this.dataChannelOptions this.dataChannelOptions
); );
} }
/** @internal */
clone(): LocalMedia { clone(): LocalMedia {
return new LocalMedia(this.userMedia?.clone(), this.microphoneMuted, this.cameraMuted, this.screenShare?.clone(), this.dataChannelOptions); return new LocalMedia(this.userMedia?.clone(),this.screenShare?.clone(), this.dataChannelOptions);
} }
dispose() { dispose() {
@ -75,11 +71,11 @@ export class LocalMedia {
stopExcept(newMedia: LocalMedia | undefined) { stopExcept(newMedia: LocalMedia | undefined) {
if(newMedia?.userMedia?.id !== this.userMedia?.id) { if(newMedia?.userMedia?.id !== this.userMedia?.id) {
this.userMedia?.audioTrack?.stop(); getStreamAudioTrack(this.userMedia)?.stop();
this.userMedia?.videoTrack?.stop(); getStreamVideoTrack(this.userMedia)?.stop();
} }
if(newMedia?.screenShare?.id !== this.screenShare?.id) { if(newMedia?.screenShare?.id !== this.screenShare?.id) {
this.screenShare?.videoTrack?.stop(); getStreamVideoTrack(this.screenShare)?.stop();
} }
} }
} }

View file

@ -16,10 +16,10 @@ limitations under the License.
import {ObservableMap} from "../../observable/map/ObservableMap"; import {ObservableMap} from "../../observable/map/ObservableMap";
import {recursivelyAssign} from "../../utils/recursivelyAssign"; import {recursivelyAssign} from "../../utils/recursivelyAssign";
import {AsyncQueue} from "../../utils/AsyncQueue"; import {Disposables, Disposable, IDisposable} from "../../utils/Disposables";
import {Disposables, IDisposable} from "../../utils/Disposables"; import {WebRTC, PeerConnection, Receiver, PeerConnectionEventMap} from "../../platform/types/WebRTC";
import {WebRTC, PeerConnection, PeerConnectionHandler, TrackSender, TrackReceiver} from "../../platform/types/WebRTC"; import {MediaDevices, Track, TrackKind, Stream, StreamTrackEvent} from "../../platform/types/MediaDevices";
import {MediaDevices, Track, AudioTrack, Stream} from "../../platform/types/MediaDevices"; import {getStreamVideoTrack, getStreamAudioTrack, MuteSettings} from "./common";
import { import {
SDPStreamMetadataKey, SDPStreamMetadataKey,
SDPStreamMetadataPurpose, SDPStreamMetadataPurpose,
@ -69,6 +69,7 @@ export class PeerCall implements IDisposable {
private direction: CallDirection; private direction: CallDirection;
// we don't own localMedia and should hence not call dispose on it from here // we don't own localMedia and should hence not call dispose on it from here
private localMedia?: LocalMedia; private localMedia?: LocalMedia;
private localMuteSettings?: MuteSettings;
private seq: number = 0; private seq: number = 0;
// A queue for candidates waiting to go out. // A queue for candidates waiting to go out.
// We try to amalgamate candidates into a single candidate message where // We try to amalgamate candidates into a single candidate message where
@ -85,7 +86,8 @@ export class PeerCall implements IDisposable {
private hangupParty: CallParty; private hangupParty: CallParty;
private disposables = new Disposables(); private disposables = new Disposables();
private statePromiseMap = new Map<CallState, {resolve: () => void, promise: Promise<void>}>(); private statePromiseMap = new Map<CallState, {resolve: () => void, promise: Promise<void>}>();
private _remoteTrackToStreamId = new Map<string, string>();
private _remoteStreams = new Map<string, {stream: Stream, disposeListener: Disposable}>();
// perfect negotiation flags // perfect negotiation flags
private makingOffer: boolean = false; private makingOffer: boolean = false;
private ignoreOffer: boolean = false; private ignoreOffer: boolean = false;
@ -96,55 +98,62 @@ export class PeerCall implements IDisposable {
private _dataChannel?: any; private _dataChannel?: any;
private _hangupReason?: CallErrorCode; private _hangupReason?: CallErrorCode;
private _remoteMedia: RemoteMedia; private _remoteMedia: RemoteMedia;
private remoteMuteSettings?: MuteSettings;
constructor( constructor(
private callId: string, private callId: string,
private readonly options: Options, private readonly options: Options,
private readonly logItem: ILogItem, private readonly logItem: ILogItem,
) { ) {
const outer = this;
this._remoteMedia = new RemoteMedia(); this._remoteMedia = new RemoteMedia();
this.peerConnection = options.webRTC.createPeerConnection({ this.peerConnection = options.webRTC.createPeerConnection(this.options.forceTURN, this.options.turnServers, 0);
onIceConnectionStateChange(state: RTCIceConnectionState) {
outer.logItem.wrap({l: "onIceConnectionStateChange", status: state}, log => { const listen = <K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void => {
outer.onIceConnectionStateChange(state, log); this.peerConnection.addEventListener(type, listener);
const dispose = () => {
this.peerConnection.removeEventListener(type, listener);
};
this.disposables.track(dispose);
};
listen("iceconnectionstatechange", () => {
const state = this.peerConnection.iceConnectionState;
this.logItem.wrap({l: "onIceConnectionStateChange", status: state}, log => {
this.onIceConnectionStateChange(state, log);
}); });
},
onLocalIceCandidate(candidate: RTCIceCandidate) {
outer.logItem.wrap("onLocalIceCandidate", log => {
outer.handleLocalIceCandidate(candidate, log);
}); });
}, listen("icecandidate", event => {
onIceGatheringStateChange(state: RTCIceGatheringState) { this.logItem.wrap("onLocalIceCandidate", log => {
outer.logItem.wrap({l: "onIceGatheringStateChange", status: state}, log => { if (event.candidate) {
outer.handleIceGatheringState(state, log); this.handleLocalIceCandidate(event.candidate, log);
}
}); });
},
onRemoteStreamRemoved(stream: Stream) {
outer.logItem.wrap("onRemoteStreamRemoved", log => {
outer.updateRemoteMedia(log);
}); });
}, listen("icegatheringstatechange", () => {
onRemoteTracksAdded(trackReceiver: TrackReceiver) { const state = this.peerConnection.iceGatheringState;
outer.logItem.wrap("onRemoteTracksAdded", log => { this.logItem.wrap({l: "onIceGatheringStateChange", status: state}, log => {
outer.updateRemoteMedia(log); this.handleIceGatheringState(state, log);
}); });
},
onRemoteDataChannel(dataChannel: any | undefined) {
outer.logItem.wrap("onRemoteDataChannel", log => {
outer._dataChannel = dataChannel;
outer.options.emitUpdate(outer, undefined);
}); });
}, listen("track", event => {
onNegotiationNeeded() { this.logItem.wrap("onRemoteTrack", log => {
this.onRemoteTrack(event.track, event.streams, log);
});
});
listen("datachannel", event => {
this.logItem.wrap("onRemoteDataChannel", log => {
this._dataChannel = event.channel;
this.options.emitUpdate(this, undefined);
});
});
listen("negotiationneeded", () => {
const promiseCreator = () => { const promiseCreator = () => {
return outer.logItem.wrap("onNegotiationNeeded", log => { return this.logItem.wrap("onNegotiationNeeded", log => {
return outer.handleNegotiation(log); return this.handleNegotiation(log);
}); });
}; };
outer.responsePromiseChain = outer.responsePromiseChain?.then(promiseCreator) ?? promiseCreator(); this.responsePromiseChain = this.responsePromiseChain?.then(promiseCreator) ?? promiseCreator();
} });
}, this.options.forceTURN, this.options.turnServers, 0);
} }
get dataChannel(): any | undefined { return this._dataChannel; } get dataChannel(): any | undefined { return this._dataChannel; }
@ -166,7 +175,7 @@ export class PeerCall implements IDisposable {
this.setState(CallState.CreateOffer, log); this.setState(CallState.CreateOffer, log);
await this.updateLocalMedia(localMedia, log); await this.updateLocalMedia(localMedia, log);
if (this.localMedia?.dataChannelOptions) { if (this.localMedia?.dataChannelOptions) {
this._dataChannel = this.peerConnection.createDataChannel(this.localMedia.dataChannelOptions); this._dataChannel = this.peerConnection.createDataChannel("channel", this.localMedia.dataChannelOptions);
} }
// after adding the local tracks, and wait for handleNegotiation to be called, // after adding the local tracks, and wait for handleNegotiation to be called,
// or invite glare where we give up our invite and answer instead // or invite glare where we give up our invite and answer instead
@ -211,11 +220,9 @@ export class PeerCall implements IDisposable {
setMedia(localMedia: LocalMedia): Promise<void> { setMedia(localMedia: LocalMedia): Promise<void> {
return this.logItem.wrap("setMedia", async log => { return this.logItem.wrap("setMedia", async log => {
log.set("userMedia_audio", !!localMedia.userMedia?.audioTrack); log.set("userMedia_audio", !!getStreamAudioTrack(localMedia.userMedia));
log.set("userMedia_audio_muted", localMedia.microphoneMuted); log.set("userMedia_video", !!getStreamVideoTrack(localMedia.userMedia));
log.set("userMedia_video", !!localMedia.userMedia?.videoTrack); log.set("screenShare_video", !!getStreamVideoTrack(localMedia.screenShare));
log.set("userMedia_video_muted", localMedia.cameraMuted);
log.set("screenShare_video", !!localMedia.screenShare?.videoTrack);
log.set("datachannel", !!localMedia.dataChannelOptions); log.set("datachannel", !!localMedia.dataChannelOptions);
await this.updateLocalMedia(localMedia, log); await this.updateLocalMedia(localMedia, log);
const content: MCallSDPStreamMetadataChanged<MCallBase> = { const content: MCallSDPStreamMetadataChanged<MCallBase> = {
@ -322,6 +329,7 @@ export class PeerCall implements IDisposable {
this.candidateSendQueue = []; this.candidateSendQueue = [];
// need to queue this // need to queue this
if (this._state === CallState.CreateOffer) {
const content = { const content = {
call_id: this.callId, call_id: this.callId,
offer, offer,
@ -330,13 +338,17 @@ export class PeerCall implements IDisposable {
seq: this.seq++, seq: this.seq++,
lifetime: CALL_TIMEOUT_MS lifetime: CALL_TIMEOUT_MS
}; };
if (this._state === CallState.CreateOffer) {
await this.sendSignallingMessage({type: EventType.Invite, content}, log); await this.sendSignallingMessage({type: EventType.Invite, content}, log);
this.setState(CallState.InviteSent, log); this.setState(CallState.InviteSent, log);
} else if (this._state === CallState.Connected || this._state === CallState.Connecting) { } else if (this._state === CallState.Connected || this._state === CallState.Connecting) {
// send Negotiate message const content = {
content.description = content.offer; call_id: this.callId,
delete content.offer; description: offer,
[SDPStreamMetadataKey]: this.getSDPMetadata(),
version: 1,
seq: this.seq++,
lifetime: CALL_TIMEOUT_MS
};
await this.sendSignallingMessage({type: EventType.Negotiate, content}, log); await this.sendSignallingMessage({type: EventType.Negotiate, content}, log);
} }
} finally { } finally {
@ -432,7 +444,7 @@ export class PeerCall implements IDisposable {
// According to previous comments in this file, firefox at some point did not // According to previous comments in this file, firefox at some point did not
// add streams until media started arriving on them. Testing latest firefox // add streams until media started arriving on them. Testing latest firefox
// (81 at time of writing), this is no longer a problem, so let's do it the correct way. // (81 at time of writing), this is no longer a problem, so let's do it the correct way.
if (this.peerConnection.remoteStreams.size === 0) { if (this.peerConnection.getReceivers().length === 0) {
await log.wrap(`Call no remote stream or no tracks after setting remote description!`, async log => { await log.wrap(`Call no remote stream or no tracks after setting remote description!`, async log => {
return this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, log); return this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, log);
}); });
@ -843,11 +855,10 @@ export class PeerCall implements IDisposable {
const metadata = {}; const metadata = {};
if (this.localMedia?.userMedia) { if (this.localMedia?.userMedia) {
const streamId = this.localMedia.userMedia.id; const streamId = this.localMedia.userMedia.id;
const streamSender = this.peerConnection.localStreams.get(streamId);
metadata[streamId] = { metadata[streamId] = {
purpose: SDPStreamMetadataPurpose.Usermedia, purpose: SDPStreamMetadataPurpose.Usermedia,
audio_muted: this.localMedia.microphoneMuted || !this.localMedia.userMedia.audioTrack, audio_muted: this.localMuteSettings?.microphone || !getStreamAudioTrack(this.localMedia.userMedia),
video_muted: this.localMedia.cameraMuted || !this.localMedia.userMedia.videoTrack, video_muted: this.localMuteSettings?.camera || !getStreamVideoTrack(this.localMedia.userMedia),
}; };
} }
if (this.localMedia?.screenShare) { if (this.localMedia?.screenShare) {
@ -859,19 +870,67 @@ export class PeerCall implements IDisposable {
return metadata; return metadata;
} }
private updateRemoteMedia(log: ILogItem) { private findReceiverForStream(kind: TrackKind, streamId: string): Receiver | undefined {
return this.peerConnection.getReceivers().find(r => {
return r.track.kind === "audio" && this._remoteTrackToStreamId.get(r.track.id) === streamId;
});
}
private onRemoteTrack(track: Track, streams: ReadonlyArray<Stream>, log: ILogItem) {
if (streams.length === 0) {
log.log({l: `ignoring ${track.kind} streamless track`, id: track.id});
return;
}
const stream = streams[0];
this._remoteTrackToStreamId.set(track.id, stream.id);
if (!this._remoteStreams.has(stream.id)) {
const listener = (event: StreamTrackEvent): void => {
this.logItem.wrap({l: "removetrack", id: event.track.id}, log => {
const streamId = this._remoteTrackToStreamId.get(event.track.id);
if (streamId) {
this._remoteTrackToStreamId.delete(event.track.id);
const streamDetails = this._remoteStreams.get(streamId);
if (streamDetails && streamDetails.stream.getTracks().length === 0) {
this.disposables.disposeTracked(disposeListener);
this._remoteStreams.delete(stream.id);
this.updateRemoteMedia(log);
}
}
})
};
stream.addEventListener("removetrack", listener);
const disposeListener = () => {
stream.removeEventListener("removetrack", listener);
};
this.disposables.track(disposeListener);
this._remoteStreams.set(stream.id, {
disposeListener,
stream
});
this.updateRemoteMedia(log);
}
}
private updateRemoteMedia(log: ILogItem): void {
this._remoteMedia.userMedia = undefined; this._remoteMedia.userMedia = undefined;
this._remoteMedia.screenShare = undefined; this._remoteMedia.screenShare = undefined;
if (this.remoteSDPStreamMetadata) { if (this.remoteSDPStreamMetadata) {
for (const [streamId, streamReceiver] of this.peerConnection.remoteStreams.entries()) { for (const streamDetails of this._remoteStreams.values()) {
const metaData = this.remoteSDPStreamMetadata[streamId]; const {stream} = streamDetails;
const metaData = this.remoteSDPStreamMetadata[stream.id];
if (metaData) { if (metaData) {
if (metaData.purpose === SDPStreamMetadataPurpose.Usermedia) { if (metaData.purpose === SDPStreamMetadataPurpose.Usermedia) {
this._remoteMedia.userMedia = streamReceiver.stream; this._remoteMedia.userMedia = stream;
streamReceiver.audioReceiver?.enable(!metaData.audio_muted); const audioReceiver = this.findReceiverForStream(TrackKind.Audio, stream.id);
streamReceiver.videoReceiver?.enable(!metaData.video_muted); if (audioReceiver) {
audioReceiver.track.enabled = !metaData.audio_muted;
}
const videoReceiver = this.findReceiverForStream(TrackKind.Video, stream.id);
if (videoReceiver) {
videoReceiver.track.enabled = !metaData.audio_muted;
}
} else if (metaData.purpose === SDPStreamMetadataPurpose.Screenshare) { } else if (metaData.purpose === SDPStreamMetadataPurpose.Screenshare) {
this._remoteMedia.screenShare = streamReceiver.stream; this._remoteMedia.screenShare = stream;
} }
} }
} }
@ -883,71 +942,46 @@ export class PeerCall implements IDisposable {
return logItem.wrap("updateLocalMedia", async log => { return logItem.wrap("updateLocalMedia", async log => {
const oldMedia = this.localMedia; const oldMedia = this.localMedia;
this.localMedia = localMedia; this.localMedia = localMedia;
const applyStream = async (oldStream: Stream | undefined, stream: Stream | undefined, oldMuteSettings: LocalMedia | undefined, mutedSettings: LocalMedia | undefined, logLabel: string) => { const applyStream = async (oldStream: Stream | undefined, stream: Stream | undefined, streamPurpose: SDPStreamMetadataPurpose) => {
let streamSender; const applyTrack = async (oldTrack: Track | undefined, newTrack: Track | undefined) => {
if (oldStream) { if (!oldTrack && newTrack) {
streamSender = this.peerConnection.localStreams.get(oldStream.id); log.wrap(`adding ${streamPurpose} ${newTrack.kind} track`, log => {
if (stream && stream.id !== oldStream.id) { const sender = this.peerConnection.addTrack(newTrack, stream!);
this.peerConnection.localStreams.set(stream.id, streamSender); this.options.webRTC.prepareSenderForPurpose(this.peerConnection, sender, streamPurpose);
this.peerConnection.localStreams.delete(oldStream.id); });
} } else if (oldTrack) {
} const sender = this.peerConnection.getSenders().find(s => s.track && s.track.id === oldTrack.id);
if (sender) {
const applyTrack = async (oldTrack: Track | undefined, sender: TrackSender | undefined, track: Track | undefined, wasMuted: boolean | undefined, muted: boolean | undefined) => { if (newTrack && oldTrack.id !== newTrack.id) {
const changed = (!track && oldTrack) ||
(track && !oldTrack) ||
(track && oldTrack && !track.equals(oldTrack));
if (changed) {
if (track) {
if (oldTrack && sender && !track.equals(oldTrack)) {
try { try {
await log.wrap(`replacing ${logLabel} ${track.kind} track`, log => { await log.wrap(`replacing ${streamPurpose} ${newTrack.kind} track`, log => {
return sender.replaceTrack(track); return sender.replaceTrack(newTrack);
}); });
} catch (err) { } catch (err) {
// can't replace the track without renegotiating // can't replace the track without renegotiating{
log.wrap(`adding and removing ${logLabel} ${track.kind} track`, log => { log.wrap(`adding and removing ${streamPurpose} ${newTrack.kind} track`, log => {
this.peerConnection.removeTrack(sender); this.peerConnection.removeTrack(sender);
this.peerConnection.addTrack(track); this.peerConnection.addTrack(newTrack);
}); });
} }
} else { } else if (!newTrack) {
log.wrap(`adding ${logLabel} ${track.kind} track`, log => { log.wrap(`removing ${streamPurpose} ${sender.track!.kind} track`, log => {
this.peerConnection.addTrack(track);
});
}
} else {
if (sender) {
// this will be used for muting, do we really want to trigger renegotiation here?
// we want to disable the sender, but also remove the track as we don't want to keep
// using the webcam if we don't need to
log.wrap(`removing ${logLabel} ${sender.track.kind} track`, log => {
sender.track.enabled = false;
this.peerConnection.removeTrack(sender); this.peerConnection.removeTrack(sender);
}); });
}
}
} else if (track) {
log.log({l: "checking mute status", muted, wasMuted, wasCameraMuted: oldMedia?.cameraMuted, sender: !!sender, streamSender: !!streamSender, oldStream: oldStream?.id, stream: stream?.id});
if (sender && muted !== wasMuted) {
log.wrap(`${logLabel} ${track.kind} ${muted ? "muting" : "unmuting"}`, log => {
// sender.track.enabled = !muted;
// This doesn't always seem to trigger renegotiation??
// We should probably always send the new metadata first ...
sender.enable(!muted);
});
} else { } else {
log.log(`${logLabel} ${track.kind} track hasn't changed`); log.log(`${streamPurpose} ${oldTrack.kind} track hasn't changed`);
} }
} }
// TODO: should we do something if we didn't find the sender? e.g. some other code already removed the sender but didn't update localMedia
}
} }
await applyTrack(oldStream?.audioTrack, streamSender?.audioSender, stream?.audioTrack, oldMuteSettings?.microphoneMuted, mutedSettings?.microphoneMuted); await applyTrack(getStreamAudioTrack(oldStream), getStreamAudioTrack(stream));
await applyTrack(oldStream?.videoTrack, streamSender?.videoSender, stream?.videoTrack, oldMuteSettings?.cameraMuted, mutedSettings?.cameraMuted); await applyTrack(getStreamVideoTrack(oldStream), getStreamVideoTrack(stream));
} };
await applyStream(oldMedia?.userMedia, localMedia?.userMedia, oldMedia, localMedia, "userMedia"); await applyStream(oldMedia?.userMedia, localMedia?.userMedia, SDPStreamMetadataPurpose.Usermedia);
await applyStream(oldMedia?.screenShare, localMedia?.screenShare, undefined, undefined, "screenShare"); await applyStream(oldMedia?.screenShare, localMedia?.screenShare, SDPStreamMetadataPurpose.Screenshare);
// TODO: datachannel, but don't do it here as we don't want to do it from answer, rather in different method // TODO: datachannel, but don't do it here as we don't want to do it from answer, rather in different method
}); });
} }
@ -967,7 +1001,7 @@ export class PeerCall implements IDisposable {
public dispose(): void { public dispose(): void {
this.disposables.dispose(); this.disposables.dispose();
this.peerConnection.dispose(); this.peerConnection.close();
} }
public close(reason: CallErrorCode | undefined, log: ILogItem): void { public close(reason: CallErrorCode | undefined, log: ILogItem): void {
@ -1038,6 +1072,7 @@ export function handlesEventType(eventType: string): boolean {
eventType === EventType.Negotiate; eventType === EventType.Negotiate;
} }
export function tests() { export function tests() {
} }

View file

@ -0,0 +1,29 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import type {Track, Stream} from "../../platform/types/MediaDevices";
export function getStreamAudioTrack(stream: Stream | undefined): Track | undefined {
return stream?.getAudioTracks()[0];
}
export function getStreamVideoTrack(stream: Stream | undefined): Track | undefined {
return stream?.getVideoTracks()[0];
}
export class MuteSettings {
constructor (public readonly microphone: boolean, public readonly camera: boolean) {}
}

View file

@ -168,7 +168,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
} }
/** @internal */ /** @internal */
create(localMedia: LocalMedia): Promise<void> { create(type: "m.video" | "m.voice"): Promise<void> {
return this.logItem.wrap("create", async log => { return this.logItem.wrap("create", async log => {
if (this._state !== GroupCallState.Fledgling) { if (this._state !== GroupCallState.Fledgling) {
return; return;
@ -176,7 +176,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
this._state = GroupCallState.Creating; this._state = GroupCallState.Creating;
this.emitChange(); this.emitChange();
this.callContent = Object.assign({ this.callContent = Object.assign({
"m.type": localMedia.userMedia?.videoTrack ? "m.video" : "m.voice", "m.type": type,
}, this.callContent); }, this.callContent);
const request = this.options.hsApi.sendState(this.roomId, EventType.GroupCall, this.id, this.callContent!, {log}); const request = this.options.hsApi.sendState(this.roomId, EventType.GroupCall, this.id, this.callContent!, {log});
await request.response(); await request.response();

View file

@ -14,15 +14,39 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
export interface Event {}
export interface MediaDevices { export interface MediaDevices {
// filter out audiooutput // filter out audiooutput
enumerate(): Promise<MediaDeviceInfo[]>; enumerate(): Promise<MediaDeviceInfo[]>;
// to assign to a video element, we downcast to WrappedTrack and use the stream property. // to assign to a video element, we downcast to WrappedTrack and use the stream property.
getMediaTracks(audio: true | MediaDeviceInfo, video: boolean | MediaDeviceInfo): Promise<Stream>; getMediaTracks(audio: true | MediaDeviceInfo, video: boolean | MediaDeviceInfo): Promise<Stream>;
getScreenShareTrack(): Promise<Stream | undefined>; getScreenShareTrack(): Promise<Stream | undefined>;
createVolumeMeasurer(stream: Stream): VolumeMeasurer; createVolumeMeasurer(stream: Stream, callback: () => void): VolumeMeasurer;
} }
// Typescript definitions derived from https://github.com/microsoft/TypeScript/blob/main/lib/lib.dom.d.ts
/*! *****************************************************************************
Copyright (c) Microsoft Corporation. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions
and limitations under the License.
***************************************************************************** */
export interface StreamTrackEvent extends Event {
readonly track: Track;
}
export interface StreamEventMap {
"addtrack": StreamTrackEvent;
"removetrack": StreamTrackEvent;
}
export interface Stream { export interface Stream {
getTracks(): ReadonlyArray<Track>; getTracks(): ReadonlyArray<Track>;
@ -30,6 +54,8 @@ export interface Stream {
getVideoTracks(): ReadonlyArray<Track>; getVideoTracks(): ReadonlyArray<Track>;
readonly id: string; readonly id: string;
clone(): Stream; clone(): Stream;
addEventListener<K extends keyof StreamEventMap>(type: K, listener: (this: Stream, ev: StreamEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void;
removeEventListener<K extends keyof StreamEventMap>(type: K, listener: (this: Stream, ev: StreamEventMap[K]) => any, options?: boolean | EventListenerOptions): void;
} }
export enum TrackKind { export enum TrackKind {
@ -47,5 +73,7 @@ export interface Track {
} }
export interface VolumeMeasurer { export interface VolumeMeasurer {
get isSpeaking(): boolean;
setSpeakingThreshold(threshold: number): void;
stop();
} }

View file

@ -14,63 +14,156 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
import {Track, Stream} from "./MediaDevices"; import {Track, Stream, Event} from "./MediaDevices";
import {SDPStreamMetadataPurpose} from "../../matrix/calls/callEventTypes"; import {SDPStreamMetadataPurpose} from "../../matrix/calls/callEventTypes";
export interface WebRTC { export interface WebRTC {
createPeerConnection(handler: PeerConnectionHandler, forceTURN: boolean, turnServers: RTCIceServer[], iceCandidatePoolSize: number): PeerConnection; createPeerConnection(forceTURN: boolean, turnServers: RTCIceServer[], iceCandidatePoolSize: number): PeerConnection;
prepareSenderForPurpose(peerConnection: PeerConnection, sender: Sender, purpose: SDPStreamMetadataPurpose): void;
} }
export interface StreamSender { // Typescript definitions derived from https://github.com/microsoft/TypeScript/blob/main/lib/lib.dom.d.ts
get stream(): Stream; /*! *****************************************************************************
get audioSender(): TrackSender | undefined; Copyright (c) Microsoft Corporation. All rights reserved.
get videoSender(): TrackSender | undefined; Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED ON AN *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions
and limitations under the License.
***************************************************************************** */
export interface DataChannelEventMap {
"bufferedamountlow": Event;
"close": Event;
"error": Event;
"message": MessageEvent;
"open": Event;
} }
export interface StreamReceiver { export interface DataChannel {
get stream(): Stream; binaryType: BinaryType;
get audioReceiver(): TrackReceiver | undefined; readonly id: number | null;
get videoReceiver(): TrackReceiver | undefined; readonly label: string;
} readonly negotiated: boolean;
readonly readyState: DataChannelState;
export interface TrackReceiver {
get track(): Track;
get enabled(): boolean;
enable(enabled: boolean); // this modifies the transceiver direction
}
export interface TrackSender extends TrackReceiver {
/** replaces the track if possible without renegotiation. Can throw. */
replaceTrack(track: Track | undefined): Promise<void>;
/** make any needed adjustments to the sender or transceiver settings
* depending on the purpose, after adding the track to the connection */
prepareForPurpose(purpose: SDPStreamMetadataPurpose): void;
}
export interface PeerConnectionHandler {
onIceConnectionStateChange(state: RTCIceConnectionState);
onLocalIceCandidate(candidate: RTCIceCandidate);
onIceGatheringStateChange(state: RTCIceGatheringState);
onRemoteStreamRemoved(stream: Stream);
onRemoteTracksAdded(receiver: TrackReceiver);
onRemoteDataChannel(dataChannel: any | undefined);
onNegotiationNeeded();
}
export interface PeerConnection {
get iceGatheringState(): RTCIceGatheringState;
get signalingState(): RTCSignalingState;
get localDescription(): RTCSessionDescription | undefined;
get localStreams(): ReadonlyMap<string, StreamSender>;
get remoteStreams(): ReadonlyMap<string, StreamReceiver>;
createOffer(): Promise<RTCSessionDescriptionInit>;
createAnswer(): Promise<RTCSessionDescriptionInit>;
setLocalDescription(description?: RTCSessionDescriptionInit): Promise<void>;
setRemoteDescription(description: RTCSessionDescriptionInit): Promise<void>;
addIceCandidate(candidate: RTCIceCandidate): Promise<void>;
addTrack(track: Track): TrackSender | undefined;
removeTrack(track: TrackSender): void;
createDataChannel(options: RTCDataChannelInit): any;
dispose(): void;
close(): void; close(): void;
send(data: string): void;
send(data: Blob): void;
send(data: ArrayBuffer): void;
send(data: ArrayBufferView): void;
addEventListener<K extends keyof DataChannelEventMap>(type: K, listener: (this: DataChannel, ev: DataChannelEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void;
removeEventListener<K extends keyof DataChannelEventMap>(type: K, listener: (this: DataChannel, ev: DataChannelEventMap[K]) => any, options?: boolean | EventListenerOptions): void;
}
export interface DataChannelInit {
id?: number;
maxPacketLifeTime?: number;
maxRetransmits?: number;
negotiated?: boolean;
ordered?: boolean;
protocol?: string;
}
export interface DataChannelEvent extends Event {
readonly channel: DataChannel;
}
export interface PeerConnectionIceEvent extends Event {
readonly candidate: RTCIceCandidate | null;
}
export interface TrackEvent extends Event {
readonly receiver: Receiver;
readonly streams: ReadonlyArray<Stream>;
readonly track: Track;
readonly transceiver: Transceiver;
}
export interface PeerConnectionEventMap {
"connectionstatechange": Event;
"datachannel": DataChannelEvent;
"icecandidate": PeerConnectionIceEvent;
"iceconnectionstatechange": Event;
"icegatheringstatechange": Event;
"negotiationneeded": Event;
"signalingstatechange": Event;
"track": TrackEvent;
}
export type DataChannelState = "closed" | "closing" | "connecting" | "open";
export type IceConnectionState = "checking" | "closed" | "completed" | "connected" | "disconnected" | "failed" | "new";
export type PeerConnectionState = "closed" | "connected" | "connecting" | "disconnected" | "failed" | "new";
export type SignalingState = "closed" | "have-local-offer" | "have-local-pranswer" | "have-remote-offer" | "have-remote-pranswer" | "stable";
export type IceGatheringState = "complete" | "gathering" | "new";
export type SdpType = "answer" | "offer" | "pranswer" | "rollback";
export type TransceiverDirection = "inactive" | "recvonly" | "sendonly" | "sendrecv" | "stopped";
export interface SessionDescription {
readonly sdp: string;
readonly type: SdpType;
toJSON(): any;
}
export interface AnswerOptions {}
export interface OfferOptions {
iceRestart?: boolean;
offerToReceiveAudio?: boolean;
offerToReceiveVideo?: boolean;
}
export interface SessionDescriptionInit {
sdp?: string;
type: SdpType;
}
export interface LocalSessionDescriptionInit {
sdp?: string;
type?: SdpType;
}
/** A WebRTC connection between the local computer and a remote peer. It provides methods to connect to a remote peer, maintain and monitor the connection, and close the connection once it's no longer needed. */
export interface PeerConnection {
readonly connectionState: PeerConnectionState;
readonly iceConnectionState: IceConnectionState;
readonly iceGatheringState: IceGatheringState;
readonly localDescription: SessionDescription | null;
readonly remoteDescription: SessionDescription | null;
readonly signalingState: SignalingState;
addIceCandidate(candidate?: RTCIceCandidateInit): Promise<void>;
addTrack(track: Track, ...streams: Stream[]): Sender;
close(): void;
createAnswer(options?: AnswerOptions): Promise<SessionDescriptionInit>;
createDataChannel(label: string, dataChannelDict?: DataChannelInit): DataChannel;
createOffer(options?: OfferOptions): Promise<SessionDescriptionInit>;
getReceivers(): Receiver[];
getSenders(): Sender[];
getTransceivers(): Transceiver[];
removeTrack(sender: Sender): void;
restartIce(): void;
setLocalDescription(description?: LocalSessionDescriptionInit): Promise<void>;
setRemoteDescription(description: SessionDescriptionInit): Promise<void>;
addEventListener<K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | AddEventListenerOptions): void;
removeEventListener<K extends keyof PeerConnectionEventMap>(type: K, listener: (this: PeerConnection, ev: PeerConnectionEventMap[K]) => any, options?: boolean | EventListenerOptions): void;
}
export interface Receiver {
readonly track: Track;
}
export interface Sender {
readonly track: Track | null;
replaceTrack(withTrack: Track | null): Promise<void>;
}
export interface Transceiver {
readonly currentDirection: TransceiverDirection | null;
direction: TransceiverDirection;
readonly mid: string | null;
readonly receiver: Receiver;
readonly sender: Sender;
stop(): void;
} }

View file

@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
import {MediaDevices as IMediaDevices, Stream, Track, TrackKind, AudioTrack} from "../../types/MediaDevices"; import {MediaDevices as IMediaDevices, Stream, Track, TrackKind, VolumeMeasurer} from "../../types/MediaDevices";
const POLLING_INTERVAL = 200; // ms const POLLING_INTERVAL = 200; // ms
export const SPEAKING_THRESHOLD = -60; // dB export const SPEAKING_THRESHOLD = -60; // dB
@ -30,12 +30,12 @@ export class MediaDevicesWrapper implements IMediaDevices {
async getMediaTracks(audio: true | MediaDeviceInfo, video: boolean | MediaDeviceInfo): Promise<Stream> { async getMediaTracks(audio: true | MediaDeviceInfo, video: boolean | MediaDeviceInfo): Promise<Stream> {
const stream = await this.mediaDevices.getUserMedia(this.getUserMediaContraints(audio, video)); const stream = await this.mediaDevices.getUserMedia(this.getUserMediaContraints(audio, video));
return new StreamWrapper(stream); return stream as Stream;
} }
async getScreenShareTrack(): Promise<Stream | undefined> { async getScreenShareTrack(): Promise<Stream | undefined> {
const stream = await this.mediaDevices.getDisplayMedia(this.getScreenshareContraints()); const stream = await this.mediaDevices.getDisplayMedia(this.getScreenshareContraints());
return new StreamWrapper(stream); return stream as Stream;
} }
private getUserMediaContraints(audio: boolean | MediaDeviceInfo, video: boolean | MediaDeviceInfo): MediaStreamConstraints { private getUserMediaContraints(audio: boolean | MediaDeviceInfo, video: boolean | MediaDeviceInfo): MediaStreamConstraints {
@ -68,70 +68,13 @@ export class MediaDevicesWrapper implements IMediaDevices {
video: true, video: true,
}; };
} }
}
export class StreamWrapper implements Stream { createVolumeMeasurer(stream: Stream, callback: () => void): VolumeMeasurer {
return new WebAudioVolumeMeasurer(stream as MediaStream, callback);
public audioTrack: AudioTrackWrapper | undefined = undefined;
public videoTrack: TrackWrapper | undefined = undefined;
constructor(public readonly stream: MediaStream, clonedTracks?: {audioTrack?: AudioTrackWrapper, videoTrack?: TrackWrapper}) {
if (clonedTracks) {
this.audioTrack = clonedTracks.audioTrack;
this.videoTrack = clonedTracks.videoTrack;
} else {
for (const track of stream.getTracks()) {
this.update(track);
}
} }
} }
get id(): string { return this.stream.id; } export class WebAudioVolumeMeasurer implements VolumeMeasurer {
clone(): Stream {
const clonedStream = this.stream.clone();
const clonedTracks = {
audioTrack: this.audioTrack ? new AudioTrackWrapper(clonedStream.getAudioTracks()[0], clonedStream, this.audioTrack.id): undefined,
videoTrack: this.videoTrack ? new TrackWrapper(clonedStream.getVideoTracks()[0], clonedStream, this.videoTrack.id): undefined,
};
return new StreamWrapper(clonedStream, clonedTracks);
}
update(track: MediaStreamTrack): TrackWrapper | undefined {
//console.trace("Stream.update " + JSON.stringify({id: track.id, vid: this.videoTrack?.id, aid: this.audioTrack?.id}));
if (track.kind === "video") {
if (!this.videoTrack || track.id !== this.videoTrack.track.id) {
this.videoTrack = new TrackWrapper(track, this.stream, track.id);
}
return this.videoTrack;
} else if (track.kind === "audio") {
if (!this.audioTrack || track.id !== this.audioTrack.track.id) {
this.audioTrack = new AudioTrackWrapper(track, this.stream, track.id);
}
return this.audioTrack;
}
}
}
export class TrackWrapper implements Track {
constructor(
public readonly track: MediaStreamTrack,
public readonly stream: MediaStream,
public readonly originalId: string,
) {}
get kind(): TrackKind { return this.track.kind as TrackKind; }
get label(): string { return this.track.label; }
get id(): string { return this.track.id; }
get settings(): MediaTrackSettings { return this.track.getSettings(); }
get enabled(): boolean { return this.track.enabled; }
set enabled(enabled: boolean) { this.track.enabled = enabled; }
// test equality across clones
equals(track: Track): boolean { return (track as TrackWrapper).originalId === this.originalId; }
stop() { this.track.stop(); }
}
export class AudioTrackWrapper extends TrackWrapper {
private measuringVolumeActivity = false; private measuringVolumeActivity = false;
private audioContext?: AudioContext; private audioContext?: AudioContext;
private analyser: AnalyserNode; private analyser: AnalyserNode;
@ -140,9 +83,12 @@ export class AudioTrackWrapper extends TrackWrapper {
private speaking = false; private speaking = false;
private volumeLooperTimeout: number; private volumeLooperTimeout: number;
private speakingVolumeSamples: number[]; private speakingVolumeSamples: number[];
private callback: () => void;
private stream: MediaStream;
constructor(track: MediaStreamTrack, stream: MediaStream, originalId: string) { constructor(stream: MediaStream, callback: () => void) {
super(track, stream, originalId); this.stream = stream;
this.callback = callback;
this.speakingVolumeSamples = new Array(SPEAKING_SAMPLE_COUNT).fill(-Infinity); this.speakingVolumeSamples = new Array(SPEAKING_SAMPLE_COUNT).fill(-Infinity);
this.initVolumeMeasuring(); this.initVolumeMeasuring();
this.measureVolumeActivity(true); this.measureVolumeActivity(true);
@ -162,6 +108,7 @@ export class AudioTrackWrapper extends TrackWrapper {
} else { } else {
this.measuringVolumeActivity = false; this.measuringVolumeActivity = false;
this.speakingVolumeSamples.fill(-Infinity); this.speakingVolumeSamples.fill(-Infinity);
this.callback();
// this.emit(CallFeedEvent.VolumeChanged, -Infinity); // this.emit(CallFeedEvent.VolumeChanged, -Infinity);
} }
} }
@ -182,7 +129,6 @@ export class AudioTrackWrapper extends TrackWrapper {
this.frequencyBinCount = new Float32Array(this.analyser.frequencyBinCount); this.frequencyBinCount = new Float32Array(this.analyser.frequencyBinCount);
} }
public setSpeakingThreshold(threshold: number) { public setSpeakingThreshold(threshold: number) {
this.speakingThreshold = threshold; this.speakingThreshold = threshold;
} }
@ -204,6 +150,7 @@ export class AudioTrackWrapper extends TrackWrapper {
this.speakingVolumeSamples.shift(); this.speakingVolumeSamples.shift();
this.speakingVolumeSamples.push(maxVolume); this.speakingVolumeSamples.push(maxVolume);
this.callback();
// this.emit(CallFeedEvent.VolumeChanged, maxVolume); // this.emit(CallFeedEvent.VolumeChanged, maxVolume);
let newSpeaking = false; let newSpeaking = false;
@ -219,267 +166,16 @@ export class AudioTrackWrapper extends TrackWrapper {
if (this.speaking !== newSpeaking) { if (this.speaking !== newSpeaking) {
this.speaking = newSpeaking; this.speaking = newSpeaking;
this.callback();
// this.emit(CallFeedEvent.Speaking, this.speaking); // this.emit(CallFeedEvent.Speaking, this.speaking);
} }
this.volumeLooperTimeout = setTimeout(this.volumeLooper, POLLING_INTERVAL) as unknown as number; this.volumeLooperTimeout = setTimeout(this.volumeLooper, POLLING_INTERVAL) as unknown as number;
}; };
public dispose(): void { public stop(): void {
clearTimeout(this.volumeLooperTimeout); clearTimeout(this.volumeLooperTimeout);
this.analyser.disconnect();
this.audioContext?.close();
} }
} }
// export interface ICallFeedOpts {
// client: MatrixClient;
// roomId: string;
// userId: string;
// stream: MediaStream;
// purpose: SDPStreamMetadataPurpose;
// audioMuted: boolean;
// videoMuted: boolean;
// }
// export enum CallFeedEvent {
// NewStream = "new_stream",
// MuteStateChanged = "mute_state_changed",
// VolumeChanged = "volume_changed",
// Speaking = "speaking",
// }
// export class CallFeed extends EventEmitter {
// public stream: MediaStream;
// public sdpMetadataStreamId: string;
// public userId: string;
// public purpose: SDPStreamMetadataPurpose;
// public speakingVolumeSamples: number[];
// private client: MatrixClient;
// private roomId: string;
// private audioMuted: boolean;
// private videoMuted: boolean;
// private measuringVolumeActivity = false;
// private audioContext: AudioContext;
// private analyser: AnalyserNode;
// private frequencyBinCount: Float32Array;
// private speakingThreshold = SPEAKING_THRESHOLD;
// private speaking = false;
// private volumeLooperTimeout: number;
// constructor(opts: ICallFeedOpts) {
// super();
// this.client = opts.client;
// this.roomId = opts.roomId;
// this.userId = opts.userId;
// this.purpose = opts.purpose;
// this.audioMuted = opts.audioMuted;
// this.videoMuted = opts.videoMuted;
// this.speakingVolumeSamples = new Array(SPEAKING_SAMPLE_COUNT).fill(-Infinity);
// this.sdpMetadataStreamId = opts.stream.id;
// this.updateStream(null, opts.stream);
// if (this.hasAudioTrack) {
// this.initVolumeMeasuring();
// }
// }
// private get hasAudioTrack(): boolean {
// return this.stream.getAudioTracks().length > 0;
// }
// private updateStream(oldStream: MediaStream, newStream: MediaStream): void {
// if (newStream === oldStream) return;
// if (oldStream) {
// oldStream.removeEventListener("addtrack", this.onAddTrack);
// this.measureVolumeActivity(false);
// }
// if (newStream) {
// this.stream = newStream;
// newStream.addEventListener("addtrack", this.onAddTrack);
// if (this.hasAudioTrack) {
// this.initVolumeMeasuring();
// } else {
// this.measureVolumeActivity(false);
// }
// }
// this.emit(CallFeedEvent.NewStream, this.stream);
// }
// private initVolumeMeasuring(): void {
// const AudioContext = window.AudioContext || window.webkitAudioContext;
// if (!this.hasAudioTrack || !AudioContext) return;
// this.audioContext = new AudioContext();
// this.analyser = this.audioContext.createAnalyser();
// this.analyser.fftSize = 512;
// this.analyser.smoothingTimeConstant = 0.1;
// const mediaStreamAudioSourceNode = this.audioContext.createMediaStreamSource(this.stream);
// mediaStreamAudioSourceNode.connect(this.analyser);
// this.frequencyBinCount = new Float32Array(this.analyser.frequencyBinCount);
// }
// private onAddTrack = (): void => {
// this.emit(CallFeedEvent.NewStream, this.stream);
// };
// /**
// * Returns callRoom member
// * @returns member of the callRoom
// */
// public getMember(): RoomMember {
// const callRoom = this.client.getRoom(this.roomId);
// return callRoom.getMember(this.userId);
// }
// /**
// * Returns true if CallFeed is local, otherwise returns false
// * @returns {boolean} is local?
// */
// public isLocal(): boolean {
// return this.userId === this.client.getUserId();
// }
// /**
// * Returns true if audio is muted or if there are no audio
// * tracks, otherwise returns false
// * @returns {boolean} is audio muted?
// */
// public isAudioMuted(): boolean {
// return this.stream.getAudioTracks().length === 0 || this.audioMuted;
// }
// *
// * Returns true video is muted or if there are no video
// * tracks, otherwise returns false
// * @returns {boolean} is video muted?
// public isVideoMuted(): boolean {
// // We assume only one video track
// return this.stream.getVideoTracks().length === 0 || this.videoMuted;
// }
// public isSpeaking(): boolean {
// return this.speaking;
// }
// /**
// * Replaces the current MediaStream with a new one.
// * This method should be only used by MatrixCall.
// * @param newStream new stream with which to replace the current one
// */
// public setNewStream(newStream: MediaStream): void {
// this.updateStream(this.stream, newStream);
// }
// /**
// * Set feed's internal audio mute state
// * @param muted is the feed's audio muted?
// */
// public setAudioMuted(muted: boolean): void {
// this.audioMuted = muted;
// this.speakingVolumeSamples.fill(-Infinity);
// this.emit(CallFeedEvent.MuteStateChanged, this.audioMuted, this.videoMuted);
// }
// /**
// * Set feed's internal video mute state
// * @param muted is the feed's video muted?
// */
// public setVideoMuted(muted: boolean): void {
// this.videoMuted = muted;
// this.emit(CallFeedEvent.MuteStateChanged, this.audioMuted, this.videoMuted);
// }
// /**
// * Starts emitting volume_changed events where the emitter value is in decibels
// * @param enabled emit volume changes
// */
// public measureVolumeActivity(enabled: boolean): void {
// if (enabled) {
// if (!this.audioContext || !this.analyser || !this.frequencyBinCount || !this.hasAudioTrack) return;
// this.measuringVolumeActivity = true;
// this.volumeLooper();
// } else {
// this.measuringVolumeActivity = false;
// this.speakingVolumeSamples.fill(-Infinity);
// this.emit(CallFeedEvent.VolumeChanged, -Infinity);
// }
// }
// public setSpeakingThreshold(threshold: number) {
// this.speakingThreshold = threshold;
// }
// private volumeLooper = () => {
// if (!this.analyser) return;
// if (!this.measuringVolumeActivity) return;
// this.analyser.getFloatFrequencyData(this.frequencyBinCount);
// let maxVolume = -Infinity;
// for (let i = 0; i < this.frequencyBinCount.length; i++) {
// if (this.frequencyBinCount[i] > maxVolume) {
// maxVolume = this.frequencyBinCount[i];
// }
// }
// this.speakingVolumeSamples.shift();
// this.speakingVolumeSamples.push(maxVolume);
// this.emit(CallFeedEvent.VolumeChanged, maxVolume);
// let newSpeaking = false;
// for (let i = 0; i < this.speakingVolumeSamples.length; i++) {
// const volume = this.speakingVolumeSamples[i];
// if (volume > this.speakingThreshold) {
// newSpeaking = true;
// break;
// }
// }
// if (this.speaking !== newSpeaking) {
// this.speaking = newSpeaking;
// this.emit(CallFeedEvent.Speaking, this.speaking);
// }
// this.volumeLooperTimeout = setTimeout(this.volumeLooper, POLLING_INTERVAL);
// };
// public clone(): CallFeed {
// const mediaHandler = this.client.getMediaHandler();
// const stream = this.stream.clone();
// if (this.purpose === SDPStreamMetadataPurpose.Usermedia) {
// mediaHandler.userMediaStreams.push(stream);
// } else {
// mediaHandler.screensharingStreams.push(stream);
// }
// return new CallFeed({
// client: this.client,
// roomId: this.roomId,
// userId: this.userId,
// stream,
// purpose: this.purpose,
// audioMuted: this.audioMuted,
// videoMuted: this.videoMuted,
// });
// }
// public dispose(): void {
// clearTimeout(this.volumeLooperTimeout);
// this.measureVolumeActivity(false);
// }
// }

View file

@ -14,9 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
import {StreamWrapper, TrackWrapper, AudioTrackWrapper} from "./MediaDevices"; import {Stream, Track, TrackKind} from "../../types/MediaDevices";
import {Stream, Track, AudioTrack, TrackKind} from "../../types/MediaDevices"; import {WebRTC, Sender, PeerConnection} from "../../types/WebRTC";
import {WebRTC, PeerConnectionHandler, StreamSender, TrackSender, StreamReceiver, TrackReceiver, PeerConnection} from "../../types/WebRTC";
import {SDPStreamMetadataPurpose} from "../../../matrix/calls/callEventTypes"; import {SDPStreamMetadataPurpose} from "../../../matrix/calls/callEventTypes";
const POLLING_INTERVAL = 200; // ms const POLLING_INTERVAL = 200; // ms
@ -24,159 +23,21 @@ export const SPEAKING_THRESHOLD = -60; // dB
const SPEAKING_SAMPLE_COUNT = 8; // samples const SPEAKING_SAMPLE_COUNT = 8; // samples
export class DOMWebRTC implements WebRTC { export class DOMWebRTC implements WebRTC {
createPeerConnection(handler: PeerConnectionHandler, forceTURN: boolean, turnServers: RTCIceServer[], iceCandidatePoolSize): PeerConnection { createPeerConnection(forceTURN: boolean, turnServers: RTCIceServer[], iceCandidatePoolSize): PeerConnection {
return new DOMPeerConnection(handler, forceTURN, turnServers, iceCandidatePoolSize); return new RTCPeerConnection({
} iceTransportPolicy: forceTURN ? 'relay' : undefined,
iceServers: turnServers,
iceCandidatePoolSize: iceCandidatePoolSize,
}) as PeerConnection;
} }
export class RemoteStreamWrapper extends StreamWrapper { prepareSenderForPurpose(peerConnection: PeerConnection, sender: Sender, purpose: SDPStreamMetadataPurpose): void {
constructor(stream: MediaStream, private readonly emptyCallback: (stream: RemoteStreamWrapper) => void) {
super(stream);
this.stream.addEventListener("removetrack", this.onTrackRemoved);
}
onTrackRemoved = (evt: MediaStreamTrackEvent) => {
if (evt.track.id === this.audioTrack?.track.id) {
this.audioTrack = undefined;
} else if (evt.track.id === this.videoTrack?.track.id) {
this.videoTrack = undefined;
}
if (!this.audioTrack && !this.videoTrack) {
this.emptyCallback(this);
}
};
dispose() {
this.stream.removeEventListener("removetrack", this.onTrackRemoved);
}
}
export class DOMStreamSender implements StreamSender {
public audioSender: DOMTrackSender | undefined;
public videoSender: DOMTrackSender | undefined;
constructor(public readonly stream: StreamWrapper) {}
update(transceivers: ReadonlyArray<RTCRtpTransceiver>, sender: RTCRtpSender): DOMTrackSender | undefined {
const transceiver = transceivers.find(t => t.sender === sender);
if (transceiver && sender.track) {
const trackWrapper = this.stream.update(sender.track);
if (trackWrapper) {
if (trackWrapper.kind === TrackKind.Video && (!this.videoSender || this.videoSender.track.id !== trackWrapper.id)) {
this.videoSender = new DOMTrackSender(trackWrapper, transceiver);
return this.videoSender;
} else if (trackWrapper.kind === TrackKind.Audio && (!this.audioSender || this.audioSender.track.id !== trackWrapper.id)) {
this.audioSender = new DOMTrackSender(trackWrapper, transceiver);
return this.audioSender;
}
}
}
}
}
export class DOMStreamReceiver implements StreamReceiver {
public audioReceiver: DOMTrackReceiver | undefined;
public videoReceiver: DOMTrackReceiver | undefined;
constructor(public readonly stream: RemoteStreamWrapper) {}
update(event: RTCTrackEvent): DOMTrackReceiver | undefined {
const {receiver} = event;
const {track} = receiver;
const trackWrapper = this.stream.update(track);
if (trackWrapper) {
if (trackWrapper.kind === TrackKind.Video) {
this.videoReceiver = new DOMTrackReceiver(trackWrapper, event.transceiver);
return this.videoReceiver;
} else {
this.audioReceiver = new DOMTrackReceiver(trackWrapper, event.transceiver);
return this.audioReceiver;
}
}
}
}
export class DOMTrackSenderOrReceiver implements TrackReceiver {
constructor(
public readonly track: TrackWrapper,
public readonly transceiver: RTCRtpTransceiver,
private readonly exclusiveValue: RTCRtpTransceiverDirection,
private readonly excludedValue: RTCRtpTransceiverDirection
) {}
get enabled(): boolean {
return this.transceiver.direction === "sendrecv" ||
this.transceiver.direction === this.exclusiveValue;
}
enableWithoutRenegotiation(enabled: boolean) {
this.track.track.enabled = enabled;
}
enable(enabled: boolean) {
if (enabled !== this.enabled) {
// do this first, so we stop sending track data immediately.
// this will still consume bandwidth though, so also disable the transceiver,
// which will trigger a renegotiation though.
this.enableWithoutRenegotiation(enabled);
if (enabled) {
if (this.transceiver.direction === "inactive") {
this.transceiver.direction = this.exclusiveValue;
} else {
this.transceiver.direction = "sendrecv";
}
} else {
if (this.transceiver.direction === "sendrecv") {
this.transceiver.direction = this.excludedValue;
} else {
this.transceiver.direction = "inactive";
}
}
}
}
}
export class DOMTrackReceiver extends DOMTrackSenderOrReceiver {
constructor(
track: TrackWrapper,
transceiver: RTCRtpTransceiver,
) {
super(track, transceiver, "recvonly", "sendonly");
}
}
export class DOMTrackSender extends DOMTrackSenderOrReceiver {
constructor(
track: TrackWrapper,
transceiver: RTCRtpTransceiver,
) {
super(track, transceiver, "sendonly", "recvonly");
}
/** replaces the track if possible without renegotiation. Can throw. */
replaceTrack(track: Track | undefined): Promise<void> {
return this.transceiver.sender.replaceTrack(track ? (track as TrackWrapper).track : null);
}
prepareForPurpose(purpose: SDPStreamMetadataPurpose): void {
if (purpose === SDPStreamMetadataPurpose.Screenshare) { if (purpose === SDPStreamMetadataPurpose.Screenshare) {
this.getRidOfRTXCodecs(); this.getRidOfRTXCodecs(peerConnection as RTCPeerConnection, sender as RTCRtpSender);
} }
} }
/** private getRidOfRTXCodecs(peerConnection: RTCPeerConnection, sender: RTCRtpSender): void {
* This method removes all video/rtx codecs from screensharing video
* transceivers. This is necessary since they can cause problems. Without
* this the following steps should produce an error:
* Chromium calls Firefox
* Firefox answers
* Firefox starts screen-sharing
* Chromium starts screen-sharing
* Call crashes for Chromium with:
* [96685:23:0518/162603.933321:ERROR:webrtc_video_engine.cc(3296)] RTX codec (PT=97) mapped to PT=96 which is not in the codec list.
* [96685:23:0518/162603.933377:ERROR:webrtc_video_engine.cc(1171)] GetChangedRecvParameters called without any video codecs.
* [96685:23:0518/162603.933430:ERROR:sdp_offer_answer.cc(4302)] Failed to set local video description recv parameters for m-section with mid='2'. (INVALID_PARAMETER)
*/
private getRidOfRTXCodecs(): void {
// RTCRtpReceiver.getCapabilities and RTCRtpSender.getCapabilities don't seem to be supported on FF // RTCRtpReceiver.getCapabilities and RTCRtpSender.getCapabilities don't seem to be supported on FF
if (!RTCRtpReceiver.getCapabilities || !RTCRtpSender.getCapabilities) return; if (!RTCRtpReceiver.getCapabilities || !RTCRtpSender.getCapabilities) return;
@ -190,172 +51,14 @@ export class DOMTrackSender extends DOMTrackSenderOrReceiver {
codecs.splice(rtxCodecIndex, 1); codecs.splice(rtxCodecIndex, 1);
} }
} }
if (this.transceiver.sender.track?.kind === "video" ||
this.transceiver.receiver.track?.kind === "video") {
this.transceiver.setCodecPreferences(codecs);
}
}
}
class DOMPeerConnection implements PeerConnection { const transceiver = peerConnection.getTransceivers().find(t => t.sender === sender);
private readonly peerConnection: RTCPeerConnection; if (transceiver && (
private readonly handler: PeerConnectionHandler; transceiver.sender.track?.kind === "video" ||
public readonly localStreams: Map<string, DOMStreamSender> = new Map(); transceiver.receiver.track?.kind === "video"
public readonly remoteStreams: Map<string, DOMStreamReceiver> = new Map(); )
) {
constructor(handler: PeerConnectionHandler, forceTURN: boolean, turnServers: RTCIceServer[], iceCandidatePoolSize) { transceiver.setCodecPreferences(codecs);
this.handler = handler;
this.peerConnection = new RTCPeerConnection({
iceTransportPolicy: forceTURN ? 'relay' : undefined,
iceServers: turnServers,
iceCandidatePoolSize: iceCandidatePoolSize,
});
this.registerHandler();
}
get iceGatheringState(): RTCIceGatheringState { return this.peerConnection.iceGatheringState; }
get localDescription(): RTCSessionDescription | undefined { return this.peerConnection.localDescription ?? undefined; }
get signalingState(): RTCSignalingState { return this.peerConnection.signalingState; }
createOffer(): Promise<RTCSessionDescriptionInit> {
return this.peerConnection.createOffer();
}
createAnswer(): Promise<RTCSessionDescriptionInit> {
return this.peerConnection.createAnswer();
}
setLocalDescription(description?: RTCSessionDescriptionInit): Promise<void> {
return this.peerConnection.setLocalDescription(description);
}
setRemoteDescription(description: RTCSessionDescriptionInit): Promise<void> {
return this.peerConnection.setRemoteDescription(description);
}
addIceCandidate(candidate: RTCIceCandidate): Promise<void> {
return this.peerConnection.addIceCandidate(candidate);
}
close(): void {
return this.peerConnection.close();
}
addTrack(track: Track): DOMTrackSender | undefined {
if (!(track instanceof TrackWrapper)) {
throw new Error("Not a TrackWrapper");
}
const sender = this.peerConnection.addTrack(track.track, track.stream);
let streamSender = this.localStreams.get(track.stream.id);
if (!streamSender) {
// TODO: reuse existing stream wrapper here?
streamSender = new DOMStreamSender(new StreamWrapper(track.stream));
this.localStreams.set(track.stream.id, streamSender);
}
const trackSender = streamSender.update(this.peerConnection.getTransceivers(), sender);
return trackSender;
}
removeTrack(sender: TrackSender): void {
if (!(sender instanceof DOMTrackSender)) {
throw new Error("Not a DOMTrackSender");
}
this.peerConnection.removeTrack((sender as DOMTrackSender).transceiver.sender);
// TODO: update localStreams
}
createDataChannel(options: RTCDataChannelInit): any {
return this.peerConnection.createDataChannel("channel", options);
}
private registerHandler() {
const pc = this.peerConnection;
pc.addEventListener('negotiationneeded', this);
pc.addEventListener('icecandidate', this);
pc.addEventListener('iceconnectionstatechange', this);
pc.addEventListener('icegatheringstatechange', this);
pc.addEventListener('signalingstatechange', this);
pc.addEventListener('track', this);
pc.addEventListener('datachannel', this);
}
private deregisterHandler() {
const pc = this.peerConnection;
pc.removeEventListener('negotiationneeded', this);
pc.removeEventListener('icecandidate', this);
pc.removeEventListener('iceconnectionstatechange', this);
pc.removeEventListener('icegatheringstatechange', this);
pc.removeEventListener('signalingstatechange', this);
pc.removeEventListener('track', this);
pc.removeEventListener('datachannel', this);
}
/** @internal */
handleEvent(evt: Event) {
switch (evt.type) {
case "iceconnectionstatechange":
this.handleIceConnectionStateChange();
break;
case "icecandidate":
this.handleLocalIceCandidate(evt as RTCPeerConnectionIceEvent);
break;
case "icegatheringstatechange":
this.handler.onIceGatheringStateChange(this.peerConnection.iceGatheringState);
break;
case "track":
this.handleRemoteTrack(evt as RTCTrackEvent);
break;
case "negotiationneeded":
this.handler.onNegotiationNeeded();
break;
case "datachannel":
this.handler.onRemoteDataChannel((evt as RTCDataChannelEvent).channel);
break;
}
}
dispose(): void {
this.deregisterHandler();
for (const r of this.remoteStreams.values()) {
r.stream.dispose();
}
}
private handleLocalIceCandidate(event: RTCPeerConnectionIceEvent) {
if (event.candidate) {
this.handler.onLocalIceCandidate(event.candidate);
}
};
private handleIceConnectionStateChange() {
const {iceConnectionState} = this.peerConnection;
if (iceConnectionState === "failed" && this.peerConnection.restartIce) {
this.peerConnection.restartIce();
} else {
this.handler.onIceConnectionStateChange(iceConnectionState);
}
}
onRemoteStreamEmpty = (stream: RemoteStreamWrapper): void => {
if (this.remoteStreams.delete(stream.id)) {
this.handler.onRemoteStreamRemoved(stream);
}
}
private handleRemoteTrack(evt: RTCTrackEvent) {
if (evt.streams.length !== 1) {
throw new Error("track in multiple streams is not supported");
}
const stream = evt.streams[0];
const transceivers = this.peerConnection.getTransceivers();
let streamReceiver: DOMStreamReceiver | undefined = this.remoteStreams.get(stream.id);
if (!streamReceiver) {
streamReceiver = new DOMStreamReceiver(new RemoteStreamWrapper(stream, this.onRemoteStreamEmpty));
this.remoteStreams.set(stream.id, streamReceiver);
}
const trackReceiver = streamReceiver.update(evt);
if (trackReceiver) {
this.handler.onRemoteTracksAdded(trackReceiver);
} }
} }
} }

View file

@ -17,15 +17,15 @@ limitations under the License.
import {TemplateView, TemplateBuilder} from "../../general/TemplateView"; import {TemplateView, TemplateBuilder} from "../../general/TemplateView";
import {ListView} from "../../general/ListView"; import {ListView} from "../../general/ListView";
import {Stream} from "../../../../types/MediaDevices"; import {Stream} from "../../../../types/MediaDevices";
import type {StreamWrapper} from "../../../dom/MediaDevices"; import {getStreamVideoTrack, getStreamAudioTrack} from "../../../../../matrix/calls/common";
import type {CallViewModel, CallMemberViewModel} from "../../../../../domain/session/room/CallViewModel"; import type {CallViewModel, CallMemberViewModel} from "../../../../../domain/session/room/CallViewModel";
function bindStream<T>(t: TemplateBuilder<T>, video: HTMLVideoElement, propSelector: (vm: T) => Stream | undefined) { function bindStream<T>(t: TemplateBuilder<T>, video: HTMLVideoElement, propSelector: (vm: T) => Stream | undefined) {
t.mapSideEffect(vm => propSelector(vm)?.videoTrack?.enabled, (_,__, vm) => { t.mapSideEffect(vm => getStreamVideoTrack(propSelector(vm))?.enabled, (_,__, vm) => {
const stream = propSelector(vm); const stream = propSelector(vm);
if (stream) { if (stream) {
video.srcObject = (stream as StreamWrapper).stream; video.srcObject = stream as MediaStream;
if (stream.videoTrack?.enabled) { if (getStreamVideoTrack(stream)?.enabled) {
video.classList.remove("hidden"); video.classList.remove("hidden");
} else { } else {
video.classList.add("hidden"); video.classList.add("hidden");

View file

@ -1,52 +0,0 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export class AsyncQueue<T, V> {
private isRunning = false;
private queue: T[] = [];
private error?: Error;
constructor(
private readonly reducer: (v: V, t: T) => Promise<V>,
private value: V,
private readonly contains: (t: T, queue: T[]) => boolean = (t, queue) => queue.includes(t)
) {}
push(t: T) {
if (this.contains(t, this.queue)) {
return;
}
this.queue.push(t);
this.runLoopIfNeeded();
}
private async runLoopIfNeeded() {
if (this.isRunning || this.error) {
return;
}
this.isRunning = true;
try {
let item: T | undefined;
while (item = this.queue.shift()) {
this.value = await this.reducer(this.value, item);
}
} catch (err) {
this.error = err;
} finally {
this.isRunning = false;
}
}
}