From 9f6822f362e3e0d8ca7eff124fd3434160464c35 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 11 Sep 2020 14:38:36 +0200 Subject: [PATCH 1/5] remove needsRoomKey flag on member --- src/matrix/room/Room.js | 2 +- src/matrix/room/members/RoomMember.js | 8 --- .../room/timeline/persistence/SyncWriter.js | 58 +++++++------------ .../storage/idb/stores/RoomMemberStore.js | 15 ----- 4 files changed, 22 insertions(+), 61 deletions(-) diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 8d7754c1..3d65e49a 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -158,7 +158,7 @@ export class Room extends EventEmitter { decryption = await decryptChanges.write(txn); } const {entries, newLiveKey, memberChanges} = - await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn); + await this._syncWriter.writeSync(roomResponse, txn); if (decryption) { decryption.applyToEntries(entries); } diff --git a/src/matrix/room/members/RoomMember.js b/src/matrix/room/members/RoomMember.js index 084e6168..fe55b5aa 100644 --- a/src/matrix/room/members/RoomMember.js +++ b/src/matrix/room/members/RoomMember.js @@ -67,14 +67,6 @@ export class RoomMember { }); } - get needsRoomKey() { - return this._data.needsRoomKey; - } - - set needsRoomKey(value) { - this._data.needsRoomKey = !!value; - } - get membership() { return this._data.membership; } diff --git a/src/matrix/room/timeline/persistence/SyncWriter.js b/src/matrix/room/timeline/persistence/SyncWriter.js index 9f42163d..7f2275b1 100644 --- a/src/matrix/room/timeline/persistence/SyncWriter.js +++ b/src/matrix/room/timeline/persistence/SyncWriter.js @@ -98,48 +98,40 @@ export class SyncWriter { return {oldFragment, newFragment}; } - async _writeMember(event, trackNewlyJoined, txn) { + _writeMember(event, txn) { const userId = event.state_key; if (userId) { const memberChange = new MemberChange(this._roomId, event); const {member} = memberChange; if (member) { - if (trackNewlyJoined) { - const existingMemberData = await txn.roomMembers.get(this._roomId, userId); - // mark new members so we know who needs our the room key for our outbound megolm session - member.needsRoomKey = existingMemberData?.needsRoomKey || memberChange.hasJoined; - } txn.roomMembers.set(member.serialize()); return memberChange; } } } - async _writeStateEvent(event, trackNewlyJoined, txn) { + _writeStateEvent(event, txn) { if (event.type === MEMBER_EVENT_TYPE) { - return await this._writeMember(event, trackNewlyJoined, txn); + return this._writeMember(event, txn); } else { txn.roomState.set(this._roomId, event); } } - async _writeStateEvents(roomResponse, trackNewlyJoined, txn) { - const memberChanges = new Map(); + _writeStateEvents(roomResponse, memberChanges, txn) { // persist state const {state} = roomResponse; if (Array.isArray(state?.events)) { - await Promise.all(state.events.map(async event => { - const memberChange = await this._writeStateEvent(event, trackNewlyJoined, txn); + for (const event of state.events) { + const memberChange = this._writeStateEvent(event, txn); if (memberChange) { memberChanges.set(memberChange.userId, memberChange); } - })); + } } - return memberChanges; } - async _writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn) { - const memberChanges = new Map(); + async _writeTimeline(entries, timeline, currentKey, memberChanges, txn) { if (Array.isArray(timeline.events)) { const events = deduplicateEvents(timeline.events); for(const event of events) { @@ -153,19 +145,17 @@ export class SyncWriter { } txn.timelineEvents.insert(entry); entries.push(new EventEntry(entry, this._fragmentIdComparer)); - } - // process live state events first, so new member info is available - // also run async state event writing in parallel - await Promise.all(events.filter(event => { - return typeof event.state_key === "string"; - }).map(async stateEvent => { - const memberChange = await this._writeStateEvent(stateEvent, trackNewlyJoined, txn); - if (memberChange) { - memberChanges.set(memberChange.userId, memberChange); + + // process live state events first, so new member info is available + if (typeof event.state_key === "string") { + const memberChange = this._writeStateEvent(event, txn); + if (memberChange) { + memberChanges.set(memberChange.userId, memberChange); + } } - })); + } } - return {currentKey, memberChanges}; + return currentKey; } async _findMemberData(userId, events, txn) { @@ -193,11 +183,10 @@ export class SyncWriter { * @property {Map} memberChanges member changes in the processed sync ny user id * * @param {Object} roomResponse [description] - * @param {Boolean} trackNewlyJoined needed to know if we need to keep track whether a user needs keys when they join an encrypted room * @param {Transaction} txn * @return {SyncWriterResult} */ - async writeSync(roomResponse, trackNewlyJoined, txn) { + async writeSync(roomResponse, txn) { const entries = []; const {timeline} = roomResponse; let currentKey = this._lastLiveKey; @@ -217,16 +206,11 @@ export class SyncWriter { entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer)); entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer)); } + const memberChanges = new Map(); // important this happens before _writeTimeline so // members are available in the transaction - const memberChanges = await this._writeStateEvents(roomResponse, trackNewlyJoined, txn); - // TODO: remove trackNewlyJoined and pass in memberChanges - const timelineResult = await this._writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn); - currentKey = timelineResult.currentKey; - // merge member changes from state and timeline, giving precedence to the latter - for (const [userId, memberChange] of timelineResult.memberChanges.entries()) { - memberChanges.set(userId, memberChange); - } + this._writeStateEvents(roomResponse, memberChanges, txn); + currentKey = await this._writeTimeline(entries, timeline, currentKey, memberChanges, txn); return {entries, newLiveKey: currentKey, memberChanges}; } diff --git a/src/matrix/storage/idb/stores/RoomMemberStore.js b/src/matrix/storage/idb/stores/RoomMemberStore.js index 6d354dd9..be2b16ec 100644 --- a/src/matrix/storage/idb/stores/RoomMemberStore.js +++ b/src/matrix/storage/idb/stores/RoomMemberStore.js @@ -60,19 +60,4 @@ export class RoomMemberStore { }); return userIds; } - - async getUserIdsNeedingRoomKey(roomId) { - const userIds = []; - const range = IDBKeyRange.lowerBound(encodeKey(roomId, "")); - await this._roomMembersStore.iterateWhile(range, member => { - if (member.roomId !== roomId) { - return false; - } - if (member.needsRoomKey) { - userIds.push(member.userId); - } - return true; - }); - return userIds; - } } From b00865510faed1fd6c2e30142b01dcac8aa9de4b Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 11 Sep 2020 14:40:05 +0200 Subject: [PATCH 2/5] add operation store --- src/matrix/storage/common.js | 1 + src/matrix/storage/idb/Transaction.js | 5 ++ src/matrix/storage/idb/schema.js | 2 + .../storage/idb/stores/OperationStore.js | 55 +++++++++++++++++++ 4 files changed, 63 insertions(+) create mode 100644 src/matrix/storage/idb/stores/OperationStore.js diff --git a/src/matrix/storage/common.js b/src/matrix/storage/common.js index 4a4060b2..f74dafdc 100644 --- a/src/matrix/storage/common.js +++ b/src/matrix/storage/common.js @@ -28,6 +28,7 @@ export const STORE_NAMES = Object.freeze([ "inboundGroupSessions", "outboundGroupSessions", "groupSessionDecryptions", + "operations" ]); export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => { diff --git a/src/matrix/storage/idb/Transaction.js b/src/matrix/storage/idb/Transaction.js index e0982c54..af6d49ca 100644 --- a/src/matrix/storage/idb/Transaction.js +++ b/src/matrix/storage/idb/Transaction.js @@ -30,6 +30,7 @@ import {OlmSessionStore} from "./stores/OlmSessionStore.js"; import {InboundGroupSessionStore} from "./stores/InboundGroupSessionStore.js"; import {OutboundGroupSessionStore} from "./stores/OutboundGroupSessionStore.js"; import {GroupSessionDecryptionStore} from "./stores/GroupSessionDecryptionStore.js"; +import {OperationStore} from "./stores/OperationStore.js"; export class Transaction { constructor(txn, allowedStoreNames) { @@ -111,6 +112,10 @@ export class Transaction { return this._store("groupSessionDecryptions", idbStore => new GroupSessionDecryptionStore(idbStore)); } + get operations() { + return this._store("operations", idbStore => new OperationStore(idbStore)); + } + complete() { return txnAsPromise(this._txn); } diff --git a/src/matrix/storage/idb/schema.js b/src/matrix/storage/idb/schema.js index 1ed9cadb..809f6729 100644 --- a/src/matrix/storage/idb/schema.js +++ b/src/matrix/storage/idb/schema.js @@ -74,4 +74,6 @@ function createE2EEStores(db) { db.createObjectStore("inboundGroupSessions", {keyPath: "key"}); db.createObjectStore("outboundGroupSessions", {keyPath: "roomId"}); db.createObjectStore("groupSessionDecryptions", {keyPath: "key"}); + const operations = db.createObjectStore("operations", {keyPath: "id"}); + operations.createIndex("byTypeAndScope", "typeScopeKey", {unique: false}); } diff --git a/src/matrix/storage/idb/stores/OperationStore.js b/src/matrix/storage/idb/stores/OperationStore.js new file mode 100644 index 00000000..598f80e7 --- /dev/null +++ b/src/matrix/storage/idb/stores/OperationStore.js @@ -0,0 +1,55 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +function encodeTypeScopeKey(type, scope) { + return `${type}|${scope}`; +} + +export class OperationStore { + constructor(store) { + this._store = store; + } + + getAll() { + return this._store.selectAll(); + } + + async getAllByTypeAndScope(type, scope) { + const key = encodeTypeScopeKey(type, scope); + const results = []; + await this._store.index("byTypeAndScope").iterateWhile(key, value => { + if (value.typeScopeKey !== key) { + return false; + } + results.push(value); + return true; + }); + return results; + } + + add(operation) { + operation.typeScopeKey = encodeTypeScopeKey(operation.type, operation.scope); + this._store.add(operation); + } + + update(operation) { + this._store.set(operation); + } + + remove(id) { + this._store.delete(id); + } +} From ab1fe711ad3ac83b57487560e5fe560d272b2d6d Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 11 Sep 2020 14:41:12 +0200 Subject: [PATCH 3/5] implement room key sharing with operations store --- src/matrix/Sync.js | 4 +- src/matrix/e2ee/RoomEncryption.js | 140 +++++++++++++++++------------- src/matrix/room/Room.js | 7 +- 3 files changed, 86 insertions(+), 65 deletions(-) diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index c81acee0..ca76e57a 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -226,7 +226,9 @@ export class Sync { storeNames.groupSessionDecryptions, storeNames.deviceIdentities, // to discard outbound session when somebody leaves a room - storeNames.outboundGroupSessions + // and to create room key messages when somebody leaves + storeNames.outboundGroupSessions, + storeNames.operations ]); } diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index 44229b97..b5b56ec2 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -47,13 +47,14 @@ export class RoomEncryption { } async writeMemberChanges(memberChanges, txn) { - for (const m of memberChanges.values()) { - if (m.hasLeft) { - this._megolmEncryption.discardOutboundSession(this._room.id, txn); - break; - } + const memberChangesArray = Array.from(memberChanges.values()); + if (memberChangesArray.some(m => m.hasLeft)) { + this._megolmEncryption.discardOutboundSession(this._room.id, txn); } - return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn); + if (memberChangesArray.some(m => m.hasJoined)) { + await this._addShareRoomKeyOperationForNewMembers(memberChangesArray, txn); + } + await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn); } // this happens before entries exists, as they are created by the syncwriter @@ -146,16 +147,10 @@ export class RoomEncryption { } async encrypt(type, content, hsApi) { + await this._deviceTracker.trackRoom(this._room); const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams); - // share the new megolm session if needed if (megolmResult.roomKeyMessage) { - await this._deviceTracker.trackRoom(this._room); - const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi); - await this._sendRoomKey(megolmResult.roomKeyMessage, devices, hsApi); - // if we happen to rotate the session before we have sent newly joined members the room key - // then mark those members as not needing the key anymore - const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set())); - await this._clearNeedsRoomKeyFlag(userIds); + this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi); } return { type: ENCRYPTED_TYPE, @@ -165,64 +160,87 @@ export class RoomEncryption { needsToShareKeys(memberChanges) { for (const m of memberChanges.values()) { - if (m.member.needsRoomKey) { + if (m.hasJoined) { return true; } } return false; } - async shareRoomKeyToPendingMembers(hsApi) { - // sucks to call this for all encrypted rooms on startup? - const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]); - const pendingUserIds = await txn.roomMembers.getUserIdsNeedingRoomKey(this._room.id); - return await this._shareRoomKey(pendingUserIds, hsApi); - } + async _shareNewRoomKey(roomKeyMessage, hsApi) { + const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi); + const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set())); - async shareRoomKeyForMemberChanges(memberChanges, hsApi) { - const pendingUserIds = []; - for (const m of memberChanges.values()) { - if (m.member.needsRoomKey) { - pendingUserIds.push(m.userId); - } - } - return await this._shareRoomKey(pendingUserIds, hsApi); - } - - async _shareRoomKey(userIds, hsApi) { - if (userIds.length === 0) { - return; - } - const readRoomKeyTxn = await this._storage.readTxn([this._storage.storeNames.outboundGroupSessions]); - const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage(this._room.id, readRoomKeyTxn); - // no room key if we haven't created a session yet - // (or we removed it and will create a new one on the next send) - if (roomKeyMessage) { - const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, userIds, hsApi); - await this._sendRoomKey(roomKeyMessage, devices, hsApi); - const actuallySentUserIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set())); - await this._clearNeedsRoomKeyFlag(actuallySentUserIds); - } else { - // we don't have a session yet, clear them all - await this._clearNeedsRoomKeyFlag(userIds); - } - } - - async _clearNeedsRoomKeyFlag(userIds) { - const txn = await this._storage.readWriteTxn([this._storage.storeNames.roomMembers]); + // store operation for room key share, in case we don't finish here + const writeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]); + let operationId; try { - await Promise.all(userIds.map(async userId => { - const memberData = await txn.roomMembers.get(this._room.id, userId); - if (memberData.needsRoomKey) { - memberData.needsRoomKey = false; - txn.roomMembers.set(memberData); - } - })); + operationId = this._writeRoomKeyShareOperation(roomKeyMessage, userIds, writeOpTxn); } catch (err) { - txn.abort(); + writeOpTxn.abort(); throw err; } - await txn.complete(); + await writeOpTxn.complete(); + // TODO: at this point we have the room key stored, and the rest is sort of optional + // it would be nice if we could signal SendQueue that any error from here on is non-fatal and + // return the encrypted payload. + + // send the room key + await this._sendRoomKey(roomKeyMessage, devices, hsApi); + + // remove the operation + const removeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]); + try { + removeOpTxn.operations.remove(operationId); + } catch (err) { + removeOpTxn.abort(); + throw err; + } + await removeOpTxn.complete(); + } + + async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn) { + const userIds = memberChangesArray.filter(m => m.hasJoined).map(m => m.userId); + const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage( + this._room.id, txn); + if (roomKeyMessage) { + this._writeRoomKeyShareOperation(roomKeyMessage, userIds, txn); + } + } + + _writeRoomKeyShareOperation(roomKeyMessage, userIds, txn) { + const id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString(); + txn.operations.add({ + id, + type: "share_room_key", + scope: this._room.id, + userIds, + roomKeyMessage, + }); + return id; + } + + async flushPendingRoomKeyShares(hsApi, operations = null) { + if (!operations) { + const txn = await this._storage.readTxn([this._storage.storeNames.operations]); + operations = await txn.operations.getAllByTypeAndScope("share_room_key", this._room.id); + } + for (const operation of operations) { + // just to be sure + if (operation.type !== "share_room_key") { + continue; + } + const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi); + await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi); + const removeTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]); + try { + removeTxn.operations.remove(operation.id); + } catch (err) { + removeTxn.abort(); + throw err; + } + await removeTxn.complete(); + } } async _sendRoomKey(roomKeyMessage, devices, hsApi) { diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 3d65e49a..dea9545d 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -243,7 +243,8 @@ export class Room extends EventEmitter { } needsAfterSyncCompleted({memberChanges}) { - return this._roomEncryption?.needsToShareKeys(memberChanges); + const result = this._roomEncryption?.needsToShareKeys(memberChanges); + return result; } /** @@ -251,9 +252,9 @@ export class Room extends EventEmitter { * Can be used to do longer running operations that resulted from the last sync, * like network operations. */ - async afterSyncCompleted({memberChanges}) { + async afterSyncCompleted() { if (this._roomEncryption) { - await this._roomEncryption.shareRoomKeyForMemberChanges(memberChanges, this._hsApi); + await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi); } } From 96119b4e583da025a780e78984798429c793e0f6 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 11 Sep 2020 14:41:40 +0200 Subject: [PATCH 4/5] load all pending operations when starting the session, pass to room --- src/matrix/Session.js | 14 +++++++++++++- src/matrix/room/Room.js | 11 +++++++---- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/matrix/Session.js b/src/matrix/Session.js index 7d821b14..f8ac2f40 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -28,6 +28,7 @@ import {MEGOLM_ALGORITHM} from "./e2ee/common.js"; import {RoomEncryption} from "./e2ee/RoomEncryption.js"; import {DeviceTracker} from "./e2ee/DeviceTracker.js"; import {LockMap} from "../utils/LockMap.js"; +import {groupBy} from "../utils/groupBy.js"; const PICKLE_KEY = "DEFAULT_KEY"; @@ -212,9 +213,20 @@ export class Session { await txn.complete(); } + const opsTxn = await this._storage.readWriteTxn([ + this._storage.storeNames.operations + ]); + const operations = await opsTxn.operations.getAll(); + const operationsByScope = groupBy(operations, o => o.scope); + this._sendScheduler.start(); for (const [, room] of this._rooms) { - room.start(); + let roomOperationsByType; + const roomOperations = operationsByScope.get(room.id); + if (roomOperations) { + roomOperationsByType = groupBy(roomOperations, r => r.type); + } + room.start(roomOperationsByType); } } diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index dea9545d..f4b69229 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -259,14 +259,17 @@ export class Room extends EventEmitter { } /** @package */ - async start() { + async start(pendingOperations) { if (this._roomEncryption) { try { - // if we got interrupted last time sending keys to newly joined members - await this._roomEncryption.shareRoomKeyToPendingMembers(this._hsApi); + const roomKeyShares = pendingOperations?.get("share_room_key"); + if (roomKeyShares) { + // if we got interrupted last time sending keys to newly joined members + await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, roomKeyShares); + } } catch (err) { // we should not throw here - console.error(`could not send out pending room keys for room ${this.id}`, err.stack); + console.error(`could not send out (all) pending room keys for room ${this.id}`, err.stack); } } this._sendQueue.resumeSending(); From e763771cc2ba39df66faf8baa0638e298edab27e Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 11 Sep 2020 14:45:38 +0200 Subject: [PATCH 5/5] cleanup --- src/matrix/room/Room.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index f4b69229..1ea18b4e 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -243,8 +243,7 @@ export class Room extends EventEmitter { } needsAfterSyncCompleted({memberChanges}) { - const result = this._roomEncryption?.needsToShareKeys(memberChanges); - return result; + return this._roomEncryption?.needsToShareKeys(memberChanges); } /**