From 6d68ec1bac10bb184b47516a910cf1c363d1b021 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Sat, 21 Mar 2020 23:40:40 +0100 Subject: [PATCH 1/3] move fillGap to room --- src/domain/session/room/RoomViewModel.js | 2 +- .../room/timeline/TimelineViewModel.js | 4 ++-- .../session/room/timeline/tilesCreator.js | 4 ++-- src/matrix/room/room.js | 22 ++++++++++++++++++- src/matrix/room/timeline/Timeline.js | 22 ++++--------------- 5 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/domain/session/room/RoomViewModel.js b/src/domain/session/room/RoomViewModel.js index 09de2b8b..cb6e6119 100644 --- a/src/domain/session/room/RoomViewModel.js +++ b/src/domain/session/room/RoomViewModel.js @@ -18,7 +18,7 @@ export default class RoomViewModel extends EventEmitter { this._room.on("change", this._onRoomChange); try { this._timeline = await this._room.openTimeline(); - this._timelineVM = new TimelineViewModel(this._timeline, this._ownUserId); + this._timelineVM = new TimelineViewModel(this._room, this._timeline, this._ownUserId); this.emit("change", "timelineViewModel"); } catch (err) { console.error(`room.openTimeline(): ${err.message}:\n${err.stack}`); diff --git a/src/domain/session/room/timeline/TimelineViewModel.js b/src/domain/session/room/timeline/TimelineViewModel.js index 622ab1cb..2e952a76 100644 --- a/src/domain/session/room/timeline/TimelineViewModel.js +++ b/src/domain/session/room/timeline/TimelineViewModel.js @@ -18,12 +18,12 @@ import TilesCollection from "./TilesCollection.js"; import tilesCreator from "./tilesCreator.js"; export default class TimelineViewModel { - constructor(timeline, ownUserId) { + constructor(room, timeline, ownUserId) { this._timeline = timeline; // once we support sending messages we could do // timeline.entries.concat(timeline.pendingEvents) // for an ObservableList that also contains local echos - this._tiles = new TilesCollection(timeline.entries, tilesCreator({timeline, ownUserId})); + this._tiles = new TilesCollection(timeline.entries, tilesCreator({room, ownUserId})); } // doesn't fill gaps, only loads stored entries/tiles diff --git a/src/domain/session/room/timeline/tilesCreator.js b/src/domain/session/room/timeline/tilesCreator.js index 9f583650..87d70238 100644 --- a/src/domain/session/room/timeline/tilesCreator.js +++ b/src/domain/session/room/timeline/tilesCreator.js @@ -4,11 +4,11 @@ import LocationTile from "./tiles/LocationTile.js"; import RoomNameTile from "./tiles/RoomNameTile.js"; import RoomMemberTile from "./tiles/RoomMemberTile.js"; -export default function ({timeline, ownUserId}) { +export default function ({room, ownUserId}) { return function tilesCreator(entry, emitUpdate) { const options = {entry, emitUpdate, ownUserId}; if (entry.isGap) { - return new GapTile(options, timeline); + return new GapTile(options, room); } else if (entry.eventType) { switch (entry.eventType) { case "m.room.message": { diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index d493bc00..530e42fd 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -1,6 +1,7 @@ import EventEmitter from "../../EventEmitter.js"; import RoomSummary from "./summary.js"; import SyncWriter from "./timeline/persistence/SyncWriter.js"; +import GapWriter from "./timeline/persistence/GapWriter.js"; import Timeline from "./timeline/Timeline.js"; import FragmentIdComparer from "./timeline/FragmentIdComparer.js"; import SendQueue from "./sending/SendQueue.js"; @@ -58,6 +59,26 @@ export default class Room extends EventEmitter { this._sendQueue.enqueueEvent(eventType, content); } + + /** @public */ + async fillGap(fragmentEntry, amount) { + const response = await this._hsApi.messages(this._roomId, { + from: fragmentEntry.token, + dir: fragmentEntry.direction.asApiString(), + limit: amount, + filter: {lazy_load_members: true} + }).response(); + const gapWriter = new GapWriter({ + roomId: this._roomId, + storage: this._storage, + fragmentIdComparer: this._fragmentIdComparer + }); + const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response); + if (this._timeline) { + this._timeline.addGapEntries(newEntries) + } + } + get name() { return this._summary.name; } @@ -73,7 +94,6 @@ export default class Room extends EventEmitter { this._timeline = new Timeline({ roomId: this.id, storage: this._storage, - hsApi: this._hsApi, fragmentIdComparer: this._fragmentIdComparer, pendingEvents: this._sendQueue.pendingEvents, closeCallback: () => this._timeline = null, diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 99828479..a13fa304 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -1,16 +1,14 @@ import { SortedArray, MappedList, ConcatList } from "../../../observable/index.js"; import Direction from "./Direction.js"; -import GapWriter from "./persistence/GapWriter.js"; import TimelineReader from "./persistence/TimelineReader.js"; import PendingEventEntry from "./entries/PendingEventEntry.js"; export default class Timeline { - constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user, hsApi}) { + constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user}) { this._roomId = roomId; this._storage = storage; this._closeCallback = closeCallback; this._fragmentIdComparer = fragmentIdComparer; - this._hsApi = hsApi; this._remoteEntries = new SortedArray((a, b) => a.compare(b)); this._timelineReader = new TimelineReader({ roomId: this._roomId, @@ -36,23 +34,11 @@ export default class Timeline { this._remoteEntries.setManySorted(newEntries); } - /** @public */ - async fillGap(fragmentEntry, amount) { - const response = await this._hsApi.messages(this._roomId, { - from: fragmentEntry.token, - dir: fragmentEntry.direction.asApiString(), - limit: amount, - filter: {lazy_load_members: true} - }).response(); - const gapWriter = new GapWriter({ - roomId: this._roomId, - storage: this._storage, - fragmentIdComparer: this._fragmentIdComparer - }); - const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response); + /** @package */ + addGapEntries(newEntries) { this._remoteEntries.setManySorted(newEntries); } - + // tries to prepend `amount` entries to the `entries` list. async loadAtTop(amount) { const firstEventEntry = this._remoteEntries.array.find(e => !!e.eventType); From f02641c808ac08752c021a80aae21c70f2f5ee0f Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Sun, 22 Mar 2020 00:07:37 +0100 Subject: [PATCH 2/3] look for transaction_id in /messages response to delete pending events --- src/matrix/room/room.js | 35 +++++++++--- .../room/timeline/persistence/GapWriter.js | 54 ++++++++----------- 2 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index 530e42fd..7de0d92b 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -68,14 +68,35 @@ export default class Room extends EventEmitter { limit: amount, filter: {lazy_load_members: true} }).response(); - const gapWriter = new GapWriter({ - roomId: this._roomId, - storage: this._storage, - fragmentIdComparer: this._fragmentIdComparer - }); - const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response); + + const txn = await this._storage.readWriteTxn([ + this._storage.storeNames.pendingEvents, + this._storage.storeNames.timelineEvents, + this._storage.storeNames.timelineFragments, + ]); + let removedPendingEvents; + let newEntries; + try { + // detect remote echos of pending messages in the gap + removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn); + // write new events into gap + const gapWriter = new GapWriter({ + roomId: this._roomId, + storage: this._storage, + fragmentIdComparer: this._fragmentIdComparer + }); + newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response, txn); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + // once txn is committed, emit events + if (removedPendingEvents) { + this._sendQueue.emitRemovals(removedPendingEvents); + } if (this._timeline) { - this._timeline.addGapEntries(newEntries) + this._timeline.addGapEntries(newEntries); } } diff --git a/src/matrix/room/timeline/persistence/GapWriter.js b/src/matrix/room/timeline/persistence/GapWriter.js index 9941c712..5dba0664 100644 --- a/src/matrix/room/timeline/persistence/GapWriter.js +++ b/src/matrix/room/timeline/persistence/GapWriter.js @@ -76,7 +76,7 @@ export default class GapWriter { txn.timelineFragments.update(fragmentEntry.fragment); } - async writeFragmentFill(fragmentEntry, response) { + async writeFragmentFill(fragmentEntry, response, txn) { const {fragmentId, direction} = fragmentEntry; // chunk is in reverse-chronological order when backwards const {chunk, start, end} = response; @@ -89,40 +89,28 @@ export default class GapWriter { throw new Error("Invalid end token in response"); } - const txn = await this._storage.readWriteTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.timelineFragments, - ]); - - try { - // make sure we have the latest fragment from the store - const fragment = await txn.timelineFragments.get(this._roomId, fragmentId); - if (!fragment) { - throw new Error(`Unknown fragment: ${fragmentId}`); - } - fragmentEntry = fragmentEntry.withUpdatedFragment(fragment); - // check that the request was done with the token we are aware of (extra care to avoid timeline corruption) - if (fragmentEntry.token !== start) { - throw new Error("start is not equal to prev_batch or next_batch"); - } - // find last event in fragment so we get the eventIndex to begin creating keys at - let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn); - // find out if any event in chunk is already present using findFirstOrLastOccurringEventId - const { - nonOverlappingEvents, - neighbourFragmentEntry - } = await this._findOverlappingEvents(fragmentEntry, chunk, txn); - - // create entries for all events in chunk, add them to entries - entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn); - await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn); - } catch (err) { - txn.abort(); - throw err; + // make sure we have the latest fragment from the store + const fragment = await txn.timelineFragments.get(this._roomId, fragmentId); + if (!fragment) { + throw new Error(`Unknown fragment: ${fragmentId}`); } + fragmentEntry = fragmentEntry.withUpdatedFragment(fragment); + // check that the request was done with the token we are aware of (extra care to avoid timeline corruption) + if (fragmentEntry.token !== start) { + throw new Error("start is not equal to prev_batch or next_batch"); + } + // find last event in fragment so we get the eventIndex to begin creating keys at + let lastKey = await this._findLastFragmentEventKey(fragmentEntry, txn); + // find out if any event in chunk is already present using findFirstOrLastOccurringEventId + const { + nonOverlappingEvents, + neighbourFragmentEntry + } = await this._findOverlappingEvents(fragmentEntry, chunk, txn); - await txn.complete(); - + // create entries for all events in chunk, add them to entries + entries = this._storeEvents(nonOverlappingEvents, lastKey, direction, txn); + await this._updateFragments(fragmentEntry, neighbourFragmentEntry, end, entries, txn); + return entries; } } From 8354c58c0780777dd700bfa947c1858148c26961 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 23 Mar 2020 23:00:33 +0100 Subject: [PATCH 3/3] also look for remote echos based on event_id --- src/matrix/room/sending/SendQueue.js | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index e89b3bae..3c9f90c3 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -64,13 +64,16 @@ export default class SendQueue { const removed = []; for (const event of events) { const txnId = event.unsigned && event.unsigned.transaction_id; + let idx; if (txnId) { - const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); - if (idx !== -1) { - const pendingEvent = this._pendingEvents.get(idx); - txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); - removed.push(pendingEvent); - } + idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); + } else { + idx = this._pendingEvents.array.findIndex(pe => pe.remoteId === event.event_id); + } + if (idx !== -1) { + const pendingEvent = this._pendingEvents.get(idx); + txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); + removed.push(pendingEvent); } } return removed;