split up persistFragmentFill method into smaller ones
This commit is contained in:
parent
10457611f9
commit
3324fd3afd
1 changed files with 70 additions and 46 deletions
|
@ -16,6 +16,69 @@ export default class GapPersister {
|
||||||
this._storage = storage;
|
this._storage = storage;
|
||||||
this._fragmentIdComparer = fragmentIdComparer;
|
this._fragmentIdComparer = fragmentIdComparer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async _findOverlappingEvents(fragmentEntry, events, txn) {
|
||||||
|
const eventIds = events.map(e => e.event_id);
|
||||||
|
const {direction} = fragmentEntry;
|
||||||
|
const findLast = direction.isBackward;
|
||||||
|
let nonOverlappingEvents = events;
|
||||||
|
let neighbourFragmentEntry;
|
||||||
|
const neighbourEventId = await txn.timelineEvents.findFirstOrLastOccurringEventId(this._roomId, eventIds, findLast);
|
||||||
|
if (neighbourEventId) {
|
||||||
|
// trim overlapping events
|
||||||
|
const neighbourEventIndex = events.findIndex(e => e.event_id === neighbourEventId);
|
||||||
|
const start = direction.isBackward ? neighbourEventIndex + 1 : 0;
|
||||||
|
const end = direction.isBackward ? events.length : neighbourEventIndex;
|
||||||
|
nonOverlappingEvents = events.slice(start, end);
|
||||||
|
// get neighbour fragment to link it up later on
|
||||||
|
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, neighbourEventId);
|
||||||
|
const neighbourFragment = await txn.timelineFragments.get(neighbourEvent.fragmentId);
|
||||||
|
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
||||||
|
}
|
||||||
|
|
||||||
|
return {nonOverlappingEvents, neighbourFragmentEntry};
|
||||||
|
}
|
||||||
|
|
||||||
|
async _findLastFragmentEventKey(fragmentEntry, txn) {
|
||||||
|
const {fragmentId, direction} = fragmentEntry;
|
||||||
|
if (direction.isBackward) {
|
||||||
|
const [firstEvent] = await txn.timelineEvents.firstEvents(this._roomId, fragmentId, 1);
|
||||||
|
return new EventKey(firstEvent.fragmentId, firstEvent.eventIndex);
|
||||||
|
} else {
|
||||||
|
const [lastEvent] = await txn.timelineEvents.lastEvents(this._roomId, fragmentId, 1);
|
||||||
|
return new EventKey(lastEvent.fragmentId, lastEvent.eventIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_storeEvents(events, startKey, direction, txn) {
|
||||||
|
const entries = new Array(events.length);
|
||||||
|
const reducer = direction.isBackward ? Array.prototype.reduceRight : Array.prototype.reduce;
|
||||||
|
reducer.call(events, (key, event, i) => {
|
||||||
|
key = key.nextKeyForDirection(direction);
|
||||||
|
const eventEntry = createEventEntry(key, event);
|
||||||
|
txn.timelineEvents.insert(eventEntry);
|
||||||
|
entries[i] = new EventEntry(eventEntry, this._fragmentIdComparer);
|
||||||
|
}, startKey);
|
||||||
|
return entries;
|
||||||
|
}
|
||||||
|
|
||||||
|
async _updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn) {
|
||||||
|
const {direction} = fragmentEntry;
|
||||||
|
directionalAppend(entries, fragmentEntry, direction);
|
||||||
|
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
||||||
|
if (neighbourFragmentEntry) {
|
||||||
|
fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId;
|
||||||
|
neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId;
|
||||||
|
txn.timelineFragments.set(neighbourFragmentEntry.fragment);
|
||||||
|
directionalAppend(entries, neighbourFragmentEntry, direction);
|
||||||
|
|
||||||
|
// update fragmentIdComparer here after linking up fragments?
|
||||||
|
this._fragmentIdComparer.rebuild(await txn.timelineFragments.all());
|
||||||
|
}
|
||||||
|
fragmentEntry.token = end;
|
||||||
|
txn.timelineFragments.set(fragmentEntry.fragment);
|
||||||
|
}
|
||||||
|
|
||||||
async persistFragmentFill(fragmentEntry, response) {
|
async persistFragmentFill(fragmentEntry, response) {
|
||||||
const {fragmentId, direction} = fragmentEntry;
|
const {fragmentId, direction} = fragmentEntry;
|
||||||
// assuming that chunk is in chronological order when backwards too?
|
// assuming that chunk is in chronological order when backwards too?
|
||||||
|
@ -46,55 +109,16 @@ export default class GapPersister {
|
||||||
throw new Error("start is not equal to prev_batch or next_batch");
|
throw new Error("start is not equal to prev_batch or next_batch");
|
||||||
}
|
}
|
||||||
// find last event in fragment so we get the eventIndex to begin creating keys at
|
// find last event in fragment so we get the eventIndex to begin creating keys at
|
||||||
let currentKey;
|
let lastKey = this._findLastFragmentEventKey(fragmentEntry, txn);
|
||||||
if (direction.isBackward) {
|
|
||||||
const [firstEvent] = await txn.timelineEvents.firstEvents(this._roomId, fragmentId, 1);
|
|
||||||
currentKey = new EventKey(firstEvent.fragmentId, firstEvent.eventIndex);
|
|
||||||
} else {
|
|
||||||
const [lastEvent] = await txn.timelineEvents.lastEvents(this._roomId, fragmentId, 1);
|
|
||||||
currentKey = new EventKey(lastEvent.fragmentId, lastEvent.eventIndex);
|
|
||||||
}
|
|
||||||
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
||||||
const eventIds = chunk.map(e => e.event_id);
|
const {
|
||||||
const findLast = direction.isBackward;
|
nonOverlappingEvents,
|
||||||
let nonOverlappingEvents = chunk;
|
neighbourFragmentEntry
|
||||||
let neighbourFragmentEntry;
|
} = this._findOverlappingEvents(fragmentEntry, chunk, txn);
|
||||||
const neighbourEventId = await txn.timelineEvents.findFirstOrLastOccurringEventId(this._roomId, eventIds, findLast);
|
|
||||||
if (neighbourEventId) {
|
|
||||||
// trim overlapping events
|
|
||||||
const neighbourEventIndex = chunk.findIndex(e => e.event_id === neighbourEventId);
|
|
||||||
const start = direction.isBackward ? neighbourEventIndex + 1 : 0;
|
|
||||||
const end = direction.isBackward ? chunk.length : neighbourEventIndex;
|
|
||||||
nonOverlappingEvents = chunk.slice(start, end);
|
|
||||||
// get neighbour fragment to link it up later on
|
|
||||||
const neighbourEvent = await txn.timelineEvents.getByEventId(this._roomId, neighbourEventId);
|
|
||||||
const neighbourFragment = await txn.timelineFragments.get(neighbourEvent.fragmentId);
|
|
||||||
neighbourFragmentEntry = fragmentEntry.createNeighbourEntry(neighbourFragment);
|
|
||||||
}
|
|
||||||
|
|
||||||
// create entries for all events in chunk, add them to entries
|
// create entries for all events in chunk, add them to entries
|
||||||
entries = new Array(nonOverlappingEvents.length);
|
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
||||||
const reducer = direction.isBackward ? Array.prototype.reduceRight : Array.prototype.reduce;
|
await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
||||||
currentKey = reducer.call(nonOverlappingEvents, (key, event, i) => {
|
|
||||||
key = key.nextKeyForDirection(direction);
|
|
||||||
const eventEntry = createEventEntry(currentKey, event);
|
|
||||||
txn.timelineEvents.insert(eventEntry);
|
|
||||||
entries[i] = new EventEntry(eventEntry, this._fragmentIdComparer);
|
|
||||||
}, currentKey);
|
|
||||||
|
|
||||||
directionalAppend(entries, fragmentEntry, direction);
|
|
||||||
// set `end` as token, and if we found an event in the step before, link up the fragments in the fragment entry
|
|
||||||
if (neighbourFragmentEntry) {
|
|
||||||
fragmentEntry.linkedFragmentId = neighbourFragmentEntry.fragmentId;
|
|
||||||
neighbourFragmentEntry.linkedFragmentId = fragmentEntry.fragmentId;
|
|
||||||
txn.timelineFragments.set(neighbourFragmentEntry.fragment);
|
|
||||||
directionalAppend(entries, neighbourFragmentEntry, direction);
|
|
||||||
|
|
||||||
// update fragmentIdComparer here after linking up fragments?
|
|
||||||
this._fragmentIdComparer.rebuild(await txn.timelineFragments.all());
|
|
||||||
}
|
|
||||||
fragmentEntry.token = end;
|
|
||||||
txn.timelineFragments.set(fragmentEntry.fragment);
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
txn.abort();
|
txn.abort();
|
||||||
throw err;
|
throw err;
|
||||||
|
|
Reference in a new issue