persist calls so they can be quickly loaded after a restart

also use event prefixes compatible with Element Call/MSC
This commit is contained in:
Bruno Windels 2022-04-07 10:32:23 +02:00
parent 1ad5db73a9
commit 2852834ce3
13 changed files with 229 additions and 48 deletions

View file

@ -99,6 +99,8 @@ export class SessionViewModel extends ViewModel {
start() {
this._sessionStatusViewModel.start();
//this._client.session.callHandler.loadCalls("m.prompt");
this._client.session.callHandler.loadCalls("m.ring");
}
get activeMiddleViewModel() {

View file

@ -72,7 +72,7 @@ export function tilesCreator(baseOptions) {
return new EncryptedEventTile(options);
case "m.room.encryption":
return new EncryptionEnabledTile(options);
case "m.call":
case "org.matrix.msc3401.call":
// if prevContent is present, it's an update to a call event, which we don't render
// as the original event is updated through the call object which receive state event updates
return entry.stateKey && !entry.prevContent ? new CallTile(options) : null;

View file

@ -76,7 +76,7 @@ export class Session {
this._roomsBeingCreated = new ObservableMap();
this._user = new User(sessionInfo.userId);
this._callHandler = new CallHandler({
createTimeout: this._platform.clock.createTimeout,
clock: this._platform.clock,
hsApi: this._hsApi,
encryptDeviceMessage: async (roomId, userId, message, log) => {
if (!this._deviceTracker || !this._olmEncryption) {

View file

@ -344,6 +344,7 @@ export class Sync {
// to decrypt and store new room keys
storeNames.olmSessions,
storeNames.inboundGroupSessions,
storeNames.calls,
]);
}

View file

@ -18,8 +18,9 @@ import {ObservableMap} from "../../observable/map/ObservableMap";
import {WebRTC, PeerConnection, PeerConnectionHandler} from "../../platform/types/WebRTC";
import {MediaDevices, Track, AudioTrack, TrackType} from "../../platform/types/MediaDevices";
import {handlesEventType} from "./PeerCall";
import {EventType} from "./callEventTypes";
import {EventType, CallIntent} from "./callEventTypes";
import {GroupCall} from "./group/GroupCall";
import {makeId} from "../common";
import type {LocalMedia} from "./LocalMedia";
import type {Room} from "../room/Room";
@ -30,13 +31,17 @@ import type {Platform} from "../../platform/web/Platform";
import type {BaseObservableMap} from "../../observable/map/BaseObservableMap";
import type {SignallingMessage, MGroupCallBase} from "./callEventTypes";
import type {Options as GroupCallOptions} from "./group/GroupCall";
import type {Transaction} from "../storage/idb/Transaction";
import type {CallEntry} from "../storage/idb/stores/CallStore";
import type {Clock} from "../../platform/web/dom/Clock";
const GROUP_CALL_TYPE = "m.call";
const GROUP_CALL_MEMBER_TYPE = "m.call.member";
const CALL_TERMINATED = "m.terminated";
export type Options = Omit<GroupCallOptions, "emitUpdate"> & {
logger: ILogger
export type Options = Omit<GroupCallOptions, "emitUpdate" | "createTimeout"> & {
logger: ILogger,
clock: Clock
};
export class CallHandler {
@ -48,25 +53,86 @@ export class CallHandler {
constructor(private readonly options: Options) {
this.groupCallOptions = Object.assign({}, this.options, {
emitUpdate: (groupCall, params) => this._calls.update(groupCall.id, params)
emitUpdate: (groupCall, params) => this._calls.update(groupCall.id, params),
createTimeout: this.options.clock.createTimeout,
});
}
async loadCalls(intent: CallIntent = CallIntent.Ring) {
const txn = await this._getLoadTxn();
const callEntries = await txn.calls.getByIntent(intent);
this._loadCallEntries(callEntries, txn);
}
async loadCallsForRoom(intent: CallIntent, roomId: string) {
const txn = await this._getLoadTxn();
const callEntries = await txn.calls.getByIntentAndRoom(intent, roomId);
this._loadCallEntries(callEntries, txn);
}
private async _getLoadTxn(): Promise<Transaction> {
const names = this.options.storage.storeNames;
const txn = await this.options.storage.readTxn([
names.calls,
names.roomState
]);
return txn;
}
private async _loadCallEntries(callEntries: CallEntry[], txn: Transaction): Promise<void> {
return this.options.logger.run("loading calls", async log => {
log.set("entries", callEntries.length);
await Promise.all(callEntries.map(async callEntry => {
if (this._calls.get(callEntry.callId)) {
return;
}
const event = await txn.roomState.get(callEntry.roomId, EventType.GroupCall, callEntry.callId);
if (event) {
const logItem = this.options.logger.child({l: "call", loaded: true});
const call = new GroupCall(event.event.state_key, false, event.event.content, event.roomId, this.groupCallOptions, logItem);
this._calls.set(call.id, call);
}
}));
const roomIds = Array.from(new Set(callEntries.map(e => e.roomId)));
await Promise.all(roomIds.map(async roomId => {
const ownCallsMemberEvent = await txn.roomState.get(roomId, EventType.GroupCallMember, this.options.ownUserId);
if (ownCallsMemberEvent) {
this.handleCallMemberEvent(ownCallsMemberEvent.event, log);
}
// TODO: we should be loading the other members as well at some point
}));
log.set("newSize", this._calls.size);
});
}
async createCall(roomId: string, localMedia: LocalMedia, name: string): Promise<GroupCall> {
const logItem = this.options.logger.child({l: "call", incoming: false});
const call = new GroupCall(undefined, undefined, roomId, this.groupCallOptions, logItem);
const call = new GroupCall(makeId("conf-"), true, {
"m.name": name,
"m.intent": CallIntent.Ring
}, roomId, this.groupCallOptions, logItem);
this._calls.set(call.id, call);
try {
await call.create(localMedia, name);
await call.create(localMedia);
await call.join(localMedia);
// store call info so it will ring again when reopening the app
const txn = await this.options.storage.readWriteTxn([this.options.storage.storeNames.calls]);
txn.calls.add({
intent: call.intent,
callId: call.id,
timestamp: this.options.clock.now(),
roomId: roomId
});
await txn.complete();
} catch (err) {
if (err.name === "ConnectionError") {
//if (err.name === "ConnectionError") {
// if we're offline, give up and remove the call again
call.dispose();
this._calls.remove(call.id);
}
//}
throw err;
}
await call.join(localMedia);
return call;
}
@ -75,11 +141,11 @@ export class CallHandler {
// TODO: check and poll turn server credentials here
/** @internal */
handleRoomState(room: Room, events: StateEvent[], log: ILogItem) {
handleRoomState(room: Room, events: StateEvent[], txn: Transaction, log: ILogItem) {
// first update call events
for (const event of events) {
if (event.type === EventType.GroupCall) {
this.handleCallEvent(event, room.id, log);
this.handleCallEvent(event, room.id, txn, log);
}
}
// then update members
@ -108,7 +174,7 @@ export class CallHandler {
call?.handleDeviceMessage(message, userId, deviceId, log);
}
private handleCallEvent(event: StateEvent, roomId: string, log: ILogItem) {
private handleCallEvent(event: StateEvent, roomId: string, txn: Transaction, log: ILogItem) {
const callId = event.state_key;
let call = this._calls.get(callId);
if (call) {
@ -116,11 +182,18 @@ export class CallHandler {
if (call.isTerminated) {
call.dispose();
this._calls.remove(call.id);
txn.calls.remove(call.intent, roomId, call.id);
}
} else {
const logItem = this.options.logger.child({l: "call", incoming: true});
call = new GroupCall(event.state_key, event.content, roomId, this.groupCallOptions, logItem);
call = new GroupCall(event.state_key, false, event.content, roomId, this.groupCallOptions, logItem);
this._calls.set(call.id, call);
txn.calls.add({
intent: call.intent,
callId: call.id,
timestamp: event.origin_server_ts,
roomId: roomId
});
}
}

View file

@ -1,10 +1,10 @@
// allow non-camelcase as these are events type that go onto the wire
/* eslint-disable camelcase */
import type {StateEvent} from "../storage/types";
export enum EventType {
GroupCall = "m.call",
GroupCallMember = "m.call.member",
GroupCall = "org.matrix.msc3401.call",
GroupCallMember = "org.matrix.msc3401.call.member",
Invite = "m.call.invite",
Candidates = "m.call.candidates",
Answer = "m.call.answer",
@ -211,3 +211,9 @@ export type SignallingMessage<Base extends MCallBase> =
{type: EventType.SDPStreamMetadataChanged | EventType.SDPStreamMetadataChangedPrefix, content: MCallSDPStreamMetadataChanged<Base>} |
{type: EventType.Candidates, content: MCallCandidates<Base>} |
{type: EventType.Hangup | EventType.Reject, content: MCallHangupReject<Base>};
export enum CallIntent {
Ring = "m.ring",
Prompt = "m.prompt",
Room = "m.room",
};

View file

@ -18,8 +18,8 @@ import {ObservableMap} from "../../../observable/map/ObservableMap";
import {Member} from "./Member";
import {LocalMedia} from "../LocalMedia";
import {RoomMember} from "../../room/members/RoomMember";
import {makeId} from "../../common";
import {EventEmitter} from "../../../utils/EventEmitter";
import {EventType, CallIntent} from "../callEventTypes";
import type {Options as MemberOptions} from "./Member";
import type {BaseObservableMap} from "../../../observable/map/BaseObservableMap";
@ -32,9 +32,6 @@ import type {EncryptedMessage} from "../../e2ee/olm/Encryption";
import type {ILogItem} from "../../../logging/types";
import type {Storage} from "../../storage/idb/Storage";
const CALL_TYPE = "m.call";
const CALL_MEMBER_TYPE = "m.call.member";
export enum GroupCallState {
Fledgling = "fledgling",
Creating = "creating",
@ -62,23 +59,22 @@ export type Options = Omit<MemberOptions, "emitUpdate" | "confId" | "encryptDevi
};
export class GroupCall extends EventEmitter<{change: never}> {
public readonly id: string;
private readonly _members: ObservableMap<string, Member> = new ObservableMap();
private _localMedia?: LocalMedia = undefined;
private _memberOptions: MemberOptions;
private _state: GroupCallState;
constructor(
id: string | undefined,
private callContent: Record<string, any> | undefined,
public readonly id: string,
newCall: boolean,
private callContent: Record<string, any>,
public readonly roomId: string,
private readonly options: Options,
private readonly logItem: ILogItem,
) {
super();
this.id = id ?? makeId("conf-");
logItem.set("id", this.id);
this._state = id ? GroupCallState.Created : GroupCallState.Fledgling;
this._state = newCall ? GroupCallState.Fledgling : GroupCallState.Created;
this._memberOptions = Object.assign({}, options, {
confId: this.id,
emitUpdate: member => this._members.update(getMemberKey(member.userId, member.deviceId), member),
@ -103,7 +99,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
return this.callContent?.["m.name"];
}
get intent(): string {
get intent(): CallIntent {
return this.callContent?.["m.intent"];
}
@ -117,7 +113,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
this.emitChange();
const memberContent = await this._createJoinPayload();
// send m.call.member state event
const request = this.options.hsApi.sendState(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId, memberContent, {log});
const request = this.options.hsApi.sendState(this.roomId, EventType.GroupCallMember, this.options.ownUserId, memberContent, {log});
await request.response();
this.emitChange();
// send invite to all members that are < my userId
@ -136,10 +132,10 @@ export class GroupCall extends EventEmitter<{change: never}> {
const memberContent = await this._leaveCallMemberContent();
// send m.call.member state event
if (memberContent) {
const request = this.options.hsApi.sendState(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId, memberContent, {log});
const request = this.options.hsApi.sendState(this.roomId, EventType.GroupCallMember, this.options.ownUserId, memberContent, {log});
await request.response();
// our own user isn't included in members, so not in the count
if (this._members.size === 0) {
if (this.intent === CallIntent.Ring && this._members.size === 0) {
await this.terminate();
}
} else {
@ -153,7 +149,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
if (this._state === GroupCallState.Fledgling) {
return;
}
const request = this.options.hsApi.sendState(this.roomId, CALL_TYPE, this.id, Object.assign({}, this.callContent, {
const request = this.options.hsApi.sendState(this.roomId, EventType.GroupCall, this.id, Object.assign({}, this.callContent, {
"m.terminated": true
}), {log});
await request.response();
@ -161,19 +157,17 @@ export class GroupCall extends EventEmitter<{change: never}> {
}
/** @internal */
create(localMedia: LocalMedia, name: string): Promise<void> {
create(localMedia: LocalMedia): Promise<void> {
return this.logItem.wrap("create", async log => {
if (this._state !== GroupCallState.Fledgling) {
return;
}
this._state = GroupCallState.Creating;
this.emitChange();
this.callContent = {
this.callContent = Object.assign({
"m.type": localMedia.cameraTrack ? "m.video" : "m.voice",
"m.name": name,
"m.intent": "m.ring"
};
const request = this.options.hsApi.sendState(this.roomId, CALL_TYPE, this.id, this.callContent, {log});
}, this.callContent);
const request = this.options.hsApi.sendState(this.roomId, EventType.GroupCall, this.id, this.callContent!, {log});
await request.response();
this._state = GroupCallState.Created;
this.emitChange();
@ -318,7 +312,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
private async _createJoinPayload() {
const {storage} = this.options;
const txn = await storage.readTxn([storage.storeNames.roomState]);
const stateEvent = await txn.roomState.get(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId);
const stateEvent = await txn.roomState.get(this.roomId, EventType.GroupCallMember, this.options.ownUserId);
const stateContent = stateEvent?.event?.content ?? {
["m.calls"]: []
};
@ -335,7 +329,8 @@ export class GroupCall extends EventEmitter<{change: never}> {
let deviceInfo = devicesInfo.find(d => d["device_id"] === this.options.ownDeviceId);
if (!deviceInfo) {
deviceInfo = {
["device_id"]: this.options.ownDeviceId
["device_id"]: this.options.ownDeviceId,
feeds: [{purpose: "m.usermedia"}]
};
devicesInfo.push(deviceInfo);
}
@ -345,7 +340,7 @@ export class GroupCall extends EventEmitter<{change: never}> {
private async _leaveCallMemberContent(): Promise<Record<string, any> | undefined> {
const {storage} = this.options;
const txn = await storage.readTxn([storage.storeNames.roomState]);
const stateEvent = await txn.roomState.get(this.roomId, CALL_MEMBER_TYPE, this.options.ownUserId);
const stateEvent = await txn.roomState.get(this.roomId, EventType.GroupCallMember, this.options.ownUserId);
if (stateEvent) {
const content = stateEvent.event.content;
const callInfo = content["m.calls"]?.find(c => c["m.call_id"] === this.id);

View file

@ -93,8 +93,6 @@ export class Room extends BaseRoom {
}
}
this._updateCallHandler(roomResponse, log);
return {
roomEncryption,
summaryChanges,
@ -181,6 +179,7 @@ export class Room extends BaseRoom {
removedPendingEvents = await this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn, log);
}
const powerLevelsEvent = this._getPowerLevelsEvent(roomResponse);
this._updateCallHandler(roomResponse, txn, log);
return {
summaryChanges,
roomEncryption,
@ -448,17 +447,17 @@ export class Room extends BaseRoom {
return this._sendQueue.pendingEvents;
}
_updateCallHandler(roomResponse, log) {
_updateCallHandler(roomResponse, txn, log) {
if (this._callHandler) {
const stateEvents = roomResponse.state?.events;
if (stateEvents?.length) {
this._callHandler.handleRoomState(this, stateEvents, log);
this._callHandler.handleRoomState(this, stateEvents, txn, log);
}
let timelineEvents = roomResponse.timeline?.events;
if (timelineEvents) {
const timelineStateEvents = timelineEvents.filter(e => typeof e.state_key === "string");
if (timelineEvents.length !== 0) {
this._callHandler.handleRoomState(this, timelineStateEvents, log);
this._callHandler.handleRoomState(this, timelineStateEvents, txn, log);
}
}
}

View file

@ -33,6 +33,7 @@ export enum StoreNames {
groupSessionDecryptions = "groupSessionDecryptions",
operations = "operations",
accountData = "accountData",
calls = "calls"
}
export const STORE_NAMES: Readonly<StoreNames[]> = Object.values(StoreNames);

View file

@ -36,6 +36,7 @@ import {OutboundGroupSessionStore} from "./stores/OutboundGroupSessionStore";
import {GroupSessionDecryptionStore} from "./stores/GroupSessionDecryptionStore";
import {OperationStore} from "./stores/OperationStore";
import {AccountDataStore} from "./stores/AccountDataStore";
import {CallStore} from "./stores/CallStore";
import type {ILogger, ILogItem} from "../../../logging/types";
export type IDBKey = IDBValidKey | IDBKeyRange;
@ -167,6 +168,10 @@ export class Transaction {
get accountData(): AccountDataStore {
return this._store(StoreNames.accountData, idbStore => new AccountDataStore(idbStore));
}
get calls(): CallStore {
return this._store(StoreNames.calls, idbStore => new CallStore(idbStore));
}
async complete(log?: ILogItem): Promise<void> {
try {

View file

@ -34,7 +34,8 @@ export const schema: MigrationFunc[] = [
backupAndRestoreE2EEAccountToLocalStorage,
clearAllStores,
addInboundSessionBackupIndex,
migrateBackupStatus
migrateBackupStatus,
createCallStore
];
// TODO: how to deal with git merge conflicts of this array?
@ -309,3 +310,8 @@ async function migrateBackupStatus(db: IDBDatabase, txn: IDBTransaction, localSt
log.set("countWithoutSession", countWithoutSession);
log.set("countWithSession", countWithSession);
}
//v17 create calls store
function createCallStore(db: IDBDatabase) : void {
db.createObjectStore("calls", {keyPath: "key"});
}

View file

@ -0,0 +1,83 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Copyright 2021 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {Store} from "../Store";
import {StateEvent} from "../../types";
import {MIN_UNICODE, MAX_UNICODE} from "./common";
function encodeKey(intent: string, roomId: string, callId: string) {
return `${intent}|${roomId}|${callId}`;
}
function decodeStorageEntry(storageEntry: CallStorageEntry): CallEntry {
const [intent, roomId, callId] = storageEntry.key.split("|");
return {intent, roomId, callId, timestamp: storageEntry.timestamp};
}
export interface CallEntry {
intent: string;
roomId: string;
callId: string;
timestamp: number;
}
type CallStorageEntry = {
key: string;
timestamp: number;
}
export class CallStore {
private _callStore: Store<CallStorageEntry>;
constructor(idbStore: Store<CallStorageEntry>) {
this._callStore = idbStore;
}
async getByIntent(intent: string): Promise<CallEntry[]> {
const range = this._callStore.IDBKeyRange.bound(
encodeKey(intent, MIN_UNICODE, MIN_UNICODE),
encodeKey(intent, MAX_UNICODE, MAX_UNICODE),
true,
true
);
const storageEntries = await this._callStore.selectAll(range);
return storageEntries.map(e => decodeStorageEntry(e));
}
async getByIntentAndRoom(intent: string, roomId: string): Promise<CallEntry[]> {
const range = this._callStore.IDBKeyRange.bound(
encodeKey(intent, roomId, MIN_UNICODE),
encodeKey(intent, roomId, MAX_UNICODE),
true,
true
);
const storageEntries = await this._callStore.selectAll(range);
return storageEntries.map(e => decodeStorageEntry(e));
}
add(entry: CallEntry) {
const storageEntry: CallStorageEntry = {
key: encodeKey(entry.intent, entry.roomId, entry.callId),
timestamp: entry.timestamp
};
this._callStore.add(storageEntry);
}
remove(intent: string, roomId: string, callId: string): void {
this._callStore.delete(encodeKey(intent, roomId, callId));
}
}

View file

@ -15,7 +15,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {MAX_UNICODE} from "./common";
import {MIN_UNICODE, MAX_UNICODE} from "./common";
import {Store} from "../Store";
import {StateEvent} from "../../types";
@ -41,6 +41,16 @@ export class RoomStateStore {
return this._roomStateStore.get(key);
}
getAllForType(roomId: string, type: string): Promise<RoomStateEntry[]> {
const range = this._roomStateStore.IDBKeyRange.bound(
encodeKey(roomId, type, MIN_UNICODE),
encodeKey(roomId, type, MAX_UNICODE),
true,
true
);
return this._roomStateStore.selectAll(range);
}
set(roomId: string, event: StateEvent): void {
const key = encodeKey(roomId, event.type, event.state_key);
const entry = {roomId, event, key};