|
|
|
@ -26,47 +26,28 @@ export class GapWriter {
|
|
|
|
|
this._fragmentIdComparer = fragmentIdComparer;
|
|
|
|
|
this._relationWriter = relationWriter;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async _pickNeighbourFragment(txn, duplcateEventIds) {
|
|
|
|
|
const duplicateEventId = duplcateEventIds[0];
|
|
|
|
|
const duplicateEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId);
|
|
|
|
|
return duplicateEvent.fragmentId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// events is in reverse-chronological order (last event comes at index 0) if backwards
|
|
|
|
|
async _findOverlappingEvents(fragmentEntry, events, txn, log) {
|
|
|
|
|
let expectedOverlappingEventId;
|
|
|
|
|
if (fragmentEntry.hasLinkedFragment) {
|
|
|
|
|
expectedOverlappingEventId = await this._findExpectedOverlappingEventId(fragmentEntry, txn);
|
|
|
|
|
}
|
|
|
|
|
let remainingEvents = events;
|
|
|
|
|
let nonOverlappingEvents = [];
|
|
|
|
|
const eventIds = events.map(e => e.event_id);
|
|
|
|
|
const duplicateEventIds = await txn.timelineEvents.findOccurringEventIds(this._roomId, eventIds);
|
|
|
|
|
const nonOverlappingEvents = events.filter(e => !duplicateEventIds.includes(e.event_id));
|
|
|
|
|
let neighbourFragmentEntry;
|
|
|
|
|
while (remainingEvents && remainingEvents.length) {
|
|
|
|
|
const eventIds = remainingEvents.map(e => e.event_id);
|
|
|
|
|
const duplicateEventId = await txn.timelineEvents.findFirstOccurringEventId(this._roomId, eventIds);
|
|
|
|
|
if (duplicateEventId) {
|
|
|
|
|
const duplicateEventIndex = remainingEvents.findIndex(e => e.event_id === duplicateEventId);
|
|
|
|
|
// should never happen, just being defensive as this *can't* go wrong
|
|
|
|
|
if (duplicateEventIndex === -1) {
|
|
|
|
|
throw new Error(`findFirstOccurringEventId returned ${duplicateEventIndex} which wasn't ` +
|
|
|
|
|
`in [${eventIds.join(",")}] in ${this._roomId}`);
|
|
|
|
|
}
|
|
|
|
|
nonOverlappingEvents.push(...remainingEvents.slice(0, duplicateEventIndex));
|
|
|
|
|
if (!expectedOverlappingEventId || duplicateEventId === expectedOverlappingEventId) {
|
|
|
|
|
// TODO: check here that the neighbourEvent is at the correct edge of it's fragment
|
|
|
|
|
// get neighbour fragment to link it up later on
|
|
|
|
|
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, duplicateEventId);
|
|
|
|
|
if (neighbourEvent.fragmentId === fragmentEntry.fragmentId) {
|
|
|
|
|
log.log("hit #160, prevent fragment linking to itself", log.level.Warn);
|
|
|
|
|
} else {
|
|
|
|
|
const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourEvent.fragmentId);
|
|
|
|
|
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
|
|
|
|
}
|
|
|
|
|
// trim overlapping events
|
|
|
|
|
remainingEvents = null;
|
|
|
|
|
} else {
|
|
|
|
|
// we've hit https://github.com/matrix-org/synapse/issues/7164,
|
|
|
|
|
// e.g. the event id we found is already in our store but it is not
|
|
|
|
|
// the adjacent fragment id. Ignore the event, but keep processing the ones after.
|
|
|
|
|
remainingEvents = remainingEvents.slice(duplicateEventIndex + 1);
|
|
|
|
|
}
|
|
|
|
|
if (eventIds.length && eventIds.length === duplicateEventIds.length) {
|
|
|
|
|
// TODO: check here that the neighbourEvent is at the correct edge of it's fragment
|
|
|
|
|
// get neighbour fragment to link it up later on
|
|
|
|
|
const neighbourFragmentId = await this._pickNeighbourFragment(txn, duplicateEventIds);
|
|
|
|
|
if (neighbourFragmentId === fragmentEntry.fragmentId) {
|
|
|
|
|
log.log("hit #160, prevent fragment linking to itself", log.level.Warn);
|
|
|
|
|
} else {
|
|
|
|
|
nonOverlappingEvents.push(...remainingEvents);
|
|
|
|
|
remainingEvents = null;
|
|
|
|
|
const neighbourFragment = await txn.timelineFragments.get(this._roomId, neighbourFragmentId);
|
|
|
|
|
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return {nonOverlappingEvents, neighbourFragmentEntry};
|
|
|
|
@ -330,6 +311,10 @@ export function tests() {
|
|
|
|
|
return txn.timelineFragments.get(roomId, fragmentId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async function updatedFragmentEntry(mock, fragmentEntry) {
|
|
|
|
|
return fragmentEntry.withUpdatedFragment(await fetchFragment(mock, fragmentEntry.fragmentId));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
function assertFilledLink(assert, fragment1, fragment2) {
|
|
|
|
|
assert.equal(fragment1.nextId, fragment2.id);
|
|
|
|
|
assert.equal(fragment2.previousId, fragment1.id);
|
|
|
|
@ -361,7 +346,9 @@ export function tests() {
|
|
|
|
|
const {syncResponse, fragmentEntry: firstFragmentEntry} = await syncAndWrite(mocks, { limit: 10 });
|
|
|
|
|
timelineMock.append(15);
|
|
|
|
|
const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 });
|
|
|
|
|
// Only the second backfill (in which all events overlap) fills the gap.
|
|
|
|
|
await backfillAndWrite(mocks, secondFragmentEntry, 10);
|
|
|
|
|
await backfillAndWrite(mocks, await updatedFragmentEntry(mocks, secondFragmentEntry), 10);
|
|
|
|
|
|
|
|
|
|
const firstFragment = await fetchFragment(mocks, firstFragmentEntry.fragmentId);
|
|
|
|
|
const secondFragment = await fetchFragment(mocks, secondFragmentEntry.fragmentId);
|
|
|
|
@ -413,7 +400,9 @@ export function tests() {
|
|
|
|
|
timelineMock.append(11);
|
|
|
|
|
const {fragmentEntry: secondFragmentEntry} = await syncAndWrite(mocks, { previous: syncResponse, limit: 10 });
|
|
|
|
|
timelineMock.insertAfter(eventId(9), 5);
|
|
|
|
|
// Only the second backfill (in which all events overlap) fills the gap.
|
|
|
|
|
await backfillAndWrite(mocks, secondFragmentEntry, 10);
|
|
|
|
|
await backfillAndWrite(mocks, await updatedFragmentEntry(mocks, secondFragmentEntry), 10);
|
|
|
|
|
|
|
|
|
|
const firstEvents = await allFragmentEvents(mocks, firstFragmentEntry.fragmentId);
|
|
|
|
|
assert.deepEqual(firstEvents.map(e => e.event_id), eventIds(0, 10));
|
|
|
|
|