only set new live key when creating a fragment after the txn succeeds
when doing a limited sync, and a new fragment is created, this._lastLiveKey is updated immediately. If the transaction would then fail, the fragmentId in this._lastLiveKey was incremented but the fragment wasn't written to the store, so if sync is resumed and would subsequently succeed, fragmentIds would be assigned to events that don't have a corresponding fragment in the timelineFragment store. This would throw errors when trying to load the timeline, breaking the whole app. This changes SyncWriter to only update this._lastLiveKey in the emit phase, when the transactions has been committed already.
This commit is contained in:
parent
bdc2c3d9ad
commit
224d56698a
2 changed files with 15 additions and 16 deletions
|
@ -22,19 +22,20 @@ export default class Room extends EventEmitter {
|
||||||
|
|
||||||
async persistSync(roomResponse, membership, txn) {
|
async persistSync(roomResponse, membership, txn) {
|
||||||
const summaryChanged = this._summary.applySync(roomResponse, membership, txn);
|
const summaryChanged = this._summary.applySync(roomResponse, membership, txn);
|
||||||
const newTimelineEntries = await this._syncWriter.writeSync(roomResponse, txn);
|
const {entries, newLiveKey} = await this._syncWriter.writeSync(roomResponse, txn);
|
||||||
let removedPendingEvents;
|
let removedPendingEvents;
|
||||||
if (roomResponse.timeline && roomResponse.timeline.events) {
|
if (roomResponse.timeline && roomResponse.timeline.events) {
|
||||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
||||||
}
|
}
|
||||||
return {summaryChanged, newTimelineEntries, removedPendingEvents};
|
return {summaryChanged, newTimelineEntries: entries, newLiveKey, removedPendingEvents};
|
||||||
}
|
}
|
||||||
|
|
||||||
emitSync({summaryChanged, newTimelineEntries, removedPendingEvents}) {
|
emitSync({summaryChanged, newTimelineEntries, newLiveKey, removedPendingEvents}) {
|
||||||
if (summaryChanged) {
|
if (summaryChanged) {
|
||||||
this.emit("change");
|
this.emit("change");
|
||||||
this._emitCollectionChange(this);
|
this._emitCollectionChange(this);
|
||||||
}
|
}
|
||||||
|
this._syncWriter.setKeyOnCompleted(newLiveKey);
|
||||||
if (this._timeline) {
|
if (this._timeline) {
|
||||||
this._timeline.appendLiveEntries(newTimelineEntries);
|
this._timeline.appendLiveEntries(newTimelineEntries);
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,23 +84,23 @@ export default class SyncWriter {
|
||||||
async writeSync(roomResponse, txn) {
|
async writeSync(roomResponse, txn) {
|
||||||
const entries = [];
|
const entries = [];
|
||||||
const timeline = roomResponse.timeline;
|
const timeline = roomResponse.timeline;
|
||||||
if (!this._lastLiveKey) {
|
let currentKey = this._lastLiveKey;
|
||||||
|
if (!currentKey) {
|
||||||
// means we haven't synced this room yet (just joined or did initial sync)
|
// means we haven't synced this room yet (just joined or did initial sync)
|
||||||
|
|
||||||
// as this is probably a limited sync, prev_batch should be there
|
// as this is probably a limited sync, prev_batch should be there
|
||||||
// (but don't fail if it isn't, we won't be able to back-paginate though)
|
// (but don't fail if it isn't, we won't be able to back-paginate though)
|
||||||
let liveFragment = await this._createLiveFragment(txn, timeline.prev_batch);
|
let liveFragment = await this._createLiveFragment(txn, timeline.prev_batch);
|
||||||
this._lastLiveKey = new EventKey(liveFragment.id, EventKey.defaultLiveKey.eventIndex);
|
currentKey = new EventKey(liveFragment.id, EventKey.defaultLiveKey.eventIndex);
|
||||||
entries.push(FragmentBoundaryEntry.start(liveFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.start(liveFragment, this._fragmentIdComparer));
|
||||||
} else if (timeline.limited) {
|
} else if (timeline.limited) {
|
||||||
// replace live fragment for limited sync, *only* if we had a live fragment already
|
// replace live fragment for limited sync, *only* if we had a live fragment already
|
||||||
const oldFragmentId = this._lastLiveKey.fragmentId;
|
const oldFragmentId = currentKey.fragmentId;
|
||||||
this._lastLiveKey = this._lastLiveKey.nextFragmentKey();
|
currentKey = currentKey.nextFragmentKey();
|
||||||
const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, this._lastLiveKey.fragmentId, timeline.prev_batch, txn);
|
const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, currentKey.fragmentId, timeline.prev_batch, txn);
|
||||||
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
|
||||||
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
|
||||||
}
|
}
|
||||||
let currentKey = this._lastLiveKey;
|
|
||||||
if (timeline.events) {
|
if (timeline.events) {
|
||||||
const events = deduplicateEvents(timeline.events);
|
const events = deduplicateEvents(timeline.events);
|
||||||
for(const event of events) {
|
for(const event of events) {
|
||||||
|
@ -110,12 +110,6 @@ export default class SyncWriter {
|
||||||
entries.push(new EventEntry(entry, this._fragmentIdComparer));
|
entries.push(new EventEntry(entry, this._fragmentIdComparer));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// right thing to do? if the txn fails, not sure we'll continue anyways ...
|
|
||||||
// only advance the key once the transaction has succeeded
|
|
||||||
txn.complete().then(() => {
|
|
||||||
this._lastLiveKey = currentKey;
|
|
||||||
})
|
|
||||||
|
|
||||||
// persist state
|
// persist state
|
||||||
const state = roomResponse.state;
|
const state = roomResponse.state;
|
||||||
if (state.events) {
|
if (state.events) {
|
||||||
|
@ -132,7 +126,11 @@ export default class SyncWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return entries;
|
return {entries, newLiveKey: currentKey};
|
||||||
|
}
|
||||||
|
|
||||||
|
setKeyOnCompleted(newLiveKey) {
|
||||||
|
this._lastLiveKey = newLiveKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Reference in a new issue