From 73ea09f6683f9ed9b14dc8d819267d633a3e12be Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Sat, 28 Mar 2020 16:14:48 +0100 Subject: [PATCH 1/2] Handle sync ordering vs back-fill ordering (see synapse #7164) resulting in dupe events Also extra robustness while filling a gap ignore duplicate events from synapse instead considering them an overlapping event with the adjacent fragment --- src/matrix/room/timeline/Direction.js | 4 + src/matrix/room/timeline/EventKey.js | 6 +- .../room/timeline/persistence/GapWriter.js | 93 +++++++++++++++---- 3 files changed, 85 insertions(+), 18 deletions(-) diff --git a/src/matrix/room/timeline/Direction.js b/src/matrix/room/timeline/Direction.js index 6e527e88..9c6fa8cd 100644 --- a/src/matrix/room/timeline/Direction.js +++ b/src/matrix/room/timeline/Direction.js @@ -17,6 +17,10 @@ export default class Direction { return this.isForward ? "f" : "b"; } + reverse() { + return this.isForward ? Direction.Backward : Direction.Forward + } + static get Forward() { return _forward; } diff --git a/src/matrix/room/timeline/EventKey.js b/src/matrix/room/timeline/EventKey.js index 7b8460d1..e7837cbd 100644 --- a/src/matrix/room/timeline/EventKey.js +++ b/src/matrix/room/timeline/EventKey.js @@ -37,7 +37,11 @@ export default class EventKey { } static get defaultLiveKey() { - return new EventKey(Platform.minStorageKey, Platform.middleStorageKey); + return EventKey.defaultFragmentKey(Platform.minStorageKey); + } + + static defaultFragmentKey(fragmentId) { + return new EventKey(fragmentId, Platform.middleStorageKey); } toString() { diff --git a/src/matrix/room/timeline/persistence/GapWriter.js b/src/matrix/room/timeline/persistence/GapWriter.js index 5dba0664..66b1fc3d 100644 --- a/src/matrix/room/timeline/persistence/GapWriter.js +++ b/src/matrix/room/timeline/persistence/GapWriter.js @@ -10,30 +10,75 @@ export default class GapWriter { } // events is in reverse-chronological order (last event comes at index 0) if backwards async _findOverlappingEvents(fragmentEntry, events, txn) { - const eventIds = events.map(e => e.event_id); - let nonOverlappingEvents = events; + let expectedOverlappingEventId; + if (fragmentEntry.hasLinkedFragment) { + expectedOverlappingEventId = await this._findExpectedOverlappingEventId(fragmentEntry, txn); + } + let remainingEvents = events; + let nonOverlappingEvents = []; let neighbourFragmentEntry; - const neighbourEventId = await txn.timelineEvents.findFirstOccurringEventId(this._roomId, eventIds); - if (neighbourEventId) { - // trim overlapping events - const neighbourEventIndex = events.findIndex(e => e.event_id === neighbourEventId); - nonOverlappingEvents = events.slice(0, neighbourEventIndex); - // get neighbour fragment to link it up later on - const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, neighbourEventId); - const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId); - neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment); + while (remainingEvents && remainingEvents.length) { + const eventIds = remainingEvents.map(e => e.event_id); + const duplicateEventId = await txn.timelineEvents.findFirstOccurringEventId(this._roomId, eventIds); + if (duplicateEventId) { + const duplicateEventIndex = remainingEvents.findIndex(e => e.event_id === duplicateEventId); + // should never happen, just being defensive as this *can't* go wrong + if (duplicateEventIndex === -1) { + throw new Error(`findFirstOccurringEventId returned ${duplicateEventIndex} which wasn't ` + + `in [${eventIds.join(",")}] in ${this._roomId}`); + } + nonOverlappingEvents.push(...remainingEvents.slice(0, duplicateEventIndex)); + if (!expectedOverlappingEventId || duplicateEventId === expectedOverlappingEventId) { + // TODO: check here that the neighbourEvent is at the correct edge of it's fragment + // get neighbour fragment to link it up later on + const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId); + const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId); + neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment); + // trim overlapping events + remainingEvents = null; + } else { + // we've hit https://github.com/matrix-org/synapse/issues/7164, + // e.g. the event id we found is already in our store but it is not + // the adjacent fragment id. Ignore the event, but keep processing the ones after. + remainingEvents = remainingEvents.slice(duplicateEventIndex + 1); + } + } else { + nonOverlappingEvents.push(...remainingEvents); + remainingEvents = null; + } } return {nonOverlappingEvents, neighbourFragmentEntry}; } - async _findLastFragmentEventKey(fragmentEntry, txn) { + async _findExpectedOverlappingEventId(fragmentEntry, txn) { + const eventEntry = await this._findFragmentEdgeEvent( + fragmentEntry.linkedFragmentId, + // reverse because it's the oppose edge of the linked fragment + fragmentEntry.direction.reverse(), + txn); + if (eventEntry) { + return eventEntry.event.event_id; + } + } + + async _findFragmentEdgeEventKey(fragmentEntry, txn) { const {fragmentId, direction} = fragmentEntry; + const event = await this._findFragmentEdgeEvent(fragmentId, direction, txn); + if (event) { + return new EventKey(event.fragmentId, event.eventIndex); + } else { + // no events yet in the fragment ... odd, but let's not fail and take the default key + return EventKey.defaultFragmentKey(fragmentEntry.fragmentId); + } + } + + async _findFragmentEdgeEvent(fragmentId, direction, txn) { if (direction.isBackward) { const [firstEvent] = await txn.timelineEvents.firstEvents(this._roomId, fragmentId, 1); - return new EventKey(firstEvent.fragmentId, firstEvent.eventIndex); + return firstEvent; } else { const [lastEvent] = await txn.timelineEvents.lastEvents(this._roomId, fragmentId, 1); - return new EventKey(lastEvent.fragmentId, lastEvent.eventIndex); + return lastEvent; } } @@ -57,8 +102,22 @@ export default class GapWriter { directionalAppend(entries, fragmentEntry, direction); // set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry if (neighbourFragmentEntry) { - fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId; - neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId; + // the throws here should never happen and are only here to detect client or unhandled server bugs + // and a last measure to prevent corrupting fragment links + if (!fragmentEntry.hasLinkedFragment) { + fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId; + } else if (fragmentEntry.linkedFragmentId !== neighbourFragmentEntry.fragmentId) { + throw new Error(`Prevented changing fragment ${fragmentEntry.fragmentId} ` + + `${fragmentEntry.direction.asApiString()} link from ${fragmentEntry.linkedFragmentId} ` + + `to ${neighbourFragmentEntry.fragmentId} in ${this._roomId}`); + } + if (!neighbourFragmentEntry.hasLinkedFragment) { + neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId; + } else if (neighbourFragmentEntry.linkedFragmentId !== fragmentEntry.fragmentId) { + throw new Error(`Prevented changing fragment ${neighbourFragmentEntry.fragmentId} ` + + `${neighbourFragmentEntry.direction.asApiString()} link from ${neighbourFragmentEntry.linkedFragmentId} ` + + `to ${fragmentEntry.fragmentId} in ${this._roomId}`); + } // if neighbourFragmentEntry was found, it means the events were overlapping, // so no pagination should happen anymore. neighbourFragmentEntry.token = null; @@ -100,7 +159,7 @@ export default class GapWriter { throw new Error("start is not equal to prev_batch or next_batch"); } // find last event in fragment so we get the eventIndex to begin creating keys at - let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn); + let lastKey = await this._findFragmentEdgeEventKey(fragmentEntry, txn); // find out if any event in chunk is already present using findFirstOrLastOccurringEventId const { nonOverlappingEvents, From 234c26033968a319fbd086d54a79cc72b82e675b Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 30 Mar 2020 20:46:52 +0200 Subject: [PATCH 2/2] dont modify fragments in comparer until txn succeeds --- doc/impl-thoughts/RECONNECTING.md | 1 + src/matrix/room/room.js | 11 +++++++---- src/matrix/room/timeline/persistence/GapWriter.js | 14 +++++++++----- 3 files changed, 17 insertions(+), 9 deletions(-) create mode 100644 doc/impl-thoughts/RECONNECTING.md diff --git a/doc/impl-thoughts/RECONNECTING.md b/doc/impl-thoughts/RECONNECTING.md new file mode 100644 index 00000000..fb9ff506 --- /dev/null +++ b/doc/impl-thoughts/RECONNECTING.md @@ -0,0 +1 @@ +# Reconnecting \ No newline at end of file diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index 7de0d92b..605535a6 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -75,7 +75,7 @@ export default class Room extends EventEmitter { this._storage.storeNames.timelineFragments, ]); let removedPendingEvents; - let newEntries; + let gapResult; try { // detect remote echos of pending messages in the gap removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn); @@ -85,18 +85,21 @@ export default class Room extends EventEmitter { storage: this._storage, fragmentIdComparer: this._fragmentIdComparer }); - newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response, txn); + gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn); } catch (err) { txn.abort(); throw err; } await txn.complete(); - // once txn is committed, emit events + // once txn is committed, update in-memory state & emit events + for (const fragment of gapResult.fragments) { + this._fragmentIdComparer.add(fragment); + } if (removedPendingEvents) { this._sendQueue.emitRemovals(removedPendingEvents); } if (this._timeline) { - this._timeline.addGapEntries(newEntries); + this._timeline.addGapEntries(gapResult.entries); } } diff --git a/src/matrix/room/timeline/persistence/GapWriter.js b/src/matrix/room/timeline/persistence/GapWriter.js index 66b1fc3d..36080270 100644 --- a/src/matrix/room/timeline/persistence/GapWriter.js +++ b/src/matrix/room/timeline/persistence/GapWriter.js @@ -99,6 +99,7 @@ export default class GapWriter { async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) { const {direction} = fragmentEntry; + const changedFragments = []; directionalAppend(entries, fragmentEntry, direction); // set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry if (neighbourFragmentEntry) { @@ -126,13 +127,16 @@ export default class GapWriter { txn.timelineFragments.update(neighbourFragmentEntry.fragment); directionalAppend(entries, neighbourFragmentEntry, direction); - // update fragmentIdComparer here after linking up fragments - this._fragmentIdComparer.add(fragmentEntry.fragment); - this._fragmentIdComparer.add(neighbourFragmentEntry.fragment); + // fragments that need to be changed in the fragmentIdComparer here + // after txn succeeds + changedFragments.push(fragmentEntry.fragment); + changedFragments.push(neighbourFragmentEntry.fragment); } else { fragmentEntry.token = end; } txn.timelineFragments.update(fragmentEntry.fragment); + + return changedFragments; } async writeFragmentFill(fragmentEntry, response, txn) { @@ -168,9 +172,9 @@ export default class GapWriter { // create entries for all events in chunk, add them to entries entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn); - await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn); + const fragments = await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn); - return entries; + return {entries, fragments}; } }