diff --git a/src/matrix/DeviceMessageHandler.js b/src/matrix/DeviceMessageHandler.js index 4c7d0e75..8a50c66c 100644 --- a/src/matrix/DeviceMessageHandler.js +++ b/src/matrix/DeviceMessageHandler.js @@ -59,12 +59,12 @@ export class DeviceMessageHandler { return {roomKeys}; } - _applyDecryptChanges(rooms, {roomKeys}) { - if (roomKeys && roomKeys.length) { - const roomKeysByRoom = groupBy(roomKeys, s => s.roomId); - for (const [roomId, roomKeys] of roomKeysByRoom) { - const room = rooms.get(roomId); - room?.notifyRoomKeys(roomKeys); + async _applyDecryptChanges(rooms, {roomKeys}) { + if (Array.isArray(roomKeys)) { + for (const roomKey of roomKeys) { + const room = rooms.get(roomKey.roomId); + // TODO: this is less parallized than it could be (like sync) + await room?.notifyRoomKey(roomKey); } } } @@ -101,7 +101,7 @@ export class DeviceMessageHandler { throw err; } await txn.complete(); - this._applyDecryptChanges(rooms, changes); + await this._applyDecryptChanges(rooms, changes); } async _getPendingEvents(txn) { diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 515c096d..f78e437c 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -41,15 +41,12 @@ function timelineIsEmpty(roomResponse) { /** * Sync steps in js-pseudocode: * ```js - * let preparation; - * if (room.needsPrepareSync) { - * // can only read some stores - * preparation = await room.prepareSync(roomResponse, prepareTxn); - * // can do async work that is not related to storage (such as decryption) - * preparation = await room.afterPrepareSync(preparation); - * } + * // can only read some stores + * const preparation = await room.prepareSync(roomResponse, membership, prepareTxn); + * // can do async work that is not related to storage (such as decryption) + * await room.afterPrepareSync(preparation); * // writes and calculates changes - * const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn); + * const changes = await room.writeSync(roomResponse, isInitialSync, preparation, syncTxn); * // applies and emits changes once syncTxn is committed * room.afterSync(changes); * if (room.needsAfterSyncCompleted(changes)) { @@ -180,7 +177,7 @@ export class Sync { await Promise.all(roomStates.map(async rs => { console.log(` * applying sync response to room ${rs.room.id} ...`); rs.changes = await rs.room.writeSync( - rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn); + rs.roomResponse, isInitialSync, rs.preparation, syncTxn); })); sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn); } catch(err) { @@ -219,16 +216,11 @@ export class Sync { } async _prepareRooms(roomStates) { - const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync); - if (prepareRoomStates.length) { - const prepareTxn = await this._openPrepareSyncTxn(); - await Promise.all(prepareRoomStates.map(async rs => { - rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn); - })); - await Promise.all(prepareRoomStates.map(async rs => { - rs.preparation = await rs.room.afterPrepareSync(rs.preparation); - })); - } + const prepareTxn = await this._openPrepareSyncTxn(); + await Promise.all(roomStates.map(async rs => { + rs.preparation = await rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn); + })); + await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation))); } async _openSyncTxn() { diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index f0446f98..2c23933d 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -42,8 +42,12 @@ export class RoomEncryption { this._megolmBackfillCache = this._megolmDecryption.createSessionCache(); this._megolmSyncCache = this._megolmDecryption.createSessionCache(); - // not `event_id`, but an internal event id passed in to the decrypt methods - this._eventIdsByMissingSession = new Map(); + // session => event ids of messages we tried to decrypt and the session was missing + this._missingSessions = new SessionToEventIdsMap(); + // sessions that may or may not be missing, but that while + // looking for a particular session came up as a candidate and were + // added to the cache to prevent further lookups from storage + this._missingSessionCandidates = new SessionToEventIdsMap(); this._senderDeviceCache = new Map(); this._storage = storage; this._sessionBackup = sessionBackup; @@ -57,8 +61,7 @@ export class RoomEncryption { return; } this._sessionBackup = sessionBackup; - for(const key of this._eventIdsByMissingSession.keys()) { - const {senderKey, sessionId} = decodeMissingSessionKey(key); + for(const {senderKey, sessionId} of this._missingSessions.getSessions()) { await this._requestMissingSessionFromBackup(senderKey, sessionId, null); } } @@ -115,13 +118,17 @@ export class RoomEncryption { if (customCache) { customCache.dispose(); } - return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this); + return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this, events); } - async _processDecryptionResults(results, errors, flags, txn) { - for (const error of errors.values()) { - if (error.code === "MEGOLM_NO_SESSION") { - this._addMissingSessionEvent(error.event, flags.source); + async _processDecryptionResults(events, results, errors, flags, txn) { + for (const event of events) { + const error = errors.get(event.event_id); + if (error?.code === "MEGOLM_NO_SESSION") { + this._addMissingSessionEvent(event, flags.source); + } else { + this._missingSessions.removeEvent(event); + this._missingSessionCandidates.removeEvent(event); } } if (flags.isTimelineOpen) { @@ -145,17 +152,12 @@ export class RoomEncryption { } _addMissingSessionEvent(event, source) { - const senderKey = event.content?.["sender_key"]; - const sessionId = event.content?.["session_id"]; - const key = encodeMissingSessionKey(senderKey, sessionId); - let eventIds = this._eventIdsByMissingSession.get(key); - // new missing session - if (!eventIds) { + const isNewSession = this._missingSessions.addEvent(event); + if (isNewSession) { + const senderKey = event.content?.["sender_key"]; + const sessionId = event.content?.["session_id"]; this._requestMissingSessionFromBackup(senderKey, sessionId, source); - eventIds = new Set(); - this._eventIdsByMissingSession.set(key, eventIds); } - eventIds.add(event.event_id); } async _requestMissingSessionFromBackup(senderKey, sessionId, source) { @@ -163,7 +165,7 @@ export class RoomEncryption { // and only after that proceed to request from backup if (source === DecryptionSource.Sync) { await this._clock.createTimeout(10000).elapsed(); - if (this._disposed || !this._eventIdsByMissingSession.has(encodeMissingSessionKey(senderKey, sessionId))) { + if (this._disposed || !this._missingSessions.hasSession(senderKey, sessionId)) { return; } } @@ -192,8 +194,8 @@ export class RoomEncryption { await txn.complete(); if (roomKey) { - // this will call into applyRoomKeys below - await this._room.notifyRoomKeys([roomKey]); + // this will reattempt decryption + await this._room.notifyRoomKey(roomKey); } } else if (session?.algorithm) { console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`); @@ -212,18 +214,36 @@ export class RoomEncryption { * @param {Array} roomKeys * @return {Array} the event ids that should be retried to decrypt */ - applyRoomKeys(roomKeys) { - // retry decryption with the new sessions - const retryEventIds = []; - for (const roomKey of roomKeys) { - const key = encodeMissingSessionKey(roomKey.senderKey, roomKey.sessionId); - const entriesForSession = this._eventIdsByMissingSession.get(key); - if (entriesForSession) { - this._eventIdsByMissingSession.delete(key); - retryEventIds.push(...entriesForSession); + getEventIdsForRoomKey(roomKey) { + // TODO: we could concat both results here, and only put stuff in + // candidates if it is not in missing sessions to use a bit less memory + let eventIds = this._missingSessions.getEventIds(roomKey.senderKey, roomKey.sessionId); + if (!eventIds) { + eventIds = this._missingSessionCandidates.getEventIds(roomKey.senderKey, roomKey.sessionId); + } + return eventIds; + } + + /** + * caches mapping of session to event id of all encrypted candidates + * and filters to return only the candidates for the given room key + */ + findAndCacheEntriesForRoomKey(roomKey, candidateEntries) { + const matches = []; + + for (const entry of candidateEntries) { + if (entry.eventType === ENCRYPTED_TYPE) { + this._missingSessionCandidates.addEvent(entry.event); + const senderKey = entry.event?.content?.["sender_key"]; + const sessionId = entry.event?.content?.["session_id"]; + if (senderKey === roomKey.senderKey && sessionId === roomKey.sessionId) { + matches.push(entry); + } } } - return retryEventIds; + + return matches; + } async encrypt(type, content, hsApi) { @@ -354,11 +374,12 @@ export class RoomEncryption { * the decryption results before turning them */ class DecryptionPreparation { - constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption) { + constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption, events) { this._megolmDecryptionPreparation = megolmDecryptionPreparation; this._extraErrors = extraErrors; this._flags = flags; this._roomEncryption = roomEncryption; + this._events = events; } async decrypt() { @@ -366,7 +387,8 @@ class DecryptionPreparation { await this._megolmDecryptionPreparation.decrypt(), this._extraErrors, this._flags, - this._roomEncryption); + this._roomEncryption, + this._events); } dispose() { @@ -375,17 +397,18 @@ class DecryptionPreparation { } class DecryptionChanges { - constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption) { + constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption, events) { this._megolmDecryptionChanges = megolmDecryptionChanges; this._extraErrors = extraErrors; this._flags = flags; this._roomEncryption = roomEncryption; + this._events = events; } async write(txn) { const {results, errors} = await this._megolmDecryptionChanges.write(txn); mergeMap(this._extraErrors, errors); - await this._roomEncryption._processDecryptionResults(results, errors, this._flags, txn); + await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._flags, txn); return new BatchDecryptionResult(results, errors); } } @@ -410,3 +433,58 @@ class BatchDecryptionResult { } } } + +class SessionToEventIdsMap { + constructor() { + this._eventIdsBySession = new Map(); + } + + addEvent(event) { + let isNewSession = false; + const senderKey = event.content?.["sender_key"]; + const sessionId = event.content?.["session_id"]; + const key = encodeMissingSessionKey(senderKey, sessionId); + let eventIds = this._eventIdsBySession.get(key); + // new missing session + if (!eventIds) { + eventIds = new Set(); + this._eventIdsBySession.set(key, eventIds); + isNewSession = true; + } + eventIds.add(event.event_id); + return isNewSession; + } + + getEventIds(senderKey, sessionId) { + const key = encodeMissingSessionKey(senderKey, sessionId); + const entriesForSession = this._eventIdsBySession.get(key); + if (entriesForSession) { + return [...entriesForSession]; + } + } + + getSessions() { + return Array.from(this._eventIdsBySession.keys()).map(decodeMissingSessionKey); + } + + hasSession(senderKey, sessionId) { + return this._eventIdsBySession.has(encodeMissingSessionKey(senderKey, sessionId)); + } + + removeEvent(event) { + let hasRemovedSession = false; + const senderKey = event.content?.["sender_key"]; + const sessionId = event.content?.["session_id"]; + const key = encodeMissingSessionKey(senderKey, sessionId); + let eventIds = this._eventIdsBySession.get(key); + if (eventIds) { + if (eventIds.delete(event.event_id)) { + if (!eventIds.length) { + this._eventIdsBySession.delete(key); + hasRemovedSession = true; + } + } + } + return hasRemovedSession; + } +} diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 99c2155e..369836ee 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -15,9 +15,10 @@ limitations under the License. */ import {EventEmitter} from "../../utils/EventEmitter.js"; -import {RoomSummary, needsHeroes} from "./RoomSummary.js"; +import {RoomSummary} from "./RoomSummary.js"; import {SyncWriter} from "./timeline/persistence/SyncWriter.js"; import {GapWriter} from "./timeline/persistence/GapWriter.js"; +import {readRawTimelineEntriesWithTxn} from "./timeline/persistence/TimelineReader.js"; import {Timeline} from "./timeline/Timeline.js"; import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js"; import {SendQueue} from "./sending/SendQueue.js"; @@ -26,20 +27,22 @@ import {fetchOrLoadMembers} from "./members/load.js"; import {MemberList} from "./members/MemberList.js"; import {Heroes} from "./members/Heroes.js"; import {EventEntry} from "./timeline/entries/EventEntry.js"; +import {EventKey} from "./timeline/EventKey.js"; +import {Direction} from "./timeline/Direction.js"; import {DecryptionSource} from "../e2ee/common.js"; const EVENT_ENCRYPTED_TYPE = "m.room.encrypted"; export class Room extends EventEmitter { - constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) { + constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) { super(); this._roomId = roomId; this._storage = storage; this._hsApi = hsApi; this._mediaRepository = mediaRepository; - this._summary = new RoomSummary(roomId, user.id); + this._summary = new RoomSummary(roomId); this._fragmentIdComparer = new FragmentIdComparer([]); - this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer}); + this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer}); this._emitCollectionChange = emitCollectionChange; this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents}); this._timeline = null; @@ -50,43 +53,77 @@ export class Room extends EventEmitter { this._roomEncryption = null; this._getSyncToken = getSyncToken; this._clock = clock; - } + } - async notifyRoomKeys(roomKeys) { - if (this._roomEncryption) { - let retryEventIds = this._roomEncryption.applyRoomKeys(roomKeys); - if (retryEventIds.length) { - const retryEntries = []; - const txn = await this._storage.readTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.inboundGroupSessions, - ]); - for (const eventId of retryEventIds) { - const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); - if (storageEntry) { - retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); - } - } - const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); - await decryptRequest.complete(); + _readRetryDecryptCandidateEntries(sinceEventKey, txn) { + if (sinceEventKey) { + return readRawTimelineEntriesWithTxn(this._roomId, sinceEventKey, + Direction.Forward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn); + } else { + // all messages for room ... + // if you haven't decrypted any message in a room yet, + // it's unlikely you will have tons of them. + // so this should be fine as a last resort + return readRawTimelineEntriesWithTxn(this._roomId, this._syncWriter.lastMessageKey, + Direction.Backward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn); + } + } - this._timeline?.replaceEntries(retryEntries); - // we would ideally write the room summary in the same txn as the groupSessionDecryptions in the - // _decryptEntries entries and could even know which events have been decrypted for the first - // time from DecryptionChanges.write and only pass those to the summary. As timeline changes - // are not essential to the room summary, it's fine to write this in a separate txn for now. - const changes = this._summary.processTimelineEntries(retryEntries, false, this._isTimelineOpen); - if (changes) { - this._summary.writeAndApplyChanges(changes, this._storage); - this._emitUpdate(); + async notifyRoomKey(roomKey) { + if (!this._roomEncryption) { + return; + } + const retryEventIds = this._roomEncryption.getEventIdsForRoomKey(roomKey); + const stores = [ + this._storage.storeNames.timelineEvents, + this._storage.storeNames.inboundGroupSessions, + ]; + let txn; + let retryEntries; + if (retryEventIds) { + retryEntries = []; + txn = await this._storage.readTxn(stores); + for (const eventId of retryEventIds) { + const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); + if (storageEntry) { + retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); } } + } else { + // we only look for messages since lastDecryptedEventKey because + // the timeline must be closed (otherwise getEventIdsForRoomKey would have found the event ids) + // and to update the summary we only care about events since lastDecryptedEventKey + const key = this._summary.data.lastDecryptedEventKey; + // key might be missing if we haven't decrypted any events in this room + const sinceEventKey = key && new EventKey(key.fragmentId, key.entryIndex); + // check we have not already decrypted the most recent event in the room + // otherwise we know that the messages for this room key will not update the room summary + if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) { + txn = await this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments)); + const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn); + retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries); + } + } + if (retryEntries?.length) { + const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); + // this will close txn while awaiting decryption + await decryptRequest.complete(); + + this._timeline?.replaceEntries(retryEntries); + // we would ideally write the room summary in the same txn as the groupSessionDecryptions in the + // _decryptEntries entries and could even know which events have been decrypted for the first + // time from DecryptionChanges.write and only pass those to the summary. As timeline changes + // are not essential to the room summary, it's fine to write this in a separate txn for now. + const changes = this._summary.data.applyTimelineEntries(retryEntries, false, false); + if (await this._summary.writeAndApplyData(changes, this._storage)) { + this._emitUpdate(); + } } } - _enableEncryption(encryptionParams) { - this._roomEncryption = this._createRoomEncryption(this, encryptionParams); - if (this._roomEncryption) { + _setEncryption(roomEncryption) { + if (roomEncryption && !this._roomEncryption) { + this._roomEncryption = roomEncryption; this._sendQueue.enableEncryption(this._roomEncryption); if (this._timeline) { this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); @@ -132,57 +169,62 @@ export class Room extends EventEmitter { return request; } - get needsPrepareSync() { - // only encrypted rooms need the prepare sync steps - return !!this._roomEncryption; - } + async prepareSync(roomResponse, membership, txn) { + const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership) + let roomEncryption = this._roomEncryption; + // encryption is enabled in this sync + if (!roomEncryption && summaryChanges.encryption) { + roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption); + } - async prepareSync(roomResponse, txn) { - if (this._roomEncryption) { + let decryptPreparation; + if (roomEncryption) { const events = roomResponse?.timeline?.events; if (Array.isArray(events)) { const eventsToDecrypt = events.filter(event => { return event?.type === EVENT_ENCRYPTED_TYPE; }); - const preparation = await this._roomEncryption.prepareDecryptAll( + decryptPreparation = await roomEncryption.prepareDecryptAll( eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn); - return preparation; } } + + return { + roomEncryption, + summaryChanges, + decryptPreparation, + decryptChanges: null, + }; } async afterPrepareSync(preparation) { - if (preparation) { - const decryptChanges = await preparation.decrypt(); - return decryptChanges; + if (preparation.decryptPreparation) { + preparation.decryptChanges = await preparation.decryptPreparation.decrypt(); + preparation.decryptPreparation = null; } } /** @package */ - async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) { - let decryption; - if (this._roomEncryption && decryptChanges) { - decryption = await decryptChanges.write(txn); - } - const {entries, newLiveKey, memberChanges} = + async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn) { + const {entries, newLiveKey, memberChanges} = await this._syncWriter.writeSync(roomResponse, txn); - if (decryption) { + if (decryptChanges) { + const decryption = await decryptChanges.write(txn); decryption.applyToEntries(entries); } // pass member changes to device tracker - if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) { - await this._roomEncryption.writeMemberChanges(memberChanges, txn); + if (roomEncryption && this.isTrackingMembers && memberChanges?.size) { + await roomEncryption.writeMemberChanges(memberChanges, txn); } - const summaryChanges = this._summary.writeSync( - roomResponse, - entries, - membership, - isInitialSync, this._isTimelineOpen, - txn); + // also apply (decrypted) timeline entries to the summary changes + summaryChanges = summaryChanges.applyTimelineEntries( + entries, isInitialSync, !this._isTimelineOpen, this._user.id); + // write summary changes, and unset if nothing was actually changed + summaryChanges = this._summary.writeData(summaryChanges, txn); // fetch new members while we have txn open, // but don't make any in-memory changes yet let heroChanges; - if (summaryChanges && needsHeroes(summaryChanges)) { + if (summaryChanges?.needsHeroes) { // room name disappeared, open heroes if (!this._heroes) { this._heroes = new Heroes(this._roomId); @@ -190,11 +232,12 @@ export class Room extends EventEmitter { heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn); } let removedPendingEvents; - if (roomResponse.timeline && roomResponse.timeline.events) { + if (Array.isArray(roomResponse.timeline?.events)) { removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); } return { summaryChanges, + roomEncryption, newTimelineEntries: entries, newLiveKey, removedPendingEvents, @@ -208,11 +251,9 @@ export class Room extends EventEmitter { * Called with the changes returned from `writeSync` to apply them and emit changes. * No storage or network operations should be done here. */ - afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges}) { + afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}) { this._syncWriter.afterSync(newLiveKey); - if (!this._summary.encryption && summaryChanges.encryption && !this._roomEncryption) { - this._enableEncryption(summaryChanges.encryption); - } + this._setEncryption(roomEncryption); if (memberChanges.size) { if (this._changedMembersDuringSync) { for (const [userId, memberChange] of memberChanges.entries()) { @@ -226,14 +267,14 @@ export class Room extends EventEmitter { let emitChange = false; if (summaryChanges) { this._summary.applyChanges(summaryChanges); - if (!this._summary.needsHeroes) { + if (!this._summary.data.needsHeroes) { this._heroes = null; } emitChange = true; } if (this._heroes && heroChanges) { const oldName = this.name; - this._heroes.applyChanges(heroChanges, this._summary); + this._heroes.applyChanges(heroChanges, this._summary.data); if (oldName !== this.name) { emitChange = true; } @@ -247,7 +288,7 @@ export class Room extends EventEmitter { if (removedPendingEvents) { this._sendQueue.emitRemovals(removedPendingEvents); } - } + } needsAfterSyncCompleted({memberChanges}) { return this._roomEncryption?.needsToShareKeys(memberChanges); @@ -282,23 +323,24 @@ export class Room extends EventEmitter { } /** @package */ - async load(summary, txn) { + async load(summary, txn) { try { this._summary.load(summary); - if (this._summary.encryption) { - this._enableEncryption(this._summary.encryption); + if (this._summary.data.encryption) { + const roomEncryption = this._createRoomEncryption(this, this._summary.data.encryption); + this._setEncryption(roomEncryption); } // need to load members for name? - if (this._summary.needsHeroes) { + if (this._summary.data.needsHeroes) { this._heroes = new Heroes(this._roomId); - const changes = await this._heroes.calculateChanges(this._summary.heroes, [], txn); - this._heroes.applyChanges(changes, this._summary); + const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn); + this._heroes.applyChanges(changes, this._summary.data); } return this._syncWriter.load(txn); } catch (err) { throw new WrappedError(`Could not load room ${this._roomId}`, err); } - } + } /** @public */ sendEvent(eventType, content) { @@ -388,7 +430,14 @@ export class Room extends EventEmitter { if (this._heroes) { return this._heroes.roomName; } - return this._summary.name; + const summaryData = this._summary.data; + if (summaryData.name) { + return summaryData.name; + } + if (summaryData.canonicalAlias) { + return summaryData.canonicalAlias; + } + return null; } /** @public */ @@ -397,8 +446,8 @@ export class Room extends EventEmitter { } get avatarUrl() { - if (this._summary.avatarUrl) { - return this._summary.avatarUrl; + if (this._summary.data.avatarUrl) { + return this._summary.data.avatarUrl; } else if (this._heroes) { return this._heroes.roomAvatarUrl; } @@ -406,28 +455,28 @@ export class Room extends EventEmitter { } get lastMessageTimestamp() { - return this._summary.lastMessageTimestamp; + return this._summary.data.lastMessageTimestamp; } get isUnread() { - return this._summary.isUnread; + return this._summary.data.isUnread; } get notificationCount() { - return this._summary.notificationCount; + return this._summary.data.notificationCount; } get highlightCount() { - return this._summary.highlightCount; + return this._summary.data.highlightCount; } get isLowPriority() { - const tags = this._summary.tags; + const tags = this._summary.data.tags; return !!(tags && tags['m.lowpriority']); } get isEncrypted() { - return !!this._summary.encryption; + return !!this._summary.data.encryption; } enableSessionBackup(sessionBackup) { @@ -435,7 +484,7 @@ export class Room extends EventEmitter { } get isTrackingMembers() { - return this._summary.isTrackingMembers; + return this._summary.data.isTrackingMembers; } async _getLastEventId() { diff --git a/src/matrix/room/RoomSummary.js b/src/matrix/room/RoomSummary.js index 21739a1e..c708ee91 100644 --- a/src/matrix/room/RoomSummary.js +++ b/src/matrix/room/RoomSummary.js @@ -17,11 +17,11 @@ limitations under the License. import {MEGOLM_ALGORITHM} from "../e2ee/common.js"; -function applyTimelineEntries(data, timelineEntries, isInitialSync, isTimelineOpen, ownUserId) { +function applyTimelineEntries(data, timelineEntries, isInitialSync, canMarkUnread, ownUserId) { if (timelineEntries.length) { data = timelineEntries.reduce((data, entry) => { return processTimelineEvent(data, entry, - isInitialSync, isTimelineOpen, ownUserId); + isInitialSync, canMarkUnread, ownUserId); }, data); } return data; @@ -39,16 +39,17 @@ function applySyncResponse(data, roomResponse, membership) { if (roomResponse.account_data) { data = roomResponse.account_data.events.reduce(processRoomAccountData, data); } + const stateEvents = roomResponse?.state?.events; // state comes before timeline - if (roomResponse.state) { - data = roomResponse.state.events.reduce(processStateEvent, data); + if (Array.isArray(stateEvents)) { + data = stateEvents.reduce(processStateEvent, data); } - const {timeline} = roomResponse; + const timelineEvents = roomResponse?.timeline?.events; // process state events in timeline // non-state events are handled by applyTimelineEntries // so decryption is handled properly - if (timeline && Array.isArray(timeline.events)) { - data = timeline.events.reduce((data, event) => { + if (Array.isArray(timelineEvents)) { + data = timelineEvents.reduce((data, event) => { if (typeof event.state_key === "string") { return processStateEvent(data, event); } @@ -104,13 +105,13 @@ function processStateEvent(data, event) { return data; } -function processTimelineEvent(data, eventEntry, isInitialSync, isTimelineOpen, ownUserId) { +function processTimelineEvent(data, eventEntry, isInitialSync, canMarkUnread, ownUserId) { if (eventEntry.eventType === "m.room.message") { if (!data.lastMessageTimestamp || eventEntry.timestamp > data.lastMessageTimestamp) { data = data.cloneIfNeeded(); data.lastMessageTimestamp = eventEntry.timestamp; } - if (!isInitialSync && eventEntry.sender !== ownUserId && !isTimelineOpen) { + if (!isInitialSync && eventEntry.sender !== ownUserId && canMarkUnread) { data = data.cloneIfNeeded(); data.isUnread = true; } @@ -122,6 +123,29 @@ function processTimelineEvent(data, eventEntry, isInitialSync, isTimelineOpen, o data.lastMessageBody = body; } } + // store the event key of the last decrypted event so when decryption does succeed, + // we can attempt to re-decrypt from this point to update the room summary + if (!!data.encryption && eventEntry.isEncrypted && eventEntry.isDecrypted) { + let hasLargerEventKey = true; + if (data.lastDecryptedEventKey) { + try { + hasLargerEventKey = eventEntry.compare(data.lastDecryptedEventKey) > 0; + } catch (err) { + // TODO: load the fragments in between here? + // this could happen if an earlier event gets decrypted that + // is in a fragment different from the live one and the timeline is not open. + // In this case, we will just read too many events once per app load + // and then keep the mapping in memory. When eventually an event is decrypted in + // the live fragment, this should stop failing and the event key will be written. + hasLargerEventKey = false; + } + } + if (hasLargerEventKey) { + data = data.cloneIfNeeded(); + const {fragmentId, entryIndex} = eventEntry; + data.lastDecryptedEventKey = {fragmentId, entryIndex}; + } + } return data; } @@ -155,6 +179,7 @@ class SummaryData { this.lastMessageTimestamp = copy ? copy.lastMessageTimestamp : null; this.isUnread = copy ? copy.isUnread : false; this.encryption = copy ? copy.encryption : null; + this.lastDecryptedEventKey = copy ? copy.lastDecryptedEventKey : null; this.isDirectMessage = copy ? copy.isDirectMessage : false; this.membership = copy ? copy.membership : null; this.inviteCount = copy ? copy.inviteCount : 0; @@ -182,83 +207,27 @@ class SummaryData { const {cloned, ...serializedProps} = this; return serializedProps; } -} -export function needsHeroes(data) { - return !data.name && !data.canonicalAlias && data.heroes && data.heroes.length > 0; + applyTimelineEntries(timelineEntries, isInitialSync, canMarkUnread, ownUserId) { + return applyTimelineEntries(this, timelineEntries, isInitialSync, canMarkUnread, ownUserId); + } + + applySyncResponse(roomResponse, membership) { + return applySyncResponse(this, roomResponse, membership); + } + + get needsHeroes() { + return !this.name && !this.canonicalAlias && this.heroes && this.heroes.length > 0; + } } export class RoomSummary { - constructor(roomId, ownUserId) { - this._ownUserId = ownUserId; + constructor(roomId) { this._data = new SummaryData(null, roomId); } - get name() { - if (this._data.name) { - return this._data.name; - } - if (this._data.canonicalAlias) { - return this._data.canonicalAlias; - } - return null; - } - - get heroes() { - return this._data.heroes; - } - - get encryption() { - return this._data.encryption; - } - - // whether the room name should be determined with Heroes - get needsHeroes() { - return needsHeroes(this._data); - } - - get isUnread() { - return this._data.isUnread; - } - - get notificationCount() { - return this._data.notificationCount; - } - - get highlightCount() { - return this._data.highlightCount; - } - - get lastMessage() { - return this._data.lastMessageBody; - } - - get lastMessageTimestamp() { - return this._data.lastMessageTimestamp; - } - - get inviteCount() { - return this._data.inviteCount; - } - - get joinCount() { - return this._data.joinCount; - } - - get avatarUrl() { - return this._data.avatarUrl; - } - - get hasFetchedMembers() { - return this._data.hasFetchedMembers; - } - - get isTrackingMembers() { - return this._data.isTrackingMembers; - } - - get tags() { - return this._data.tags; + get data() { + return this._data; } writeClearUnread(txn) { @@ -284,48 +253,17 @@ export class RoomSummary { return data; } - /** - * after retrying decryption - */ - processTimelineEntries(timelineEntries, isInitialSync, isTimelineOpen) { - // clear cloned flag, so cloneIfNeeded makes a copy and - // this._data is not modified if any field is changed. - - processTimelineEvent - - this._data.cloned = false; - const data = applyTimelineEntries( - this._data, - timelineEntries, - isInitialSync, isTimelineOpen, - this._ownUserId); - if (data !== this._data) { - return data; - } - } - - writeSync(roomResponse, timelineEntries, membership, isInitialSync, isTimelineOpen, txn) { - // clear cloned flag, so cloneIfNeeded makes a copy and - // this._data is not modified if any field is changed. - this._data.cloned = false; - let data = applySyncResponse(this._data, roomResponse, membership); - data = applyTimelineEntries( - data, - timelineEntries, - isInitialSync, isTimelineOpen, - this._ownUserId); + writeData(data, txn) { if (data !== this._data) { txn.roomSummary.set(data.serialize()); return data; } } - /** - * Only to be used with processTimelineEntries, - * other methods like writeSync, writeHasFetchedMembers, - * writeIsTrackingMembers, ... take a txn directly. - */ - async writeAndApplyChanges(data, storage) { + async writeAndApplyData(data, storage) { + if (data === this._data) { + return false; + } const txn = await storage.readWriteTxn([ storage.storeNames.roomSummary, ]); @@ -337,10 +275,14 @@ export class RoomSummary { } await txn.complete(); this.applyChanges(data); + return true; } applyChanges(data) { this._data = data; + // clear cloned flag, so cloneIfNeeded makes a copy and + // this._data is not modified if any field is changed. + this._data.cloned = false; } async load(summary) { @@ -353,7 +295,9 @@ export function tests() { "membership trigger change": function(assert) { const summary = new RoomSummary("id"); let written = false; - const changes = summary.writeSync({}, "join", false, false, {roomSummary: {set: () => { written = true; }}}); + let changes = summary.data.applySyncResponse({}, "join"); + const txn = {roomSummary: {set: () => { written = true; }}}; + changes = summary.writeData(changes, txn); assert(changes); assert(written); assert.equal(changes.membership, "join"); diff --git a/src/matrix/room/members/Heroes.js b/src/matrix/room/members/Heroes.js index 809b61c2..f6ad3085 100644 --- a/src/matrix/room/members/Heroes.js +++ b/src/matrix/room/members/Heroes.js @@ -16,8 +16,8 @@ limitations under the License. import {RoomMember} from "./RoomMember.js"; -function calculateRoomName(sortedMembers, summary) { - const countWithoutMe = summary.joinCount + summary.inviteCount - 1; +function calculateRoomName(sortedMembers, summaryData) { + const countWithoutMe = summaryData.joinCount + summaryData.inviteCount - 1; if (sortedMembers.length >= countWithoutMe) { if (sortedMembers.length > 1) { const lastMember = sortedMembers[sortedMembers.length - 1]; @@ -74,7 +74,7 @@ export class Heroes { return {updatedHeroMembers: updatedHeroMembers.values(), removedUserIds}; } - applyChanges({updatedHeroMembers, removedUserIds}, summary) { + applyChanges({updatedHeroMembers, removedUserIds}, summaryData) { for (const userId of removedUserIds) { this._members.delete(userId); } @@ -82,7 +82,7 @@ export class Heroes { this._members.set(member.userId, member); } const sortedMembers = Array.from(this._members.values()).sort((a, b) => a.name.localeCompare(b.name)); - this._roomName = calculateRoomName(sortedMembers, summary); + this._roomName = calculateRoomName(sortedMembers, summaryData); } get roomName() { diff --git a/src/matrix/room/members/load.js b/src/matrix/room/members/load.js index 18fc4eb4..a8648eac 100644 --- a/src/matrix/room/members/load.js +++ b/src/matrix/room/members/load.js @@ -82,7 +82,7 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan export async function fetchOrLoadMembers(options) { const {summary} = options; - if (!summary.hasFetchedMembers) { + if (!summary.data.hasFetchedMembers) { return fetchMembers(options); } else { return loadMembers(options); diff --git a/src/matrix/room/timeline/EventKey.js b/src/matrix/room/timeline/EventKey.js index 128f8805..e1771258 100644 --- a/src/matrix/room/timeline/EventKey.js +++ b/src/matrix/room/timeline/EventKey.js @@ -63,6 +63,10 @@ export class EventKey { toString() { return `[${this.fragmentId}/${this.eventIndex}]`; } + + equals(other) { + return this.fragmentId === other?.fragmentId && this.eventIndex === other?.eventIndex; + } } export function xtests() { diff --git a/src/matrix/room/timeline/entries/EventEntry.js b/src/matrix/room/timeline/entries/EventEntry.js index 81e31112..410c3d63 100644 --- a/src/matrix/room/timeline/entries/EventEntry.js +++ b/src/matrix/room/timeline/entries/EventEntry.js @@ -82,6 +82,10 @@ export class EventEntry extends BaseEntry { return this._eventEntry.event.type === "m.room.encrypted"; } + get isDecrypted() { + return !!this._decryptionResult?.event; + } + get isVerified() { return this.isEncrypted && this._decryptionResult?.isVerified; } diff --git a/src/matrix/room/timeline/persistence/TimelineReader.js b/src/matrix/room/timeline/persistence/TimelineReader.js index f5983a19..37b15574 100644 --- a/src/matrix/room/timeline/persistence/TimelineReader.js +++ b/src/matrix/room/timeline/persistence/TimelineReader.js @@ -37,6 +37,52 @@ class ReaderRequest { } } +/** + * Raw because it doesn't do decryption and in the future it should not read relations either. + * It is just about reading entries and following fragment links + */ +export async function readRawTimelineEntriesWithTxn(roomId, eventKey, direction, amount, fragmentIdComparer, txn) { + let entries = []; + const timelineStore = txn.timelineEvents; + const fragmentStore = txn.timelineFragments; + + while (entries.length < amount && eventKey) { + let eventsWithinFragment; + if (direction.isForward) { + // TODO: should we pass amount - entries.length here? + eventsWithinFragment = await timelineStore.eventsAfter(roomId, eventKey, amount); + } else { + eventsWithinFragment = await timelineStore.eventsBefore(roomId, eventKey, amount); + } + let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, fragmentIdComparer)); + entries = directionalConcat(entries, eventEntries, direction); + // prepend or append eventsWithinFragment to entries, and wrap them in EventEntry + + if (entries.length < amount) { + const fragment = await fragmentStore.get(roomId, eventKey.fragmentId); + // TODO: why does the first fragment not need to be added? (the next *is* added below) + // it looks like this would be fine when loading in the sync island + // (as the live fragment should be added already) but not for permalinks when we support them + // + // fragmentIdComparer.addFragment(fragment); + let fragmentEntry = new FragmentBoundaryEntry(fragment, direction.isBackward, fragmentIdComparer); + // append or prepend fragmentEntry, reuse func from GapWriter? + directionalAppend(entries, fragmentEntry, direction); + // only continue loading if the fragment boundary can't be backfilled + if (!fragmentEntry.token && fragmentEntry.hasLinkedFragment) { + const nextFragment = await fragmentStore.get(roomId, fragmentEntry.linkedFragmentId); + fragmentIdComparer.add(nextFragment); + const nextFragmentEntry = new FragmentBoundaryEntry(nextFragment, direction.isForward, fragmentIdComparer); + directionalAppend(entries, nextFragmentEntry, direction); + eventKey = nextFragmentEntry.asEventKey(); + } else { + eventKey = null; + } + } + } + return entries; +} + export class TimelineReader { constructor({roomId, storage, fragmentIdComparer}) { this._roomId = roomId; @@ -87,40 +133,7 @@ export class TimelineReader { } async _readFrom(eventKey, direction, amount, r, txn) { - let entries = []; - const timelineStore = txn.timelineEvents; - const fragmentStore = txn.timelineFragments; - - while (entries.length < amount && eventKey) { - let eventsWithinFragment; - if (direction.isForward) { - eventsWithinFragment = await timelineStore.eventsAfter(this._roomId, eventKey, amount); - } else { - eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount); - } - let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, this._fragmentIdComparer)); - entries = directionalConcat(entries, eventEntries, direction); - // prepend or append eventsWithinFragment to entries, and wrap them in EventEntry - - if (entries.length < amount) { - const fragment = await fragmentStore.get(this._roomId, eventKey.fragmentId); - // this._fragmentIdComparer.addFragment(fragment); - let fragmentEntry = new FragmentBoundaryEntry(fragment, direction.isBackward, this._fragmentIdComparer); - // append or prepend fragmentEntry, reuse func from GapWriter? - directionalAppend(entries, fragmentEntry, direction); - // only continue loading if the fragment boundary can't be backfilled - if (!fragmentEntry.token && fragmentEntry.hasLinkedFragment) { - const nextFragment = await fragmentStore.get(this._roomId, fragmentEntry.linkedFragmentId); - this._fragmentIdComparer.add(nextFragment); - const nextFragmentEntry = new FragmentBoundaryEntry(nextFragment, direction.isForward, this._fragmentIdComparer); - directionalAppend(entries, nextFragmentEntry, direction); - eventKey = nextFragmentEntry.asEventKey(); - } else { - eventKey = null; - } - } - } - + const entries = await readRawTimelineEntriesWithTxn(this._roomId, eventKey, direction, amount, this._fragmentIdComparer, txn); if (this._decryptEntries) { r.decryptRequest = this._decryptEntries(entries, txn); try {