diff --git a/src/matrix/room/timeline/persistence/GapWriter.js b/src/matrix/room/timeline/persistence/GapWriter.js index d51f4a18..d769cbb5 100644 --- a/src/matrix/room/timeline/persistence/GapWriter.js +++ b/src/matrix/room/timeline/persistence/GapWriter.js @@ -26,63 +26,24 @@ export class GapWriter { this._fragmentIdComparer = fragmentIdComparer; this._relationWriter = relationWriter; } - // events is in reverse-chronological order (last event comes at index 0) if backwards - async _findOverlappingEvents(fragmentEntry, events, txn, log) { - let expectedOverlappingEventId; - if (fragmentEntry.hasLinkedFragment) { - expectedOverlappingEventId = await this._findExpectedOverlappingEventId(fragmentEntry, txn); - } - let remainingEvents = events; - let nonOverlappingEvents = []; + + async _findOverlappingEvents(fragmentEntry, events, txn) { + const eventIds = events.map(e => e.event_id); + const existingEventKeyMap = await txn.timelineEvents.getEventKeysForIds(this._roomId, eventIds); + const nonOverlappingEvents = events.filter(e => !existingEventKeyMap.has(e.event_id)); let neighbourFragmentEntry; - while (remainingEvents && remainingEvents.length) { - const eventIds = remainingEvents.map(e => e.event_id); - const duplicateEventId = await txn.timelineEvents.findFirstOccurringEventId(this._roomId, eventIds); - if (duplicateEventId) { - const duplicateEventIndex = remainingEvents.findIndex(e => e.event_id === duplicateEventId); - // should never happen, just being defensive as this *can't* go wrong - if (duplicateEventIndex === -1) { - throw new Error(`findFirstOccurringEventId returned ${duplicateEventIndex} which wasn't ` + - `in [${eventIds.join(",")}] in ${this._roomId}`); + if (fragmentEntry.hasLinkedFragment) { + for (const eventKey of existingEventKeyMap.values()) { + if (eventKey.fragmentId === fragmentEntry.linkedFragmentId) { + const neighbourFragment = await txn.timelineFragments.get(this._roomId, fragmentEntry.linkedFragmentId); + neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment); + break; } - nonOverlappingEvents.push(...remainingEvents.slice(0, duplicateEventIndex)); - if (!expectedOverlappingEventId || duplicateEventId === expectedOverlappingEventId) { - // TODO: check here that the neighbourEvent is at the correct edge of it's fragment - // get neighbour fragment to link it up later on - const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId); - if (neighbourEvent.fragmentId === fragmentEntry.fragmentId) { - log.log("hit #160, prevent fragment linking to itself", log.level.Warn); - } else { - const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId); - neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment); - } - // trim overlapping events - remainingEvents = null; - } else { - // we've hit https://github.com/matrix-org/synapse/issues/7164, - // e.g. the event id we found is already in our store but it is not - // the adjacent fragment id. Ignore the event, but keep processing the ones after. - remainingEvents = remainingEvents.slice(duplicateEventIndex + 1); - } - } else { - nonOverlappingEvents.push(...remainingEvents); - remainingEvents = null; } } return {nonOverlappingEvents, neighbourFragmentEntry}; } - async _findExpectedOverlappingEventId(fragmentEntry, txn) { - const eventEntry = await this._findFragmentEdgeEvent( - fragmentEntry.linkedFragmentId, - // reverse because it's the oppose edge of the linked fragment - fragmentEntry.direction.reverse(), - txn); - if (eventEntry) { - return eventEntry.event.event_id; - } - } - async _findFragmentEdgeEventKey(fragmentEntry, txn) { const {fragmentId, direction} = fragmentEntry; const event = await this._findFragmentEdgeEvent(fragmentId, direction, txn); @@ -162,30 +123,15 @@ export class GapWriter { } } - async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) { + async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn, log) { const {direction} = fragmentEntry; const changedFragments = []; directionalAppend(entries, fragmentEntry, direction); // set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry if (neighbourFragmentEntry) { - // the throws here should never happen and are only here to detect client or unhandled server bugs - // and a last measure to prevent corrupting fragment links - if (!fragmentEntry.hasLinkedFragment) { - fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId; - } else if (fragmentEntry.linkedFragmentId !== neighbourFragmentEntry.fragmentId) { - throw new Error(`Prevented changing fragment ${fragmentEntry.fragmentId} ` + - `${fragmentEntry.direction.asApiString()} link from ${fragmentEntry.linkedFragmentId} ` + - `to ${neighbourFragmentEntry.fragmentId} in ${this._roomId}`); - } - if (!neighbourFragmentEntry.hasLinkedFragment) { - neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId; - } else if (neighbourFragmentEntry.linkedFragmentId !== fragmentEntry.fragmentId) { - throw new Error(`Prevented changing fragment ${neighbourFragmentEntry.fragmentId} ` + - `${neighbourFragmentEntry.direction.asApiString()} link from ${neighbourFragmentEntry.linkedFragmentId} ` + - `to ${fragmentEntry.fragmentId} in ${this._roomId}`); - } // if neighbourFragmentEntry was found, it means the events were overlapping, // so no pagination should happen anymore. + log.set("closedGapWith", neighbourFragmentEntry.fragmentId); neighbourFragmentEntry.token = null; fragmentEntry.token = null; @@ -241,14 +187,10 @@ export class GapWriter { const { nonOverlappingEvents, neighbourFragmentEntry - } = await this._findOverlappingEvents(fragmentEntry, chunk, txn, log); - if (!neighbourFragmentEntry && nonOverlappingEvents.length === 0 && typeof end === "string") { - log.log("hit #160, clearing token", log.level.Warn); - end = null; - } + } = await this._findOverlappingEvents(fragmentEntry, chunk, txn); // create entries for all events in chunk, add them to entries const {entries, updatedEntries} = await this._storeEvents(nonOverlappingEvents, lastKey, direction, state, txn, log); - const fragments = await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn); + const fragments = await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn, log); return {entries, updatedEntries, fragments}; } diff --git a/src/matrix/storage/idb/QueryTarget.ts b/src/matrix/storage/idb/QueryTarget.ts index 3ab33a6b..f2b6afb5 100644 --- a/src/matrix/storage/idb/QueryTarget.ts +++ b/src/matrix/storage/idb/QueryTarget.ts @@ -164,37 +164,17 @@ export class QueryTarget { * If the callback returns true, the search is halted and callback won't be called again. * `callback` is called with the same instances of the key as given in `keys`, so direct comparison can be used. */ - async findExistingKeys(keys: IDBValidKey[], backwards: boolean, callback: (key: IDBValidKey, found: boolean) => boolean): Promise { + async findExistingKeys(keys: IDBValidKey[], backwards: boolean, callback: (key: IDBValidKey, pk: IDBValidKey) => boolean): Promise { const direction = backwards ? "prev" : "next"; - const compareKeys = (a, b) => backwards ? -this.idbFactory.cmp(a, b) : this.idbFactory.cmp(a, b); - const sortedKeys = keys.slice().sort(compareKeys); + const sortedKeys = keys.slice().sort((a, b) => backwards ? -this.idbFactory.cmp(a, b) : this.idbFactory.cmp(a, b)); const firstKey = backwards ? sortedKeys[sortedKeys.length - 1] : sortedKeys[0]; const lastKey = backwards ? sortedKeys[0] : sortedKeys[sortedKeys.length - 1]; const cursor = this._target.openKeyCursor(this.IDBKeyRange.bound(firstKey, lastKey), direction); - let i = 0; - let consumerDone = false; - await iterateCursor(cursor, (value, key) => { - // while key is larger than next key, advance and report false - while(i < sortedKeys.length && compareKeys(sortedKeys[i], key) < 0 && !consumerDone) { - consumerDone = callback(sortedKeys[i], false); - ++i; - } - if (i < sortedKeys.length && compareKeys(sortedKeys[i], key) === 0 && !consumerDone) { - consumerDone = callback(sortedKeys[i], true); - ++i; - } - const done = consumerDone || i >= sortedKeys.length; - let jumpTo; - if (!done) { - jumpTo = sortedKeys[i]; - } - return {done, jumpTo}; + await iterateCursor(cursor, (value, key, cursor) => { + const pk = cursor.primaryKey; + const done = callback(key, pk); + return done ? DONE : NOT_DONE; }); - // report null for keys we didn't to at the end - while (!consumerDone && i < sortedKeys.length) { - consumerDone = callback(sortedKeys[i], false); - ++i; - } } _reduce(range: IDBQuery, reducer: (reduced: B, value: T) => B, initialValue: B, direction: IDBCursorDirection): Promise { diff --git a/src/matrix/storage/idb/stores/TimelineEventStore.ts b/src/matrix/storage/idb/stores/TimelineEventStore.ts index 3d27e8a7..7338f96a 100644 --- a/src/matrix/storage/idb/stores/TimelineEventStore.ts +++ b/src/matrix/storage/idb/stores/TimelineEventStore.ts @@ -44,6 +44,11 @@ function encodeKey(roomId: string, fragmentId: number, eventIndex: number): stri return `${roomId}|${encodeUint32(fragmentId)}|${encodeUint32(eventIndex)}`; } +function decodeKey(key: string): { roomId: string, eventKey: EventKey } { + const [roomId, fragmentId, eventIndex] = key.split("|"); + return {roomId, eventKey: new EventKey(parseInt(fragmentId, 10), parseInt(eventIndex, 10))}; +} + function encodeEventIdKey(roomId: string, eventId: string): string { return `${roomId}|${eventId}`; } @@ -220,6 +225,19 @@ export class TimelineEventStore { return events; } + async getEventKeysForIds(roomId: string, eventIds: string[]): Promise> { + const byEventId = this._timelineStore.index("byEventId"); + const keys = eventIds.map(eventId => encodeEventIdKey(roomId, eventId)); + const results = new Map(); + await byEventId.findExistingKeys(keys, false, (indexKey, pk) => { + const {eventId} = decodeEventIdKey(indexKey as string); + const {eventKey} = decodeKey(pk as string); + results.set(eventId, eventKey); + return false; + }); + return results; + } + /** Finds the first eventId that occurs in the store, if any. * For optimal performance, `eventIds` should be in chronological order. *