forked from mystiq/hydrogen-web
Merge pull request #508 from vector-im/bwindels/fix-393
don't (re)link fragments in fill, close gap if overlap w linked fragment
This commit is contained in:
commit
a7b6fe4b22
3 changed files with 39 additions and 99 deletions
|
@ -26,63 +26,24 @@ export class GapWriter {
|
||||||
this._fragmentIdComparer = fragmentIdComparer;
|
this._fragmentIdComparer = fragmentIdComparer;
|
||||||
this._relationWriter = relationWriter;
|
this._relationWriter = relationWriter;
|
||||||
}
|
}
|
||||||
// events is in reverse-chronological order (last event comes at index 0) if backwards
|
|
||||||
async _findOverlappingEvents(fragmentEntry, events, txn, log) {
|
async _findOverlappingEvents(fragmentEntry, events, txn) {
|
||||||
let expectedOverlappingEventId;
|
const eventIds = events.map(e => e.event_id);
|
||||||
if (fragmentEntry.hasLinkedFragment) {
|
const existingEventKeyMap = await txn.timelineEvents.getEventKeysForIds(this._roomId, eventIds);
|
||||||
expectedOverlappingEventId = await this._findExpectedOverlappingEventId(fragmentEntry, txn);
|
const nonOverlappingEvents = events.filter(e => !existingEventKeyMap.has(e.event_id));
|
||||||
}
|
|
||||||
let remainingEvents = events;
|
|
||||||
let nonOverlappingEvents = [];
|
|
||||||
let neighbourFragmentEntry;
|
let neighbourFragmentEntry;
|
||||||
while (remainingEvents && remainingEvents.length) {
|
if (fragmentEntry.hasLinkedFragment) {
|
||||||
const eventIds = remainingEvents.map(e => e.event_id);
|
for (const eventKey of existingEventKeyMap.values()) {
|
||||||
const duplicateEventId = await txn.timelineEvents.findFirstOccurringEventId(this._roomId, eventIds);
|
if (eventKey.fragmentId === fragmentEntry.linkedFragmentId) {
|
||||||
if (duplicateEventId) {
|
const neighbourFragment = await txn.timelineFragments.get(this._roomId, fragmentEntry.linkedFragmentId);
|
||||||
const duplicateEventIndex = remainingEvents.findIndex(e => e.event_id === duplicateEventId);
|
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
||||||
// should never happen, just being defensive as this *can't* go wrong
|
break;
|
||||||
if (duplicateEventIndex === -1) {
|
|
||||||
throw new Error(`findFirstOccurringEventId returned ${duplicateEventIndex} which wasn't ` +
|
|
||||||
`in [${eventIds.join(",")}] in ${this._roomId}`);
|
|
||||||
}
|
}
|
||||||
nonOverlappingEvents.push(...remainingEvents.slice(0, duplicateEventIndex));
|
|
||||||
if (!expectedOverlappingEventId || duplicateEventId === expectedOverlappingEventId) {
|
|
||||||
// TODO: check here that the neighbourEvent is at the correct edge of it's fragment
|
|
||||||
// get neighbour fragment to link it up later on
|
|
||||||
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId);
|
|
||||||
if (neighbourEvent.fragmentId === fragmentEntry.fragmentId) {
|
|
||||||
log.log("hit #160, prevent fragment linking to itself", log.level.Warn);
|
|
||||||
} else {
|
|
||||||
const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId);
|
|
||||||
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
|
||||||
}
|
|
||||||
// trim overlapping events
|
|
||||||
remainingEvents = null;
|
|
||||||
} else {
|
|
||||||
// we've hit https://github.com/matrix-org/synapse/issues/7164,
|
|
||||||
// e.g. the event id we found is already in our store but it is not
|
|
||||||
// the adjacent fragment id. Ignore the event, but keep processing the ones after.
|
|
||||||
remainingEvents = remainingEvents.slice(duplicateEventIndex + 1);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
nonOverlappingEvents.push(...remainingEvents);
|
|
||||||
remainingEvents = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return {nonOverlappingEvents, neighbourFragmentEntry};
|
return {nonOverlappingEvents, neighbourFragmentEntry};
|
||||||
}
|
}
|
||||||
|
|
||||||
async _findExpectedOverlappingEventId(fragmentEntry, txn) {
|
|
||||||
const eventEntry = await this._findFragmentEdgeEvent(
|
|
||||||
fragmentEntry.linkedFragmentId,
|
|
||||||
// reverse because it's the oppose edge of the linked fragment
|
|
||||||
fragmentEntry.direction.reverse(),
|
|
||||||
txn);
|
|
||||||
if (eventEntry) {
|
|
||||||
return eventEntry.event.event_id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async _findFragmentEdgeEventKey(fragmentEntry, txn) {
|
async _findFragmentEdgeEventKey(fragmentEntry, txn) {
|
||||||
const {fragmentId, direction} = fragmentEntry;
|
const {fragmentId, direction} = fragmentEntry;
|
||||||
const event = await this._findFragmentEdgeEvent(fragmentId, direction, txn);
|
const event = await this._findFragmentEdgeEvent(fragmentId, direction, txn);
|
||||||
|
@ -162,30 +123,15 @@ export class GapWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) {
|
async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn, log) {
|
||||||
const {direction} = fragmentEntry;
|
const {direction} = fragmentEntry;
|
||||||
const changedFragments = [];
|
const changedFragments = [];
|
||||||
directionalAppend(entries, fragmentEntry, direction);
|
directionalAppend(entries, fragmentEntry, direction);
|
||||||
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
||||||
if (neighbourFragmentEntry) {
|
if (neighbourFragmentEntry) {
|
||||||
// the throws here should never happen and are only here to detect client or unhandled server bugs
|
|
||||||
// and a last measure to prevent corrupting fragment links
|
|
||||||
if (!fragmentEntry.hasLinkedFragment) {
|
|
||||||
fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId;
|
|
||||||
} else if (fragmentEntry.linkedFragmentId !== neighbourFragmentEntry.fragmentId) {
|
|
||||||
throw new Error(`Prevented changing fragment ${fragmentEntry.fragmentId} ` +
|
|
||||||
`${fragmentEntry.direction.asApiString()} link from ${fragmentEntry.linkedFragmentId} ` +
|
|
||||||
`to ${neighbourFragmentEntry.fragmentId} in ${this._roomId}`);
|
|
||||||
}
|
|
||||||
if (!neighbourFragmentEntry.hasLinkedFragment) {
|
|
||||||
neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId;
|
|
||||||
} else if (neighbourFragmentEntry.linkedFragmentId !== fragmentEntry.fragmentId) {
|
|
||||||
throw new Error(`Prevented changing fragment ${neighbourFragmentEntry.fragmentId} ` +
|
|
||||||
`${neighbourFragmentEntry.direction.asApiString()} link from ${neighbourFragmentEntry.linkedFragmentId} ` +
|
|
||||||
`to ${fragmentEntry.fragmentId} in ${this._roomId}`);
|
|
||||||
}
|
|
||||||
// if neighbourFragmentEntry was found, it means the events were overlapping,
|
// if neighbourFragmentEntry was found, it means the events were overlapping,
|
||||||
// so no pagination should happen anymore.
|
// so no pagination should happen anymore.
|
||||||
|
log.set("closedGapWith", neighbourFragmentEntry.fragmentId);
|
||||||
neighbourFragmentEntry.token = null;
|
neighbourFragmentEntry.token = null;
|
||||||
fragmentEntry.token = null;
|
fragmentEntry.token = null;
|
||||||
|
|
||||||
|
@ -241,14 +187,10 @@ export class GapWriter {
|
||||||
const {
|
const {
|
||||||
nonOverlappingEvents,
|
nonOverlappingEvents,
|
||||||
neighbourFragmentEntry
|
neighbourFragmentEntry
|
||||||
} = await this._findOverlappingEvents(fragmentEntry, chunk, txn, log);
|
} = await this._findOverlappingEvents(fragmentEntry, chunk, txn);
|
||||||
if (!neighbourFragmentEntry && nonOverlappingEvents.length === 0 && typeof end === "string") {
|
|
||||||
log.log("hit #160, clearing token", log.level.Warn);
|
|
||||||
end = null;
|
|
||||||
}
|
|
||||||
// create entries for all events in chunk, add them to entries
|
// create entries for all events in chunk, add them to entries
|
||||||
const {entries, updatedEntries} = await this._storeEvents(nonOverlappingEvents, lastKey, direction, state, txn, log);
|
const {entries, updatedEntries} = await this._storeEvents(nonOverlappingEvents, lastKey, direction, state, txn, log);
|
||||||
const fragments = await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
const fragments = await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn, log);
|
||||||
|
|
||||||
return {entries, updatedEntries, fragments};
|
return {entries, updatedEntries, fragments};
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,37 +164,17 @@ export class QueryTarget<T> {
|
||||||
* If the callback returns true, the search is halted and callback won't be called again.
|
* If the callback returns true, the search is halted and callback won't be called again.
|
||||||
* `callback` is called with the same instances of the key as given in `keys`, so direct comparison can be used.
|
* `callback` is called with the same instances of the key as given in `keys`, so direct comparison can be used.
|
||||||
*/
|
*/
|
||||||
async findExistingKeys(keys: IDBValidKey[], backwards: boolean, callback: (key: IDBValidKey, found: boolean) => boolean): Promise<void> {
|
async findExistingKeys(keys: IDBValidKey[], backwards: boolean, callback: (key: IDBValidKey, pk: IDBValidKey) => boolean): Promise<void> {
|
||||||
const direction = backwards ? "prev" : "next";
|
const direction = backwards ? "prev" : "next";
|
||||||
const compareKeys = (a, b) => backwards ? -this.idbFactory.cmp(a, b) : this.idbFactory.cmp(a, b);
|
const sortedKeys = keys.slice().sort((a, b) => backwards ? -this.idbFactory.cmp(a, b) : this.idbFactory.cmp(a, b));
|
||||||
const sortedKeys = keys.slice().sort(compareKeys);
|
|
||||||
const firstKey = backwards ? sortedKeys[sortedKeys.length - 1] : sortedKeys[0];
|
const firstKey = backwards ? sortedKeys[sortedKeys.length - 1] : sortedKeys[0];
|
||||||
const lastKey = backwards ? sortedKeys[0] : sortedKeys[sortedKeys.length - 1];
|
const lastKey = backwards ? sortedKeys[0] : sortedKeys[sortedKeys.length - 1];
|
||||||
const cursor = this._target.openKeyCursor(this.IDBKeyRange.bound(firstKey, lastKey), direction);
|
const cursor = this._target.openKeyCursor(this.IDBKeyRange.bound(firstKey, lastKey), direction);
|
||||||
let i = 0;
|
await iterateCursor(cursor, (value, key, cursor) => {
|
||||||
let consumerDone = false;
|
const pk = cursor.primaryKey;
|
||||||
await iterateCursor(cursor, (value, key) => {
|
const done = callback(key, pk);
|
||||||
// while key is larger than next key, advance and report false
|
return done ? DONE : NOT_DONE;
|
||||||
while(i < sortedKeys.length && compareKeys(sortedKeys[i], key) < 0 && !consumerDone) {
|
|
||||||
consumerDone = callback(sortedKeys[i], false);
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
if (i < sortedKeys.length && compareKeys(sortedKeys[i], key) === 0 && !consumerDone) {
|
|
||||||
consumerDone = callback(sortedKeys[i], true);
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
const done = consumerDone || i >= sortedKeys.length;
|
|
||||||
let jumpTo;
|
|
||||||
if (!done) {
|
|
||||||
jumpTo = sortedKeys[i];
|
|
||||||
}
|
|
||||||
return {done, jumpTo};
|
|
||||||
});
|
});
|
||||||
// report null for keys we didn't to at the end
|
|
||||||
while (!consumerDone && i < sortedKeys.length) {
|
|
||||||
consumerDone = callback(sortedKeys[i], false);
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_reduce<B>(range: IDBQuery, reducer: (reduced: B, value: T) => B, initialValue: B, direction: IDBCursorDirection): Promise<boolean> {
|
_reduce<B>(range: IDBQuery, reducer: (reduced: B, value: T) => B, initialValue: B, direction: IDBCursorDirection): Promise<boolean> {
|
||||||
|
|
|
@ -44,6 +44,11 @@ function encodeKey(roomId: string, fragmentId: number, eventIndex: number): stri
|
||||||
return `${roomId}|${encodeUint32(fragmentId)}|${encodeUint32(eventIndex)}`;
|
return `${roomId}|${encodeUint32(fragmentId)}|${encodeUint32(eventIndex)}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function decodeKey(key: string): { roomId: string, eventKey: EventKey } {
|
||||||
|
const [roomId, fragmentId, eventIndex] = key.split("|");
|
||||||
|
return {roomId, eventKey: new EventKey(parseInt(fragmentId, 10), parseInt(eventIndex, 10))};
|
||||||
|
}
|
||||||
|
|
||||||
function encodeEventIdKey(roomId: string, eventId: string): string {
|
function encodeEventIdKey(roomId: string, eventId: string): string {
|
||||||
return `${roomId}|${eventId}`;
|
return `${roomId}|${eventId}`;
|
||||||
}
|
}
|
||||||
|
@ -220,6 +225,19 @@ export class TimelineEventStore {
|
||||||
return events;
|
return events;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getEventKeysForIds(roomId: string, eventIds: string[]): Promise<Map<string, EventKey>> {
|
||||||
|
const byEventId = this._timelineStore.index("byEventId");
|
||||||
|
const keys = eventIds.map(eventId => encodeEventIdKey(roomId, eventId));
|
||||||
|
const results = new Map();
|
||||||
|
await byEventId.findExistingKeys(keys, false, (indexKey, pk) => {
|
||||||
|
const {eventId} = decodeEventIdKey(indexKey as string);
|
||||||
|
const {eventKey} = decodeKey(pk as string);
|
||||||
|
results.set(eventId, eventKey);
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
/** Finds the first eventId that occurs in the store, if any.
|
/** Finds the first eventId that occurs in the store, if any.
|
||||||
* For optimal performance, `eventIds` should be in chronological order.
|
* For optimal performance, `eventIds` should be in chronological order.
|
||||||
*
|
*
|
||||||
|
|
Loading…
Reference in a new issue