WIP
This commit is contained in:
parent
66304ed7e0
commit
46ebd55092
6 changed files with 216 additions and 4 deletions
|
@ -16,12 +16,15 @@ limitations under the License.
|
|||
|
||||
import {OLM_ALGORITHM} from "./e2ee/common.js";
|
||||
import {countBy, groupBy} from "../utils/groupBy";
|
||||
import {LRUCache} from "../../utils/LRUCache";
|
||||
|
||||
export class DeviceMessageHandler {
|
||||
constructor({storage}) {
|
||||
constructor({storage, callHandler}) {
|
||||
this._storage = storage;
|
||||
this._olmDecryption = null;
|
||||
this._megolmDecryption = null;
|
||||
this._callHandler = callHandler;
|
||||
this._senderDeviceCache = new LRUCache(10, di => di.curve25519Key);
|
||||
}
|
||||
|
||||
enableEncryption({olmDecryption, megolmDecryption}) {
|
||||
|
@ -49,6 +52,15 @@ export class DeviceMessageHandler {
|
|||
log.child("decrypt_error").catch(err);
|
||||
}
|
||||
const newRoomKeys = this._megolmDecryption.roomKeysFromDeviceMessages(olmDecryptChanges.results, log);
|
||||
const callMessages = olmDecryptChanges.results.filter(dr => this._callHandler.handlesDeviceMessageEventType(dr.event?.type));
|
||||
await Promise.all(callMessages.map(async dr => {
|
||||
dr.setDevice(await this._getDevice(dr.senderCurve25519Key, txn));
|
||||
this._callHandler.handleDeviceMessage(dr.device.userId, dr.device.deviceId, dr.event.type, dr.event.content, log);
|
||||
}));
|
||||
// TODO: somehow include rooms that received a call to_device message in the sync state?
|
||||
// or have updates flow through event emitter?
|
||||
// well, we don't really need to update the room other then when a call starts or stops
|
||||
// any changes within the call will be emitted on the call object?
|
||||
return new SyncPreparation(olmDecryptChanges, newRoomKeys);
|
||||
}
|
||||
}
|
||||
|
@ -60,6 +72,18 @@ export class DeviceMessageHandler {
|
|||
const didWriteValues = await Promise.all(prep.newRoomKeys.map(key => this._megolmDecryption.writeRoomKey(key, txn)));
|
||||
return didWriteValues.some(didWrite => !!didWrite);
|
||||
}
|
||||
|
||||
|
||||
async _getDevice(senderKey, txn) {
|
||||
let device = this._senderDeviceCache.get(senderKey);
|
||||
if (!device) {
|
||||
device = await txn.deviceIdentities.getByCurve25519Key(senderKey);
|
||||
if (device) {
|
||||
this._senderDeviceCache.set(device);
|
||||
}
|
||||
}
|
||||
return device;
|
||||
}
|
||||
}
|
||||
|
||||
class SyncPreparation {
|
||||
|
|
|
@ -73,7 +73,7 @@ export class Session {
|
|||
};
|
||||
this._roomsBeingCreated = new ObservableMap();
|
||||
this._user = new User(sessionInfo.userId);
|
||||
this._deviceMessageHandler = new DeviceMessageHandler({storage});
|
||||
this._deviceMessageHandler = new DeviceMessageHandler({storage, callHandler: this._callHandler});
|
||||
this._olm = olm;
|
||||
this._olmUtil = null;
|
||||
this._e2eeAccount = null;
|
||||
|
@ -100,6 +100,7 @@ export class Session {
|
|||
this._createRoomEncryption = this._createRoomEncryption.bind(this);
|
||||
this._forgetArchivedRoom = this._forgetArchivedRoom.bind(this);
|
||||
this.needsKeyBackup = new ObservableValue(false);
|
||||
this._callHandler = new CallHandler(this._platform, this._hsApi);
|
||||
}
|
||||
|
||||
get fingerprintKey() {
|
||||
|
@ -562,7 +563,8 @@ export class Session {
|
|||
pendingEvents,
|
||||
user: this._user,
|
||||
createRoomEncryption: this._createRoomEncryption,
|
||||
platform: this._platform
|
||||
platform: this._platform,
|
||||
callHandler: this._callHandler
|
||||
});
|
||||
}
|
||||
|
||||
|
|
156
src/matrix/calls/CallHandler.ts
Normal file
156
src/matrix/calls/CallHandler.ts
Normal file
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {ObservableMap} from "../../observable/map/ObservableMap";
|
||||
|
||||
import type {Room} from "../room/Room";
|
||||
import type {StateEvent} from "../storage/types";
|
||||
import type {ILogItem} from "../../logging/types";
|
||||
|
||||
const GROUP_CALL_TYPE = "m.call";
|
||||
const GROUP_CALL_MEMBER_TYPE = "m.call.member";
|
||||
|
||||
enum CallSetupMessageType {
|
||||
Invite = "m.call.invite",
|
||||
Answer = "m.call.answer",
|
||||
Candidates = "m.call.candidates",
|
||||
Hangup = "m.call.hangup",
|
||||
}
|
||||
|
||||
const CALL_ID = "m.call_id";
|
||||
const CALL_TERMINATED = "m.terminated";
|
||||
|
||||
export class CallHandler {
|
||||
public readonly groupCalls: ObservableMap<string, GroupCall> = new ObservableMap<string, GroupCall>();
|
||||
|
||||
constructor() {
|
||||
|
||||
}
|
||||
|
||||
handleRoomState(room: Room, events: StateEvent[], log: ILogItem) {
|
||||
// first update call events
|
||||
for (const event of events) {
|
||||
if (event.type === GROUP_CALL_TYPE) {
|
||||
const callId = event.state_key;
|
||||
let call = this.groupCalls.get(callId);
|
||||
if (call) {
|
||||
call.updateCallEvent(event);
|
||||
if (call.isTerminated) {
|
||||
this.groupCalls.remove(call.id);
|
||||
}
|
||||
} else {
|
||||
call = new GroupCall(event, room);
|
||||
this.groupCalls.set(call.id, call);
|
||||
}
|
||||
}
|
||||
}
|
||||
// then update participants
|
||||
for (const event of events) {
|
||||
if (event.type === GROUP_CALL_MEMBER_TYPE) {
|
||||
const participant = event.state_key;
|
||||
const sources = event.content["m.sources"];
|
||||
for (const source of sources) {
|
||||
const call = this.groupCalls.get(source[CALL_ID]);
|
||||
if (call && !call.isTerminated) {
|
||||
call.addParticipant(participant, source);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
handlesDeviceMessageEventType(eventType: string | undefined): boolean {
|
||||
return eventType === CallSetupMessageType.Invite ||
|
||||
eventType === CallSetupMessageType.Candidates ||
|
||||
eventType === CallSetupMessageType.Answer ||
|
||||
eventType === CallSetupMessageType.Hangup;
|
||||
}
|
||||
|
||||
handleDeviceMessage(senderUserId: string, senderDeviceId: string, eventType: string, content: Record<string, any>, log: ILogItem) {
|
||||
const callId = content[CALL_ID];
|
||||
const call = this.groupCalls.get(callId);
|
||||
call?.handleDeviceMessage(senderUserId, senderDeviceId, eventType, content, log);
|
||||
}
|
||||
}
|
||||
|
||||
function peerCallKey(senderUserId: string, senderDeviceId: string) {
|
||||
return JSON.stringify(senderUserId) + JSON.stringify(senderDeviceId);
|
||||
}
|
||||
|
||||
class GroupCall {
|
||||
private peerCalls: Map<string, PeerCall>
|
||||
|
||||
constructor(private callEvent: StateEvent, private readonly room: Room) {
|
||||
|
||||
}
|
||||
|
||||
updateCallEvent(callEvent: StateEvent) {
|
||||
this.callEvent = callEvent;
|
||||
}
|
||||
|
||||
addParticipant(userId, source) {
|
||||
|
||||
}
|
||||
|
||||
handleDeviceMessage(senderUserId: string, senderDeviceId: string, eventType: string, content: Record<string, any>, log: ILogItem) {
|
||||
const peerCall = this.peerCalls.get(peerCallKey(senderUserId, senderDeviceId));
|
||||
peerCall?.handleIncomingSignallingMessage()
|
||||
}
|
||||
|
||||
get id(): string {
|
||||
return this.callEvent.state_key;
|
||||
}
|
||||
|
||||
get isTerminated(): boolean {
|
||||
return !!this.callEvent.content[CALL_TERMINATED];
|
||||
}
|
||||
|
||||
private createPeerCall(userId: string, deviceId: string): PeerCall {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Does WebRTC signalling for a single PeerConnection, and deals with WebRTC wrappers from platform
|
||||
* */
|
||||
|
||||
|
||||
// when sending, we need to encrypt message with olm. I think the flow of room => roomEncryption => olmEncryption as we already
|
||||
// do for sharing keys will be best as that already deals with room tracking.
|
||||
type SendSignallingMessageCallback = (type: CallSetupMessageType, content: Record<string, any>) => Promise<void>;
|
||||
|
||||
class PeerCall {
|
||||
constructor(private readonly sendSignallingMessage: SendSignallingMessageCallback) {
|
||||
|
||||
}
|
||||
|
||||
handleIncomingSignallingMessage(type: CallSetupMessageType, content: Record<string, any>) {
|
||||
switch (type) {
|
||||
case CallSetupMessageType.Invite:
|
||||
case CallSetupMessageType.Answer:
|
||||
case CallSetupMessageType.Candidates:
|
||||
case CallSetupMessageType.Hangup:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MediaSource {
|
||||
|
||||
}
|
||||
|
||||
class PeerConnection {
|
||||
|
||||
}
|
|
@ -69,6 +69,14 @@ export class DecryptionResult {
|
|||
}
|
||||
}
|
||||
|
||||
get userId(): string | undefined {
|
||||
return this.device?.userId;
|
||||
}
|
||||
|
||||
get deviceId(): string | undefined {
|
||||
return this.device?.deviceId;
|
||||
}
|
||||
|
||||
get isVerificationUnknown(): boolean {
|
||||
// verification is unknown if we haven't yet fetched the devices for the room
|
||||
return !this.device && !this.roomTracked;
|
||||
|
|
|
@ -30,6 +30,7 @@ const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
|||
export class Room extends BaseRoom {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this._callHandler = options.callHandler;
|
||||
// TODO: pass pendingEvents to start like pendingOperations?
|
||||
const {pendingEvents} = options;
|
||||
const relationWriter = new RelationWriter({
|
||||
|
@ -92,6 +93,8 @@ export class Room extends BaseRoom {
|
|||
}
|
||||
}
|
||||
|
||||
this._updateCallHandler(roomResponse);
|
||||
|
||||
return {
|
||||
roomEncryption,
|
||||
summaryChanges,
|
||||
|
@ -442,6 +445,25 @@ export class Room extends BaseRoom {
|
|||
return this._sendQueue.pendingEvents;
|
||||
}
|
||||
|
||||
_updateCallHandler(roomResponse) {
|
||||
if (this._callHandler) {
|
||||
const stateEvents = roomResponse.state?.events;
|
||||
if (stateEvents) {
|
||||
for (const e of stateEvents) {
|
||||
this._callHandler.handleRoomState(this, e);
|
||||
}
|
||||
}
|
||||
let timelineEvents = roomResponse.timeline?.events;
|
||||
if (timelineEvents) {
|
||||
for (const e of timelineEvents) {
|
||||
if (typeof e.state_key === "string") {
|
||||
this._callHandler.handleRoomState(this, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** @package */
|
||||
writeIsTrackingMembers(value, txn) {
|
||||
return this._summary.writeIsTrackingMembers(value, txn);
|
||||
|
|
|
@ -71,7 +71,7 @@ export class BaseLRUCache<T> {
|
|||
export class LRUCache<T, K> extends BaseLRUCache<T> {
|
||||
private _keyFn: (T) => K;
|
||||
|
||||
constructor(limit, keyFn: (T) => K) {
|
||||
constructor(limit: number, keyFn: (T) => K) {
|
||||
super(limit);
|
||||
this._keyFn = keyFn;
|
||||
}
|
||||
|
|
Reference in a new issue