diff --git a/src/matrix/room/BaseRoom.js b/src/matrix/room/BaseRoom.js index 4a8f50b4..2d401a65 100644 --- a/src/matrix/room/BaseRoom.js +++ b/src/matrix/room/BaseRoom.js @@ -266,6 +266,53 @@ export class BaseRoom extends EventEmitter { } } + async _fetchEvents(callback, log) { + const txn = await this._storage.readWriteTxn([ + this._storage.storeNames.pendingEvents, + this._storage.storeNames.timelineEvents, + this._storage.storeNames.timelineRelations, + this._storage.storeNames.timelineFragments, + ]); + let extraGapFillChanges; + let gapResult; + try { + const relationWriter = new RelationWriter({ + roomId: this._roomId, + fragmentIdComparer: this._fragmentIdComparer, + ownUserId: this._user.id, + }); + const gapWriter = new GapWriter({ + roomId: this._roomId, + storage: this._storage, + fragmentIdComparer: this._fragmentIdComparer, + relationWriter + }); + const callbackResult = await callback(txn, gapWriter); + extraGapFillChanges = callbackResult.extraGapFillChanges; + gapResult = callbackResult.gapResult; + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + if (this._roomEncryption) { + const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log); + await decryptRequest.complete(); + } + // once txn is committed, update in-memory state & emit events + for (const fragment of gapResult.fragments) { + this._fragmentIdComparer.add(fragment); + } + if (extraGapFillChanges) { + this._applyGapFill(extraGapFillChanges); + } + if (this._timeline) { + // these should not be added if not already there + this._timeline.replaceEntries(gapResult.updatedEntries); + this._timeline.addEntries(gapResult.entries); + } + } + /** @public */ fillGap(fragmentEntry, amount, log = null) { // TODO move some/all of this out of BaseRoom @@ -287,51 +334,12 @@ export class BaseRoom extends EventEmitter { } }, {log}).response(); - const txn = await this._storage.readWriteTxn([ - this._storage.storeNames.pendingEvents, - this._storage.storeNames.timelineEvents, - this._storage.storeNames.timelineRelations, - this._storage.storeNames.timelineFragments, - ]); - let extraGapFillChanges; - let gapResult; - try { + await this._fetchEvents(async (txn, gapWriter) => { // detect remote echos of pending messages in the gap - extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log); - // write new events into gap - const relationWriter = new RelationWriter({ - roomId: this._roomId, - fragmentIdComparer: this._fragmentIdComparer, - ownUserId: this._user.id, - }); - const gapWriter = new GapWriter({ - roomId: this._roomId, - storage: this._storage, - fragmentIdComparer: this._fragmentIdComparer, - relationWriter - }); - gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log); - } catch (err) { - txn.abort(); - throw err; - } - await txn.complete(); - if (this._roomEncryption) { - const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log); - await decryptRequest.complete(); - } - // once txn is committed, update in-memory state & emit events - for (const fragment of gapResult.fragments) { - this._fragmentIdComparer.add(fragment); - } - if (extraGapFillChanges) { - this._applyGapFill(extraGapFillChanges); - } - if (this._timeline) { - // these should not be added if not already there - this._timeline.replaceEntries(gapResult.updatedEntries); - this._timeline.addEntries(gapResult.entries); - } + const extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log); + const gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log); + return { extraGapFillChanges, gapResult }; + }, log); }); }