Extract some gap filling functionality into a helper method

This commit is contained in:
Danila Fedorin 2021-09-07 13:17:44 -07:00
parent ae6e211150
commit 299abe3e7e

View file

@ -266,6 +266,53 @@ export class BaseRoom extends EventEmitter {
} }
} }
async _fetchEvents(callback, log) {
const txn = await this._storage.readWriteTxn([
this._storage.storeNames.pendingEvents,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineRelations,
this._storage.storeNames.timelineFragments,
]);
let extraGapFillChanges;
let gapResult;
try {
const relationWriter = new RelationWriter({
roomId: this._roomId,
fragmentIdComparer: this._fragmentIdComparer,
ownUserId: this._user.id,
});
const gapWriter = new GapWriter({
roomId: this._roomId,
storage: this._storage,
fragmentIdComparer: this._fragmentIdComparer,
relationWriter
});
const callbackResult = await callback(txn, gapWriter);
extraGapFillChanges = callbackResult.extraGapFillChanges;
gapResult = callbackResult.gapResult;
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
if (this._roomEncryption) {
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log);
await decryptRequest.complete();
}
// once txn is committed, update in-memory state & emit events
for (const fragment of gapResult.fragments) {
this._fragmentIdComparer.add(fragment);
}
if (extraGapFillChanges) {
this._applyGapFill(extraGapFillChanges);
}
if (this._timeline) {
// these should not be added if not already there
this._timeline.replaceEntries(gapResult.updatedEntries);
this._timeline.addEntries(gapResult.entries);
}
}
/** @public */ /** @public */
fillGap(fragmentEntry, amount, log = null) { fillGap(fragmentEntry, amount, log = null) {
// TODO move some/all of this out of BaseRoom // TODO move some/all of this out of BaseRoom
@ -287,51 +334,12 @@ export class BaseRoom extends EventEmitter {
} }
}, {log}).response(); }, {log}).response();
const txn = await this._storage.readWriteTxn([ await this._fetchEvents(async (txn, gapWriter) => {
this._storage.storeNames.pendingEvents,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineRelations,
this._storage.storeNames.timelineFragments,
]);
let extraGapFillChanges;
let gapResult;
try {
// detect remote echos of pending messages in the gap // detect remote echos of pending messages in the gap
extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log); const extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log);
// write new events into gap const gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log);
const relationWriter = new RelationWriter({ return { extraGapFillChanges, gapResult };
roomId: this._roomId, }, log);
fragmentIdComparer: this._fragmentIdComparer,
ownUserId: this._user.id,
});
const gapWriter = new GapWriter({
roomId: this._roomId,
storage: this._storage,
fragmentIdComparer: this._fragmentIdComparer,
relationWriter
});
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
if (this._roomEncryption) {
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log);
await decryptRequest.complete();
}
// once txn is committed, update in-memory state & emit events
for (const fragment of gapResult.fragments) {
this._fragmentIdComparer.add(fragment);
}
if (extraGapFillChanges) {
this._applyGapFill(extraGapFillChanges);
}
if (this._timeline) {
// these should not be added if not already there
this._timeline.replaceEntries(gapResult.updatedEntries);
this._timeline.addEntries(gapResult.entries);
}
}); });
} }