diff --git a/src/matrix/room/BaseRoom.js b/src/matrix/room/BaseRoom.js new file mode 100644 index 00000000..d01cfaf1 --- /dev/null +++ b/src/matrix/room/BaseRoom.js @@ -0,0 +1,481 @@ +/* +Copyright 2020 Bruno Windels + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {EventEmitter} from "../../utils/EventEmitter.js"; +import {RoomSummary} from "./RoomSummary.js"; +import {GapWriter} from "./timeline/persistence/GapWriter.js"; +import {Timeline} from "./timeline/Timeline.js"; +import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js"; +import {WrappedError} from "../error.js" +import {fetchOrLoadMembers} from "./members/load.js"; +import {MemberList} from "./members/MemberList.js"; +import {Heroes} from "./members/Heroes.js"; +import {EventEntry} from "./timeline/entries/EventEntry.js"; +import {ObservedEventMap} from "./ObservedEventMap.js"; +import {DecryptionSource} from "../e2ee/common.js"; +import {ensureLogItem} from "../../logging/utils.js"; + +const EVENT_ENCRYPTED_TYPE = "m.room.encrypted"; + +export class BaseRoom extends EventEmitter { + constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, user, createRoomEncryption, getSyncToken, platform}) { + super(); + this._roomId = roomId; + this._storage = storage; + this._hsApi = hsApi; + this._mediaRepository = mediaRepository; + this._summary = new RoomSummary(roomId); + this._fragmentIdComparer = new FragmentIdComparer([]); + this._emitCollectionChange = emitCollectionChange; + this._timeline = null; + this._user = user; + this._changedMembersDuringSync = null; + this._memberList = null; + this._createRoomEncryption = createRoomEncryption; + this._roomEncryption = null; + this._getSyncToken = getSyncToken; + this._platform = platform; + this._observedEvents = null; + } + + async _eventIdsToEntries(eventIds, txn) { + const retryEntries = []; + await Promise.all(eventIds.map(async eventId => { + const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); + if (storageEntry) { + retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); + } + })); + return retryEntries; + } + + _getAdditionalTimelineRetryEntries(otherRetryEntries, roomKeys) { + let retryTimelineEntries = this._roomEncryption.filterUndecryptedEventEntriesForKeys(this._timeline.remoteEntries, roomKeys); + // filter out any entries already in retryEntries so we don't decrypt them twice + const existingIds = otherRetryEntries.reduce((ids, e) => {ids.add(e.id); return ids;}, new Set()); + retryTimelineEntries = retryTimelineEntries.filter(e => !existingIds.has(e.id)); + return retryTimelineEntries; + } + + /** + * Used for retrying decryption from other sources than sync, like key backup. + * @internal + * @param {RoomKey} roomKey + * @param {Array} eventIds any event ids that should be retried. There might be more in the timeline though for this key. + * @return {Promise} + */ + async notifyRoomKey(roomKey, eventIds, log) { + if (!this._roomEncryption) { + return; + } + const txn = await this._storage.readTxn([ + this._storage.storeNames.timelineEvents, + this._storage.storeNames.inboundGroupSessions, + ]); + let retryEntries = await this._eventIdsToEntries(eventIds, txn); + if (this._timeline) { + const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, [roomKey]); + retryEntries = retryEntries.concat(retryTimelineEntries); + } + if (retryEntries.length) { + const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn, log); + // this will close txn while awaiting decryption + await decryptRequest.complete(); + + this._timeline?.replaceEntries(retryEntries); + // we would ideally write the room summary in the same txn as the groupSessionDecryptions in the + // _decryptEntries entries and could even know which events have been decrypted for the first + // time from DecryptionChanges.write and only pass those to the summary. As timeline changes + // are not essential to the room summary, it's fine to write this in a separate txn for now. + const changes = this._summary.data.applyTimelineEntries(retryEntries, false, false); + if (await this._summary.writeAndApplyData(changes, this._storage)) { + this._emitUpdate(); + } + } + } + + _setEncryption(roomEncryption) { + if (roomEncryption && !this._roomEncryption) { + this._roomEncryption = roomEncryption; + if (this._timeline) { + this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); + } + return true; + } + return false; + } + + /** + * Used for decrypting when loading/filling the timeline, and retrying decryption, + * not during sync, where it is split up during the multiple phases. + */ + _decryptEntries(source, entries, inboundSessionTxn, log = null) { + const request = new DecryptionRequest(async (r, log) => { + if (!inboundSessionTxn) { + inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); + } + if (r.cancelled) return; + const events = entries.filter(entry => { + return entry.eventType === EVENT_ENCRYPTED_TYPE; + }).map(entry => entry.event); + r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, inboundSessionTxn); + if (r.cancelled) return; + const changes = await r.preparation.decrypt(); + r.preparation = null; + if (r.cancelled) return; + const stores = [this._storage.storeNames.groupSessionDecryptions]; + const isTimelineOpen = this._isTimelineOpen; + if (isTimelineOpen) { + // read to fetch devices if timeline is open + stores.push(this._storage.storeNames.deviceIdentities); + } + const writeTxn = await this._storage.readWriteTxn(stores); + let decryption; + try { + decryption = await changes.write(writeTxn, log); + if (isTimelineOpen) { + await decryption.verifySenders(writeTxn); + } + } catch (err) { + writeTxn.abort(); + throw err; + } + await writeTxn.complete(); + // TODO: log decryption errors here + decryption.applyToEntries(entries); + if (this._observedEvents) { + this._observedEvents.updateEvents(entries); + } + }, ensureLogItem(log)); + return request; + } + + async _getSyncRetryDecryptEntries(newKeys, roomEncryption, txn) { + const entriesPerKey = await Promise.all(newKeys.map(async key => { + const retryEventIds = await roomEncryption.getEventIdsForMissingKey(key, txn); + if (retryEventIds) { + return this._eventIdsToEntries(retryEventIds, txn); + } + })); + let retryEntries = entriesPerKey.reduce((allEntries, entries) => entries ? allEntries.concat(entries) : allEntries, []); + // If we have the timeline open, see if there are more entries for the new keys + // as we only store missing session information for synced events, not backfilled. + // We want to decrypt all events we can though if the user is looking + // at them when the timeline is open + if (this._timeline) { + const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, newKeys); + // make copies so we don't modify the original entry in writeSync, before the afterSync stage + const retryTimelineEntriesCopies = retryTimelineEntries.map(e => e.clone()); + // add to other retry entries + retryEntries = retryEntries.concat(retryTimelineEntriesCopies); + } + return retryEntries; + } + + /** @package */ + async load(summary, txn, log) { + log.set("id", this.id); + try { + // if called from sync, there is no summary yet + if (summary) { + this._summary.load(summary); + } + if (this._summary.data.encryption) { + const roomEncryption = this._createRoomEncryption(this, this._summary.data.encryption); + this._setEncryption(roomEncryption); + } + // need to load members for name? + if (this._summary.data.needsHeroes) { + this._heroes = new Heroes(this._roomId); + const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn); + this._heroes.applyChanges(changes, this._summary.data); + } + } catch (err) { + throw new WrappedError(`Could not load room ${this._roomId}`, err); + } + } + + /** @public */ + async loadMemberList(log = null) { + if (this._memberList) { + // TODO: also await fetchOrLoadMembers promise here + this._memberList.retain(); + return this._memberList; + } else { + const members = await fetchOrLoadMembers({ + summary: this._summary, + roomId: this._roomId, + hsApi: this._hsApi, + storage: this._storage, + syncToken: this._getSyncToken(), + // to handle race between /members and /sync + setChangedMembersMap: map => this._changedMembersDuringSync = map, + log, + }, this._platform.logger); + this._memberList = new MemberList({ + members, + closeCallback: () => { this._memberList = null; } + }); + return this._memberList; + } + } + + /** @public */ + fillGap(fragmentEntry, amount, log = null) { + // TODO move some/all of this out of BaseRoom + return this._platform.logger.wrapOrRun(log, "fillGap", async log => { + log.set("id", this.id); + log.set("fragment", fragmentEntry.fragmentId); + log.set("dir", fragmentEntry.direction.asApiString()); + if (fragmentEntry.edgeReached) { + log.set("edgeReached", true); + return; + } + const response = await this._hsApi.messages(this._roomId, { + from: fragmentEntry.token, + dir: fragmentEntry.direction.asApiString(), + limit: amount, + filter: { + lazy_load_members: true, + include_redundant_members: true, + } + }, {log}).response(); + + const txn = await this._storage.readWriteTxn([ + this._storage.storeNames.pendingEvents, + this._storage.storeNames.timelineEvents, + this._storage.storeNames.timelineFragments, + ]); + let extraGapFillChanges; + let gapResult; + try { + // detect remote echos of pending messages in the gap + extraGapFillChanges = this._writeGapFill(response.chunk, txn, log); + // write new events into gap + const gapWriter = new GapWriter({ + roomId: this._roomId, + storage: this._storage, + fragmentIdComparer: this._fragmentIdComparer, + }); + gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + if (this._roomEncryption) { + const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log); + await decryptRequest.complete(); + } + // once txn is committed, update in-memory state & emit events + for (const fragment of gapResult.fragments) { + this._fragmentIdComparer.add(fragment); + } + if (extraGapFillChanges) { + this._applyGapFill(extraGapFillChanges); + } + if (this._timeline) { + this._timeline.addOrReplaceEntries(gapResult.entries); + } + }); + } + + /** + allow sub classes to integrate in the gap fill lifecycle. + JoinedRoom uses this update remote echos. + */ + _writeGapFill(chunk, txn, log) {} + _applyGapFill() {} + + /** @public */ + get name() { + if (this._heroes) { + return this._heroes.roomName; + } + const summaryData = this._summary.data; + if (summaryData.name) { + return summaryData.name; + } + if (summaryData.canonicalAlias) { + return summaryData.canonicalAlias; + } + return null; + } + + /** @public */ + get id() { + return this._roomId; + } + + get avatarUrl() { + if (this._summary.data.avatarUrl) { + return this._summary.data.avatarUrl; + } else if (this._heroes) { + return this._heroes.roomAvatarUrl; + } + return null; + } + + get lastMessageTimestamp() { + return this._summary.data.lastMessageTimestamp; + } + + get isLowPriority() { + const tags = this._summary.data.tags; + return !!(tags && tags['m.lowpriority']); + } + + get isEncrypted() { + return !!this._summary.data.encryption; + } + + get isJoined() { + return this.membership === "join"; + } + + get isLeft() { + return this.membership === "leave"; + } + + get mediaRepository() { + return this._mediaRepository; + } + + get membership() { + return this._summary.data.membership; + } + + enableSessionBackup(sessionBackup) { + this._roomEncryption?.enableSessionBackup(sessionBackup); + // TODO: do we really want to do this every time you open the app? + if (this._timeline) { + this._platform.logger.run("enableSessionBackup", log => { + return this._roomEncryption.restoreMissingSessionsFromBackup(this._timeline.remoteEntries, log); + }); + } + } + + get _isTimelineOpen() { + return !!this._timeline; + } + + _emitUpdate() { + // once for event emitter listeners + this.emit("change"); + // and once for collection listeners + this._emitCollectionChange(this); + } + + /** @public */ + openTimeline(log = null) { + return this._platform.logger.wrapOrRun(log, "open timeline", async log => { + log.set("id", this.id); + if (this._timeline) { + throw new Error("not dealing with load race here for now"); + } + this._timeline = new Timeline({ + roomId: this.id, + storage: this._storage, + fragmentIdComparer: this._fragmentIdComparer, + pendingEvents: this._getPendingEvents(), + closeCallback: () => { + this._timeline = null; + if (this._roomEncryption) { + this._roomEncryption.notifyTimelineClosed(); + } + }, + clock: this._platform.clock, + logger: this._platform.logger, + }); + if (this._roomEncryption) { + this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); + } + await this._timeline.load(this._user, this.membership, log); + return this._timeline; + }); + } + + /* allow subclasses to provide an observable list with pending events when opening the timeline */ + _getPendingEvents() { return null; } + + observeEvent(eventId) { + if (!this._observedEvents) { + this._observedEvents = new ObservedEventMap(() => { + this._observedEvents = null; + }); + } + let entry = null; + if (this._timeline) { + entry = this._timeline.getByEventId(eventId); + } + const observable = this._observedEvents.observe(eventId, entry); + if (!entry) { + // update in the background + this._readEventById(eventId).then(entry => { + observable.update(entry); + }).catch(err => { + console.warn(`could not load event ${eventId} from storage`, err); + }); + } + return observable; + } + + async _readEventById(eventId) { + let stores = [this._storage.storeNames.timelineEvents]; + if (this.isEncrypted) { + stores.push(this._storage.storeNames.inboundGroupSessions); + } + const txn = await this._storage.readTxn(stores); + const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); + if (storageEntry) { + const entry = new EventEntry(storageEntry, this._fragmentIdComparer); + if (entry.eventType === EVENT_ENCRYPTED_TYPE) { + const request = this._decryptEntries(DecryptionSource.Timeline, [entry], txn); + await request.complete(); + } + return entry; + } + } + + + dispose() { + this._roomEncryption?.dispose(); + this._timeline?.dispose(); + } +} + +class DecryptionRequest { + constructor(decryptFn, log) { + this._cancelled = false; + this.preparation = null; + this._promise = log.wrap("decryptEntries", log => decryptFn(this, log)); + } + + complete() { + return this._promise; + } + + get cancelled() { + return this._cancelled; + } + + dispose() { + this._cancelled = true; + if (this.preparation) { + this.preparation.dispose(); + } + } +} diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index e234e0e4..7e45a8da 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -14,179 +14,30 @@ See the License for the specific language governing permissions and limitations under the License. */ -import {EventEmitter} from "../../utils/EventEmitter.js"; -import {RoomSummary} from "./RoomSummary.js"; +import {BaseRoom} from "./BaseRoom.js"; import {SyncWriter} from "./timeline/persistence/SyncWriter.js"; -import {GapWriter} from "./timeline/persistence/GapWriter.js"; -import {Timeline} from "./timeline/Timeline.js"; -import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js"; import {SendQueue} from "./sending/SendQueue.js"; import {WrappedError} from "../error.js" -import {fetchOrLoadMembers} from "./members/load.js"; -import {MemberList} from "./members/MemberList.js"; import {Heroes} from "./members/Heroes.js"; -import {EventEntry} from "./timeline/entries/EventEntry.js"; -import {ObservedEventMap} from "./ObservedEventMap.js"; import {AttachmentUpload} from "./AttachmentUpload.js"; import {DecryptionSource} from "../e2ee/common.js"; -import {ensureLogItem} from "../../logging/utils.js"; const EVENT_ENCRYPTED_TYPE = "m.room.encrypted"; -export class Room extends EventEmitter { - constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, platform}) { - super(); - this._roomId = roomId; - this._storage = storage; - this._hsApi = hsApi; - this._mediaRepository = mediaRepository; - this._summary = new RoomSummary(roomId); - this._fragmentIdComparer = new FragmentIdComparer([]); - this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer}); - this._emitCollectionChange = emitCollectionChange; - this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents}); - this._timeline = null; - this._user = user; - this._changedMembersDuringSync = null; - this._memberList = null; - this._createRoomEncryption = createRoomEncryption; - this._roomEncryption = null; - this._getSyncToken = getSyncToken; - this._platform = platform; - this._observedEvents = null; - } - - async _eventIdsToEntries(eventIds, txn) { - const retryEntries = []; - await Promise.all(eventIds.map(async eventId => { - const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); - if (storageEntry) { - retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); - } - })); - return retryEntries; - } - - _getAdditionalTimelineRetryEntries(otherRetryEntries, roomKeys) { - let retryTimelineEntries = this._roomEncryption.filterUndecryptedEventEntriesForKeys(this._timeline.remoteEntries, roomKeys); - // filter out any entries already in retryEntries so we don't decrypt them twice - const existingIds = otherRetryEntries.reduce((ids, e) => {ids.add(e.id); return ids;}, new Set()); - retryTimelineEntries = retryTimelineEntries.filter(e => !existingIds.has(e.id)); - return retryTimelineEntries; - } - - /** - * Used for retrying decryption from other sources than sync, like key backup. - * @internal - * @param {RoomKey} roomKey - * @param {Array} eventIds any event ids that should be retried. There might be more in the timeline though for this key. - * @return {Promise} - */ - async notifyRoomKey(roomKey, eventIds, log) { - if (!this._roomEncryption) { - return; - } - const txn = await this._storage.readTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.inboundGroupSessions, - ]); - let retryEntries = await this._eventIdsToEntries(eventIds, txn); - if (this._timeline) { - const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, [roomKey]); - retryEntries = retryEntries.concat(retryTimelineEntries); - } - if (retryEntries.length) { - const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn, log); - // this will close txn while awaiting decryption - await decryptRequest.complete(); - - this._timeline?.replaceEntries(retryEntries); - // we would ideally write the room summary in the same txn as the groupSessionDecryptions in the - // _decryptEntries entries and could even know which events have been decrypted for the first - // time from DecryptionChanges.write and only pass those to the summary. As timeline changes - // are not essential to the room summary, it's fine to write this in a separate txn for now. - const changes = this._summary.data.applyTimelineEntries(retryEntries, false, false); - if (await this._summary.writeAndApplyData(changes, this._storage)) { - this._emitUpdate(); - } - } +export class Room extends BaseRoom { + constructor(options) { + super(options); + const {pendingEvents} = options; + this._syncWriter = new SyncWriter({roomId: this.id, fragmentIdComparer: this._fragmentIdComparer}); + this._sendQueue = new SendQueue({roomId: this.id, storage: this._storage, hsApi: this._hsApi, pendingEvents}); } _setEncryption(roomEncryption) { - if (roomEncryption && !this._roomEncryption) { - this._roomEncryption = roomEncryption; + if (super._setEncryption(roomEncryption)) { this._sendQueue.enableEncryption(this._roomEncryption); - if (this._timeline) { - this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); - } + return true; } - } - - /** - * Used for decrypting when loading/filling the timeline, and retrying decryption, - * not during sync, where it is split up during the multiple phases. - */ - _decryptEntries(source, entries, inboundSessionTxn, log = null) { - const request = new DecryptionRequest(async (r, log) => { - if (!inboundSessionTxn) { - inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); - } - if (r.cancelled) return; - const events = entries.filter(entry => { - return entry.eventType === EVENT_ENCRYPTED_TYPE; - }).map(entry => entry.event); - r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, inboundSessionTxn); - if (r.cancelled) return; - const changes = await r.preparation.decrypt(); - r.preparation = null; - if (r.cancelled) return; - const stores = [this._storage.storeNames.groupSessionDecryptions]; - const isTimelineOpen = this._isTimelineOpen; - if (isTimelineOpen) { - // read to fetch devices if timeline is open - stores.push(this._storage.storeNames.deviceIdentities); - } - const writeTxn = await this._storage.readWriteTxn(stores); - let decryption; - try { - decryption = await changes.write(writeTxn, log); - if (isTimelineOpen) { - await decryption.verifySenders(writeTxn); - } - } catch (err) { - writeTxn.abort(); - throw err; - } - await writeTxn.complete(); - // TODO: log decryption errors here - decryption.applyToEntries(entries); - if (this._observedEvents) { - this._observedEvents.updateEvents(entries); - } - }, ensureLogItem(log)); - return request; - } - - async _getSyncRetryDecryptEntries(newKeys, roomEncryption, txn) { - const entriesPerKey = await Promise.all(newKeys.map(async key => { - const retryEventIds = await roomEncryption.getEventIdsForMissingKey(key, txn); - if (retryEventIds) { - return this._eventIdsToEntries(retryEventIds, txn); - } - })); - let retryEntries = entriesPerKey.reduce((allEntries, entries) => entries ? allEntries.concat(entries) : allEntries, []); - // If we have the timeline open, see if there are more entries for the new keys - // as we only store missing session information for synced events, not backfilled. - // We want to decrypt all events we can though if the user is looking - // at them when the timeline is open - if (this._timeline) { - const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, newKeys); - // make copies so we don't modify the original entry in writeSync, before the afterSync stage - const retryTimelineEntriesCopies = retryTimelineEntries.map(e => e.clone()); - // add to other retry entries - retryEntries = retryEntries.concat(retryTimelineEntriesCopies); - } - return retryEntries; + return false; } async prepareSync(roomResponse, membership, invite, newKeys, txn, log) { @@ -254,6 +105,7 @@ export class Room extends EventEmitter { // so no old state sticks around txn.roomState.removeAllForRoom(this.id); txn.roomMembers.removeAllForRoom(this.id); + // TODO: this should be done in ArchivedRoom txn.archivedRoomSummary.remove(this.id); } const {entries: newEntries, newLiveKey, memberChanges} = @@ -422,31 +274,23 @@ export class Room extends EventEmitter { /** @package */ async load(summary, txn, log) { - log.set("id", this.id); try { - // if called from sync, there is no summary yet - if (summary) { - this._summary.load(summary); - } - if (this._summary.data.encryption) { - const roomEncryption = this._createRoomEncryption(this, this._summary.data.encryption); - this._setEncryption(roomEncryption); - } - // need to load members for name? - if (this._summary.data.needsHeroes) { - this._heroes = new Heroes(this._roomId); - const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn); - this._heroes.applyChanges(changes, this._summary.data); - } - // don't load sync writer for archived room - if (this.membership !== "leave") { - return this._syncWriter.load(txn, log); - } + super.load(summary, txn, log); + this._syncWriter.load(txn, log); } catch (err) { throw new WrappedError(`Could not load room ${this._roomId}`, err); } } + _writeGapFill(gapChunk, txn, log) { + const removedPendingEvents = this._sendQueue.removeRemoteEchos(gapChunk, txn, log); + return removedPendingEvents; + } + + _applyGapFill(removedPendingEvents) { + this._sendQueue.emitRemovals(removedPendingEvents); + } + /** @public */ sendEvent(eventType, content, attachments, log = null) { this._platform.logger.wrapOrRun(log, "send", log => { @@ -466,124 +310,6 @@ export class Room extends EventEmitter { }); } - /** @public */ - async loadMemberList(log = null) { - if (this._memberList) { - // TODO: also await fetchOrLoadMembers promise here - this._memberList.retain(); - return this._memberList; - } else { - const members = await fetchOrLoadMembers({ - summary: this._summary, - roomId: this._roomId, - hsApi: this._hsApi, - storage: this._storage, - syncToken: this._getSyncToken(), - // to handle race between /members and /sync - setChangedMembersMap: map => this._changedMembersDuringSync = map, - log, - }, this._platform.logger); - this._memberList = new MemberList({ - members, - closeCallback: () => { this._memberList = null; } - }); - return this._memberList; - } - } - - /** @public */ - fillGap(fragmentEntry, amount, log = null) { - // TODO move some/all of this out of Room - return this._platform.logger.wrapOrRun(log, "fillGap", async log => { - log.set("id", this.id); - log.set("fragment", fragmentEntry.fragmentId); - log.set("dir", fragmentEntry.direction.asApiString()); - if (fragmentEntry.edgeReached) { - log.set("edgeReached", true); - return; - } - const response = await this._hsApi.messages(this._roomId, { - from: fragmentEntry.token, - dir: fragmentEntry.direction.asApiString(), - limit: amount, - filter: { - lazy_load_members: true, - include_redundant_members: true, - } - }, {log}).response(); - - const txn = await this._storage.readWriteTxn([ - this._storage.storeNames.pendingEvents, - this._storage.storeNames.timelineEvents, - this._storage.storeNames.timelineFragments, - ]); - let removedPendingEvents; - let gapResult; - try { - // detect remote echos of pending messages in the gap - removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn, log); - // write new events into gap - const gapWriter = new GapWriter({ - roomId: this._roomId, - storage: this._storage, - fragmentIdComparer: this._fragmentIdComparer, - }); - gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log); - } catch (err) { - txn.abort(); - throw err; - } - await txn.complete(); - if (this._roomEncryption) { - const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log); - await decryptRequest.complete(); - } - // once txn is committed, update in-memory state & emit events - for (const fragment of gapResult.fragments) { - this._fragmentIdComparer.add(fragment); - } - if (removedPendingEvents) { - this._sendQueue.emitRemovals(removedPendingEvents); - } - if (this._timeline) { - this._timeline.addOrReplaceEntries(gapResult.entries); - } - }); - } - - /** @public */ - get name() { - if (this._heroes) { - return this._heroes.roomName; - } - const summaryData = this._summary.data; - if (summaryData.name) { - return summaryData.name; - } - if (summaryData.canonicalAlias) { - return summaryData.canonicalAlias; - } - return null; - } - - /** @public */ - get id() { - return this._roomId; - } - - get avatarUrl() { - if (this._summary.data.avatarUrl) { - return this._summary.data.avatarUrl; - } else if (this._heroes) { - return this._heroes.roomAvatarUrl; - } - return null; - } - - get lastMessageTimestamp() { - return this._summary.data.lastMessageTimestamp; - } - get isUnread() { return this._summary.data.isUnread; } @@ -596,29 +322,6 @@ export class Room extends EventEmitter { return this._summary.data.highlightCount; } - get isLowPriority() { - const tags = this._summary.data.tags; - return !!(tags && tags['m.lowpriority']); - } - - get isEncrypted() { - return !!this._summary.data.encryption; - } - - get membership() { - return this._summary.data.membership; - } - - enableSessionBackup(sessionBackup) { - this._roomEncryption?.enableSessionBackup(sessionBackup); - // TODO: do we really want to do this every time you open the app? - if (this._timeline) { - this._platform.logger.run("enableSessionBackup", log => { - return this._roomEncryption.restoreMissingSessionsFromBackup(this._timeline.remoteEntries, log); - }); - } - } - get isTrackingMembers() { return this._summary.data.isTrackingMembers; } @@ -634,17 +337,6 @@ export class Room extends EventEmitter { } } - get _isTimelineOpen() { - return !!this._timeline; - } - - _emitUpdate() { - // once for event emitter listeners - this.emit("change"); - // and once for collection listeners - this._emitCollectionChange(this); - } - async clearUnread(log = null) { if (this.isUnread || this.notificationCount) { return await this._platform.logger.wrapOrRun(log, "clearUnread", async log => { @@ -678,37 +370,9 @@ export class Room extends EventEmitter { } } - /** @public */ - openTimeline(log = null) { - return this._platform.logger.wrapOrRun(log, "open timeline", async log => { - log.set("id", this.id); - if (this._timeline) { - throw new Error("not dealing with load race here for now"); - } - this._timeline = new Timeline({ - roomId: this.id, - storage: this._storage, - fragmentIdComparer: this._fragmentIdComparer, - pendingEvents: this._sendQueue.pendingEvents, - closeCallback: () => { - this._timeline = null; - if (this._roomEncryption) { - this._roomEncryption.notifyTimelineClosed(); - } - }, - clock: this._platform.clock, - logger: this._platform.logger, - }); - if (this._roomEncryption) { - this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); - } - await this._timeline.load(this._user, this.membership, log); - return this._timeline; - }); - } - - get mediaRepository() { - return this._mediaRepository; + /* called by BaseRoom to pass pendingEvents when opening the timeline */ + _getPendingEvents() { + return this._sendQueue.pendingEvents; } /** @package */ @@ -721,75 +385,12 @@ export class Room extends EventEmitter { this._summary.applyChanges(changes); } - observeEvent(eventId) { - if (!this._observedEvents) { - this._observedEvents = new ObservedEventMap(() => { - this._observedEvents = null; - }); - } - let entry = null; - if (this._timeline) { - entry = this._timeline.getByEventId(eventId); - } - const observable = this._observedEvents.observe(eventId, entry); - if (!entry) { - // update in the background - this._readEventById(eventId).then(entry => { - observable.update(entry); - }).catch(err => { - console.warn(`could not load event ${eventId} from storage`, err); - }); - } - return observable; - } - - async _readEventById(eventId) { - let stores = [this._storage.storeNames.timelineEvents]; - if (this.isEncrypted) { - stores.push(this._storage.storeNames.inboundGroupSessions); - } - const txn = await this._storage.readTxn(stores); - const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); - if (storageEntry) { - const entry = new EventEntry(storageEntry, this._fragmentIdComparer); - if (entry.eventType === EVENT_ENCRYPTED_TYPE) { - const request = this._decryptEntries(DecryptionSource.Timeline, [entry], txn); - await request.complete(); - } - return entry; - } - } - createAttachment(blob, filename) { return new AttachmentUpload({blob, filename, platform: this._platform}); } dispose() { - this._roomEncryption?.dispose(); - this._timeline?.dispose(); + super.dispose(); this._sendQueue.dispose(); } } - -class DecryptionRequest { - constructor(decryptFn, log) { - this._cancelled = false; - this.preparation = null; - this._promise = log.wrap("decryptEntries", log => decryptFn(this, log)); - } - - complete() { - return this._promise; - } - - get cancelled() { - return this._cancelled; - } - - dispose() { - this._cancelled = true; - if (this.preparation) { - this.preparation.dispose(); - } - } -} diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 5c9091df..74040586 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js"; +import {SortedArray, MappedList, ConcatList, ObservableArray} from "../../../observable/index.js"; import {Disposables} from "../../../utils/Disposables.js"; import {Direction} from "./Direction.js"; import {TimelineReader} from "./persistence/TimelineReader.js"; @@ -36,11 +36,16 @@ export class Timeline { fragmentIdComparer: this._fragmentIdComparer }); this._readerRequest = null; - const localEntries = new MappedList(pendingEvents, pe => { - return new PendingEventEntry({pendingEvent: pe, member: this._ownMember, clock}); - }, (pee, params) => { - pee.notifyUpdate(params); - }); + let localEntries; + if (pendingEvents) { + localEntries = new MappedList(pendingEvents, pe => { + return new PendingEventEntry({pendingEvent: pe, member: this._ownMember, clock}); + }, (pee, params) => { + pee.notifyUpdate(params); + }); + } else { + localEntries = new ObservableArray(); + } this._allEntries = new ConcatList(this._remoteEntries, localEntries); }