work on filling gaps
This commit is contained in:
parent
c8749a1a06
commit
8e8e22fe16
6 changed files with 164 additions and 10 deletions
|
@ -93,6 +93,11 @@ export default class HomeServerApi {
|
|||
return this._get("/sync", {since, timeout, filter});
|
||||
}
|
||||
|
||||
// params is from, dir and optionally to, limit, filter.
|
||||
messages(roomId, params) {
|
||||
return this._get(`/rooms/${roomId}/messages`, params);
|
||||
}
|
||||
|
||||
passwordLogin(username, password) {
|
||||
return this._post("/login", undefined, {
|
||||
"type": "m.login.password",
|
||||
|
|
|
@ -1,8 +1,17 @@
|
|||
import SortKey from "../storage/sortkey.js";
|
||||
|
||||
function gapEntriesAreEqual(a, b) {
|
||||
if (!a || !b || !a.gap || !b.gap) {
|
||||
return false;
|
||||
}
|
||||
const gapA = a.gap, gapB = b.gap;
|
||||
return gapA.prev_batch === gapB.prev_batch && gapA.next_batch === gapB.next_batch;
|
||||
}
|
||||
|
||||
export default class RoomPersister {
|
||||
constructor(roomId) {
|
||||
constructor({roomId, storage}) {
|
||||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
this._lastSortKey = new SortKey();
|
||||
}
|
||||
|
||||
|
@ -18,7 +27,85 @@ export default class RoomPersister {
|
|||
}
|
||||
|
||||
async persistGapFill(gapEntry, response) {
|
||||
throw new Error("not yet implemented");
|
||||
const {chunk, start, end} = response;
|
||||
if (!Array.isArray(chunk)) {
|
||||
throw new Error("Invalid chunk in response");
|
||||
}
|
||||
if (typeof start !== "string" || typeof end !== "string") {
|
||||
throw new Error("Invalid start or end token in response");
|
||||
}
|
||||
const gapKey = gapEntry.sortKey;
|
||||
const txn = await this._storage.readWriteTxn([this._storage.storeNames.roomTimeline]);
|
||||
try {
|
||||
const roomTimeline = txn.roomTimeline;
|
||||
// make sure what we've been given is actually persisted
|
||||
// in the timeline, otherwise we're replacing something
|
||||
// that doesn't exist (maybe it has been replaced already, or ...)
|
||||
const persistedEntry = await roomTimeline.findEntry(this._roomId, gapKey);
|
||||
if (!gapEntriesAreEqual(gapEntry, persistedEntry)) {
|
||||
throw new Error("Gap is not present in the timeline");
|
||||
}
|
||||
// find the previous event before the gap we could blend with
|
||||
const backwards = !!gapEntry.prev_batch;
|
||||
let neighbourEventEntry;
|
||||
if (backwards) {
|
||||
neighbourEventEntry = await roomTimeline.previousEventFromGap(this._roomId, gapKey);
|
||||
} else {
|
||||
neighbourEventEntry = await roomTimeline.nextEventFromGap(this._roomId, gapKey);
|
||||
}
|
||||
const neighbourEvent = neighbourEventEntry && neighbourEventEntry.event;
|
||||
|
||||
const newEntries = [];
|
||||
let sortKey = gapKey;
|
||||
let eventFound = false;
|
||||
if (backwards) {
|
||||
for (let i = chunk.length - 1; i >= 0; i--) {
|
||||
const event = chunk[i];
|
||||
if (event.id === neighbourEvent.id) {
|
||||
eventFound = true;
|
||||
break;
|
||||
}
|
||||
newEntries.splice(0, 0, this._createEventEntry(sortKey, event));
|
||||
sortKey = sortKey.previousKey();
|
||||
}
|
||||
if (!eventFound) {
|
||||
newEntries.splice(0, 0, this._createBackwardGapEntry(sortKey, end));
|
||||
}
|
||||
} else {
|
||||
for (let i = 0; i < chunk.length; i++) {
|
||||
const event = chunk[i];
|
||||
if (event.id === neighbourEvent.id) {
|
||||
eventFound = true;
|
||||
break;
|
||||
}
|
||||
newEntries.push(this._createEventEntry(sortKey, event));
|
||||
sortKey = sortKey.nextKey();
|
||||
}
|
||||
if (!eventFound) {
|
||||
// need to check start is correct here
|
||||
newEntries.push(this._createForwardGapEntry(sortKey, start));
|
||||
}
|
||||
}
|
||||
|
||||
if (eventFound) {
|
||||
// remove gap on the other side as well,
|
||||
// or while we're at it, remove all gaps between gapKey and neighbourEventEntry.sortKey
|
||||
} else {
|
||||
roomTimeline.deleteEntry(this._roomId, gapKey);
|
||||
}
|
||||
|
||||
for (let entry of newEntries) {
|
||||
roomTimeline.add(entry);
|
||||
}
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
|
||||
await txn.complete();
|
||||
|
||||
// somehow also return all the gaps we removed so the timeline can do the same
|
||||
return {newEntries};
|
||||
}
|
||||
|
||||
persistSync(roomResponse, txn) {
|
||||
|
@ -29,7 +116,7 @@ export default class RoomPersister {
|
|||
// I suppose it will, yes
|
||||
if (timeline.limited) {
|
||||
nextKey = nextKey.nextKeyWithGap();
|
||||
entries.push(this._createGapEntry(nextKey, timeline.prev_batch));
|
||||
entries.push(this._createBackwardGapEntry(nextKey, timeline.prev_batch));
|
||||
}
|
||||
// const startOfChunkSortKey = nextKey;
|
||||
if (timeline.events) {
|
||||
|
@ -40,7 +127,7 @@ export default class RoomPersister {
|
|||
}
|
||||
// write to store
|
||||
for(const entry of entries) {
|
||||
txn.roomTimeline.append(entry);
|
||||
txn.roomTimeline.add(entry);
|
||||
}
|
||||
// right thing to do? if the txn fails, not sure we'll continue anyways ...
|
||||
// only advance the key once the transaction has
|
||||
|
@ -68,7 +155,7 @@ export default class RoomPersister {
|
|||
return entries;
|
||||
}
|
||||
|
||||
_createGapEntry(sortKey, prevBatch) {
|
||||
_createBackwardGapEntry(sortKey, prevBatch) {
|
||||
return {
|
||||
roomId: this._roomId,
|
||||
sortKey: sortKey.buffer,
|
||||
|
@ -77,6 +164,15 @@ export default class RoomPersister {
|
|||
};
|
||||
}
|
||||
|
||||
_createForwardGapEntry(sortKey, nextBatch) {
|
||||
return {
|
||||
roomId: this._roomId,
|
||||
sortKey: sortKey.buffer,
|
||||
event: null,
|
||||
gap: {next_batch: nextBatch}
|
||||
};
|
||||
}
|
||||
|
||||
_createEventEntry(sortKey, event) {
|
||||
return {
|
||||
roomId: this._roomId,
|
||||
|
|
|
@ -10,7 +10,7 @@ export default class Room extends EventEmitter {
|
|||
this._storage = storage;
|
||||
this._hsApi = hsApi;
|
||||
this._summary = new RoomSummary(roomId);
|
||||
this._persister = new RoomPersister(roomId);
|
||||
this._persister = new RoomPersister({roomId, storage});
|
||||
this._emitCollectionChange = emitCollectionChange;
|
||||
this._timeline = null;
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import { ObservableArray } from "../../observable/index.js";
|
||||
import sortedIndex from "../../utils/sortedIndex.js";
|
||||
|
||||
export default class Timeline {
|
||||
constructor({roomId, storage, closeCallback}) {
|
||||
|
@ -37,15 +38,32 @@ export default class Timeline {
|
|||
}
|
||||
const token = gap.prev_batch || gap.next_batch;
|
||||
|
||||
const response = await this._hsApi.messages({
|
||||
roomId: this._roomId,
|
||||
const response = await this._hsApi.messages(this._roomId, {
|
||||
from: token,
|
||||
dir: direction,
|
||||
limit: amount
|
||||
});
|
||||
|
||||
const newEntries = await this._persister.persistGapFill(gapEntry, response);
|
||||
// find where to replace existing gap with newEntries by doing binary search
|
||||
const gapIdx = sortedIndex(this._entriesList.array, gapEntry.sortKey, (key, entry) => {
|
||||
return key.compare(entry.sortKey);
|
||||
});
|
||||
// only replace the gap if it's currently in the timeline
|
||||
if (this._entriesList.at(gapIdx) === gapEntry) {
|
||||
this._entriesList.removeAt(gapIdx);
|
||||
this._entriesList.insertMany(gapIdx, newEntries);
|
||||
}
|
||||
}
|
||||
|
||||
async loadAtTop(amount) {
|
||||
const firstEntry = this._entriesList.at(0);
|
||||
if (firstEntry) {
|
||||
const txn = await this._storage.readTxn([this._storage.storeNames.roomTimeline]);
|
||||
const topEntries = await txn.roomTimeline.eventsBefore(this._roomId, firstEntry.sortKey, amount);
|
||||
this._entriesList.insertMany(0, topEntries);
|
||||
return topEntries.length;
|
||||
}
|
||||
}
|
||||
|
||||
/** @public */
|
||||
|
|
|
@ -19,14 +19,27 @@ export default class RoomTimelineStore {
|
|||
}
|
||||
|
||||
async eventsBefore(roomId, sortKey, amount) {
|
||||
const range = IDBKeyRange.bound([roomId, SortKey.minKey.buffer], [roomId, sortKey.buffer], false, true);
|
||||
const range = IDBKeyRange.only([roomId, sortKey.buffer]);
|
||||
const events = await this._timelineStore.selectLimitReverse(range, amount);
|
||||
events.reverse(); // because we fetched them backwards
|
||||
return events;
|
||||
}
|
||||
|
||||
nextEventFromGap(roomId, sortKey) {
|
||||
|
||||
}
|
||||
|
||||
previousEventFromGap(roomId, sortKey) {
|
||||
|
||||
}
|
||||
|
||||
findEntry(roomId, sortKey) {
|
||||
const range = IDBKeyRange.bound([roomId, SortKey.minKey.buffer], [roomId, sortKey.buffer], false, true);
|
||||
return this._timelineStore.selectFirst(range);
|
||||
}
|
||||
|
||||
// entry should have roomId, sortKey, event & gap keys
|
||||
append(entry) {
|
||||
add(entry) {
|
||||
this._timelineStore.add(entry);
|
||||
}
|
||||
// should this happen as part of a transaction that stores all synced in changes?
|
||||
|
|
|
@ -11,6 +11,28 @@ export default class ObservableArray extends BaseObservableList {
|
|||
this.emitAdd(this._items.length - 1, item);
|
||||
}
|
||||
|
||||
insertMany(idx, items) {
|
||||
for(let item of items) {
|
||||
this.insert(idx, item);
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
|
||||
insert(idx, item) {
|
||||
this._items.splice(idx, 0, item);
|
||||
this.emitAdd(idx, item);
|
||||
}
|
||||
|
||||
get array() {
|
||||
return this._items;
|
||||
}
|
||||
|
||||
at(idx) {
|
||||
if (this._items && idx >= 0 && idx < this._items.length) {
|
||||
return this._items[idx];
|
||||
}
|
||||
}
|
||||
|
||||
get length() {
|
||||
return this._items.length;
|
||||
}
|
||||
|
|
Reference in a new issue