Merge pull request #38 from bwindels/bwindels/remote-echo-for-gaps
Look for remote echos in gap responses as well as sync responses
This commit is contained in:
commit
dc65274b8b
7 changed files with 81 additions and 63 deletions
|
@ -18,7 +18,7 @@ export default class RoomViewModel extends EventEmitter {
|
||||||
this._room.on("change", this._onRoomChange);
|
this._room.on("change", this._onRoomChange);
|
||||||
try {
|
try {
|
||||||
this._timeline = await this._room.openTimeline();
|
this._timeline = await this._room.openTimeline();
|
||||||
this._timelineVM = new TimelineViewModel(this._timeline, this._ownUserId);
|
this._timelineVM = new TimelineViewModel(this._room, this._timeline, this._ownUserId);
|
||||||
this.emit("change", "timelineViewModel");
|
this.emit("change", "timelineViewModel");
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(`room.openTimeline(): ${err.message}:\n${err.stack}`);
|
console.error(`room.openTimeline(): ${err.message}:\n${err.stack}`);
|
||||||
|
|
|
@ -18,12 +18,12 @@ import TilesCollection from "./TilesCollection.js";
|
||||||
import tilesCreator from "./tilesCreator.js";
|
import tilesCreator from "./tilesCreator.js";
|
||||||
|
|
||||||
export default class TimelineViewModel {
|
export default class TimelineViewModel {
|
||||||
constructor(timeline, ownUserId) {
|
constructor(room, timeline, ownUserId) {
|
||||||
this._timeline = timeline;
|
this._timeline = timeline;
|
||||||
// once we support sending messages we could do
|
// once we support sending messages we could do
|
||||||
// timeline.entries.concat(timeline.pendingEvents)
|
// timeline.entries.concat(timeline.pendingEvents)
|
||||||
// for an ObservableList that also contains local echos
|
// for an ObservableList that also contains local echos
|
||||||
this._tiles = new TilesCollection(timeline.entries, tilesCreator({timeline, ownUserId}));
|
this._tiles = new TilesCollection(timeline.entries, tilesCreator({room, ownUserId}));
|
||||||
}
|
}
|
||||||
|
|
||||||
// doesn't fill gaps, only loads stored entries/tiles
|
// doesn't fill gaps, only loads stored entries/tiles
|
||||||
|
|
|
@ -4,11 +4,11 @@ import LocationTile from "./tiles/LocationTile.js";
|
||||||
import RoomNameTile from "./tiles/RoomNameTile.js";
|
import RoomNameTile from "./tiles/RoomNameTile.js";
|
||||||
import RoomMemberTile from "./tiles/RoomMemberTile.js";
|
import RoomMemberTile from "./tiles/RoomMemberTile.js";
|
||||||
|
|
||||||
export default function ({timeline, ownUserId}) {
|
export default function ({room, ownUserId}) {
|
||||||
return function tilesCreator(entry, emitUpdate) {
|
return function tilesCreator(entry, emitUpdate) {
|
||||||
const options = {entry, emitUpdate, ownUserId};
|
const options = {entry, emitUpdate, ownUserId};
|
||||||
if (entry.isGap) {
|
if (entry.isGap) {
|
||||||
return new GapTile(options, timeline);
|
return new GapTile(options, room);
|
||||||
} else if (entry.eventType) {
|
} else if (entry.eventType) {
|
||||||
switch (entry.eventType) {
|
switch (entry.eventType) {
|
||||||
case "m.room.message": {
|
case "m.room.message": {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import EventEmitter from "../../EventEmitter.js";
|
import EventEmitter from "../../EventEmitter.js";
|
||||||
import RoomSummary from "./summary.js";
|
import RoomSummary from "./summary.js";
|
||||||
import SyncWriter from "./timeline/persistence/SyncWriter.js";
|
import SyncWriter from "./timeline/persistence/SyncWriter.js";
|
||||||
|
import GapWriter from "./timeline/persistence/GapWriter.js";
|
||||||
import Timeline from "./timeline/Timeline.js";
|
import Timeline from "./timeline/Timeline.js";
|
||||||
import FragmentIdComparer from "./timeline/FragmentIdComparer.js";
|
import FragmentIdComparer from "./timeline/FragmentIdComparer.js";
|
||||||
import SendQueue from "./sending/SendQueue.js";
|
import SendQueue from "./sending/SendQueue.js";
|
||||||
|
@ -58,6 +59,47 @@ export default class Room extends EventEmitter {
|
||||||
this._sendQueue.enqueueEvent(eventType, content);
|
this._sendQueue.enqueueEvent(eventType, content);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** @public */
|
||||||
|
async fillGap(fragmentEntry, amount) {
|
||||||
|
const response = await this._hsApi.messages(this._roomId, {
|
||||||
|
from: fragmentEntry.token,
|
||||||
|
dir: fragmentEntry.direction.asApiString(),
|
||||||
|
limit: amount,
|
||||||
|
filter: {lazy_load_members: true}
|
||||||
|
}).response();
|
||||||
|
|
||||||
|
const txn = await this._storage.readWriteTxn([
|
||||||
|
this._storage.storeNames.pendingEvents,
|
||||||
|
this._storage.storeNames.timelineEvents,
|
||||||
|
this._storage.storeNames.timelineFragments,
|
||||||
|
]);
|
||||||
|
let removedPendingEvents;
|
||||||
|
let newEntries;
|
||||||
|
try {
|
||||||
|
// detect remote echos of pending messages in the gap
|
||||||
|
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
|
||||||
|
// write new events into gap
|
||||||
|
const gapWriter = new GapWriter({
|
||||||
|
roomId: this._roomId,
|
||||||
|
storage: this._storage,
|
||||||
|
fragmentIdComparer: this._fragmentIdComparer
|
||||||
|
});
|
||||||
|
newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
||||||
|
} catch (err) {
|
||||||
|
txn.abort();
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
await txn.complete();
|
||||||
|
// once txn is committed, emit events
|
||||||
|
if (removedPendingEvents) {
|
||||||
|
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||||
|
}
|
||||||
|
if (this._timeline) {
|
||||||
|
this._timeline.addGapEntries(newEntries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
get name() {
|
get name() {
|
||||||
return this._summary.name;
|
return this._summary.name;
|
||||||
}
|
}
|
||||||
|
@ -73,7 +115,6 @@ export default class Room extends EventEmitter {
|
||||||
this._timeline = new Timeline({
|
this._timeline = new Timeline({
|
||||||
roomId: this.id,
|
roomId: this.id,
|
||||||
storage: this._storage,
|
storage: this._storage,
|
||||||
hsApi: this._hsApi,
|
|
||||||
fragmentIdComparer: this._fragmentIdComparer,
|
fragmentIdComparer: this._fragmentIdComparer,
|
||||||
pendingEvents: this._sendQueue.pendingEvents,
|
pendingEvents: this._sendQueue.pendingEvents,
|
||||||
closeCallback: () => this._timeline = null,
|
closeCallback: () => this._timeline = null,
|
||||||
|
|
|
@ -64,13 +64,16 @@ export default class SendQueue {
|
||||||
const removed = [];
|
const removed = [];
|
||||||
for (const event of events) {
|
for (const event of events) {
|
||||||
const txnId = event.unsigned && event.unsigned.transaction_id;
|
const txnId = event.unsigned && event.unsigned.transaction_id;
|
||||||
|
let idx;
|
||||||
if (txnId) {
|
if (txnId) {
|
||||||
const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
|
idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId);
|
||||||
if (idx !== -1) {
|
} else {
|
||||||
const pendingEvent = this._pendingEvents.get(idx);
|
idx = this._pendingEvents.array.findIndex(pe => pe.remoteId === event.event_id);
|
||||||
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
|
}
|
||||||
removed.push(pendingEvent);
|
if (idx !== -1) {
|
||||||
}
|
const pendingEvent = this._pendingEvents.get(idx);
|
||||||
|
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
|
||||||
|
removed.push(pendingEvent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return removed;
|
return removed;
|
||||||
|
|
|
@ -1,16 +1,14 @@
|
||||||
import { SortedArray, MappedList, ConcatList } from "../../../observable/index.js";
|
import { SortedArray, MappedList, ConcatList } from "../../../observable/index.js";
|
||||||
import Direction from "./Direction.js";
|
import Direction from "./Direction.js";
|
||||||
import GapWriter from "./persistence/GapWriter.js";
|
|
||||||
import TimelineReader from "./persistence/TimelineReader.js";
|
import TimelineReader from "./persistence/TimelineReader.js";
|
||||||
import PendingEventEntry from "./entries/PendingEventEntry.js";
|
import PendingEventEntry from "./entries/PendingEventEntry.js";
|
||||||
|
|
||||||
export default class Timeline {
|
export default class Timeline {
|
||||||
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user, hsApi}) {
|
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user}) {
|
||||||
this._roomId = roomId;
|
this._roomId = roomId;
|
||||||
this._storage = storage;
|
this._storage = storage;
|
||||||
this._closeCallback = closeCallback;
|
this._closeCallback = closeCallback;
|
||||||
this._fragmentIdComparer = fragmentIdComparer;
|
this._fragmentIdComparer = fragmentIdComparer;
|
||||||
this._hsApi = hsApi;
|
|
||||||
this._remoteEntries = new SortedArray((a, b) => a.compare(b));
|
this._remoteEntries = new SortedArray((a, b) => a.compare(b));
|
||||||
this._timelineReader = new TimelineReader({
|
this._timelineReader = new TimelineReader({
|
||||||
roomId: this._roomId,
|
roomId: this._roomId,
|
||||||
|
@ -36,20 +34,8 @@ export default class Timeline {
|
||||||
this._remoteEntries.setManySorted(newEntries);
|
this._remoteEntries.setManySorted(newEntries);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @public */
|
/** @package */
|
||||||
async fillGap(fragmentEntry, amount) {
|
addGapEntries(newEntries) {
|
||||||
const response = await this._hsApi.messages(this._roomId, {
|
|
||||||
from: fragmentEntry.token,
|
|
||||||
dir: fragmentEntry.direction.asApiString(),
|
|
||||||
limit: amount,
|
|
||||||
filter: {lazy_load_members: true}
|
|
||||||
}).response();
|
|
||||||
const gapWriter = new GapWriter({
|
|
||||||
roomId: this._roomId,
|
|
||||||
storage: this._storage,
|
|
||||||
fragmentIdComparer: this._fragmentIdComparer
|
|
||||||
});
|
|
||||||
const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response);
|
|
||||||
this._remoteEntries.setManySorted(newEntries);
|
this._remoteEntries.setManySorted(newEntries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ export default class GapWriter {
|
||||||
txn.timelineFragments.update(fragmentEntry.fragment);
|
txn.timelineFragments.update(fragmentEntry.fragment);
|
||||||
}
|
}
|
||||||
|
|
||||||
async writeFragmentFill(fragmentEntry, response) {
|
async writeFragmentFill(fragmentEntry, response, txn) {
|
||||||
const {fragmentId, direction} = fragmentEntry;
|
const {fragmentId, direction} = fragmentEntry;
|
||||||
// chunk is in reverse-chronological order when backwards
|
// chunk is in reverse-chronological order when backwards
|
||||||
const {chunk, start, end} = response;
|
const {chunk, start, end} = response;
|
||||||
|
@ -89,39 +89,27 @@ export default class GapWriter {
|
||||||
throw new Error("Invalid end token in response");
|
throw new Error("Invalid end token in response");
|
||||||
}
|
}
|
||||||
|
|
||||||
const txn = await this._storage.readWriteTxn([
|
// make sure we have the latest fragment from the store
|
||||||
this._storage.storeNames.timelineEvents,
|
const fragment = await txn.timelineFragments.get(this._roomId, fragmentId);
|
||||||
this._storage.storeNames.timelineFragments,
|
if (!fragment) {
|
||||||
]);
|
throw new Error(`Unknown fragment: ${fragmentId}`);
|
||||||
|
|
||||||
try {
|
|
||||||
// make sure we have the latest fragment from the store
|
|
||||||
const fragment = await txn.timelineFragments.get(this._roomId, fragmentId);
|
|
||||||
if (!fragment) {
|
|
||||||
throw new Error(`Unknown fragment: ${fragmentId}`);
|
|
||||||
}
|
|
||||||
fragmentEntry = fragmentEntry.withUpdatedFragment(fragment);
|
|
||||||
// check that the request was done with the token we are aware of (extra care to avoid timeline corruption)
|
|
||||||
if (fragmentEntry.token !== start) {
|
|
||||||
throw new Error("start is not equal to prev_batch or next_batch");
|
|
||||||
}
|
|
||||||
// find last event in fragment so we get the eventIndex to begin creating keys at
|
|
||||||
let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn);
|
|
||||||
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
|
||||||
const {
|
|
||||||
nonOverlappingEvents,
|
|
||||||
neighbourFragmentEntry
|
|
||||||
} = await this._findOverlappingEvents(fragmentEntry, chunk, txn);
|
|
||||||
|
|
||||||
// create entries for all events in chunk, add them to entries
|
|
||||||
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
|
||||||
await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
|
||||||
} catch (err) {
|
|
||||||
txn.abort();
|
|
||||||
throw err;
|
|
||||||
}
|
}
|
||||||
|
fragmentEntry = fragmentEntry.withUpdatedFragment(fragment);
|
||||||
|
// check that the request was done with the token we are aware of (extra care to avoid timeline corruption)
|
||||||
|
if (fragmentEntry.token !== start) {
|
||||||
|
throw new Error("start is not equal to prev_batch or next_batch");
|
||||||
|
}
|
||||||
|
// find last event in fragment so we get the eventIndex to begin creating keys at
|
||||||
|
let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn);
|
||||||
|
// find out if any event in chunk is already present using findFirstOrLastOccurringEventId
|
||||||
|
const {
|
||||||
|
nonOverlappingEvents,
|
||||||
|
neighbourFragmentEntry
|
||||||
|
} = await this._findOverlappingEvents(fragmentEntry, chunk, txn);
|
||||||
|
|
||||||
await txn.complete();
|
// create entries for all events in chunk, add them to entries
|
||||||
|
entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn);
|
||||||
|
await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn);
|
||||||
|
|
||||||
return entries;
|
return entries;
|
||||||
}
|
}
|
||||||
|
|
Reference in a new issue