Merge pull request #40 from bwindels/bwindels/fixgaplinkslost
Prevent fragment links corrupting when filling a gap and server returns duplicate events
This commit is contained in:
commit
53f2a5801e
5 changed files with 102 additions and 27 deletions
1
doc/impl-thoughts/RECONNECTING.md
Normal file
1
doc/impl-thoughts/RECONNECTING.md
Normal file
|
@ -0,0 +1 @@
|
||||||
|
# Reconnecting
|
|
@ -75,7 +75,7 @@ export default class Room extends EventEmitter {
|
||||||
this._storage.storeNames.timelineFragments,
|
this._storage.storeNames.timelineFragments,
|
||||||
]);
|
]);
|
||||||
let removedPendingEvents;
|
let removedPendingEvents;
|
||||||
let newEntries;
|
let gapResult;
|
||||||
try {
|
try {
|
||||||
// detect remote echos of pending messages in the gap
|
// detect remote echos of pending messages in the gap
|
||||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
|
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
|
||||||
|
@ -85,18 +85,21 @@ export default class Room extends EventEmitter {
|
||||||
storage: this._storage,
|
storage: this._storage,
|
||||||
fragmentIdComparer: this._fragmentIdComparer
|
fragmentIdComparer: this._fragmentIdComparer
|
||||||
});
|
});
|
||||||
newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
txn.abort();
|
txn.abort();
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
await txn.complete();
|
await txn.complete();
|
||||||
// once txn is committed, emit events
|
// once txn is committed, update in-memory state & emit events
|
||||||
|
for (const fragment of gapResult.fragments) {
|
||||||
|
this._fragmentIdComparer.add(fragment);
|
||||||
|
}
|
||||||
if (removedPendingEvents) {
|
if (removedPendingEvents) {
|
||||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||||
}
|
}
|
||||||
if (this._timeline) {
|
if (this._timeline) {
|
||||||
this._timeline.addGapEntries(newEntries);
|
this._timeline.addGapEntries(gapResult.entries);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,10 @@ export default class Direction {
|
||||||
return this.isForward ? "f" : "b";
|
return this.isForward ? "f" : "b";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reverse() {
|
||||||
|
return this.isForward ? Direction.Backward : Direction.Forward
|
||||||
|
}
|
||||||
|
|
||||||
static get Forward() {
|
static get Forward() {
|
||||||
return _forward;
|
return _forward;
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,7 +37,11 @@ export default class EventKey {
|
||||||
}
|
}
|
||||||
|
|
||||||
static get defaultLiveKey() {
|
static get defaultLiveKey() {
|
||||||
return new EventKey(Platform.minStorageKey, Platform.middleStorageKey);
|
return EventKey.defaultFragmentKey(Platform.minStorageKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
static defaultFragmentKey(fragmentId) {
|
||||||
|
return new EventKey(fragmentId, Platform.middleStorageKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
toString() {
|
toString() {
|
||||||
|
|
|
@ -10,30 +10,75 @@ export default class GapWriter {
|
||||||
}
|
}
|
||||||
// events is in reverse-chronological order (last event comes at index 0) if backwards
|
// events is in reverse-chronological order (last event comes at index 0) if backwards
|
||||||
async _findOverlappingEvents(fragmentEntry, events, txn) {
|
async _findOverlappingEvents(fragmentEntry, events, txn) {
|
||||||
const eventIds = events.map(e => e.event_id);
|
let expectedOverlappingEventId;
|
||||||
let nonOverlappingEvents = events;
|
if (fragmentEntry.hasLinkedFragment) {
|
||||||
|
expectedOverlappingEventId = await this._findExpectedOverlappingEventId(fragmentEntry, txn);
|
||||||
|
}
|
||||||
|
let remainingEvents = events;
|
||||||
|
let nonOverlappingEvents = [];
|
||||||
let neighbourFragmentEntry;
|
let neighbourFragmentEntry;
|
||||||
const neighbourEventId = await txn.timelineEvents.findFirstOccurringEventId(this._roomId, eventIds);
|
while (remainingEvents && remainingEvents.length) {
|
||||||
if (neighbourEventId) {
|
const eventIds = remainingEvents.map(e => e.event_id);
|
||||||
// trim overlapping events
|
const duplicateEventId = await txn.timelineEvents.findFirstOccurringEventId(this._roomId, eventIds);
|
||||||
const neighbourEventIndex = events.findIndex(e => e.event_id === neighbourEventId);
|
if (duplicateEventId) {
|
||||||
nonOverlappingEvents = events.slice(0, neighbourEventIndex);
|
const duplicateEventIndex = remainingEvents.findIndex(e => e.event_id === duplicateEventId);
|
||||||
// get neighbour fragment to link it up later on
|
// should never happen, just being defensive as this *can't* go wrong
|
||||||
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, neighbourEventId);
|
if (duplicateEventIndex === -1) {
|
||||||
const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId);
|
throw new Error(`findFirstOccurringEventId returned ${duplicateEventIndex} which wasn't ` +
|
||||||
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
`in [${eventIds.join(",")}] in ${this._roomId}`);
|
||||||
|
}
|
||||||
|
nonOverlappingEvents.push(...remainingEvents.slice(0, duplicateEventIndex));
|
||||||
|
if (!expectedOverlappingEventId || duplicateEventId === expectedOverlappingEventId) {
|
||||||
|
// TODO: check here that the neighbourEvent is at the correct edge of it's fragment
|
||||||
|
// get neighbour fragment to link it up later on
|
||||||
|
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId);
|
||||||
|
const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId);
|
||||||
|
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
||||||
|
// trim overlapping events
|
||||||
|
remainingEvents = null;
|
||||||
|
} else {
|
||||||
|
// we've hit https://github.com/matrix-org/synapse/issues/7164,
|
||||||
|
// e.g. the event id we found is already in our store but it is not
|
||||||
|
// the adjacent fragment id. Ignore the event, but keep processing the ones after.
|
||||||
|
remainingEvents = remainingEvents.slice(duplicateEventIndex + 1);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
nonOverlappingEvents.push(...remainingEvents);
|
||||||
|
remainingEvents = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return {nonOverlappingEvents, neighbourFragmentEntry};
|
return {nonOverlappingEvents, neighbourFragmentEntry};
|
||||||
}
|
}
|
||||||
|
|
||||||
async _findLastFragmentEventKey(fragmentEntry, txn) {
|
async _findExpectedOverlappingEventId(fragmentEntry, txn) {
|
||||||
|
const eventEntry = await this._findFragmentEdgeEvent(
|
||||||
|
fragmentEntry.linkedFragmentId,
|
||||||
|
// reverse because it's the oppose edge of the linked fragment
|
||||||
|
fragmentEntry.direction.reverse(),
|
||||||
|
txn);
|
||||||
|
if (eventEntry) {
|
||||||
|
return eventEntry.event.event_id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async _findFragmentEdgeEventKey(fragmentEntry, txn) {
|
||||||
const {fragmentId, direction} = fragmentEntry;
|
const {fragmentId, direction} = fragmentEntry;
|
||||||
|
const event = await this._findFragmentEdgeEvent(fragmentId, direction, txn);
|
||||||
|
if (event) {
|
||||||
|
return new EventKey(event.fragmentId, event.eventIndex);
|
||||||
|
} else {
|
||||||
|
// no events yet in the fragment ... odd, but let's not fail and take the default key
|
||||||
|
return EventKey.defaultFragmentKey(fragmentEntry.fragmentId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async _findFragmentEdgeEvent(fragmentId, direction, txn) {
|
||||||
if (direction.isBackward) {
|
if (direction.isBackward) {
|
||||||
const [firstEvent] = await txn.timelineEvents.firstEvents(this._roomId, fragmentId, 1);
|
const [firstEvent] = await txn.timelineEvents.firstEvents(this._roomId, fragmentId, 1);
|
||||||
return new EventKey(firstEvent.fragmentId, firstEvent.eventIndex);
|
return firstEvent;
|
||||||
} else {
|
} else {
|
||||||
const [lastEvent] = await txn.timelineEvents.lastEvents(this._roomId, fragmentId, 1);
|
const [lastEvent] = await txn.timelineEvents.lastEvents(this._roomId, fragmentId, 1);
|
||||||
return new EventKey(lastEvent.fragmentId, lastEvent.eventIndex);
|
return lastEvent;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,11 +99,26 @@ export default class GapWriter {
|
||||||
|
|
||||||
async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) {
|
async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) {
|
||||||
const {direction} = fragmentEntry;
|
const {direction} = fragmentEntry;
|
||||||
|
const changedFragments = [];
|
||||||
directionalAppend(entries, fragmentEntry, direction);
|
directionalAppend(entries, fragmentEntry, direction);
|
||||||
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
||||||
if (neighbourFragmentEntry) {
|
if (neighbourFragmentEntry) {
|
||||||
fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId;
|
// the throws here should never happen and are only here to detect client or unhandled server bugs
|
||||||
neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId;
|
// and a last measure to prevent corrupting fragment links
|
||||||
|
if (!fragmentEntry.hasLinkedFragment) {
|
||||||
|
fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId;
|
||||||
|
} else if (fragmentEntry.linkedFragmentId !== neighbourFragmentEntry.fragmentId) {
|
||||||
|
throw new Error(`Prevented changing fragment ${fragmentEntry.fragmentId} ` +
|
||||||
|
`${fragmentEntry.direction.asApiString()} link from ${fragmentEntry.linkedFragmentId} ` +
|
||||||
|
`to ${neighbourFragmentEntry.fragmentId} in ${this._roomId}`);
|
||||||
|
}
|
||||||
|
if (!neighbourFragmentEntry.hasLinkedFragment) {
|
||||||
|
neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId;
|
||||||
|
} else if (neighbourFragmentEntry.linkedFragmentId !== fragmentEntry.fragmentId) {
|
||||||
|
throw new Error(`Prevented changing fragment ${neighbourFragmentEntry.fragmentId} ` +
|
||||||
|
`${neighbourFragmentEntry.direction.asApiString()} link from ${neighbourFragmentEntry.linkedFragmentId} ` +
|
||||||
|
`to ${fragmentEntry.fragmentId} in ${this._roomId}`);
|
||||||
|
}
|
||||||
// if neighbourFragmentEntry was found, it means the events were overlapping,
|
// if neighbourFragmentEntry was found, it means the events were overlapping,
|
||||||
// so no pagination should happen anymore.
|
// so no pagination should happen anymore.
|
||||||
neighbourFragmentEntry.token = null;
|
neighbourFragmentEntry.token = null;
|
||||||
|
@ -67,13 +127,16 @@ export default class GapWriter {
|
||||||
txn.timelineFragments.update(neighbourFragmentEntry.fragment);
|
txn.timelineFragments.update(neighbourFragmentEntry.fragment);
|
||||||
directionalAppend(entries, neighbourFragmentEntry, direction);
|
directionalAppend(entries, neighbourFragmentEntry, direction);
|
||||||
|
|
||||||
// update fragmentIdComparer here after linking up fragments
|
// fragments that need to be changed in the fragmentIdComparer here
|
||||||
this._fragmentIdComparer.add(fragmentEntry.fragment);
|
// after txn succeeds
|
||||||
this._fragmentIdComparer.add(neighbourFragmentEntry.fragment);
|
changedFragments.push(fragmentEntry.fragment);
|
||||||
|
changedFragments.push(neighbourFragmentEntry.fragment);
|
||||||
} else {
|
} else {
|
||||||
fragmentEntry.token = end;
|
fragmentEntry.token = end;
|
||||||
}
|
}
|
||||||
txn.timelineFragments.update(fragmentEntry.fragment);
|
txn.timelineFragments.update(fragmentEntry.fragment);
|
||||||
|
|
||||||
|
return changedFragments;
|
||||||
}
|
}
|
||||||
|
|
||||||
async writeFragmentFill(fragmentEntry, response, txn) {
|
async writeFragmentFill(fragmentEntry, response, txn) {
|
||||||
|
@ -100,7 +163,7 @@ export default class GapWriter {
|
||||||
throw new Error("start is not equal to prev_batch or next_batch");
|
throw new Error("start is not equal to prev_batch or next_batch");
|
||||||
}
|
}
|
||||||
// find last event in fragment so we get the eventIndex to begin creating keys at
|
// find last event in fragment so we get the eventIndex to begin creating keys at
|
||||||
let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn);
|
let lastKey = await this._findFragmentEdgeEventKey(fragmentEntry, txn);
|
||||||
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
||||||
const {
|
const {
|
||||||
nonOverlappingEvents,
|
nonOverlappingEvents,
|
||||||
|
@ -109,9 +172,9 @@ export default class GapWriter {
|
||||||
|
|
||||||
// create entries for all events in chunk, add them to entries
|
// create entries for all events in chunk, add them to entries
|
||||||
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
||||||
await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
const fragments = await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
||||||
|
|
||||||
return entries;
|
return {entries, fragments};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Reference in a new issue