From 4560e0e4916c136a3f314feeb7b3f1efe5f85b3c Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Tue, 20 Apr 2021 17:57:17 +0200 Subject: [PATCH] split sync lifecycle steps out in different methods to keep it smaller --- src/matrix/Sync.js | 97 ++++++++++++++++++++++++---------------------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 7e64d2f8..8b81d18e 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -197,57 +197,17 @@ export class Sync { try { // take a lock on olm sessions used in this sync so sending a message doesn't change them while syncing sessionState.lock = await log.wrap("obtainSyncLock", () => this._session.obtainSyncLock(response)); - await log.wrap("prepare", log => this._prepareSessionAndRooms(sessionState, roomStates, response, log)); + await log.wrap("prepare", log => this._prepareSync(sessionState, roomStates, response, log)); await log.wrap("afterPrepareSync", log => Promise.all(roomStates.map(rs => { return rs.room.afterPrepareSync(rs.preparation, log); }))); - await log.wrap("write", async log => { - const syncTxn = await this._openSyncTxn(); - try { - sessionState.changes = await log.wrap("session", log => this._session.writeSync( - response, syncFilterId, sessionState.preparation, syncTxn, log)); - await Promise.all(inviteStates.map(async is => { - is.changes = await log.wrap("invite", log => is.invite.writeSync( - is.membership, is.roomResponse, syncTxn, log)); - })); - await Promise.all(roomStates.map(async rs => { - rs.changes = await log.wrap("room", log => rs.room.writeSync( - rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log)); - })); - } catch(err) { - // avoid corrupting state by only - // storing the sync up till the point - // the exception occurred - try { - syncTxn.abort(); - } catch (abortErr) { - log.set("couldNotAbortTxn", true); - } - throw err; - } - await syncTxn.complete(); - }); + await log.wrap("write", async log => this._writeSync( + sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log)); } finally { sessionState.dispose(); } - - log.wrap("after", log => { - log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail); - // emit invite related events after txn has been closed - for(let is of inviteStates) { - log.wrap("invite", () => is.invite.afterSync(is.changes), log.level.Detail); - if (is.isNewInvite) { - this._session.addInviteAfterSync(is.invite); - } - } - // emit room related events after txn has been closed - for(let rs of roomStates) { - log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail); - if (rs.isNewRoom) { - this._session.addRoomAfterSync(rs.room); - } - } - }); + // sync txn comitted, emit updates and apply changes to in-memory state + log.wrap("after", log => this._afterSync(sessionState, inviteStates, roomStates, log)); const toDeviceEvents = response.to_device?.events; return { @@ -267,7 +227,7 @@ export class Sync { ]); } - async _prepareSessionAndRooms(sessionState, roomStates, response, log) { + async _prepareSync(sessionState, roomStates, response, log) { const prepareTxn = await this._openPrepareSyncTxn(); sessionState.preparation = await log.wrap("session", log => this._session.prepareSync( response, sessionState.lock, prepareTxn, log)); @@ -298,6 +258,51 @@ export class Sync { await prepareTxn.complete(); } + async _writeSync(sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log) { + const syncTxn = await this._openSyncTxn(); + try { + sessionState.changes = await log.wrap("session", log => this._session.writeSync( + response, syncFilterId, sessionState.preparation, syncTxn, log)); + await Promise.all(inviteStates.map(async is => { + is.changes = await log.wrap("invite", log => is.invite.writeSync( + is.membership, is.roomResponse, syncTxn, log)); + })); + await Promise.all(roomStates.map(async rs => { + rs.changes = await log.wrap("room", log => rs.room.writeSync( + rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log)); + })); + } catch(err) { + // avoid corrupting state by only + // storing the sync up till the point + // the exception occurred + try { + syncTxn.abort(); + } catch (abortErr) { + log.set("couldNotAbortTxn", true); + } + throw err; + } + await syncTxn.complete(); + } + + _afterSync(sessionState, inviteStates, roomStates, log) { + log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail); + // emit invite related events after txn has been closed + for(let is of inviteStates) { + log.wrap("invite", () => is.invite.afterSync(is.changes), log.level.Detail); + if (is.isNewInvite) { + this._session.addInviteAfterSync(is.invite); + } + } + // emit room related events after txn has been closed + for(let rs of roomStates) { + log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail); + if (rs.isNewRoom) { + this._session.addRoomAfterSync(rs.room); + } + } + } + _openSyncTxn() { const storeNames = this._storage.storeNames; return this._storage.readWriteTxn([