forked from mystiq/hydrogen-web
dont modify fragments in comparer until txn succeeds
This commit is contained in:
parent
73ea09f668
commit
234c260339
3 changed files with 17 additions and 9 deletions
1
doc/impl-thoughts/RECONNECTING.md
Normal file
1
doc/impl-thoughts/RECONNECTING.md
Normal file
|
@ -0,0 +1 @@
|
||||||
|
# Reconnecting
|
|
@ -75,7 +75,7 @@ export default class Room extends EventEmitter {
|
||||||
this._storage.storeNames.timelineFragments,
|
this._storage.storeNames.timelineFragments,
|
||||||
]);
|
]);
|
||||||
let removedPendingEvents;
|
let removedPendingEvents;
|
||||||
let newEntries;
|
let gapResult;
|
||||||
try {
|
try {
|
||||||
// detect remote echos of pending messages in the gap
|
// detect remote echos of pending messages in the gap
|
||||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
|
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
|
||||||
|
@ -85,18 +85,21 @@ export default class Room extends EventEmitter {
|
||||||
storage: this._storage,
|
storage: this._storage,
|
||||||
fragmentIdComparer: this._fragmentIdComparer
|
fragmentIdComparer: this._fragmentIdComparer
|
||||||
});
|
});
|
||||||
newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
txn.abort();
|
txn.abort();
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
await txn.complete();
|
await txn.complete();
|
||||||
// once txn is committed, emit events
|
// once txn is committed, update in-memory state & emit events
|
||||||
|
for (const fragment of gapResult.fragments) {
|
||||||
|
this._fragmentIdComparer.add(fragment);
|
||||||
|
}
|
||||||
if (removedPendingEvents) {
|
if (removedPendingEvents) {
|
||||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||||
}
|
}
|
||||||
if (this._timeline) {
|
if (this._timeline) {
|
||||||
this._timeline.addGapEntries(newEntries);
|
this._timeline.addGapEntries(gapResult.entries);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,6 +99,7 @@ export default class GapWriter {
|
||||||
|
|
||||||
async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) {
|
async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) {
|
||||||
const {direction} = fragmentEntry;
|
const {direction} = fragmentEntry;
|
||||||
|
const changedFragments = [];
|
||||||
directionalAppend(entries, fragmentEntry, direction);
|
directionalAppend(entries, fragmentEntry, direction);
|
||||||
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
||||||
if (neighbourFragmentEntry) {
|
if (neighbourFragmentEntry) {
|
||||||
|
@ -126,13 +127,16 @@ export default class GapWriter {
|
||||||
txn.timelineFragments.update(neighbourFragmentEntry.fragment);
|
txn.timelineFragments.update(neighbourFragmentEntry.fragment);
|
||||||
directionalAppend(entries, neighbourFragmentEntry, direction);
|
directionalAppend(entries, neighbourFragmentEntry, direction);
|
||||||
|
|
||||||
// update fragmentIdComparer here after linking up fragments
|
// fragments that need to be changed in the fragmentIdComparer here
|
||||||
this._fragmentIdComparer.add(fragmentEntry.fragment);
|
// after txn succeeds
|
||||||
this._fragmentIdComparer.add(neighbourFragmentEntry.fragment);
|
changedFragments.push(fragmentEntry.fragment);
|
||||||
|
changedFragments.push(neighbourFragmentEntry.fragment);
|
||||||
} else {
|
} else {
|
||||||
fragmentEntry.token = end;
|
fragmentEntry.token = end;
|
||||||
}
|
}
|
||||||
txn.timelineFragments.update(fragmentEntry.fragment);
|
txn.timelineFragments.update(fragmentEntry.fragment);
|
||||||
|
|
||||||
|
return changedFragments;
|
||||||
}
|
}
|
||||||
|
|
||||||
async writeFragmentFill(fragmentEntry, response, txn) {
|
async writeFragmentFill(fragmentEntry, response, txn) {
|
||||||
|
@ -168,9 +172,9 @@ export default class GapWriter {
|
||||||
|
|
||||||
// create entries for all events in chunk, add them to entries
|
// create entries for all events in chunk, add them to entries
|
||||||
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
||||||
await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
const fragments = await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
||||||
|
|
||||||
return entries;
|
return {entries, fragments};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue