split sync lifecycle steps out in different methods to keep it smaller

This commit is contained in:
Bruno Windels 2021-04-20 17:57:17 +02:00
parent 09ac503e22
commit 4560e0e491

View file

@ -197,57 +197,17 @@ export class Sync {
try {
// take a lock on olm sessions used in this sync so sending a message doesn't change them while syncing
sessionState.lock = await log.wrap("obtainSyncLock", () => this._session.obtainSyncLock(response));
await log.wrap("prepare", log => this._prepareSessionAndRooms(sessionState, roomStates, response, log));
await log.wrap("prepare", log => this._prepareSync(sessionState, roomStates, response, log));
await log.wrap("afterPrepareSync", log => Promise.all(roomStates.map(rs => {
return rs.room.afterPrepareSync(rs.preparation, log);
})));
await log.wrap("write", async log => {
const syncTxn = await this._openSyncTxn();
try {
sessionState.changes = await log.wrap("session", log => this._session.writeSync(
response, syncFilterId, sessionState.preparation, syncTxn, log));
await Promise.all(inviteStates.map(async is => {
is.changes = await log.wrap("invite", log => is.invite.writeSync(
is.membership, is.roomResponse, syncTxn, log));
}));
await Promise.all(roomStates.map(async rs => {
rs.changes = await log.wrap("room", log => rs.room.writeSync(
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
}));
} catch(err) {
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
try {
syncTxn.abort();
} catch (abortErr) {
log.set("couldNotAbortTxn", true);
}
throw err;
}
await syncTxn.complete();
});
await log.wrap("write", async log => this._writeSync(
sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log));
} finally {
sessionState.dispose();
}
log.wrap("after", log => {
log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail);
// emit invite related events after txn has been closed
for(let is of inviteStates) {
log.wrap("invite", () => is.invite.afterSync(is.changes), log.level.Detail);
if (is.isNewInvite) {
this._session.addInviteAfterSync(is.invite);
}
}
// emit room related events after txn has been closed
for(let rs of roomStates) {
log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail);
if (rs.isNewRoom) {
this._session.addRoomAfterSync(rs.room);
}
}
});
// sync txn comitted, emit updates and apply changes to in-memory state
log.wrap("after", log => this._afterSync(sessionState, inviteStates, roomStates, log));
const toDeviceEvents = response.to_device?.events;
return {
@ -267,7 +227,7 @@ export class Sync {
]);
}
async _prepareSessionAndRooms(sessionState, roomStates, response, log) {
async _prepareSync(sessionState, roomStates, response, log) {
const prepareTxn = await this._openPrepareSyncTxn();
sessionState.preparation = await log.wrap("session", log => this._session.prepareSync(
response, sessionState.lock, prepareTxn, log));
@ -298,6 +258,51 @@ export class Sync {
await prepareTxn.complete();
}
async _writeSync(sessionState, inviteStates, roomStates, response, syncFilterId, isInitialSync, log) {
const syncTxn = await this._openSyncTxn();
try {
sessionState.changes = await log.wrap("session", log => this._session.writeSync(
response, syncFilterId, sessionState.preparation, syncTxn, log));
await Promise.all(inviteStates.map(async is => {
is.changes = await log.wrap("invite", log => is.invite.writeSync(
is.membership, is.roomResponse, syncTxn, log));
}));
await Promise.all(roomStates.map(async rs => {
rs.changes = await log.wrap("room", log => rs.room.writeSync(
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
}));
} catch(err) {
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
try {
syncTxn.abort();
} catch (abortErr) {
log.set("couldNotAbortTxn", true);
}
throw err;
}
await syncTxn.complete();
}
_afterSync(sessionState, inviteStates, roomStates, log) {
log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail);
// emit invite related events after txn has been closed
for(let is of inviteStates) {
log.wrap("invite", () => is.invite.afterSync(is.changes), log.level.Detail);
if (is.isNewInvite) {
this._session.addInviteAfterSync(is.invite);
}
}
// emit room related events after txn has been closed
for(let rs of roomStates) {
log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail);
if (rs.isNewRoom) {
this._session.addRoomAfterSync(rs.room);
}
}
}
_openSyncTxn() {
const storeNames = this._storage.storeNames;
return this._storage.readWriteTxn([