diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 05c1d138..4154f959 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -248,6 +248,7 @@ export class Sync { return this._storage.readTxn([ storeNames.olmSessions, storeNames.inboundGroupSessions, + storeNames.timelineEvents // to read events that can now be decrypted ]); } diff --git a/src/matrix/e2ee/README.md b/src/matrix/e2ee/README.md index 46f4e95f..fab53880 100644 --- a/src/matrix/e2ee/README.md +++ b/src/matrix/e2ee/README.md @@ -1,40 +1,41 @@ ## Integratation within the sync lifetime cycle -### prepareSync +### session.prepareSync + +Decrypt any device messages, and turn them into RoomKey instances. +Any rooms that are not in the sync response but for which we receive keys will be included in the rooms to sync. + +Runs before any room.prepareSync, so the new room keys can be passed to each room prepareSync to use in decryption. + +### room.prepareSync The session can start its own read/write transactions here, rooms only read from a shared transaction - - session - - device handler - - txn - - write pending encrypted - - txn - - olm decryption read - - olm async decryption - - dispatch to worker - - txn - - olm decryption write / remove pending encrypted - rooms (with shared read txn) - - megolm decryption read + - megolm decryption read using any new keys decrypted by the session. -### afterPrepareSync +### room.afterPrepareSync - rooms - megolm async decryption - dispatch to worker -### writeSync +### room.writeSync - rooms (with shared readwrite txn) - megolm decryption write, yielding decrypted events - use decrypted events to write room summary -### afterSync +### session.writeSync + + - writes any room keys that were received + +### room.afterSync - rooms - emit changes -### afterSyncCompleted +### room.afterSyncCompleted - session - e2ee account diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index 13b00f6f..f7a1920e 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -15,8 +15,9 @@ limitations under the License. */ import {MEGOLM_ALGORITHM, DecryptionSource} from "./common.js"; -import {groupBy} from "../../utils/groupBy.js"; +import {groupEventsBySession} from "./megolm/decryption/utils.js"; import {mergeMap} from "../../utils/mergeMap.js"; +import {groupBy} from "../../utils/groupBy.js"; import {makeTxnId} from "../common.js"; const ENCRYPTED_TYPE = "m.room.encrypted"; @@ -25,15 +26,6 @@ const ENCRYPTED_TYPE = "m.room.encrypted"; // note that encrypt could still create a new session const MIN_PRESHARE_INTERVAL = 60 * 1000; // 1min -function encodeMissingSessionKey(senderKey, sessionId) { - return `${senderKey}|${sessionId}`; -} - -function decodeMissingSessionKey(key) { - const [senderKey, sessionId] = key.split("|"); - return {senderKey, sessionId}; -} - export class RoomEncryption { constructor({room, deviceTracker, olmEncryption, megolmEncryption, megolmDecryption, encryptionParams, storage, sessionBackup, notifyMissingMegolmSession, clock}) { this._room = room; @@ -43,32 +35,41 @@ export class RoomEncryption { this._megolmDecryption = megolmDecryption; // content of the m.room.encryption event this._encryptionParams = encryptionParams; - this._megolmBackfillCache = this._megolmDecryption.createSessionCache(); this._megolmSyncCache = this._megolmDecryption.createSessionCache(1); - // session => event ids of messages we tried to decrypt and the session was missing - this._missingSessions = new SessionToEventIdsMap(); - // sessions that may or may not be missing, but that while - // looking for a particular session came up as a candidate and were - // added to the cache to prevent further lookups from storage - this._missingSessionCandidates = new SessionToEventIdsMap(); + // caches devices to verify events this._senderDeviceCache = new Map(); this._storage = storage; this._sessionBackup = sessionBackup; this._notifyMissingMegolmSession = notifyMissingMegolmSession; this._clock = clock; - this._disposed = false; this._isFlushingRoomKeyShares = false; this._lastKeyPreShareTime = null; + this._disposed = false; } - async enableSessionBackup(sessionBackup) { + enableSessionBackup(sessionBackup) { if (this._sessionBackup) { return; } this._sessionBackup = sessionBackup; - for(const {senderKey, sessionId} of this._missingSessions.getSessions()) { - await this._requestMissingSessionFromBackup(senderKey, sessionId, null); + } + + async restoreMissingSessionsFromBackup(entries) { + const events = entries.filter(e => e.isEncrypted && !e.isDecrypted && e.event).map(e => e.event); + const eventsBySession = groupEventsBySession(events); + const groups = Array.from(eventsBySession.values()); + const txn = this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); + const hasSessions = await Promise.all(groups.map(async group => { + return this._megolmDecryption.hasSession(this._room.id, group.senderKey, group.sessionId, txn); + })); + const missingSessions = groups.filter((_, i) => !hasSessions[i]); + if (missingSessions.length) { + // start with last sessions which should be for the last items in the timeline + for (var i = missingSessions.length - 1; i >= 0; i--) { + const session = missingSessions[i]; + await this._requestMissingSessionFromBackup(session.senderKey, session.sessionId); + } } } @@ -79,21 +80,27 @@ export class RoomEncryption { this._senderDeviceCache = new Map(); // purge the sender device cache } - async writeMemberChanges(memberChanges, txn) { + async writeMemberChanges(memberChanges, txn, log) { + let shouldFlush; const memberChangesArray = Array.from(memberChanges.values()); if (memberChangesArray.some(m => m.hasLeft)) { + log.log({ + l: "discardOutboundSession", + leftUsers: memberChangesArray.filter(m => m.hasLeft).map(m => m.userId), + }); this._megolmEncryption.discardOutboundSession(this._room.id, txn); } if (memberChangesArray.some(m => m.hasJoined)) { - await this._addShareRoomKeyOperationForNewMembers(memberChangesArray, txn); + shouldFlush = await this._addShareRoomKeyOperationForNewMembers(memberChangesArray, txn, log); } await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn); + return shouldFlush; } // this happens before entries exists, as they are created by the syncwriter // but we want to be able to map it back to something in the timeline easily // when retrying decryption. - async prepareDecryptAll(events, newKeys, source, isTimelineOpen, txn) { + async prepareDecryptAll(events, newKeys, source, txn) { const errors = new Map(); const validEvents = []; for (const event of events) { @@ -126,24 +133,51 @@ export class RoomEncryption { if (customCache) { customCache.dispose(); } - return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this, events); + return new DecryptionPreparation(preparation, errors, source, this, events); } - async _processDecryptionResults(events, results, errors, flags, txn) { - for (const event of events) { + async _processDecryptionResults(events, results, errors, source, txn) { + const missingSessionEvents = events.filter(event => { const error = errors.get(event.event_id); - if (error?.code === "MEGOLM_NO_SESSION") { - this._addMissingSessionEvent(event, flags.source); - } else { - this._missingSessions.removeEvent(event); - this._missingSessionCandidates.removeEvent(event); - } + return error?.code === "MEGOLM_NO_SESSION"; + }); + if (!missingSessionEvents.length) { + return; } - if (flags.isTimelineOpen) { - for (const result of results.values()) { - await this._verifyDecryptionResult(result, txn); - } + const eventsBySession = groupEventsBySession(events); + if (source === DecryptionSource.Sync) { + await Promise.all(Array.from(eventsBySession.values()).map(async group => { + const eventIds = group.events.map(e => e.event_id); + return this._megolmDecryption.addMissingKeyEventIds( + this._room.id, group.senderKey, group.sessionId, eventIds, txn); + })); } + + // TODO: do proper logging here + // run detached + Promise.resolve().then(async () => { + // if the message came from sync, wait 10s to see if the room key arrives late, + // and only after that proceed to request from backup + if (source === DecryptionSource.Sync) { + await this._clock.createTimeout(10000).elapsed(); + if (this._disposed) { + return; + } + // now check which sessions have been received already + const txn = this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); + await Promise.all(Array.from(eventsBySession).map(async ([key, group]) => { + if (await this._megolmDecryption.hasSession(this._room.id, group.senderKey, group.sessionId, txn)) { + eventsBySession.delete(key); + } + })); + } + await Promise.all(Array.from(eventsBySession.values()).map(group => { + return this._requestMissingSessionFromBackup(group.senderKey, group.sessionId); + })); + }).catch(err => { + console.log("failed to fetch missing session from key backup"); + console.error(err); + }); } async _verifyDecryptionResult(result, txn) { @@ -159,24 +193,7 @@ export class RoomEncryption { } } - _addMissingSessionEvent(event, source) { - const isNewSession = this._missingSessions.addEvent(event); - if (isNewSession) { - const senderKey = event.content?.["sender_key"]; - const sessionId = event.content?.["session_id"]; - this._requestMissingSessionFromBackup(senderKey, sessionId, source); - } - } - - async _requestMissingSessionFromBackup(senderKey, sessionId, source) { - // if the message came from sync, wait 10s to see if the room key arrives, - // and only after that proceed to request from backup - if (source === DecryptionSource.Sync) { - await this._clock.createTimeout(10000).elapsed(); - if (this._disposed || !this._missingSessions.hasSession(senderKey, sessionId)) { - return; - } - } + async _requestMissingSessionFromBackup(senderKey, sessionId) { // show prompt to enable secret storage if (!this._sessionBackup) { this._notifyMissingMegolmSession(); @@ -216,43 +233,19 @@ export class RoomEncryption { console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`); } } catch (err) { - console.error(`Could not get session ${sessionId} from backup`, err); + if (!(err.name === "HomeServerError" && err.errcode === "M_NOT_FOUND")) { + console.error(`Could not get session ${sessionId} from backup`, err); + } } } /** * @param {RoomKey} roomKeys - * @return {Array} the event ids that should be retried to decrypt + * @param {Transaction} txn + * @return {Promise>} the event ids that should be retried to decrypt */ - getEventIdsForRoomKey(roomKey) { - // TODO: we could concat both results here, and only put stuff in - // candidates if it is not in missing sessions to use a bit less memory - let eventIds = this._missingSessions.getEventIds(roomKey.senderKey, roomKey.sessionId); - if (!eventIds) { - eventIds = this._missingSessionCandidates.getEventIds(roomKey.senderKey, roomKey.sessionId); - } - return eventIds; - } - - /** - * caches mapping of session to event id of all encrypted candidates - * and filters to return only the candidates for the given room key - */ - findAndCacheEntriesForRoomKey(roomKey, candidateEntries) { - const matches = []; - - for (const entry of candidateEntries) { - if (entry.eventType === ENCRYPTED_TYPE) { - this._missingSessionCandidates.addEvent(entry.event); - const senderKey = entry.event?.content?.["sender_key"]; - const sessionId = entry.event?.content?.["session_id"]; - if (senderKey === roomKey.senderKey && sessionId === roomKey.sessionId) { - matches.push(entry); - } - } - } - - return matches; + getEventIdsForMissingKey(roomKey, txn) { + return this._megolmDecryption.getEventIdsForMissingKey(this._room.id, roomKey.senderKey, roomKey.sessionId, txn); } /** shares the encryption key for the next message if needed */ @@ -322,13 +315,20 @@ export class RoomEncryption { await removeOpTxn.complete(); } - async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn) { + async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn, log) { const userIds = memberChangesArray.filter(m => m.hasJoined).map(m => m.userId); const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage( this._room.id, txn); if (roomKeyMessage) { + log.log({ + l: "share key for new members", userIds, + id: roomKeyMessage.session_id, + chain_index: roomKeyMessage.chain_index + }); this._writeRoomKeyShareOperation(roomKeyMessage, userIds, txn); + return true; } + return false; } _writeRoomKeyShareOperation(roomKeyMessage, userIds, txn) { @@ -400,13 +400,15 @@ export class RoomEncryption { await hsApi.sendToDevice(type, payload, txnId, {log}).response(); } - filterEventEntriesForKeys(entries, keys) { + filterUndecryptedEventEntriesForKeys(entries, keys) { return entries.filter(entry => { - const {event} = entry; - if (event) { - const senderKey = event.content?.["sender_key"]; - const sessionId = event.content?.["session_id"]; - return keys.some(key => senderKey === key.senderKey && sessionId === key.sessionId); + if (entry.isEncrypted && !entry.isDecrypted) { + const {event} = entry; + if (event) { + const senderKey = event.content?.["sender_key"]; + const sessionId = event.content?.["session_id"]; + return keys.some(key => senderKey === key.senderKey && sessionId === key.sessionId); + } } return false; }); @@ -424,10 +426,10 @@ export class RoomEncryption { * the decryption results before turning them */ class DecryptionPreparation { - constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption, events) { + constructor(megolmDecryptionPreparation, extraErrors, source, roomEncryption, events) { this._megolmDecryptionPreparation = megolmDecryptionPreparation; this._extraErrors = extraErrors; - this._flags = flags; + this._source = source; this._roomEncryption = roomEncryption; this._events = events; } @@ -436,7 +438,7 @@ class DecryptionPreparation { return new DecryptionChanges( await this._megolmDecryptionPreparation.decrypt(), this._extraErrors, - this._flags, + this._source, this._roomEncryption, this._events); } @@ -447,10 +449,10 @@ class DecryptionPreparation { } class DecryptionChanges { - constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption, events) { + constructor(megolmDecryptionChanges, extraErrors, source, roomEncryption, events) { this._megolmDecryptionChanges = megolmDecryptionChanges; this._extraErrors = extraErrors; - this._flags = flags; + this._source = source; this._roomEncryption = roomEncryption; this._events = events; } @@ -458,15 +460,16 @@ class DecryptionChanges { async write(txn) { const {results, errors} = await this._megolmDecryptionChanges.write(txn); mergeMap(this._extraErrors, errors); - await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._flags, txn); - return new BatchDecryptionResult(results, errors); + await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._source, txn); + return new BatchDecryptionResult(results, errors, this._roomEncryption); } } class BatchDecryptionResult { - constructor(results, errors) { + constructor(results, errors, roomEncryption) { this.results = results; this.errors = errors; + this._roomEncryption = roomEncryption; } applyToEntries(entries) { @@ -482,59 +485,10 @@ class BatchDecryptionResult { } } } -} -class SessionToEventIdsMap { - constructor() { - this._eventIdsBySession = new Map(); - } - - addEvent(event) { - let isNewSession = false; - const senderKey = event.content?.["sender_key"]; - const sessionId = event.content?.["session_id"]; - const key = encodeMissingSessionKey(senderKey, sessionId); - let eventIds = this._eventIdsBySession.get(key); - // new missing session - if (!eventIds) { - eventIds = new Set(); - this._eventIdsBySession.set(key, eventIds); - isNewSession = true; - } - eventIds.add(event.event_id); - return isNewSession; - } - - getEventIds(senderKey, sessionId) { - const key = encodeMissingSessionKey(senderKey, sessionId); - const entriesForSession = this._eventIdsBySession.get(key); - if (entriesForSession) { - return [...entriesForSession]; - } - } - - getSessions() { - return Array.from(this._eventIdsBySession.keys()).map(decodeMissingSessionKey); - } - - hasSession(senderKey, sessionId) { - return this._eventIdsBySession.has(encodeMissingSessionKey(senderKey, sessionId)); - } - - removeEvent(event) { - let hasRemovedSession = false; - const senderKey = event.content?.["sender_key"]; - const sessionId = event.content?.["session_id"]; - const key = encodeMissingSessionKey(senderKey, sessionId); - let eventIds = this._eventIdsBySession.get(key); - if (eventIds) { - if (eventIds.delete(event.event_id)) { - if (!eventIds.length) { - this._eventIdsBySession.delete(key); - hasRemovedSession = true; - } - } - } - return hasRemovedSession; + verifySenders(txn) { + return Promise.all(Array.from(this.results.values()).map(result => { + return this._roomEncryption._verifyDecryptionResult(result, txn); + })); } } diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index 80a55961..8f4714ea 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -15,25 +15,13 @@ limitations under the License. */ import {DecryptionError} from "../common.js"; -import {groupBy} from "../../../utils/groupBy.js"; import * as RoomKey from "./decryption/RoomKey.js"; import {SessionInfo} from "./decryption/SessionInfo.js"; import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js"; import {SessionDecryption} from "./decryption/SessionDecryption.js"; import {SessionCache} from "./decryption/SessionCache.js"; import {MEGOLM_ALGORITHM} from "../common.js"; - -function getSenderKey(event) { - return event.content?.["sender_key"]; -} - -function getSessionId(event) { - return event.content?.["session_id"]; -} - -function getCiphertext(event) { - return event.content?.ciphertext; -} +import {validateEvent, groupEventsBySession} from "./decryption/utils.js"; export class Decryption { constructor({pickleKey, olm, olmWorker}) { @@ -46,6 +34,37 @@ export class Decryption { return new SessionCache(size); } + async addMissingKeyEventIds(roomId, senderKey, sessionId, eventIds, txn) { + let sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId); + // we never want to overwrite an existing key + if (sessionEntry?.session) { + return; + } + if (sessionEntry) { + const uniqueEventIds = new Set(sessionEntry.eventIds); + for (const id of eventIds) { + uniqueEventIds.add(id); + } + sessionEntry.eventIds = Array.from(uniqueEventIds); + } else { + sessionEntry = {roomId, senderKey, sessionId, eventIds}; + } + txn.inboundGroupSessions.set(sessionEntry); + } + + async getEventIdsForMissingKey(roomId, senderKey, sessionId, txn) { + const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId); + if (sessionEntry && !sessionEntry.session) { + return sessionEntry.eventIds; + } + } + + async hasSession(roomId, senderKey, sessionId, txn) { + const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId); + const isValidSession = typeof sessionEntry?.session === "string"; + return isValidSession; + } + /** * Reads all the state from storage to be able to decrypt the given events. * Decryption can then happen outside of a storage transaction. @@ -61,28 +80,22 @@ export class Decryption { const validEvents = []; for (const event of events) { - const isValid = typeof getSenderKey(event) === "string" && - typeof getSessionId(event) === "string" && - typeof getCiphertext(event) === "string"; - if (isValid) { + if (validateEvent(event)) { validEvents.push(event); } else { errors.set(event.event_id, new DecryptionError("MEGOLM_INVALID_EVENT", event)) } } - const eventsBySession = groupBy(validEvents, event => { - return `${getSenderKey(event)}|${getSessionId(event)}`; - }); + const eventsBySession = groupEventsBySession(validEvents); const sessionDecryptions = []; - await Promise.all(Array.from(eventsBySession.values()).map(async eventsForSession => { - const firstEvent = eventsForSession[0]; - const sessionInfo = await this._getSessionInfoForEvent(roomId, firstEvent, newKeys, sessionCache, txn); + await Promise.all(Array.from(eventsBySession.values()).map(async group => { + const sessionInfo = await this._getSessionInfo(roomId, group.senderKey, group.sessionId, newKeys, sessionCache, txn); if (sessionInfo) { - sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._olmWorker)); + sessionDecryptions.push(new SessionDecryption(sessionInfo, group.events, this._olmWorker)); } else { - for (const event of eventsForSession) { + for (const event of group.events) { errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event)); } } @@ -91,9 +104,7 @@ export class Decryption { return new DecryptionPreparation(roomId, sessionDecryptions, errors); } - async _getSessionInfoForEvent(roomId, event, newKeys, sessionCache, txn) { - const senderKey = getSenderKey(event); - const sessionId = getSessionId(event); + async _getSessionInfo(roomId, senderKey, sessionId, newKeys, sessionCache, txn) { let sessionInfo; if (newKeys) { const key = newKeys.find(k => k.roomId === roomId && k.senderKey === senderKey && k.sessionId === sessionId); @@ -110,7 +121,7 @@ export class Decryption { } if (!sessionInfo) { const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId); - if (sessionEntry) { + if (sessionEntry && sessionEntry.session) { let session = new this._olm.InboundGroupSession(); try { session.unpickle(this._pickleKey, sessionEntry.session); diff --git a/src/matrix/e2ee/megolm/decryption/RoomKey.js b/src/matrix/e2ee/megolm/decryption/RoomKey.js index e206003a..1880961b 100644 --- a/src/matrix/e2ee/megolm/decryption/RoomKey.js +++ b/src/matrix/e2ee/megolm/decryption/RoomKey.js @@ -33,7 +33,7 @@ export class BaseRoomKey { async _isBetterThanKnown(session, olm, pickleKey, txn) { let isBetter = true; const existingSessionEntry = await txn.inboundGroupSessions.get(this.roomId, this.senderKey, this.sessionId); - if (existingSessionEntry) { + if (existingSessionEntry?.session) { const existingSession = new olm.InboundGroupSession(); try { existingSession.unpickle(pickleKey, existingSessionEntry.session); diff --git a/src/matrix/e2ee/megolm/decryption/utils.js b/src/matrix/e2ee/megolm/decryption/utils.js new file mode 100644 index 00000000..c38b1416 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/utils.js @@ -0,0 +1,57 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {groupByWithCreator} from "../../../../utils/groupBy.js"; + +function getSenderKey(event) { + return event.content?.["sender_key"]; +} + +function getSessionId(event) { + return event.content?.["session_id"]; +} + +function getCiphertext(event) { + return event.content?.ciphertext; +} + +export function validateEvent(event) { + return typeof getSenderKey(event) === "string" && + typeof getSessionId(event) === "string" && + typeof getCiphertext(event) === "string"; +} + +class SessionKeyGroup { + constructor() { + this.events = []; + } + + get senderKey() { + return getSenderKey(this.events[0]); + } + + get sessionId() { + return getSessionId(this.events[0]); + } +} + +export function groupEventsBySession(events) { + return groupByWithCreator(events, + event => `${getSenderKey(event)}|${getSessionId(event)}`, + () => new SessionKeyGroup(), + (group, event) => group.events.push(event) + ); +} diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 5883da1a..89653b62 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -18,7 +18,6 @@ import {EventEmitter} from "../../utils/EventEmitter.js"; import {RoomSummary} from "./RoomSummary.js"; import {SyncWriter} from "./timeline/persistence/SyncWriter.js"; import {GapWriter} from "./timeline/persistence/GapWriter.js"; -import {readRawTimelineEntriesWithTxn} from "./timeline/persistence/TimelineReader.js"; import {Timeline} from "./timeline/Timeline.js"; import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js"; import {SendQueue} from "./sending/SendQueue.js"; @@ -27,8 +26,6 @@ import {fetchOrLoadMembers} from "./members/load.js"; import {MemberList} from "./members/MemberList.js"; import {Heroes} from "./members/Heroes.js"; import {EventEntry} from "./timeline/entries/EventEntry.js"; -import {EventKey} from "./timeline/EventKey.js"; -import {Direction} from "./timeline/Direction.js"; import {ObservedEventMap} from "./ObservedEventMap.js"; import {AttachmentUpload} from "./AttachmentUpload.js"; import {DecryptionSource} from "../e2ee/common.js"; @@ -58,56 +55,36 @@ export class Room extends EventEmitter { this._observedEvents = null; } - _readRetryDecryptCandidateEntries(sinceEventKey, txn) { - if (sinceEventKey) { - return readRawTimelineEntriesWithTxn(this._roomId, sinceEventKey, - Direction.Forward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn); - } else { - // all messages for room ... - // if you haven't decrypted any message in a room yet, - // it's unlikely you will have tons of them. - // so this should be fine as a last resort - return readRawTimelineEntriesWithTxn(this._roomId, this._syncWriter.lastMessageKey, - Direction.Backward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn); - } - } - - async notifyRoomKey(roomKey) { - if (!this._roomEncryption) { - return; - } - const retryEventIds = this._roomEncryption.getEventIdsForRoomKey(roomKey); - const stores = [ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.inboundGroupSessions, - ]; - let txn; - let retryEntries; + async _getRetryDecryptEntriesForKey(roomKey, txn) { + const retryEventIds = await this._roomEncryption.getEventIdsForMissingKey(roomKey, txn); + const retryEntries = []; if (retryEventIds) { - retryEntries = []; - txn = this._storage.readTxn(stores); for (const eventId of retryEventIds) { const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); if (storageEntry) { retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); } } - } else { - // we only look for messages since lastDecryptedEventKey because - // the timeline must be closed (otherwise getEventIdsForRoomKey would have found the event ids) - // and to update the summary we only care about events since lastDecryptedEventKey - const key = this._summary.data.lastDecryptedEventKey; - // key might be missing if we haven't decrypted any events in this room - const sinceEventKey = key && new EventKey(key.fragmentId, key.entryIndex); - // check we have not already decrypted the most recent event in the room - // otherwise we know that the messages for this room key will not update the room summary - if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) { - txn = this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments)); - const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn); - retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries); - } } - if (retryEntries?.length) { + return retryEntries; + } + + /** + * Used for keys received from other sources than sync, like key backup. + * @internal + * @param {RoomKey} roomKey + * @return {Promise} + */ + async notifyRoomKey(roomKey) { + if (!this._roomEncryption) { + return; + } + const txn = this._storage.readTxn([ + this._storage.storeNames.timelineEvents, + this._storage.storeNames.inboundGroupSessions, + ]); + const retryEntries = this._getRetryDecryptEntriesForKey(roomKey, txn); + if (retryEntries.length) { const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); // this will close txn while awaiting decryption await decryptRequest.complete(); @@ -147,13 +124,13 @@ export class Room extends EventEmitter { const events = entries.filter(entry => { return entry.eventType === EVENT_ENCRYPTED_TYPE; }).map(entry => entry.event); - const isTimelineOpen = this._isTimelineOpen; - r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, isTimelineOpen, inboundSessionTxn); + r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, inboundSessionTxn); if (r.cancelled) return; const changes = await r.preparation.decrypt(); r.preparation = null; if (r.cancelled) return; const stores = [this._storage.storeNames.groupSessionDecryptions]; + const isTimelineOpen = this._isTimelineOpen; if (isTimelineOpen) { // read to fetch devices if timeline is open stores.push(this._storage.storeNames.deviceIdentities); @@ -162,6 +139,9 @@ export class Room extends EventEmitter { let decryption; try { decryption = await changes.write(writeTxn); + if (isTimelineOpen) { + await decryption.verifySenders(writeTxn); + } } catch (err) { writeTxn.abort(); throw err; @@ -176,6 +156,26 @@ export class Room extends EventEmitter { return request; } + async _getSyncRetryDecryptEntries(newKeys, txn) { + const entriesPerKey = await Promise.all(newKeys.map(key => this._getRetryDecryptEntriesForKey(key, txn))); + let retryEntries = entriesPerKey.reduce((allEntries, entries) => allEntries.concat(entries), []); + // If we have the timeline open, see if there are more entries for the new keys + // as we only store missing session information for synced events, not backfilled. + // We want to decrypt all events we can though if the user is looking + // at them when the timeline is open + if (this._timeline) { + let retryTimelineEntries = this._roomEncryption.filterUndecryptedEventEntriesForKeys(this._timeline.remoteEntries, newKeys); + // filter out any entries already in retryEntries so we don't decrypt them twice + const existingIds = retryEntries.reduce((ids, e) => {ids.add(e.id); return ids;}, new Set()); + retryTimelineEntries = retryTimelineEntries.filter(e => !existingIds.has(e.id)); + // make copies so we don't modify the original entry in writeSync, before the afterSync stage + const retryTimelineEntriesCopies = retryTimelineEntries.map(e => e.clone()); + // add to other retry entries + retryEntries = retryEntries.concat(retryTimelineEntriesCopies); + } + return retryEntries; + } + async prepareSync(roomResponse, membership, newKeys, txn, log) { log.set("id", this.id); if (newKeys) { @@ -192,25 +192,21 @@ export class Room extends EventEmitter { let retryEntries; let decryptPreparation; if (roomEncryption) { - // also look for events in timeline here - let events = roomResponse?.timeline?.events || []; - // when new keys arrive, also see if any events currently loaded in the timeline - // can now be retried to decrypt - if (this._timeline && newKeys) { - retryEntries = roomEncryption.filterEventEntriesForKeys( - this._timeline.remoteEntries, newKeys); + let eventsToDecrypt = roomResponse?.timeline?.events || []; + // when new keys arrive, also see if any older events can now be retried to decrypt + if (newKeys) { + retryEntries = await this._getSyncRetryDecryptEntries(newKeys, txn); if (retryEntries.length) { log.set("retry", retryEntries.length); - events = events.concat(retryEntries.map(entry => entry.event)); + eventsToDecrypt = eventsToDecrypt.concat(retryEntries.map(entry => entry.event)); } } - - if (events.length) { - const eventsToDecrypt = events.filter(event => { - return event?.type === EVENT_ENCRYPTED_TYPE; - }); + eventsToDecrypt = eventsToDecrypt.filter(event => { + return event?.type === EVENT_ENCRYPTED_TYPE; + }); + if (eventsToDecrypt.length) { decryptPreparation = await roomEncryption.prepareDecryptAll( - eventsToDecrypt, newKeys, DecryptionSource.Sync, this._isTimelineOpen, txn); + eventsToDecrypt, newKeys, DecryptionSource.Sync, txn); } } @@ -225,7 +221,7 @@ export class Room extends EventEmitter { async afterPrepareSync(preparation, parentLog) { if (preparation.decryptPreparation) { - await parentLog.wrap("afterPrepareSync decrypt", async log => { + await parentLog.wrap("decrypt", async log => { log.set("id", this.id); preparation.decryptChanges = await preparation.decryptPreparation.decrypt(); preparation.decryptPreparation = null; @@ -236,28 +232,37 @@ export class Room extends EventEmitter { /** @package */ async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption, retryEntries}, txn, log) { log.set("id", this.id); - const {entries, newLiveKey, memberChanges} = + const {entries: newEntries, newLiveKey, memberChanges} = await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, txn, log), log.level.Detail); + let allEntries = newEntries; if (decryptChanges) { const decryption = await decryptChanges.write(txn); - if (retryEntries?.length) { - // TODO: this will modify existing timeline entries (which we should not do in writeSync), - // but it is a temporary way of reattempting decryption while timeline is open - // won't need copies when tracking missing sessions properly - // prepend the retried entries, as we know they are older (not that it should matter much for the summary) - entries.unshift(...retryEntries); + log.set("decryptionResults", decryption.results.size); + log.set("decryptionErrors", decryption.errors.size); + if (this._isTimelineOpen) { + await decryption.verifySenders(txn); + } + decryption.applyToEntries(newEntries); + if (retryEntries?.length) { + decryption.applyToEntries(retryEntries); + allEntries = retryEntries.concat(allEntries); } - decryption.applyToEntries(entries); } + log.set("allEntries", allEntries.length); + let shouldFlushKeyShares = false; // pass member changes to device tracker if (roomEncryption && this.isTrackingMembers && memberChanges?.size) { - await roomEncryption.writeMemberChanges(memberChanges, txn); + shouldFlushKeyShares = await roomEncryption.writeMemberChanges(memberChanges, txn, log); + log.set("shouldFlushKeyShares", shouldFlushKeyShares); } // also apply (decrypted) timeline entries to the summary changes summaryChanges = summaryChanges.applyTimelineEntries( - entries, isInitialSync, !this._isTimelineOpen, this._user.id); + allEntries, isInitialSync, !this._isTimelineOpen, this._user.id); // write summary changes, and unset if nothing was actually changed summaryChanges = this._summary.writeData(summaryChanges, txn); + if (summaryChanges) { + log.set("summaryChanges", summaryChanges.diff(this._summary.data)); + } // fetch new members while we have txn open, // but don't make any in-memory changes yet let heroChanges; @@ -275,11 +280,13 @@ export class Room extends EventEmitter { return { summaryChanges, roomEncryption, - newAndUpdatedEntries: entries, + newEntries, + updatedEntries: retryEntries || [], newLiveKey, removedPendingEvents, memberChanges, heroChanges, + shouldFlushKeyShares, }; } @@ -288,7 +295,12 @@ export class Room extends EventEmitter { * Called with the changes returned from `writeSync` to apply them and emit changes. * No storage or network operations should be done here. */ - afterSync({summaryChanges, newAndUpdatedEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}, log) { + afterSync(changes, log) { + const { + summaryChanges, newEntries, updatedEntries, newLiveKey, + removedPendingEvents, memberChanges, + heroChanges, roomEncryption + } = changes; log.set("id", this.id); this._syncWriter.afterSync(newLiveKey); this._setEncryption(roomEncryption); @@ -321,18 +333,21 @@ export class Room extends EventEmitter { this._emitUpdate(); } if (this._timeline) { - this._timeline.appendLiveEntries(newAndUpdatedEntries); + // these should not be added if not already there + this._timeline.replaceEntries(updatedEntries); + this._timeline.addOrReplaceEntries(newEntries); } if (this._observedEvents) { - this._observedEvents.updateEvents(newAndUpdatedEntries); + this._observedEvents.updateEvents(updatedEntries); + this._observedEvents.updateEvents(newEntries); } if (removedPendingEvents) { this._sendQueue.emitRemovals(removedPendingEvents); } } - needsAfterSyncCompleted({memberChanges}) { - return this._roomEncryption?.needsToShareKeys(memberChanges); + needsAfterSyncCompleted({shouldFlushKeyShares}) { + return shouldFlushKeyShares; } /** @@ -483,7 +498,7 @@ export class Room extends EventEmitter { this._sendQueue.emitRemovals(removedPendingEvents); } if (this._timeline) { - this._timeline.addGapEntries(gapResult.entries); + this._timeline.addOrReplaceEntries(gapResult.entries); } }); } @@ -548,6 +563,10 @@ export class Room extends EventEmitter { enableSessionBackup(sessionBackup) { this._roomEncryption?.enableSessionBackup(sessionBackup); + // TODO: do we really want to do this every time you open the app? + if (this._timeline) { + this._roomEncryption.restoreMissingSessionsFromBackup(this._timeline.remoteEntries); + } } get isTrackingMembers() { diff --git a/src/matrix/room/RoomSummary.js b/src/matrix/room/RoomSummary.js index 39b1d1b3..8c7c8576 100644 --- a/src/matrix/room/RoomSummary.js +++ b/src/matrix/room/RoomSummary.js @@ -122,36 +122,6 @@ function processTimelineEvent(data, eventEntry, isInitialSync, canMarkUnread, ow data = data.cloneIfNeeded(); data.isUnread = true; } - const {content} = eventEntry; - const body = content?.body; - const msgtype = content?.msgtype; - if (msgtype === "m.text" && !eventEntry.isEncrypted) { - data = data.cloneIfNeeded(); - data.lastMessageBody = body; - } - } - // store the event key of the last decrypted event so when decryption does succeed, - // we can attempt to re-decrypt from this point to update the room summary - if (!!data.encryption && eventEntry.isEncrypted && eventEntry.isDecrypted) { - let hasLargerEventKey = true; - if (data.lastDecryptedEventKey) { - try { - hasLargerEventKey = eventEntry.compare(data.lastDecryptedEventKey) > 0; - } catch (err) { - // TODO: load the fragments in between here? - // this could happen if an earlier event gets decrypted that - // is in a fragment different from the live one and the timeline is not open. - // In this case, we will just read too many events once per app load - // and then keep the mapping in memory. When eventually an event is decrypted in - // the live fragment, this should stop failing and the event key will be written. - hasLargerEventKey = false; - } - } - if (hasLargerEventKey) { - data = data.cloneIfNeeded(); - const {fragmentId, entryIndex} = eventEntry; - data.lastDecryptedEventKey = {fragmentId, entryIndex}; - } } return data; } @@ -182,12 +152,9 @@ class SummaryData { constructor(copy, roomId) { this.roomId = copy ? copy.roomId : roomId; this.name = copy ? copy.name : null; - this.lastMessageBody = copy ? copy.lastMessageBody : null; this.lastMessageTimestamp = copy ? copy.lastMessageTimestamp : null; this.isUnread = copy ? copy.isUnread : false; this.encryption = copy ? copy.encryption : null; - this.lastDecryptedEventKey = copy ? copy.lastDecryptedEventKey : null; - this.isDirectMessage = copy ? copy.isDirectMessage : false; this.membership = copy ? copy.membership : null; this.inviteCount = copy ? copy.inviteCount : 0; this.joinCount = copy ? copy.joinCount : 0; @@ -202,6 +169,18 @@ class SummaryData { this.cloned = copy ? true : false; } + diff(other) { + const props = Object.getOwnPropertyNames(this); + return props.reduce((diff, prop) => { + if (prop !== "cloned") { + if (this[prop] !== other[prop]) { + diff[prop] = this[prop]; + } + } + return diff; + }, {}); + } + cloneIfNeeded() { if (this.cloned) { return this; diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index e5592526..7e2d2110 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -60,16 +60,8 @@ export class Timeline { } } - // TODO: should we rather have generic methods for - // - adding new entries - // - updating existing entries (redaction, relations) /** @package */ - appendLiveEntries(newEntries) { - this._remoteEntries.setManySorted(newEntries); - } - - /** @package */ - addGapEntries(newEntries) { + addOrReplaceEntries(newEntries) { this._remoteEntries.setManySorted(newEntries); } diff --git a/src/matrix/room/timeline/entries/EventEntry.js b/src/matrix/room/timeline/entries/EventEntry.js index 410c3d63..88a0aa5e 100644 --- a/src/matrix/room/timeline/entries/EventEntry.js +++ b/src/matrix/room/timeline/entries/EventEntry.js @@ -25,6 +25,13 @@ export class EventEntry extends BaseEntry { this._decryptionResult = null; } + clone() { + const clone = new EventEntry(this._eventEntry, this._fragmentIdComparer); + clone._decryptionResult = this._decryptionResult; + clone._decryptionError = this._decryptionError; + return clone; + } + get event() { return this._eventEntry.event; } diff --git a/src/matrix/room/timeline/persistence/TimelineReader.js b/src/matrix/room/timeline/persistence/TimelineReader.js index 24ad4127..7493727c 100644 --- a/src/matrix/room/timeline/persistence/TimelineReader.js +++ b/src/matrix/room/timeline/persistence/TimelineReader.js @@ -41,7 +41,7 @@ class ReaderRequest { * Raw because it doesn't do decryption and in the future it should not read relations either. * It is just about reading entries and following fragment links */ -export async function readRawTimelineEntriesWithTxn(roomId, eventKey, direction, amount, fragmentIdComparer, txn) { +async function readRawTimelineEntriesWithTxn(roomId, eventKey, direction, amount, fragmentIdComparer, txn) { let entries = []; const timelineStore = txn.timelineEvents; const fragmentStore = txn.timelineFragments;