only run decryptPending if needed
This commit is contained in:
parent
82cff84f92
commit
4a0173e90f
3 changed files with 34 additions and 10 deletions
|
@ -15,7 +15,6 @@ limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import {OLM_ALGORITHM, MEGOLM_ALGORITHM} from "./e2ee/common.js";
|
import {OLM_ALGORITHM, MEGOLM_ALGORITHM} from "./e2ee/common.js";
|
||||||
import {groupBy} from "../utils/groupBy.js";
|
|
||||||
|
|
||||||
// key to store in session store
|
// key to store in session store
|
||||||
const PENDING_ENCRYPTED_EVENTS = "pendingEncryptedDeviceEvents";
|
const PENDING_ENCRYPTED_EVENTS = "pendingEncryptedDeviceEvents";
|
||||||
|
@ -32,13 +31,20 @@ export class DeviceMessageHandler {
|
||||||
this._megolmDecryption = megolmDecryption;
|
this._megolmDecryption = megolmDecryption;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return {bool} whether messages are waiting to be decrypted and `decryptPending` should be called.
|
||||||
|
*/
|
||||||
async writeSync(toDeviceEvents, txn) {
|
async writeSync(toDeviceEvents, txn) {
|
||||||
const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted");
|
const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted");
|
||||||
|
if (!encryptedEvents.length) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
// store encryptedEvents
|
// store encryptedEvents
|
||||||
let pendingEvents = await this._getPendingEvents(txn);
|
let pendingEvents = await this._getPendingEvents(txn);
|
||||||
pendingEvents = pendingEvents.concat(encryptedEvents);
|
pendingEvents = pendingEvents.concat(encryptedEvents);
|
||||||
txn.session.set(PENDING_ENCRYPTED_EVENTS, pendingEvents);
|
txn.session.set(PENDING_ENCRYPTED_EVENTS, pendingEvents);
|
||||||
// we don't handle anything other for now
|
// we don't handle anything other for now
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -335,7 +335,11 @@ export class Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
async writeSync(syncResponse, syncFilterId, txn) {
|
async writeSync(syncResponse, syncFilterId, txn) {
|
||||||
const changes = {};
|
const changes = {
|
||||||
|
syncInfo: null,
|
||||||
|
e2eeAccountChanges: null,
|
||||||
|
deviceMessageDecryptionPending: false
|
||||||
|
};
|
||||||
const syncToken = syncResponse.next_batch;
|
const syncToken = syncResponse.next_batch;
|
||||||
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
|
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
|
||||||
|
|
||||||
|
@ -357,7 +361,8 @@ export class Session {
|
||||||
|
|
||||||
const toDeviceEvents = syncResponse.to_device?.events;
|
const toDeviceEvents = syncResponse.to_device?.events;
|
||||||
if (Array.isArray(toDeviceEvents)) {
|
if (Array.isArray(toDeviceEvents)) {
|
||||||
this._deviceMessageHandler.writeSync(toDeviceEvents, txn);
|
changes.deviceMessageDecryptionPending =
|
||||||
|
await this._deviceMessageHandler.writeSync(toDeviceEvents, txn);
|
||||||
}
|
}
|
||||||
|
|
||||||
// store account data
|
// store account data
|
||||||
|
@ -382,8 +387,11 @@ export class Session {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async afterSyncCompleted(isCatchupSync) {
|
async afterSyncCompleted(changes, isCatchupSync) {
|
||||||
const promises = [this._deviceMessageHandler.decryptPending(this.rooms)];
|
const promises = [];
|
||||||
|
if (changes.deviceMessageDecryptionPending) {
|
||||||
|
promises.push(this._deviceMessageHandler.decryptPending(this.rooms));
|
||||||
|
}
|
||||||
// we don't start uploading one-time keys until we've caught up with
|
// we don't start uploading one-time keys until we've caught up with
|
||||||
// to-device messages, to help us avoid throwing away one-time-keys that we
|
// to-device messages, to help us avoid throwing away one-time-keys that we
|
||||||
// are about to receive messages for
|
// are about to receive messages for
|
||||||
|
@ -394,9 +402,11 @@ export class Session {
|
||||||
promises.push(this._e2eeAccount.uploadKeys(this._storage));
|
promises.push(this._e2eeAccount.uploadKeys(this._storage));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (promises.length) {
|
||||||
// run key upload and decryption in parallel
|
// run key upload and decryption in parallel
|
||||||
await Promise.all(promises);
|
await Promise.all(promises);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
get syncToken() {
|
get syncToken() {
|
||||||
return this._syncInfo?.token;
|
return this._syncInfo?.token;
|
||||||
|
|
|
@ -93,6 +93,7 @@ export class Sync {
|
||||||
// if syncToken is falsy, it will first do an initial sync ...
|
// if syncToken is falsy, it will first do an initial sync ...
|
||||||
while(this._status.get() !== SyncStatus.Stopped) {
|
while(this._status.get() !== SyncStatus.Stopped) {
|
||||||
let roomStates;
|
let roomStates;
|
||||||
|
let sessionChanges;
|
||||||
try {
|
try {
|
||||||
console.log(`starting sync request with since ${syncToken} ...`);
|
console.log(`starting sync request with since ${syncToken} ...`);
|
||||||
// unless we are happily syncing already, we want the server to return
|
// unless we are happily syncing already, we want the server to return
|
||||||
|
@ -110,6 +111,7 @@ export class Sync {
|
||||||
const syncResult = await this._syncRequest(syncToken, timeout);
|
const syncResult = await this._syncRequest(syncToken, timeout);
|
||||||
syncToken = syncResult.syncToken;
|
syncToken = syncResult.syncToken;
|
||||||
roomStates = syncResult.roomStates;
|
roomStates = syncResult.roomStates;
|
||||||
|
sessionChanges = syncResult.sessionChanges;
|
||||||
// initial sync or catchup sync
|
// initial sync or catchup sync
|
||||||
if (this._status.get() !== SyncStatus.Syncing && syncResult.hadToDeviceMessages) {
|
if (this._status.get() !== SyncStatus.Syncing && syncResult.hadToDeviceMessages) {
|
||||||
this._status.set(SyncStatus.CatchupSync);
|
this._status.set(SyncStatus.CatchupSync);
|
||||||
|
@ -125,16 +127,21 @@ export class Sync {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (this._status.get() !== SyncStatus.Stopped) {
|
if (this._status.get() !== SyncStatus.Stopped) {
|
||||||
await this._runAfterSyncCompleted(roomStates);
|
// TODO: if we're not going to run this phase in parallel with the next
|
||||||
|
// sync request (because this causes OTKs to be uploaded twice)
|
||||||
|
// should we move this inside _syncRequest?
|
||||||
|
// Alternatively, we can try to fix the OTK upload issue while still
|
||||||
|
// running in parallel.
|
||||||
|
await this._runAfterSyncCompleted(sessionChanges, roomStates);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _runAfterSyncCompleted(roomStates) {
|
async _runAfterSyncCompleted(sessionChanges, roomStates) {
|
||||||
const isCatchupSync = this._status.get() === SyncStatus.CatchupSync;
|
const isCatchupSync = this._status.get() === SyncStatus.CatchupSync;
|
||||||
const sessionPromise = (async () => {
|
const sessionPromise = (async () => {
|
||||||
try {
|
try {
|
||||||
await this._session.afterSyncCompleted(isCatchupSync);
|
await this._session.afterSyncCompleted(sessionChanges, isCatchupSync);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("error during session afterSyncCompleted, continuing", err.stack);
|
console.error("error during session afterSyncCompleted, continuing", err.stack);
|
||||||
}
|
}
|
||||||
|
@ -204,6 +211,7 @@ export class Sync {
|
||||||
return {
|
return {
|
||||||
syncToken,
|
syncToken,
|
||||||
roomStates,
|
roomStates,
|
||||||
|
sessionChanges,
|
||||||
hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0,
|
hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue