forked from mystiq/hydrogen-web
WIP8 - implement PeerCall.handleAnswer and other things
This commit is contained in:
parent
25b0148073
commit
ecf7eab3ee
4 changed files with 99 additions and 28 deletions
|
@ -17,6 +17,7 @@ limitations under the License.
|
||||||
import {ObservableMap} from "../../observable/map/ObservableMap";
|
import {ObservableMap} from "../../observable/map/ObservableMap";
|
||||||
import {recursivelyAssign} from "../../utils/recursivelyAssign";
|
import {recursivelyAssign} from "../../utils/recursivelyAssign";
|
||||||
import {AsyncQueue} from "../../utils/AsyncQueue";
|
import {AsyncQueue} from "../../utils/AsyncQueue";
|
||||||
|
import {Disposables, Disposable} from "../../utils/Disposables";
|
||||||
import type {Room} from "../room/Room";
|
import type {Room} from "../room/Room";
|
||||||
import type {StateEvent} from "../storage/types";
|
import type {StateEvent} from "../storage/types";
|
||||||
import type {ILogItem} from "../../logging/types";
|
import type {ILogItem} from "../../logging/types";
|
||||||
|
@ -50,7 +51,8 @@ class PeerCall {
|
||||||
private responsePromiseChain?: Promise<void>;
|
private responsePromiseChain?: Promise<void>;
|
||||||
private opponentPartyId?: PartyId;
|
private opponentPartyId?: PartyId;
|
||||||
private hangupParty: CallParty;
|
private hangupParty: CallParty;
|
||||||
private hangupTimeout?: Timeout;
|
private disposables = new Disposables();
|
||||||
|
private statePromiseMap = new Map<CallState, {resolve: () => void, promise: Promise<void>}>();
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly handler: PeerCallHandler,
|
private readonly handler: PeerCallHandler,
|
||||||
|
@ -144,14 +146,13 @@ class PeerCall {
|
||||||
try {
|
try {
|
||||||
await this.peerConnection.setLocalDescription(myAnswer);
|
await this.peerConnection.setLocalDescription(myAnswer);
|
||||||
this.setState(CallState.Connecting);
|
this.setState(CallState.Connecting);
|
||||||
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.debug(`Call ${this.callId} Error setting local description!`, err);
|
this.logger.debug(`Call ${this.callId} Error setting local description!`, err);
|
||||||
this.terminate(CallParty.Local, CallErrorCode.SetLocalDescription, true);
|
this.terminate(CallParty.Local, CallErrorCode.SetLocalDescription, true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Allow a short time for initial candidates to be gathered
|
// Allow a short time for initial candidates to be gathered
|
||||||
await this.createTimeout(200).elapsed();
|
await this.delay(200);
|
||||||
this.sendAnswer();
|
this.sendAnswer();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,7 +194,7 @@ class PeerCall {
|
||||||
|
|
||||||
if (this.peerConnection.iceGatheringState === 'gathering') {
|
if (this.peerConnection.iceGatheringState === 'gathering') {
|
||||||
// Allow a short time for initial candidates to be gathered
|
// Allow a short time for initial candidates to be gathered
|
||||||
await this.createTimeout(200).elapsed();
|
await this.delay(200);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.state === CallState.Ended) {
|
if (this.state === CallState.Ended) {
|
||||||
|
@ -221,8 +222,7 @@ class PeerCall {
|
||||||
this.sendCandidateQueue();
|
this.sendCandidateQueue();
|
||||||
|
|
||||||
if (this.state === CallState.CreateOffer) {
|
if (this.state === CallState.CreateOffer) {
|
||||||
this.hangupTimeout = this.createTimeout(CALL_TIMEOUT_MS);
|
await this.delay(CALL_TIMEOUT_MS);
|
||||||
await this.hangupTimeout.elapsed();
|
|
||||||
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
|
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
|
||||||
if (this.state === CallState.InviteSent) {
|
if (this.state === CallState.InviteSent) {
|
||||||
this.hangup(CallErrorCode.InviteTimeout);
|
this.hangup(CallErrorCode.InviteTimeout);
|
||||||
|
@ -271,18 +271,55 @@ class PeerCall {
|
||||||
|
|
||||||
this.setState(CallState.Ringing);
|
this.setState(CallState.Ringing);
|
||||||
|
|
||||||
setTimeout(() => {
|
await this.delay(content.lifetime ?? CALL_TIMEOUT_MS);
|
||||||
if (this.state == CallState.Ringing) {
|
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
|
||||||
this.logger.debug(`Call ${this.callId} invite has expired. Hanging up.`);
|
if (this.state === CallState.Ringing) {
|
||||||
this.hangupParty = CallParty.Remote; // effectively
|
this.logger.debug(`Call ${this.callId} invite has expired. Hanging up.`);
|
||||||
this.setState(CallState.Ended);
|
this.hangupParty = CallParty.Remote; // effectively
|
||||||
this.stopAllMedia();
|
this.setState(CallState.Ended);
|
||||||
if (this.peerConnection.signalingState != 'closed') {
|
this.stopAllMedia();
|
||||||
this.peerConnection.close();
|
if (this.peerConnection.signalingState != 'closed') {
|
||||||
}
|
this.peerConnection.close();
|
||||||
this.emit(CallEvent.Hangup);
|
|
||||||
}
|
}
|
||||||
}, content.lifetime ?? CALL_TIMEOUT_MS /* - event.getLocalAge() */ );
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async handleAnswer(content: AnwserContent, partyId: PartyId): Promise<void> {
|
||||||
|
this.logger.debug(`Got answer for call ID ${this.callId} from party ID ${content.party_id}`);
|
||||||
|
|
||||||
|
if (this.state === CallState.Ended) {
|
||||||
|
this.logger.debug(`Ignoring answer because call ID ${this.callId} has ended`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.opponentPartyId !== undefined) {
|
||||||
|
this.logger.info(
|
||||||
|
`Call ${this.callId} ` +
|
||||||
|
`Ignoring answer from party ID ${content.party_id}: ` +
|
||||||
|
`we already have an answer/reject from ${this.opponentPartyId}`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.opponentPartyId = partyId;
|
||||||
|
await this.addBufferedIceCandidates();
|
||||||
|
|
||||||
|
this.setState(CallState.Connecting);
|
||||||
|
|
||||||
|
const sdpStreamMetadata = content[SDPStreamMetadataKey];
|
||||||
|
if (sdpStreamMetadata) {
|
||||||
|
this.updateRemoteSDPStreamMetadata(sdpStreamMetadata);
|
||||||
|
} else {
|
||||||
|
this.logger.warn(`Call ${this.callId} Did not get any SDPStreamMetadata! Can not send/receive multiple streams`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this.peerConnection.setRemoteDescription(content.answer);
|
||||||
|
} catch (e) {
|
||||||
|
this.logger.debug(`Call ${this.callId} Failed to set remote description`, e);
|
||||||
|
this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async sendAnswer(): Promise<void> {
|
private async sendAnswer(): Promise<void> {
|
||||||
|
@ -333,8 +370,7 @@ class PeerCall {
|
||||||
|
|
||||||
// MSC2746 recommends these values (can be quite long when calling because the
|
// MSC2746 recommends these values (can be quite long when calling because the
|
||||||
// callee will need a while to answer the call)
|
// callee will need a while to answer the call)
|
||||||
const delay = this.direction === CallDirection.Inbound ? 500 : 2000;
|
this.delay(this.direction === CallDirection.Inbound ? 500 : 2000).then(() => {
|
||||||
this.createTimeout(delay).elapsed().then(() => {
|
|
||||||
this.sendCandidateQueue();
|
this.sendCandidateQueue();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -384,13 +420,15 @@ class PeerCall {
|
||||||
|
|
||||||
|
|
||||||
private async addBufferedIceCandidates(): Promise<void> {
|
private async addBufferedIceCandidates(): Promise<void> {
|
||||||
const bufferedCandidates = this.remoteCandidateBuffer!.get(this.opponentPartyId!);
|
if (this.remoteCandidateBuffer && this.opponentPartyId) {
|
||||||
if (bufferedCandidates) {
|
const bufferedCandidates = this.remoteCandidateBuffer.get(this.opponentPartyId);
|
||||||
this.logger.info(`Call ${this.callId} Adding ${
|
if (bufferedCandidates) {
|
||||||
bufferedCandidates.length} buffered candidates for opponent ${this.opponentPartyId}`);
|
this.logger.info(`Call ${this.callId} Adding ${
|
||||||
await this.addIceCandidates(bufferedCandidates);
|
bufferedCandidates.length} buffered candidates for opponent ${this.opponentPartyId}`);
|
||||||
|
await this.addIceCandidates(bufferedCandidates);
|
||||||
|
}
|
||||||
|
this.remoteCandidateBuffer = undefined;
|
||||||
}
|
}
|
||||||
this.remoteCandidateBuffer = undefined;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async addIceCandidates(candidates: RTCIceCandidate[]): Promise<void> {
|
private async addIceCandidates(candidates: RTCIceCandidate[]): Promise<void> {
|
||||||
|
@ -417,11 +455,25 @@ class PeerCall {
|
||||||
private setState(state: CallState): void {
|
private setState(state: CallState): void {
|
||||||
const oldState = this.state;
|
const oldState = this.state;
|
||||||
this.state = state;
|
this.state = state;
|
||||||
this.handler.emitUpdate();
|
let deferred = this.statePromiseMap.get(state);
|
||||||
|
if (deferred) {
|
||||||
|
deferred.resolve();
|
||||||
|
this.statePromiseMap.delete(state);
|
||||||
|
}
|
||||||
|
this.handler.emitUpdate(this, undefined);
|
||||||
}
|
}
|
||||||
|
|
||||||
private waitForState(state: CallState): Promise<void> {
|
private waitForState(state: CallState): Promise<void> {
|
||||||
|
let deferred = this.statePromiseMap.get(state);
|
||||||
|
if (!deferred) {
|
||||||
|
let resolve;
|
||||||
|
const promise = new Promise<void>(r => {
|
||||||
|
resolve = r;
|
||||||
|
});
|
||||||
|
deferred = {resolve, promise};
|
||||||
|
this.statePromiseMap.set(state, deferred);
|
||||||
|
}
|
||||||
|
return deferred.promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async terminate(hangupParty: CallParty, hangupReason: CallErrorCode, shouldEmit: boolean): Promise<void> {
|
private async terminate(hangupParty: CallParty, hangupReason: CallErrorCode, shouldEmit: boolean): Promise<void> {
|
||||||
|
@ -433,6 +485,18 @@ class PeerCall {
|
||||||
track.stop();
|
track.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async delay(timeoutMs: number): Promise<void> {
|
||||||
|
// Allow a short time for initial candidates to be gathered
|
||||||
|
const timeout = this.disposables.track(this.createTimeout(timeoutMs));
|
||||||
|
await timeout.elapsed();
|
||||||
|
this.disposables.untrack(timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
public dispose(): void {
|
||||||
|
this.disposables.dispose();
|
||||||
|
// TODO: dispose peerConnection?
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -105,7 +105,7 @@ Expose call objects
|
||||||
Write view model
|
Write view model
|
||||||
write view
|
write view
|
||||||
|
|
||||||
## Calls questions\
|
## Calls questions
|
||||||
- how do we handle glare between group calls (e.g. different state events with different call ids?)
|
- how do we handle glare between group calls (e.g. different state events with different call ids?)
|
||||||
- Split up DOM part into platform code? What abstractions to choose?
|
- Split up DOM part into platform code? What abstractions to choose?
|
||||||
Does it make sense to come up with our own API very similar to DOM api?
|
Does it make sense to come up with our own API very similar to DOM api?
|
||||||
|
|
|
@ -42,6 +42,7 @@ export interface PeerConnection {
|
||||||
get remoteTracks(): Track[];
|
get remoteTracks(): Track[];
|
||||||
get dataChannel(): DataChannel | undefined;
|
get dataChannel(): DataChannel | undefined;
|
||||||
get iceGatheringState(): RTCIceGatheringState;
|
get iceGatheringState(): RTCIceGatheringState;
|
||||||
|
get signalingState(): RTCSignalingState;
|
||||||
get localDescription(): RTCSessionDescription | undefined;
|
get localDescription(): RTCSessionDescription | undefined;
|
||||||
createOffer(): Promise<RTCSessionDescriptionInit>;
|
createOffer(): Promise<RTCSessionDescriptionInit>;
|
||||||
createAnswer(): Promise<RTCSessionDescriptionInit>;
|
createAnswer(): Promise<RTCSessionDescriptionInit>;
|
||||||
|
@ -53,4 +54,5 @@ export interface PeerConnection {
|
||||||
replaceTrack(oldTrack: Track, newTrack: Track): Promise<boolean>;
|
replaceTrack(oldTrack: Track, newTrack: Track): Promise<boolean>;
|
||||||
createDataChannel(): DataChannel;
|
createDataChannel(): DataChannel;
|
||||||
dispose(): void;
|
dispose(): void;
|
||||||
|
close(): void;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ class DOMPeerConnection implements PeerConnection {
|
||||||
get dataChannel(): DataChannel | undefined { return undefined; }
|
get dataChannel(): DataChannel | undefined { return undefined; }
|
||||||
get iceGatheringState(): RTCIceGatheringState { return this.peerConnection.iceGatheringState; }
|
get iceGatheringState(): RTCIceGatheringState { return this.peerConnection.iceGatheringState; }
|
||||||
get localDescription(): RTCSessionDescription | undefined { return this.peerConnection.localDescription ?? undefined; }
|
get localDescription(): RTCSessionDescription | undefined { return this.peerConnection.localDescription ?? undefined; }
|
||||||
|
get signalingState(): RTCSignalingState { return this.peerConnection.signalingState; }
|
||||||
|
|
||||||
createOffer(): Promise<RTCSessionDescriptionInit> {
|
createOffer(): Promise<RTCSessionDescriptionInit> {
|
||||||
return this.peerConnection.createOffer();
|
return this.peerConnection.createOffer();
|
||||||
|
@ -64,6 +65,10 @@ class DOMPeerConnection implements PeerConnection {
|
||||||
return this.peerConnection.addIceCandidate(candidate);
|
return this.peerConnection.addIceCandidate(candidate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close(): void {
|
||||||
|
return this.peerConnection.close();
|
||||||
|
}
|
||||||
|
|
||||||
addTrack(track: Track): void {
|
addTrack(track: Track): void {
|
||||||
if (!(track instanceof TrackWrapper)) {
|
if (!(track instanceof TrackWrapper)) {
|
||||||
throw new Error("Not a TrackWrapper");
|
throw new Error("Not a TrackWrapper");
|
||||||
|
|
Loading…
Reference in a new issue