add structured logging to call code

This commit is contained in:
Bruno Windels 2022-03-25 14:43:02 +01:00
parent a0a07355d4
commit eaf92b382b
10 changed files with 478 additions and 351 deletions

View file

@ -36,6 +36,15 @@ export abstract class BaseLogger implements ILogger {
this._persistItem(item, undefined, false);
}
/** Prefer `run()` or `log()` above this method; only use it if you have a long-running operation
* *without* a single call stack that should be logged into one sub-tree.
* You need to call `finish()` on the returned item or it will stay open until the app unloads. */
child(labelOrValues: LabelOrValues, logLevel: LogLevel = LogLevel.Info, filterCreator?: FilterCreator): ILogItem {
const item = new DeferredPersistRootLogItem(labelOrValues, logLevel, this, filterCreator);
this._openItems.add(item);
return item;
}
/** if item is a log item, wrap the callback in a child of it, otherwise start a new root log item. */
wrapOrRun<T>(item: ILogItem | undefined, labelOrValues: LabelOrValues, callback: LogCallback<T>, logLevel?: LogLevel, filterCreator?: FilterCreator): T {
if (item) {
@ -127,7 +136,7 @@ export abstract class BaseLogger implements ILogger {
_finishOpenItems() {
for (const openItem of this._openItems) {
openItem.finish();
openItem.forceFinish();
try {
// for now, serialize with an all-permitting filter
// as the createFilter function would get a distorted image anyway
@ -158,3 +167,15 @@ export abstract class BaseLogger implements ILogger {
return Math.round(this._platform.random() * Number.MAX_SAFE_INTEGER);
}
}
class DeferredPersistRootLogItem extends LogItem {
finish() {
super.finish();
(this._logger as BaseLogger)._persistItem(this, undefined, false);
}
forceFinish() {
super.finish();
/// no need to persist when force-finishing as _finishOpenItems above will do it
}
}

View file

@ -25,7 +25,7 @@ export class LogItem implements ILogItem {
public error?: Error;
public end?: number;
private _values: LogItemValues;
private _logger: BaseLogger;
protected _logger: BaseLogger;
private _filterCreator?: FilterCreator;
private _children?: Array<LogItem>;
@ -221,6 +221,11 @@ export class LogItem implements ILogItem {
}
}
/** @internal */
forceFinish(): void {
this.finish();
}
// expose log level without needing import everywhere
get level(): typeof LogLevel {
return LogLevel;
@ -235,7 +240,7 @@ export class LogItem implements ILogItem {
child(labelOrValues: LabelOrValues, logLevel?: LogLevel, filterCreator?: FilterCreator): LogItem {
if (this.end) {
console.trace("log item is finished, additional logs will likely not be recorded");
console.trace(`log item ${this.values.l} finished, additional log ${JSON.stringify(labelOrValues)} will likely not be recorded`);
}
if (!logLevel) {
logLevel = this.logLevel || LogLevel.Info;

View file

@ -23,6 +23,10 @@ export class NullLogger implements ILogger {
log(): void {}
child(): ILogItem {
return this.item;
}
run<T>(_, callback: LogCallback<T>): T {
return callback(this.item);
}
@ -50,13 +54,13 @@ export class NullLogger implements ILogger {
}
export class NullLogItem implements ILogItem {
public readonly logger: NullLogger;
public readonly logger: ILogger;
public readonly logLevel: LogLevel;
public children?: Array<ILogItem>;
public values: LogItemValues;
public error?: Error;
constructor(logger: NullLogger) {
constructor(logger: ILogger) {
this.logger = logger;
}
@ -99,6 +103,7 @@ export class NullLogItem implements ILogItem {
}
finish(): void {}
forceFinish(): void {}
serialize(): undefined {
return undefined;

View file

@ -51,11 +51,24 @@ export interface ILogItem {
catch(err: Error): Error;
serialize(filter: LogFilter, parentStartTime: number | undefined, forced: boolean): ISerializedItem | undefined;
finish(): void;
forceFinish(): void;
child(labelOrValues: LabelOrValues, logLevel?: LogLevel, filterCreator?: FilterCreator): ILogItem;
}
/*
extend both ILogger and ILogItem from this interface, but need to rename ILogger.run => wrap then. Or both to `span`?
export interface ILogItemCreator {
child(labelOrValues: LabelOrValues, logLevel?: LogLevel, filterCreator?: FilterCreator): ILogItem;
refDetached(logItem: ILogItem, logLevel?: LogLevel): void;
log(labelOrValues: LabelOrValues, logLevel?: LogLevel): ILogItem;
wrap<T>(labelOrValues: LabelOrValues, callback: LogCallback<T>, logLevel?: LogLevel, filterCreator?: FilterCreator): T;
get level(): typeof LogLevel;
}
*/
export interface ILogger {
log(labelOrValues: LabelOrValues, logLevel?: LogLevel): void;
child(labelOrValues: LabelOrValues, logLevel?: LogLevel, filterCreator?: FilterCreator): ILogItem;
wrapOrRun<T>(item: ILogItem | undefined, labelOrValues: LabelOrValues, callback: LogCallback<T>, logLevel?: LogLevel, filterCreator?: FilterCreator): T;
runDetached<T>(labelOrValues: LabelOrValues, callback: LogCallback<T>, logLevel?: LogLevel, filterCreator?: FilterCreator): ILogItem;
run<T>(labelOrValues: LabelOrValues, callback: LogCallback<T>, logLevel?: LogLevel, filterCreator?: FilterCreator): T;

View file

@ -86,7 +86,6 @@ export class DeviceMessageHandler {
this._senderDeviceCache.set(device);
}
}
console.log("incoming device message", senderKey, device, this._senderDeviceCache);
return device;
}
}

View file

@ -84,8 +84,10 @@ export class Session {
}
// TODO: just get the devices we're sending the message to, not all the room devices
// although we probably already fetched all devices to send messages in the likely e2ee room
const devices = await log.wrap("get device keys", async log => {
await this._deviceTracker.trackRoom(this.rooms.get(roomId), log);
const devices = await this._deviceTracker.devicesForRoomMembers(roomId, [userId], this._hsApi, log);
return this._deviceTracker.devicesForRoomMembers(roomId, [userId], this._hsApi, log);
});
const encryptedMessage = await this._olmEncryption.encrypt(message.type, message.content, devices, this._hsApi, log);
return encryptedMessage;
},
@ -93,6 +95,7 @@ export class Session {
webRTC: this._platform.webRTC,
ownDeviceId: sessionInfo.deviceId,
ownUserId: sessionInfo.userId,
logger: this._platform.logger,
});
this._deviceMessageHandler = new DeviceMessageHandler({storage, callHandler: this._callHandler});
this._olm = olm;

View file

@ -25,7 +25,7 @@ import type {LocalMedia} from "./LocalMedia";
import type {Room} from "../room/Room";
import type {MemberChange} from "../room/members/RoomMember";
import type {StateEvent} from "../storage/types";
import type {ILogItem} from "../../logging/types";
import type {ILogItem, ILogger} from "../../logging/types";
import type {Platform} from "../../platform/web/Platform";
import type {BaseObservableMap} from "../../observable/map/BaseObservableMap";
import type {SignallingMessage, MGroupCallBase} from "./callEventTypes";
@ -35,7 +35,9 @@ const GROUP_CALL_TYPE = "m.call";
const GROUP_CALL_MEMBER_TYPE = "m.call.member";
const CALL_TERMINATED = "m.terminated";
export type Options = Omit<GroupCallOptions, "emitUpdate">;
export type Options = Omit<GroupCallOptions, "emitUpdate"> & {
logger: ILogger
};
export class CallHandler {
// group calls by call id
@ -51,7 +53,8 @@ export class CallHandler {
}
async createCall(roomId: string, localMedia: LocalMedia, name: string): Promise<GroupCall> {
const call = new GroupCall(undefined, undefined, roomId, this.groupCallOptions);
const logItem = this.options.logger.child({l: "call", incoming: false});
const call = new GroupCall(undefined, undefined, roomId, this.groupCallOptions, logItem);
console.log("created call with id", call.id);
this._calls.set(call.id, call);
try {
@ -59,6 +62,7 @@ export class CallHandler {
} catch (err) {
if (err.name === "ConnectionError") {
// if we're offline, give up and remove the call again
call.dispose();
this._calls.remove(call.id);
}
throw err;
@ -79,13 +83,13 @@ export class CallHandler {
// first update call events
for (const event of events) {
if (event.type === EventType.GroupCall) {
this.handleCallEvent(event, room.id);
this.handleCallEvent(event, room.id, log);
}
}
// then update members
for (const event of events) {
if (event.type === EventType.GroupCallMember) {
this.handleCallMemberEvent(event);
this.handleCallMemberEvent(event, log);
}
}
}
@ -108,28 +112,30 @@ export class CallHandler {
call?.handleDeviceMessage(message, userId, deviceId, log);
}
private handleCallEvent(event: StateEvent, roomId: string) {
private handleCallEvent(event: StateEvent, roomId: string, log: ILogItem) {
const callId = event.state_key;
let call = this._calls.get(callId);
if (call) {
call.updateCallEvent(event.content);
call.updateCallEvent(event.content, log);
if (call.isTerminated) {
call.dispose();
this._calls.remove(call.id);
}
} else {
call = new GroupCall(event.state_key, event.content, roomId, this.groupCallOptions);
const logItem = this.options.logger.child({l: "call", incoming: true});
call = new GroupCall(event.state_key, event.content, roomId, this.groupCallOptions, logItem);
this._calls.set(call.id, call);
}
}
private handleCallMemberEvent(event: StateEvent) {
private handleCallMemberEvent(event: StateEvent, log: ILogItem) {
const userId = event.state_key;
const calls = event.content["m.calls"] ?? [];
for (const call of calls) {
const callId = call["m.call_id"];
const groupCall = this._calls.get(callId);
// TODO: also check the member when receiving the m.call event
groupCall?.addMember(userId, call);
groupCall?.addMember(userId, call, log);
};
const newCallIdsMemberOf = new Set<string>(calls.map(call => call["m.call_id"]));
let previousCallIdsMemberOf = this.memberToCallIds.get(userId);
@ -138,7 +144,7 @@ export class CallHandler {
for (const previousCallId of previousCallIdsMemberOf) {
if (!newCallIdsMemberOf.has(previousCallId)) {
const groupCall = this._calls.get(previousCallId);
groupCall?.removeMember(userId);
groupCall?.removeMember(userId, log);
}
}
}

View file

@ -21,7 +21,6 @@ import {Disposables, IDisposable} from "../../utils/Disposables";
import type {Room} from "../room/Room";
import type {StateEvent} from "../storage/types";
import type {ILogItem} from "../../logging/types";
import {Instance as logger} from "../../logging/NullLogger";
import type {TimeoutCreator, Timeout} from "../../platform/types/types";
import {WebRTC, PeerConnection, PeerConnectionHandler, DataChannel} from "../../platform/types/WebRTC";
@ -69,9 +68,8 @@ export class PeerCall implements IDisposable {
// If candidates arrive before we've picked an opponent (which, in particular,
// will happen if the opponent sends candidates eagerly before the user answers
// the call) we buffer them up here so we can then add the ones from the party we pick
private remoteCandidateBuffer? = new Map<string, RTCIceCandidate[]>();
private remoteCandidateBuffer? = new Map<PartyId, RTCIceCandidate[]>();
private logger: any;
private remoteSDPStreamMetadata?: SDPStreamMetadata;
private responsePromiseChain?: Promise<void>;
private opponentPartyId?: PartyId;
@ -88,38 +86,44 @@ export class PeerCall implements IDisposable {
constructor(
private callId: string,
private readonly options: Options
private readonly options: Options,
private readonly logItem: ILogItem,
) {
const outer = this;
this.peerConnection = options.webRTC.createPeerConnection({
onIceConnectionStateChange(state: RTCIceConnectionState) {
outer.onIceConnectionStateChange(state);
outer.logItem.wrap({l: "onIceConnectionStateChange", status: state}, log => {
outer.onIceConnectionStateChange(state, log);
});
},
onLocalIceCandidate(candidate: RTCIceCandidate) {
outer.handleLocalIceCandidate(candidate);
outer.logItem.wrap("onLocalIceCandidate", log => {
outer.handleLocalIceCandidate(candidate, log);
});
},
onIceGatheringStateChange(state: RTCIceGatheringState) {
outer.handleIceGatheringState(state);
outer.logItem.wrap({l: "onIceGatheringStateChange", status: state}, log => {
outer.handleIceGatheringState(state, log);
});
},
onRemoteTracksChanged(tracks: Track[]) {
outer.logItem.wrap("onRemoteTracksChanged", log => {
outer.options.emitUpdate(outer, undefined);
});
},
onDataChannelChanged(dataChannel: DataChannel | undefined) {},
onNegotiationNeeded() {
const promiseCreator = () => outer.handleNegotiation();
const log = outer.logItem.child("onNegotiationNeeded");
const promiseCreator = async () => {
await outer.handleNegotiation(log);
log.finish();
};
outer.responsePromiseChain = outer.responsePromiseChain?.then(promiseCreator) ?? promiseCreator();
},
getPurposeForStreamId(streamId: string): SDPStreamMetadataPurpose {
return outer.remoteSDPStreamMetadata?.[streamId]?.purpose ?? SDPStreamMetadataPurpose.Usermedia;
}
});
this.logger = {
info(...args) { console.info.apply(console, ["WebRTC debug:", ...args])},
debug(...args) { console.log.apply(console, ["WebRTC debug:", ...args])},
log(...args) { console.log.apply(console, ["WebRTC log:", ...args])},
warn(...args) { console.log.apply(console, ["WebRTC warn:", ...args])},
error(...args) { console.error.apply(console, ["WebRTC error:", ...args])},
};
}
get state(): CallState { return this._state; }
@ -128,7 +132,8 @@ export class PeerCall implements IDisposable {
return this.peerConnection.remoteTracks;
}
async call(localMedia: LocalMedia): Promise<void> {
call(localMedia: LocalMedia): Promise<void> {
return this.logItem.wrap("call", async log => {
if (this._state !== CallState.Fledgling) {
return;
}
@ -141,9 +146,11 @@ export class PeerCall implements IDisposable {
// after adding the local tracks, and wait for handleNegotiation to be called,
// or invite glare where we give up our invite and answer instead
await this.waitForState([CallState.InviteSent, CallState.CreateAnswer]);
});
}
async answer(localMedia: LocalMedia): Promise<void> {
answer(localMedia: LocalMedia): Promise<void> {
return this.logItem.wrap("answer", async log => {
if (this._state !== CallState.Ringing) {
return;
}
@ -157,8 +164,10 @@ export class PeerCall implements IDisposable {
try {
myAnswer = await this.peerConnection.createAnswer();
} catch (err) {
this.logger.debug(`Call ${this.callId} Failed to create answer: `, err);
this.terminate(CallParty.Local, CallErrorCode.CreateAnswer, true);
await log.wrap(`Failed to create answer`, log => {
log.catch(err);
this.terminate(CallParty.Local, CallErrorCode.CreateAnswer, true, log);
});
return;
}
@ -166,16 +175,21 @@ export class PeerCall implements IDisposable {
await this.peerConnection.setLocalDescription(myAnswer);
this.setState(CallState.Connecting);
} catch (err) {
this.logger.debug(`Call ${this.callId} Error setting local description!`, err);
this.terminate(CallParty.Local, CallErrorCode.SetLocalDescription, true);
await log.wrap(`Error setting local description!`, log => {
log.catch(err);
this.terminate(CallParty.Local, CallErrorCode.SetLocalDescription, true, log);
});
return;
}
// Allow a short time for initial candidates to be gathered
await this.delay(200);
await this.sendAnswer();
try { await this.delay(200); }
catch (err) { return; }
await this.sendAnswer(log);
});
}
async setMedia(localMediaPromise: Promise<LocalMedia>) {
setMedia(localMediaPromise: Promise<LocalMedia>): Promise<void> {
return this.logItem.wrap("setMedia", async log => {
const oldMedia = this.localMedia;
this.localMedia = await localMediaPromise;
@ -195,41 +209,50 @@ export class PeerCall implements IDisposable {
applyTrack(m => m?.microphoneTrack);
applyTrack(m => m?.cameraTrack);
applyTrack(m => m?.screenShareTrack);
});
}
async reject() {
}
async hangup(errorCode: CallErrorCode): Promise<void> {
hangup(errorCode: CallErrorCode): Promise<void> {
return this.logItem.wrap("hangup", log => {
return this._hangup(errorCode, log);
});
}
private async _hangup(errorCode: CallErrorCode, log: ILogItem): Promise<void> {
if (this._state !== CallState.Ended) {
this._state = CallState.Ended;
await this.sendHangupWithCallId(this.callId, errorCode);
await this.sendHangupWithCallId(this.callId, errorCode, log);
}
}
async handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId, log: ILogItem): Promise<void> {
handleIncomingSignallingMessage<B extends MCallBase>(message: SignallingMessage<B>, partyId: PartyId): Promise<void> {
return this.logItem.wrap({l: "receive", id: message.type, partyId}, async log => {
switch (message.type) {
case EventType.Invite:
if (this.callId !== message.content.call_id) {
await this.handleInviteGlare(message.content, partyId);
await this.handleInviteGlare(message.content, partyId, log);
} else {
await this.handleFirstInvite(message.content, partyId);
await this.handleFirstInvite(message.content, partyId, log);
}
break;
case EventType.Answer:
await this.handleAnswer(message.content, partyId);
await this.handleAnswer(message.content, partyId, log);
break;
case EventType.Candidates:
await this.handleRemoteIceCandidates(message.content, partyId);
await this.handleRemoteIceCandidates(message.content, partyId, log);
break;
case EventType.Hangup:
default:
throw new Error(`Unknown event type for call: ${message.type}`);
}
});
}
private sendHangupWithCallId(callId: string, reason?: CallErrorCode): Promise<void> {
private sendHangupWithCallId(callId: string, reason: CallErrorCode | undefined, log: ILogItem): Promise<void> {
const content = {
call_id: callId,
version: 1,
@ -237,27 +260,28 @@ export class PeerCall implements IDisposable {
if (reason) {
content["reason"] = reason;
}
return this.options.sendSignallingMessage({
return this.sendSignallingMessage({
type: EventType.Hangup,
content
}, logger.item);
}, log);
}
// calls are serialized and deduplicated by responsePromiseChain
private handleNegotiation = async (): Promise<void> => {
private handleNegotiation = async (log: ILogItem): Promise<void> => {
this.makingOffer = true;
try {
try {
await this.peerConnection.setLocalDescription();
} catch (err) {
this.logger.debug(`Call ${this.callId} Error setting local description!`, err);
this.terminate(CallParty.Local, CallErrorCode.SetLocalDescription, true);
log.log(`Error setting local description!`).catch(err);
this.terminate(CallParty.Local, CallErrorCode.SetLocalDescription, true, log);
return;
}
if (this.peerConnection.iceGatheringState === 'gathering') {
// Allow a short time for initial candidates to be gathered
await this.delay(200);
try { await this.delay(200); }
catch (err) { return; }
}
if (this._state === CallState.Ended) {
@ -267,7 +291,7 @@ export class PeerCall implements IDisposable {
const offer = this.peerConnection.localDescription!;
// Get rid of any candidates waiting to be sent: they'll be included in the local
// description we just got and will send in the offer.
this.logger.info(`Call ${this.callId} Discarding ${
log.log(`Discarding ${
this.candidateSendQueue.length} candidates that will be sent in offer`);
this.candidateSendQueue = [];
@ -280,63 +304,64 @@ export class PeerCall implements IDisposable {
lifetime: CALL_TIMEOUT_MS
};
if (this._state === CallState.CreateOffer) {
await this.options.sendSignallingMessage({type: EventType.Invite, content}, logger.item);
await this.sendSignallingMessage({type: EventType.Invite, content}, log);
this.setState(CallState.InviteSent);
} else if (this._state === CallState.Connected || this._state === CallState.Connecting) {
// send Negotiate message
//await this.options.sendSignallingMessage({type: EventType.Invite, content});
//await this.sendSignallingMessage({type: EventType.Invite, content});
//this.setState(CallState.InviteSent);
}
} finally {
this.makingOffer = false;
}
this.sendCandidateQueue();
this.sendCandidateQueue(log);
if (this._state === CallState.InviteSent) {
await this.delay(CALL_TIMEOUT_MS);
try { await this.delay(CALL_TIMEOUT_MS); }
catch (err) { return; }
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
if (this._state === CallState.InviteSent) {
this.hangup(CallErrorCode.InviteTimeout);
this._hangup(CallErrorCode.InviteTimeout, log);
}
}
};
private async handleInviteGlare(content: MCallInvite<MCallBase>, partyId: PartyId): Promise<void> {
private async handleInviteGlare(content: MCallInvite<MCallBase>, partyId: PartyId, log: ILogItem): Promise<void> {
// this is only called when the ids are different
const newCallId = content.call_id;
if (this.callId! > newCallId) {
this.logger.log(
log.log(
"Glare detected: answering incoming call " + newCallId +
" and canceling outgoing call " + this.callId,
" and canceling outgoing call ",
);
// How do we interrupt `call()`? well, perhaps we need to not just await InviteSent but also CreateAnswer?
if (this._state === CallState.Fledgling || this._state === CallState.CreateOffer) {
// TODO: don't send invite!
} else {
await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced);
await this.sendHangupWithCallId(this.callId, CallErrorCode.Replaced, log);
}
await this.handleInvite(content, partyId);
await this.handleInvite(content, partyId, log);
// TODO: need to skip state check
await this.answer(this.localMedia!);
} else {
this.logger.log(
log.log(
"Glare detected: rejecting incoming call " + newCallId +
" and keeping outgoing call " + this.callId,
" and keeping outgoing call ",
);
await this.sendHangupWithCallId(newCallId, CallErrorCode.Replaced);
await this.sendHangupWithCallId(newCallId, CallErrorCode.Replaced, log);
}
}
private async handleFirstInvite(content: MCallInvite<MCallBase>, partyId: PartyId): Promise<void> {
private async handleFirstInvite(content: MCallInvite<MCallBase>, partyId: PartyId, log: ILogItem): Promise<void> {
if (this._state !== CallState.Fledgling || this.opponentPartyId !== undefined) {
// TODO: hangup or ignore?
return;
}
await this.handleInvite(content, partyId);
await this.handleInvite(content, partyId, log);
}
private async handleInvite(content: MCallInvite<MCallBase>, partyId: PartyId): Promise<void> {
private async handleInvite(content: MCallInvite<MCallBase>, partyId: PartyId, log: ILogItem): Promise<void> {
// we must set the party ID before await-ing on anything: the call event
// handler will start giving us more call events (eg. candidates) so if
@ -348,17 +373,18 @@ export class PeerCall implements IDisposable {
if (sdpStreamMetadata) {
this.updateRemoteSDPStreamMetadata(sdpStreamMetadata);
} else {
this.logger.debug(`Call ${
this.callId} did not get any SDPStreamMetadata! Can not send/receive multiple streams`);
log.log(`Call did not get any SDPStreamMetadata! Can not send/receive multiple streams`);
}
try {
// Q: Why do we set the remote description before accepting the call? To start creating ICE candidates?
await this.peerConnection.setRemoteDescription(content.offer);
await this.addBufferedIceCandidates();
await this.addBufferedIceCandidates(log);
} catch (e) {
this.logger.debug(`Call ${this.callId} failed to set remote description`, e);
this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false);
await log.wrap(`Call failed to set remote description`, async log => {
log.catch(e);
return this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false, log);
});
return;
}
@ -366,17 +392,19 @@ export class PeerCall implements IDisposable {
// add streams until media started arriving on them. Testing latest firefox
// (81 at time of writing), this is no longer a problem, so let's do it the correct way.
if (this.peerConnection.remoteTracks.length === 0) {
this.logger.error(`Call ${this.callId} no remote stream or no tracks after setting remote description!`);
this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false);
await log.wrap(`Call no remote stream or no tracks after setting remote description!`, async log => {
return this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false, log);
});
return;
}
this.setState(CallState.Ringing);
await this.delay(content.lifetime ?? CALL_TIMEOUT_MS);
try { await this.delay(content.lifetime ?? CALL_TIMEOUT_MS); }
catch (err) { return; }
// @ts-ignore TS doesn't take the await above into account to know that the state could have changed in between
if (this._state === CallState.Ringing) {
this.logger.debug(`Call ${this.callId} invite has expired. Hanging up.`);
log.log(`Invite has expired. Hanging up.`);
this.hangupParty = CallParty.Remote; // effectively
this.setState(CallState.Ended);
this.stopAllMedia();
@ -386,25 +414,19 @@ export class PeerCall implements IDisposable {
}
}
private async handleAnswer(content: MCallAnswer<MCallBase>, partyId: PartyId): Promise<void> {
this.logger.debug(`Got answer for call ID ${this.callId} from party ID ${partyId}`);
private async handleAnswer(content: MCallAnswer<MCallBase>, partyId: PartyId, log: ILogItem): Promise<void> {
if (this._state === CallState.Ended) {
this.logger.debug(`Ignoring answer because call ID ${this.callId} has ended`);
log.log(`Ignoring answer because call has ended`);
return;
}
if (this.opponentPartyId !== undefined) {
this.logger.info(
`Call ${this.callId} ` +
`Ignoring answer from party ID ${partyId}: ` +
`we already have an answer/reject from ${this.opponentPartyId}`,
);
log.log(`Ignoring answer: we already have an answer/reject from ${this.opponentPartyId}`);
return;
}
this.opponentPartyId = partyId;
await this.addBufferedIceCandidates();
await this.addBufferedIceCandidates(log);
this.setState(CallState.Connecting);
@ -412,20 +434,22 @@ export class PeerCall implements IDisposable {
if (sdpStreamMetadata) {
this.updateRemoteSDPStreamMetadata(sdpStreamMetadata);
} else {
this.logger.warn(`Call ${this.callId} Did not get any SDPStreamMetadata! Can not send/receive multiple streams`);
log.log(`Did not get any SDPStreamMetadata! Can not send/receive multiple streams`);
}
try {
await this.peerConnection.setRemoteDescription(content.answer);
} catch (e) {
this.logger.debug(`Call ${this.callId} Failed to set remote description`, e);
this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false);
await log.wrap(`Failed to set remote description`, log => {
log.catch(e);
this.terminate(CallParty.Local, CallErrorCode.SetRemoteDescription, false, log);
});
return;
}
}
private handleIceGatheringState(state: RTCIceGatheringState) {
this.logger.debug(`Call ${this.callId} ice gathering state changed to ${state}`);
private handleIceGatheringState(state: RTCIceGatheringState, log: ILogItem) {
log.set("state", state);
if (state === 'complete' && !this.sentEndOfCandidates) {
// If we didn't get an empty-string candidate to signal the end of candidates,
// create one ourselves now gathering has finished.
@ -437,37 +461,37 @@ export class PeerCall implements IDisposable {
const c = {
candidate: '',
} as RTCIceCandidate;
this.queueCandidate(c);
this.queueCandidate(c, log);
this.sentEndOfCandidates = true;
}
}
private handleLocalIceCandidate(candidate: RTCIceCandidate) {
this.logger.debug(
"Call " + this.callId + " got local ICE " + candidate.sdpMid + " candidate: " +
candidate.candidate,
);
if (this._state === CallState.Ended) return;
private handleLocalIceCandidate(candidate: RTCIceCandidate, log: ILogItem) {
log.set("sdpMid", candidate.sdpMid);
log.set("candidate", candidate.candidate);
if (this._state === CallState.Ended) {
return;
}
// As with the offer, note we need to make a copy of this object, not
// pass the original: that broke in Chrome ~m43.
if (candidate.candidate !== '' || !this.sentEndOfCandidates) {
this.queueCandidate(candidate);
if (candidate.candidate === '') this.sentEndOfCandidates = true;
this.queueCandidate(candidate, log);
if (candidate.candidate === '') {
this.sentEndOfCandidates = true;
}
}
}
private async handleRemoteIceCandidates(content: MCallCandidates<MCallBase>, partyId) {
private async handleRemoteIceCandidates(content: MCallCandidates<MCallBase>, partyId: PartyId, log: ILogItem) {
if (this.state === CallState.Ended) {
//debuglog("Ignoring remote ICE candidate because call has ended");
log.log("Ignoring remote ICE candidate because call has ended");
return;
}
const candidates = content.candidates;
if (!candidates) {
this.logger.info(`Call ${this.callId} Ignoring candidates event with no candidates!`);
log.log(`Ignoring candidates event with no candidates!`);
return;
}
@ -475,7 +499,7 @@ export class PeerCall implements IDisposable {
if (this.opponentPartyId === undefined) {
// we haven't picked an opponent yet so save the candidates
this.logger.info(`Call ${this.callId} Buffering ${candidates.length} candidates until we pick an opponent`);
log.log(`Buffering ${candidates.length} candidates until we pick an opponent`);
const bufferedCandidates = this.remoteCandidateBuffer!.get(fromPartyId) || [];
bufferedCandidates.push(...candidates);
this.remoteCandidateBuffer!.set(fromPartyId, bufferedCandidates);
@ -483,8 +507,7 @@ export class PeerCall implements IDisposable {
}
if (this.opponentPartyId !== partyId) {
this.logger.info(
`Call ${this.callId} `+
log.log(
`Ignoring candidates from party ID ${partyId}: ` +
`we have chosen party ID ${this.opponentPartyId}`,
);
@ -492,14 +515,14 @@ export class PeerCall implements IDisposable {
return;
}
await this.addIceCandidates(candidates);
await this.addIceCandidates(candidates, log);
}
// private async onNegotiateReceived(event: MatrixEvent): Promise<void> {
// const content = event.getContent<MCallNegotiate>();
// const description = content.description;
// if (!description || !description.sdp || !description.type) {
// this.logger.info(`Call ${this.callId} Ignoring invalid m.call.negotiate event`);
// this.logger.info(`Ignoring invalid m.call.negotiate event`);
// return;
// }
// // Politeness always follows the direction of the call: in a glare situation,
@ -516,7 +539,7 @@ export class PeerCall implements IDisposable {
// this.ignoreOffer = !polite && offerCollision;
// if (this.ignoreOffer) {
// this.logger.info(`Call ${this.callId} Ignoring colliding negotiate event because we're impolite`);
// this.logger.info(`Ignoring colliding negotiate event because we're impolite`);
// return;
// }
@ -524,7 +547,7 @@ export class PeerCall implements IDisposable {
// if (sdpStreamMetadata) {
// this.updateRemoteSDPStreamMetadata(sdpStreamMetadata);
// } else {
// this.logger.warn(`Call ${this.callId} Received negotiation event without SDPStreamMetadata!`);
// this.logger.warn(`Received negotiation event without SDPStreamMetadata!`);
// }
// try {
@ -532,7 +555,7 @@ export class PeerCall implements IDisposable {
// if (description.type === 'offer') {
// await this.peerConnection.setLocalDescription();
// await this.options.sendSignallingMessage({
// await this.sendSignallingMessage({
// type: EventType.CallNegotiate,
// content: {
// description: this.peerConnection.localDescription!,
@ -541,11 +564,11 @@ export class PeerCall implements IDisposable {
// });
// }
// } catch (err) {
// this.logger.warn(`Call ${this.callId} Failed to complete negotiation`, err);
// this.logger.warn(`Failed to complete negotiation`, err);
// }
// }
private async sendAnswer(): Promise<void> {
private async sendAnswer(log: ILogItem): Promise<void> {
const localDescription = this.peerConnection.localDescription!;
const answerContent: MCallAnswer<MCallBase> = {
call_id: this.callId,
@ -560,23 +583,23 @@ export class PeerCall implements IDisposable {
// We have just taken the local description from the peerConn which will
// contain all the local candidates added so far, so we can discard any candidates
// we had queued up because they'll be in the answer.
this.logger.info(`Call ${this.callId} Discarding ${
log.log(`Discarding ${
this.candidateSendQueue.length} candidates that will be sent in answer`);
this.candidateSendQueue = [];
try {
await this.options.sendSignallingMessage({type: EventType.Answer, content: answerContent}, logger.item);
await this.sendSignallingMessage({type: EventType.Answer, content: answerContent}, log);
} catch (error) {
this.terminate(CallParty.Local, CallErrorCode.SendAnswer, false);
this.terminate(CallParty.Local, CallErrorCode.SendAnswer, false, log);
throw error;
}
// error handler re-throws so this won't happen on error, but
// we don't want the same error handling on the candidate queue
this.sendCandidateQueue();
this.sendCandidateQueue(log);
}
private queueCandidate(content: RTCIceCandidate): void {
private queueCandidate(content: RTCIceCandidate, log: ILogItem): void {
// We partially de-trickle candidates by waiting for `delay` before sending them
// amalgamated, in order to avoid sending too many m.call.candidates events and hitting
// rate limits in Matrix.
@ -593,36 +616,48 @@ export class PeerCall implements IDisposable {
// MSC2746 recommends these values (can be quite long when calling because the
// callee will need a while to answer the call)
this.delay(this.direction === CallDirection.Inbound ? 500 : 2000).then(() => {
this.sendCandidateQueue();
const sendLogItem = this.logItem.child("wait to send candidates");
log.refDetached(sendLogItem);
this.delay(this.direction === CallDirection.Inbound ? 500 : 2000)
.then(() => {
return this.sendCandidateQueue(sendLogItem);
}, err => {}) // swallow delay AbortError
.finally(() => {
sendLogItem.finish();
});
}
private async sendCandidateQueue(): Promise<void> {
private async sendCandidateQueue(log: ILogItem): Promise<void> {
return log.wrap("send candidates queue", async log => {
log.set("queueLength", this.candidateSendQueue.length);
if (this.candidateSendQueue.length === 0 || this._state === CallState.Ended) {
return;
}
const candidates = this.candidateSendQueue;
this.candidateSendQueue = [];
this.logger.debug(`Call ${this.callId} attempting to send ${candidates.length} candidates`);
try {
await this.options.sendSignallingMessage({
await this.sendSignallingMessage({
type: EventType.Candidates,
content: {
call_id: this.callId,
version: 1,
candidates
},
}, logger.item);
}, log);
// Try to send candidates again just in case we received more candidates while sending.
this.sendCandidateQueue();
this.sendCandidateQueue(log);
} catch (error) {
log.catch(error);
// don't retry this event: we'll send another one later as we might
// have more candidates by then.
// put all the candidates we failed to send back in the queue
this.terminate(CallParty.Local, CallErrorCode.SignallingFailed, false);
// TODO: terminate doesn't seem to vibe with the comment above?
this.terminate(CallParty.Local, CallErrorCode.SignallingFailed, false, log);
}
});
}
private updateRemoteSDPStreamMetadata(metadata: SDPStreamMetadata): void {
@ -641,44 +676,44 @@ export class PeerCall implements IDisposable {
}
}
private async addBufferedIceCandidates(): Promise<void> {
private async addBufferedIceCandidates(log: ILogItem): Promise<void> {
if (this.remoteCandidateBuffer && this.opponentPartyId) {
const bufferedCandidates = this.remoteCandidateBuffer.get(this.opponentPartyId);
if (bufferedCandidates) {
this.logger.info(`Call ${this.callId} Adding ${
log.log(`Adding ${
bufferedCandidates.length} buffered candidates for opponent ${this.opponentPartyId}`);
await this.addIceCandidates(bufferedCandidates);
await this.addIceCandidates(bufferedCandidates, log);
}
this.remoteCandidateBuffer = undefined;
}
}
private async addIceCandidates(candidates: RTCIceCandidate[]): Promise<void> {
private async addIceCandidates(candidates: RTCIceCandidate[], log: ILogItem): Promise<void> {
for (const candidate of candidates) {
if (
(candidate.sdpMid === null || candidate.sdpMid === undefined) &&
(candidate.sdpMLineIndex === null || candidate.sdpMLineIndex === undefined)
) {
this.logger.debug(`Call ${this.callId} ignoring remote ICE candidate with no sdpMid or sdpMLineIndex`);
log.log(`Ignoring remote ICE candidate with no sdpMid or sdpMLineIndex`);
continue;
}
this.logger.debug(`Call ${this.callId} got remote ICE ${candidate.sdpMid} candidate: ${candidate.candidate}`);
log.log(`Got remote ICE ${candidate.sdpMid} candidate: ${candidate.candidate}`);
try {
await this.peerConnection.addIceCandidate(candidate);
} catch (err) {
if (!this.ignoreOffer) {
this.logger.info(`Call ${this.callId} failed to add remote ICE candidate`, err);
log.log(`Failed to add remote ICE candidate`, err);
}
}
}
}
private onIceConnectionStateChange = (state: RTCIceConnectionState): void => {
private onIceConnectionStateChange = (state: RTCIceConnectionState, log: ILogItem): void => {
if (this._state === CallState.Ended) {
return; // because ICE can still complete as we're ending the call
}
this.logger.debug(
"Call ID " + this.callId + ": ICE connection state changed to: " + state,
log.log(
"ICE connection state changed to: " + state,
);
// ideally we'd consider the call to be connected when we get media but
// chrome doesn't implement any of the 'onstarted' events yet
@ -689,11 +724,11 @@ export class PeerCall implements IDisposable {
} else if (state == 'failed') {
this.iceDisconnectedTimeout?.abort();
this.iceDisconnectedTimeout = undefined;
this.hangup(CallErrorCode.IceFailed);
this._hangup(CallErrorCode.IceFailed, log);
} else if (state == 'disconnected') {
this.iceDisconnectedTimeout = this.options.createTimeout(30 * 1000);
this.iceDisconnectedTimeout.elapsed().then(() => {
this.hangup(CallErrorCode.IceFailed);
this._hangup(CallErrorCode.IceFailed, log);
}, () => { /* ignore AbortError */ });
}
};
@ -725,7 +760,7 @@ export class PeerCall implements IDisposable {
}));
}
private async terminate(hangupParty: CallParty, hangupReason: CallErrorCode, shouldEmit: boolean): Promise<void> {
private async terminate(hangupParty: CallParty, hangupReason: CallErrorCode, shouldEmit: boolean, log: ILogItem): Promise<void> {
}
@ -744,6 +779,12 @@ export class PeerCall implements IDisposable {
this.disposables.untrack(timeout);
}
private sendSignallingMessage(message: SignallingMessage<MCallBase>, log: ILogItem) {
return log.wrap({l: "send", id: message.type}, async log => {
return this.options.sendSignallingMessage(message, log);
});
}
public dispose(): void {
this.disposables.dispose();
this.peerConnection.dispose();

View file

@ -61,10 +61,12 @@ export class GroupCall extends EventEmitter<{change: never}> {
id: string | undefined,
private callContent: Record<string, any> | undefined,
public readonly roomId: string,
private readonly options: Options
private readonly options: Options,
private readonly logItem: ILogItem,
) {
super();
this.id = id ?? makeId("conf-");
logItem.set("id", this.id);
this._state = id ? GroupCallState.Created : GroupCallState.Fledgling;
this._memberOptions = Object.assign({}, options, {
confId: this.id,
@ -86,7 +88,8 @@ export class GroupCall extends EventEmitter<{change: never}> {
return this.callContent?.["m.name"];
}
async join(localMedia: LocalMedia) {
join(localMedia: LocalMedia): Promise<void> {
return this.logItem.wrap("join", async log => {
if (this._state !== GroupCallState.Created) {
return;
}
@ -95,44 +98,50 @@ export class GroupCall extends EventEmitter<{change: never}> {
this.emitChange();
const memberContent = await this._createJoinPayload();
// send m.call.member state event
const request = this.options.hsApi.sendState(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId, memberContent);
const request = this.options.hsApi.sendState(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId, memberContent, {log});
await request.response();
this.emitChange();
// send invite to all members that are < my userId
for (const [,member] of this._members) {
member.connect(this._localMedia);
}
});
}
get hasJoined() {
return this._state === GroupCallState.Joining || this._state === GroupCallState.Joined;
}
async leave() {
leave(): Promise<void> {
return this.logItem.wrap("leave", async log => {
const memberContent = await this._leaveCallMemberContent();
// send m.call.member state event
if (memberContent) {
const request = this.options.hsApi.sendState(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId, memberContent);
const request = this.options.hsApi.sendState(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId, memberContent, {log});
await request.response();
// our own user isn't included in members, so not in the count
if (this._members.size === 0) {
this.terminate();
await this.terminate();
}
}
});
}
async terminate() {
terminate(): Promise<void> {
return this.logItem.wrap("terminate", async log => {
if (this._state === GroupCallState.Fledgling) {
return;
}
const request = this.options.hsApi.sendState(this.roomId, CALL_TYPE, this.id, Object.assign({}, this.callContent, {
"m.terminated": true
}));
}), {log});
await request.response();
});
}
/** @internal */
async create(localMedia: LocalMedia, name: string) {
create(localMedia: LocalMedia, name: string): Promise<void> {
return this.logItem.wrap("create", async log => {
if (this._state !== GroupCallState.Fledgling) {
return;
}
@ -143,23 +152,31 @@ export class GroupCall extends EventEmitter<{change: never}> {
"m.name": name,
"m.intent": "m.ring"
};
const request = this.options.hsApi.sendState(this.roomId, CALL_TYPE, this.id, this.callContent);
const request = this.options.hsApi.sendState(this.roomId, CALL_TYPE, this.id, this.callContent, {log});
await request.response();
this._state = GroupCallState.Created;
this.emitChange();
});
}
/** @internal */
updateCallEvent(callContent: Record<string, any>) {
updateCallEvent(callContent: Record<string, any>, syncLog: ILogItem) {
this.logItem.wrap("updateCallEvent", log => {
syncLog.refDetached(log);
this.callContent = callContent;
if (this._state === GroupCallState.Creating) {
this._state = GroupCallState.Created;
}
log.set("status", this._state);
this.emitChange();
});
}
/** @internal */
addMember(userId, memberCallInfo) {
addMember(userId: string, memberCallInfo, syncLog: ILogItem) {
this.logItem.wrap({l: "addMember", id: userId}, log => {
syncLog.refDetached(log);
if (userId === this.options.ownUserId) {
if (this._state === GroupCallState.Joining) {
this._state = GroupCallState.Joined;
@ -171,16 +188,20 @@ export class GroupCall extends EventEmitter<{change: never}> {
if (member) {
member.updateCallInfo(memberCallInfo);
} else {
member = new Member(RoomMember.fromUserId(this.roomId, userId, "join"), memberCallInfo, this._memberOptions);
const logItem = this.logItem.child("member");
member = new Member(RoomMember.fromUserId(this.roomId, userId, "join"), memberCallInfo, this._memberOptions, logItem);
this._members.add(userId, member);
if (this._state === GroupCallState.Joining || this._state === GroupCallState.Joined) {
member.connect(this._localMedia!);
}
}
});
}
/** @internal */
removeMember(userId) {
removeMember(userId: string, syncLog: ILogItem) {
this.logItem.wrap({l: "removeMember", id: userId}, log => {
syncLog.refDetached(log);
if (userId === this.options.ownUserId) {
if (this._state === GroupCallState.Joined) {
this._localMedia?.dispose();
@ -198,20 +219,27 @@ export class GroupCall extends EventEmitter<{change: never}> {
}
}
this.emitChange();
});
}
/** @internal */
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, userId: string, deviceId: string, log: ILogItem) {
console.log("incoming to_device call signalling message from", userId, deviceId, message);
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, userId: string, deviceId: string, syncLog: ILogItem) {
// TODO: return if we are not membering to the call
let member = this._members.get(userId);
if (member) {
member.handleDeviceMessage(message, deviceId, log);
member.handleDeviceMessage(message, deviceId, syncLog);
} else {
const item = this.logItem.log({l: "could not find member for signalling message", userId, deviceId});
syncLog.refDetached(item);
// we haven't received the m.call.member yet for this caller. buffer the device messages or create the member/call anyway?
}
}
/** @internal */
dispose() {
this.logItem.finish();
}
private async _createJoinPayload() {
const {storage} = this.options;
const txn = await storage.readTxn([storage.storeNames.roomState]);

View file

@ -44,8 +44,11 @@ export class Member {
constructor(
public readonly member: RoomMember,
private memberCallInfo: Record<string, any>,
private readonly options: Options
) {}
private readonly options: Options,
private readonly logItem: ILogItem,
) {
logItem.set("id", member.userId);
}
get remoteTracks(): Track[] {
return this.peerCall?.remoteTracks ?? [];
@ -57,6 +60,7 @@ export class Member {
/** @internal */
connect(localMedia: LocalMedia) {
this.logItem.log("connect");
this.localMedia = localMedia;
// otherwise wait for it to connect
if (this.member.userId < this.options.ownUserId) {
@ -71,6 +75,7 @@ export class Member {
this.peerCall?.dispose();
this.peerCall = undefined;
this.localMedia = undefined;
this.logItem.log("disconnect");
}
/** @internal */
@ -87,7 +92,7 @@ export class Member {
}
/** @internal */
sendSignallingMessage = async (message: SignallingMessage<MCallBase>, log: ILogItem) => {
sendSignallingMessage = async (message: SignallingMessage<MCallBase>, log: ILogItem): Promise<void> => {
const groupMessage = message as SignallingMessage<MGroupCallBase>;
groupMessage.content.conf_id = this.options.confId;
const encryptedMessages = await this.options.encryptDeviceMessage(this.member.userId, groupMessage, log);
@ -102,12 +107,13 @@ export class Member {
}
/** @internal */
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, deviceId: string, log: ILogItem) {
handleDeviceMessage(message: SignallingMessage<MGroupCallBase>, deviceId: string, syncLog: ILogItem) {
syncLog.refDetached(this.logItem);
if (message.type === EventType.Invite && !this.peerCall) {
this.peerCall = this._createPeerCall(message.content.call_id);
}
if (this.peerCall) {
this.peerCall.handleIncomingSignallingMessage(message, deviceId, log);
this.peerCall.handleIncomingSignallingMessage(message, deviceId);
} else {
// TODO: need to buffer events until invite comes?
}
@ -117,6 +123,6 @@ export class Member {
return new PeerCall(callId, Object.assign({}, this.options, {
emitUpdate: this.emitUpdate,
sendSignallingMessage: this.sendSignallingMessage
}));
}), this.logItem);
}
}