From 4a0173e90f7d30b087daa70720c846a42ba0b0ae Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 24 Sep 2020 10:52:56 +0200 Subject: [PATCH] only run decryptPending if needed --- src/matrix/DeviceMessageHandler.js | 8 +++++++- src/matrix/Session.js | 22 ++++++++++++++++------ src/matrix/Sync.js | 14 +++++++++++--- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/matrix/DeviceMessageHandler.js b/src/matrix/DeviceMessageHandler.js index 8a50c66c..854c901b 100644 --- a/src/matrix/DeviceMessageHandler.js +++ b/src/matrix/DeviceMessageHandler.js @@ -15,7 +15,6 @@ limitations under the License. */ import {OLM_ALGORITHM, MEGOLM_ALGORITHM} from "./e2ee/common.js"; -import {groupBy} from "../utils/groupBy.js"; // key to store in session store const PENDING_ENCRYPTED_EVENTS = "pendingEncryptedDeviceEvents"; @@ -32,13 +31,20 @@ export class DeviceMessageHandler { this._megolmDecryption = megolmDecryption; } + /** + * @return {bool} whether messages are waiting to be decrypted and `decryptPending` should be called. + */ async writeSync(toDeviceEvents, txn) { const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted"); + if (!encryptedEvents.length) { + return false; + } // store encryptedEvents let pendingEvents = await this._getPendingEvents(txn); pendingEvents = pendingEvents.concat(encryptedEvents); txn.session.set(PENDING_ENCRYPTED_EVENTS, pendingEvents); // we don't handle anything other for now + return true; } /** diff --git a/src/matrix/Session.js b/src/matrix/Session.js index 2e470b58..ef535fc7 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -335,7 +335,11 @@ export class Session { } async writeSync(syncResponse, syncFilterId, txn) { - const changes = {}; + const changes = { + syncInfo: null, + e2eeAccountChanges: null, + deviceMessageDecryptionPending: false + }; const syncToken = syncResponse.next_batch; const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count; @@ -357,7 +361,8 @@ export class Session { const toDeviceEvents = syncResponse.to_device?.events; if (Array.isArray(toDeviceEvents)) { - this._deviceMessageHandler.writeSync(toDeviceEvents, txn); + changes.deviceMessageDecryptionPending = + await this._deviceMessageHandler.writeSync(toDeviceEvents, txn); } // store account data @@ -382,8 +387,11 @@ export class Session { } } - async afterSyncCompleted(isCatchupSync) { - const promises = [this._deviceMessageHandler.decryptPending(this.rooms)]; + async afterSyncCompleted(changes, isCatchupSync) { + const promises = []; + if (changes.deviceMessageDecryptionPending) { + promises.push(this._deviceMessageHandler.decryptPending(this.rooms)); + } // we don't start uploading one-time keys until we've caught up with // to-device messages, to help us avoid throwing away one-time-keys that we // are about to receive messages for @@ -394,8 +402,10 @@ export class Session { promises.push(this._e2eeAccount.uploadKeys(this._storage)); } } - // run key upload and decryption in parallel - await Promise.all(promises); + if (promises.length) { + // run key upload and decryption in parallel + await Promise.all(promises); + } } get syncToken() { diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index f78e437c..02d4f8b2 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -93,6 +93,7 @@ export class Sync { // if syncToken is falsy, it will first do an initial sync ... while(this._status.get() !== SyncStatus.Stopped) { let roomStates; + let sessionChanges; try { console.log(`starting sync request with since ${syncToken} ...`); // unless we are happily syncing already, we want the server to return @@ -110,6 +111,7 @@ export class Sync { const syncResult = await this._syncRequest(syncToken, timeout); syncToken = syncResult.syncToken; roomStates = syncResult.roomStates; + sessionChanges = syncResult.sessionChanges; // initial sync or catchup sync if (this._status.get() !== SyncStatus.Syncing && syncResult.hadToDeviceMessages) { this._status.set(SyncStatus.CatchupSync); @@ -125,16 +127,21 @@ export class Sync { } } if (this._status.get() !== SyncStatus.Stopped) { - await this._runAfterSyncCompleted(roomStates); + // TODO: if we're not going to run this phase in parallel with the next + // sync request (because this causes OTKs to be uploaded twice) + // should we move this inside _syncRequest? + // Alternatively, we can try to fix the OTK upload issue while still + // running in parallel. + await this._runAfterSyncCompleted(sessionChanges, roomStates); } } } - async _runAfterSyncCompleted(roomStates) { + async _runAfterSyncCompleted(sessionChanges, roomStates) { const isCatchupSync = this._status.get() === SyncStatus.CatchupSync; const sessionPromise = (async () => { try { - await this._session.afterSyncCompleted(isCatchupSync); + await this._session.afterSyncCompleted(sessionChanges, isCatchupSync); } catch (err) { console.error("error during session afterSyncCompleted, continuing", err.stack); } @@ -204,6 +211,7 @@ export class Sync { return { syncToken, roomStates, + sessionChanges, hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0, }; }