Merge pull request #252 from vector-im/bwindels/track-unknown-sessions

Track event ids of missing megolm sessions
This commit is contained in:
Bruno Windels 2021-03-03 13:05:15 +00:00 committed by GitHub
commit ae5f1050ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 343 additions and 322 deletions

View file

@ -248,6 +248,7 @@ export class Sync {
return this._storage.readTxn([
storeNames.olmSessions,
storeNames.inboundGroupSessions,
storeNames.timelineEvents // to read events that can now be decrypted
]);
}

View file

@ -1,40 +1,41 @@
## Integratation within the sync lifetime cycle
### prepareSync
### session.prepareSync
Decrypt any device messages, and turn them into RoomKey instances.
Any rooms that are not in the sync response but for which we receive keys will be included in the rooms to sync.
Runs before any room.prepareSync, so the new room keys can be passed to each room prepareSync to use in decryption.
### room.prepareSync
The session can start its own read/write transactions here, rooms only read from a shared transaction
- session
- device handler
- txn
- write pending encrypted
- txn
- olm decryption read
- olm async decryption
- dispatch to worker
- txn
- olm decryption write / remove pending encrypted
- rooms (with shared read txn)
- megolm decryption read
- megolm decryption read using any new keys decrypted by the session.
### afterPrepareSync
### room.afterPrepareSync
- rooms
- megolm async decryption
- dispatch to worker
### writeSync
### room.writeSync
- rooms (with shared readwrite txn)
- megolm decryption write, yielding decrypted events
- use decrypted events to write room summary
### afterSync
### session.writeSync
- writes any room keys that were received
### room.afterSync
- rooms
- emit changes
### afterSyncCompleted
### room.afterSyncCompleted
- session
- e2ee account

View file

@ -15,8 +15,9 @@ limitations under the License.
*/
import {MEGOLM_ALGORITHM, DecryptionSource} from "./common.js";
import {groupBy} from "../../utils/groupBy.js";
import {groupEventsBySession} from "./megolm/decryption/utils.js";
import {mergeMap} from "../../utils/mergeMap.js";
import {groupBy} from "../../utils/groupBy.js";
import {makeTxnId} from "../common.js";
const ENCRYPTED_TYPE = "m.room.encrypted";
@ -25,15 +26,6 @@ const ENCRYPTED_TYPE = "m.room.encrypted";
// note that encrypt could still create a new session
const MIN_PRESHARE_INTERVAL = 60 * 1000; // 1min
function encodeMissingSessionKey(senderKey, sessionId) {
return `${senderKey}|${sessionId}`;
}
function decodeMissingSessionKey(key) {
const [senderKey, sessionId] = key.split("|");
return {senderKey, sessionId};
}
export class RoomEncryption {
constructor({room, deviceTracker, olmEncryption, megolmEncryption, megolmDecryption, encryptionParams, storage, sessionBackup, notifyMissingMegolmSession, clock}) {
this._room = room;
@ -43,32 +35,41 @@ export class RoomEncryption {
this._megolmDecryption = megolmDecryption;
// content of the m.room.encryption event
this._encryptionParams = encryptionParams;
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
this._megolmSyncCache = this._megolmDecryption.createSessionCache(1);
// session => event ids of messages we tried to decrypt and the session was missing
this._missingSessions = new SessionToEventIdsMap();
// sessions that may or may not be missing, but that while
// looking for a particular session came up as a candidate and were
// added to the cache to prevent further lookups from storage
this._missingSessionCandidates = new SessionToEventIdsMap();
// caches devices to verify events
this._senderDeviceCache = new Map();
this._storage = storage;
this._sessionBackup = sessionBackup;
this._notifyMissingMegolmSession = notifyMissingMegolmSession;
this._clock = clock;
this._disposed = false;
this._isFlushingRoomKeyShares = false;
this._lastKeyPreShareTime = null;
this._disposed = false;
}
async enableSessionBackup(sessionBackup) {
enableSessionBackup(sessionBackup) {
if (this._sessionBackup) {
return;
}
this._sessionBackup = sessionBackup;
for(const {senderKey, sessionId} of this._missingSessions.getSessions()) {
await this._requestMissingSessionFromBackup(senderKey, sessionId, null);
}
async restoreMissingSessionsFromBackup(entries) {
const events = entries.filter(e => e.isEncrypted && !e.isDecrypted && e.event).map(e => e.event);
const eventsBySession = groupEventsBySession(events);
const groups = Array.from(eventsBySession.values());
const txn = this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
const hasSessions = await Promise.all(groups.map(async group => {
return this._megolmDecryption.hasSession(this._room.id, group.senderKey, group.sessionId, txn);
}));
const missingSessions = groups.filter((_, i) => !hasSessions[i]);
if (missingSessions.length) {
// start with last sessions which should be for the last items in the timeline
for (var i = missingSessions.length - 1; i >= 0; i--) {
const session = missingSessions[i];
await this._requestMissingSessionFromBackup(session.senderKey, session.sessionId);
}
}
}
@ -79,21 +80,27 @@ export class RoomEncryption {
this._senderDeviceCache = new Map(); // purge the sender device cache
}
async writeMemberChanges(memberChanges, txn) {
async writeMemberChanges(memberChanges, txn, log) {
let shouldFlush;
const memberChangesArray = Array.from(memberChanges.values());
if (memberChangesArray.some(m => m.hasLeft)) {
log.log({
l: "discardOutboundSession",
leftUsers: memberChangesArray.filter(m => m.hasLeft).map(m => m.userId),
});
this._megolmEncryption.discardOutboundSession(this._room.id, txn);
}
if (memberChangesArray.some(m => m.hasJoined)) {
await this._addShareRoomKeyOperationForNewMembers(memberChangesArray, txn);
shouldFlush = await this._addShareRoomKeyOperationForNewMembers(memberChangesArray, txn, log);
}
await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
return shouldFlush;
}
// this happens before entries exists, as they are created by the syncwriter
// but we want to be able to map it back to something in the timeline easily
// when retrying decryption.
async prepareDecryptAll(events, newKeys, source, isTimelineOpen, txn) {
async prepareDecryptAll(events, newKeys, source, txn) {
const errors = new Map();
const validEvents = [];
for (const event of events) {
@ -126,24 +133,51 @@ export class RoomEncryption {
if (customCache) {
customCache.dispose();
}
return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this, events);
return new DecryptionPreparation(preparation, errors, source, this, events);
}
async _processDecryptionResults(events, results, errors, flags, txn) {
for (const event of events) {
async _processDecryptionResults(events, results, errors, source, txn) {
const missingSessionEvents = events.filter(event => {
const error = errors.get(event.event_id);
if (error?.code === "MEGOLM_NO_SESSION") {
this._addMissingSessionEvent(event, flags.source);
} else {
this._missingSessions.removeEvent(event);
this._missingSessionCandidates.removeEvent(event);
return error?.code === "MEGOLM_NO_SESSION";
});
if (!missingSessionEvents.length) {
return;
}
const eventsBySession = groupEventsBySession(events);
if (source === DecryptionSource.Sync) {
await Promise.all(Array.from(eventsBySession.values()).map(async group => {
const eventIds = group.events.map(e => e.event_id);
return this._megolmDecryption.addMissingKeyEventIds(
this._room.id, group.senderKey, group.sessionId, eventIds, txn);
}));
}
if (flags.isTimelineOpen) {
for (const result of results.values()) {
await this._verifyDecryptionResult(result, txn);
// TODO: do proper logging here
// run detached
Promise.resolve().then(async () => {
// if the message came from sync, wait 10s to see if the room key arrives late,
// and only after that proceed to request from backup
if (source === DecryptionSource.Sync) {
await this._clock.createTimeout(10000).elapsed();
if (this._disposed) {
return;
}
// now check which sessions have been received already
const txn = this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
await Promise.all(Array.from(eventsBySession).map(async ([key, group]) => {
if (await this._megolmDecryption.hasSession(this._room.id, group.senderKey, group.sessionId, txn)) {
eventsBySession.delete(key);
}
}));
}
await Promise.all(Array.from(eventsBySession.values()).map(group => {
return this._requestMissingSessionFromBackup(group.senderKey, group.sessionId);
}));
}).catch(err => {
console.log("failed to fetch missing session from key backup");
console.error(err);
});
}
async _verifyDecryptionResult(result, txn) {
@ -159,24 +193,7 @@ export class RoomEncryption {
}
}
_addMissingSessionEvent(event, source) {
const isNewSession = this._missingSessions.addEvent(event);
if (isNewSession) {
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
this._requestMissingSessionFromBackup(senderKey, sessionId, source);
}
}
async _requestMissingSessionFromBackup(senderKey, sessionId, source) {
// if the message came from sync, wait 10s to see if the room key arrives,
// and only after that proceed to request from backup
if (source === DecryptionSource.Sync) {
await this._clock.createTimeout(10000).elapsed();
if (this._disposed || !this._missingSessions.hasSession(senderKey, sessionId)) {
return;
}
}
async _requestMissingSessionFromBackup(senderKey, sessionId) {
// show prompt to enable secret storage
if (!this._sessionBackup) {
this._notifyMissingMegolmSession();
@ -216,43 +233,19 @@ export class RoomEncryption {
console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`);
}
} catch (err) {
if (!(err.name === "HomeServerError" && err.errcode === "M_NOT_FOUND")) {
console.error(`Could not get session ${sessionId} from backup`, err);
}
}
}
/**
* @param {RoomKey} roomKeys
* @return {Array<string>} the event ids that should be retried to decrypt
* @param {Transaction} txn
* @return {Promise<Array<string>>} the event ids that should be retried to decrypt
*/
getEventIdsForRoomKey(roomKey) {
// TODO: we could concat both results here, and only put stuff in
// candidates if it is not in missing sessions to use a bit less memory
let eventIds = this._missingSessions.getEventIds(roomKey.senderKey, roomKey.sessionId);
if (!eventIds) {
eventIds = this._missingSessionCandidates.getEventIds(roomKey.senderKey, roomKey.sessionId);
}
return eventIds;
}
/**
* caches mapping of session to event id of all encrypted candidates
* and filters to return only the candidates for the given room key
*/
findAndCacheEntriesForRoomKey(roomKey, candidateEntries) {
const matches = [];
for (const entry of candidateEntries) {
if (entry.eventType === ENCRYPTED_TYPE) {
this._missingSessionCandidates.addEvent(entry.event);
const senderKey = entry.event?.content?.["sender_key"];
const sessionId = entry.event?.content?.["session_id"];
if (senderKey === roomKey.senderKey && sessionId === roomKey.sessionId) {
matches.push(entry);
}
}
}
return matches;
getEventIdsForMissingKey(roomKey, txn) {
return this._megolmDecryption.getEventIdsForMissingKey(this._room.id, roomKey.senderKey, roomKey.sessionId, txn);
}
/** shares the encryption key for the next message if needed */
@ -322,13 +315,20 @@ export class RoomEncryption {
await removeOpTxn.complete();
}
async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn) {
async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn, log) {
const userIds = memberChangesArray.filter(m => m.hasJoined).map(m => m.userId);
const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage(
this._room.id, txn);
if (roomKeyMessage) {
log.log({
l: "share key for new members", userIds,
id: roomKeyMessage.session_id,
chain_index: roomKeyMessage.chain_index
});
this._writeRoomKeyShareOperation(roomKeyMessage, userIds, txn);
return true;
}
return false;
}
_writeRoomKeyShareOperation(roomKeyMessage, userIds, txn) {
@ -400,14 +400,16 @@ export class RoomEncryption {
await hsApi.sendToDevice(type, payload, txnId, {log}).response();
}
filterEventEntriesForKeys(entries, keys) {
filterUndecryptedEventEntriesForKeys(entries, keys) {
return entries.filter(entry => {
if (entry.isEncrypted && !entry.isDecrypted) {
const {event} = entry;
if (event) {
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
return keys.some(key => senderKey === key.senderKey && sessionId === key.sessionId);
}
}
return false;
});
}
@ -424,10 +426,10 @@ export class RoomEncryption {
* the decryption results before turning them
*/
class DecryptionPreparation {
constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption, events) {
constructor(megolmDecryptionPreparation, extraErrors, source, roomEncryption, events) {
this._megolmDecryptionPreparation = megolmDecryptionPreparation;
this._extraErrors = extraErrors;
this._flags = flags;
this._source = source;
this._roomEncryption = roomEncryption;
this._events = events;
}
@ -436,7 +438,7 @@ class DecryptionPreparation {
return new DecryptionChanges(
await this._megolmDecryptionPreparation.decrypt(),
this._extraErrors,
this._flags,
this._source,
this._roomEncryption,
this._events);
}
@ -447,10 +449,10 @@ class DecryptionPreparation {
}
class DecryptionChanges {
constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption, events) {
constructor(megolmDecryptionChanges, extraErrors, source, roomEncryption, events) {
this._megolmDecryptionChanges = megolmDecryptionChanges;
this._extraErrors = extraErrors;
this._flags = flags;
this._source = source;
this._roomEncryption = roomEncryption;
this._events = events;
}
@ -458,15 +460,16 @@ class DecryptionChanges {
async write(txn) {
const {results, errors} = await this._megolmDecryptionChanges.write(txn);
mergeMap(this._extraErrors, errors);
await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._flags, txn);
return new BatchDecryptionResult(results, errors);
await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._source, txn);
return new BatchDecryptionResult(results, errors, this._roomEncryption);
}
}
class BatchDecryptionResult {
constructor(results, errors) {
constructor(results, errors, roomEncryption) {
this.results = results;
this.errors = errors;
this._roomEncryption = roomEncryption;
}
applyToEntries(entries) {
@ -482,59 +485,10 @@ class BatchDecryptionResult {
}
}
}
}
class SessionToEventIdsMap {
constructor() {
this._eventIdsBySession = new Map();
}
addEvent(event) {
let isNewSession = false;
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
const key = encodeMissingSessionKey(senderKey, sessionId);
let eventIds = this._eventIdsBySession.get(key);
// new missing session
if (!eventIds) {
eventIds = new Set();
this._eventIdsBySession.set(key, eventIds);
isNewSession = true;
}
eventIds.add(event.event_id);
return isNewSession;
}
getEventIds(senderKey, sessionId) {
const key = encodeMissingSessionKey(senderKey, sessionId);
const entriesForSession = this._eventIdsBySession.get(key);
if (entriesForSession) {
return [...entriesForSession];
}
}
getSessions() {
return Array.from(this._eventIdsBySession.keys()).map(decodeMissingSessionKey);
}
hasSession(senderKey, sessionId) {
return this._eventIdsBySession.has(encodeMissingSessionKey(senderKey, sessionId));
}
removeEvent(event) {
let hasRemovedSession = false;
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
const key = encodeMissingSessionKey(senderKey, sessionId);
let eventIds = this._eventIdsBySession.get(key);
if (eventIds) {
if (eventIds.delete(event.event_id)) {
if (!eventIds.length) {
this._eventIdsBySession.delete(key);
hasRemovedSession = true;
}
}
}
return hasRemovedSession;
verifySenders(txn) {
return Promise.all(Array.from(this.results.values()).map(result => {
return this._roomEncryption._verifyDecryptionResult(result, txn);
}));
}
}

View file

@ -15,25 +15,13 @@ limitations under the License.
*/
import {DecryptionError} from "../common.js";
import {groupBy} from "../../../utils/groupBy.js";
import * as RoomKey from "./decryption/RoomKey.js";
import {SessionInfo} from "./decryption/SessionInfo.js";
import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
import {SessionDecryption} from "./decryption/SessionDecryption.js";
import {SessionCache} from "./decryption/SessionCache.js";
import {MEGOLM_ALGORITHM} from "../common.js";
function getSenderKey(event) {
return event.content?.["sender_key"];
}
function getSessionId(event) {
return event.content?.["session_id"];
}
function getCiphertext(event) {
return event.content?.ciphertext;
}
import {validateEvent, groupEventsBySession} from "./decryption/utils.js";
export class Decryption {
constructor({pickleKey, olm, olmWorker}) {
@ -46,6 +34,37 @@ export class Decryption {
return new SessionCache(size);
}
async addMissingKeyEventIds(roomId, senderKey, sessionId, eventIds, txn) {
let sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
// we never want to overwrite an existing key
if (sessionEntry?.session) {
return;
}
if (sessionEntry) {
const uniqueEventIds = new Set(sessionEntry.eventIds);
for (const id of eventIds) {
uniqueEventIds.add(id);
}
sessionEntry.eventIds = Array.from(uniqueEventIds);
} else {
sessionEntry = {roomId, senderKey, sessionId, eventIds};
}
txn.inboundGroupSessions.set(sessionEntry);
}
async getEventIdsForMissingKey(roomId, senderKey, sessionId, txn) {
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
if (sessionEntry && !sessionEntry.session) {
return sessionEntry.eventIds;
}
}
async hasSession(roomId, senderKey, sessionId, txn) {
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
const isValidSession = typeof sessionEntry?.session === "string";
return isValidSession;
}
/**
* Reads all the state from storage to be able to decrypt the given events.
* Decryption can then happen outside of a storage transaction.
@ -61,28 +80,22 @@ export class Decryption {
const validEvents = [];
for (const event of events) {
const isValid = typeof getSenderKey(event) === "string" &&
typeof getSessionId(event) === "string" &&
typeof getCiphertext(event) === "string";
if (isValid) {
if (validateEvent(event)) {
validEvents.push(event);
} else {
errors.set(event.event_id, new DecryptionError("MEGOLM_INVALID_EVENT", event))
}
}
const eventsBySession = groupBy(validEvents, event => {
return `${getSenderKey(event)}|${getSessionId(event)}`;
});
const eventsBySession = groupEventsBySession(validEvents);
const sessionDecryptions = [];
await Promise.all(Array.from(eventsBySession.values()).map(async eventsForSession => {
const firstEvent = eventsForSession[0];
const sessionInfo = await this._getSessionInfoForEvent(roomId, firstEvent, newKeys, sessionCache, txn);
await Promise.all(Array.from(eventsBySession.values()).map(async group => {
const sessionInfo = await this._getSessionInfo(roomId, group.senderKey, group.sessionId, newKeys, sessionCache, txn);
if (sessionInfo) {
sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._olmWorker));
sessionDecryptions.push(new SessionDecryption(sessionInfo, group.events, this._olmWorker));
} else {
for (const event of eventsForSession) {
for (const event of group.events) {
errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event));
}
}
@ -91,9 +104,7 @@ export class Decryption {
return new DecryptionPreparation(roomId, sessionDecryptions, errors);
}
async _getSessionInfoForEvent(roomId, event, newKeys, sessionCache, txn) {
const senderKey = getSenderKey(event);
const sessionId = getSessionId(event);
async _getSessionInfo(roomId, senderKey, sessionId, newKeys, sessionCache, txn) {
let sessionInfo;
if (newKeys) {
const key = newKeys.find(k => k.roomId === roomId && k.senderKey === senderKey && k.sessionId === sessionId);
@ -110,7 +121,7 @@ export class Decryption {
}
if (!sessionInfo) {
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
if (sessionEntry) {
if (sessionEntry && sessionEntry.session) {
let session = new this._olm.InboundGroupSession();
try {
session.unpickle(this._pickleKey, sessionEntry.session);

View file

@ -33,7 +33,7 @@ export class BaseRoomKey {
async _isBetterThanKnown(session, olm, pickleKey, txn) {
let isBetter = true;
const existingSessionEntry = await txn.inboundGroupSessions.get(this.roomId, this.senderKey, this.sessionId);
if (existingSessionEntry) {
if (existingSessionEntry?.session) {
const existingSession = new olm.InboundGroupSession();
try {
existingSession.unpickle(pickleKey, existingSessionEntry.session);

View file

@ -0,0 +1,57 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {groupByWithCreator} from "../../../../utils/groupBy.js";
function getSenderKey(event) {
return event.content?.["sender_key"];
}
function getSessionId(event) {
return event.content?.["session_id"];
}
function getCiphertext(event) {
return event.content?.ciphertext;
}
export function validateEvent(event) {
return typeof getSenderKey(event) === "string" &&
typeof getSessionId(event) === "string" &&
typeof getCiphertext(event) === "string";
}
class SessionKeyGroup {
constructor() {
this.events = [];
}
get senderKey() {
return getSenderKey(this.events[0]);
}
get sessionId() {
return getSessionId(this.events[0]);
}
}
export function groupEventsBySession(events) {
return groupByWithCreator(events,
event => `${getSenderKey(event)}|${getSessionId(event)}`,
() => new SessionKeyGroup(),
(group, event) => group.events.push(event)
);
}

View file

@ -18,7 +18,6 @@ import {EventEmitter} from "../../utils/EventEmitter.js";
import {RoomSummary} from "./RoomSummary.js";
import {SyncWriter} from "./timeline/persistence/SyncWriter.js";
import {GapWriter} from "./timeline/persistence/GapWriter.js";
import {readRawTimelineEntriesWithTxn} from "./timeline/persistence/TimelineReader.js";
import {Timeline} from "./timeline/Timeline.js";
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
import {SendQueue} from "./sending/SendQueue.js";
@ -27,8 +26,6 @@ import {fetchOrLoadMembers} from "./members/load.js";
import {MemberList} from "./members/MemberList.js";
import {Heroes} from "./members/Heroes.js";
import {EventEntry} from "./timeline/entries/EventEntry.js";
import {EventKey} from "./timeline/EventKey.js";
import {Direction} from "./timeline/Direction.js";
import {ObservedEventMap} from "./ObservedEventMap.js";
import {AttachmentUpload} from "./AttachmentUpload.js";
import {DecryptionSource} from "../e2ee/common.js";
@ -58,56 +55,36 @@ export class Room extends EventEmitter {
this._observedEvents = null;
}
_readRetryDecryptCandidateEntries(sinceEventKey, txn) {
if (sinceEventKey) {
return readRawTimelineEntriesWithTxn(this._roomId, sinceEventKey,
Direction.Forward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn);
} else {
// all messages for room ...
// if you haven't decrypted any message in a room yet,
// it's unlikely you will have tons of them.
// so this should be fine as a last resort
return readRawTimelineEntriesWithTxn(this._roomId, this._syncWriter.lastMessageKey,
Direction.Backward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn);
}
}
async notifyRoomKey(roomKey) {
if (!this._roomEncryption) {
return;
}
const retryEventIds = this._roomEncryption.getEventIdsForRoomKey(roomKey);
const stores = [
this._storage.storeNames.timelineEvents,
this._storage.storeNames.inboundGroupSessions,
];
let txn;
let retryEntries;
async _getRetryDecryptEntriesForKey(roomKey, txn) {
const retryEventIds = await this._roomEncryption.getEventIdsForMissingKey(roomKey, txn);
const retryEntries = [];
if (retryEventIds) {
retryEntries = [];
txn = this._storage.readTxn(stores);
for (const eventId of retryEventIds) {
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
if (storageEntry) {
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
}
}
} else {
// we only look for messages since lastDecryptedEventKey because
// the timeline must be closed (otherwise getEventIdsForRoomKey would have found the event ids)
// and to update the summary we only care about events since lastDecryptedEventKey
const key = this._summary.data.lastDecryptedEventKey;
// key might be missing if we haven't decrypted any events in this room
const sinceEventKey = key && new EventKey(key.fragmentId, key.entryIndex);
// check we have not already decrypted the most recent event in the room
// otherwise we know that the messages for this room key will not update the room summary
if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) {
txn = this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments));
const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn);
retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries);
}
return retryEntries;
}
if (retryEntries?.length) {
/**
* Used for keys received from other sources than sync, like key backup.
* @internal
* @param {RoomKey} roomKey
* @return {Promise}
*/
async notifyRoomKey(roomKey) {
if (!this._roomEncryption) {
return;
}
const txn = this._storage.readTxn([
this._storage.storeNames.timelineEvents,
this._storage.storeNames.inboundGroupSessions,
]);
const retryEntries = this._getRetryDecryptEntriesForKey(roomKey, txn);
if (retryEntries.length) {
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
// this will close txn while awaiting decryption
await decryptRequest.complete();
@ -147,13 +124,13 @@ export class Room extends EventEmitter {
const events = entries.filter(entry => {
return entry.eventType === EVENT_ENCRYPTED_TYPE;
}).map(entry => entry.event);
const isTimelineOpen = this._isTimelineOpen;
r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, isTimelineOpen, inboundSessionTxn);
r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, inboundSessionTxn);
if (r.cancelled) return;
const changes = await r.preparation.decrypt();
r.preparation = null;
if (r.cancelled) return;
const stores = [this._storage.storeNames.groupSessionDecryptions];
const isTimelineOpen = this._isTimelineOpen;
if (isTimelineOpen) {
// read to fetch devices if timeline is open
stores.push(this._storage.storeNames.deviceIdentities);
@ -162,6 +139,9 @@ export class Room extends EventEmitter {
let decryption;
try {
decryption = await changes.write(writeTxn);
if (isTimelineOpen) {
await decryption.verifySenders(writeTxn);
}
} catch (err) {
writeTxn.abort();
throw err;
@ -176,6 +156,26 @@ export class Room extends EventEmitter {
return request;
}
async _getSyncRetryDecryptEntries(newKeys, txn) {
const entriesPerKey = await Promise.all(newKeys.map(key => this._getRetryDecryptEntriesForKey(key, txn)));
let retryEntries = entriesPerKey.reduce((allEntries, entries) => allEntries.concat(entries), []);
// If we have the timeline open, see if there are more entries for the new keys
// as we only store missing session information for synced events, not backfilled.
// We want to decrypt all events we can though if the user is looking
// at them when the timeline is open
if (this._timeline) {
let retryTimelineEntries = this._roomEncryption.filterUndecryptedEventEntriesForKeys(this._timeline.remoteEntries, newKeys);
// filter out any entries already in retryEntries so we don't decrypt them twice
const existingIds = retryEntries.reduce((ids, e) => {ids.add(e.id); return ids;}, new Set());
retryTimelineEntries = retryTimelineEntries.filter(e => !existingIds.has(e.id));
// make copies so we don't modify the original entry in writeSync, before the afterSync stage
const retryTimelineEntriesCopies = retryTimelineEntries.map(e => e.clone());
// add to other retry entries
retryEntries = retryEntries.concat(retryTimelineEntriesCopies);
}
return retryEntries;
}
async prepareSync(roomResponse, membership, newKeys, txn, log) {
log.set("id", this.id);
if (newKeys) {
@ -192,25 +192,21 @@ export class Room extends EventEmitter {
let retryEntries;
let decryptPreparation;
if (roomEncryption) {
// also look for events in timeline here
let events = roomResponse?.timeline?.events || [];
// when new keys arrive, also see if any events currently loaded in the timeline
// can now be retried to decrypt
if (this._timeline && newKeys) {
retryEntries = roomEncryption.filterEventEntriesForKeys(
this._timeline.remoteEntries, newKeys);
let eventsToDecrypt = roomResponse?.timeline?.events || [];
// when new keys arrive, also see if any older events can now be retried to decrypt
if (newKeys) {
retryEntries = await this._getSyncRetryDecryptEntries(newKeys, txn);
if (retryEntries.length) {
log.set("retry", retryEntries.length);
events = events.concat(retryEntries.map(entry => entry.event));
eventsToDecrypt = eventsToDecrypt.concat(retryEntries.map(entry => entry.event));
}
}
if (events.length) {
const eventsToDecrypt = events.filter(event => {
eventsToDecrypt = eventsToDecrypt.filter(event => {
return event?.type === EVENT_ENCRYPTED_TYPE;
});
if (eventsToDecrypt.length) {
decryptPreparation = await roomEncryption.prepareDecryptAll(
eventsToDecrypt, newKeys, DecryptionSource.Sync, this._isTimelineOpen, txn);
eventsToDecrypt, newKeys, DecryptionSource.Sync, txn);
}
}
@ -225,7 +221,7 @@ export class Room extends EventEmitter {
async afterPrepareSync(preparation, parentLog) {
if (preparation.decryptPreparation) {
await parentLog.wrap("afterPrepareSync decrypt", async log => {
await parentLog.wrap("decrypt", async log => {
log.set("id", this.id);
preparation.decryptChanges = await preparation.decryptPreparation.decrypt();
preparation.decryptPreparation = null;
@ -236,28 +232,37 @@ export class Room extends EventEmitter {
/** @package */
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption, retryEntries}, txn, log) {
log.set("id", this.id);
const {entries, newLiveKey, memberChanges} =
const {entries: newEntries, newLiveKey, memberChanges} =
await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, txn, log), log.level.Detail);
let allEntries = newEntries;
if (decryptChanges) {
const decryption = await decryptChanges.write(txn);
log.set("decryptionResults", decryption.results.size);
log.set("decryptionErrors", decryption.errors.size);
if (this._isTimelineOpen) {
await decryption.verifySenders(txn);
}
decryption.applyToEntries(newEntries);
if (retryEntries?.length) {
// TODO: this will modify existing timeline entries (which we should not do in writeSync),
// but it is a temporary way of reattempting decryption while timeline is open
// won't need copies when tracking missing sessions properly
// prepend the retried entries, as we know they are older (not that it should matter much for the summary)
entries.unshift(...retryEntries);
decryption.applyToEntries(retryEntries);
allEntries = retryEntries.concat(allEntries);
}
decryption.applyToEntries(entries);
}
log.set("allEntries", allEntries.length);
let shouldFlushKeyShares = false;
// pass member changes to device tracker
if (roomEncryption && this.isTrackingMembers && memberChanges?.size) {
await roomEncryption.writeMemberChanges(memberChanges, txn);
shouldFlushKeyShares = await roomEncryption.writeMemberChanges(memberChanges, txn, log);
log.set("shouldFlushKeyShares", shouldFlushKeyShares);
}
// also apply (decrypted) timeline entries to the summary changes
summaryChanges = summaryChanges.applyTimelineEntries(
entries, isInitialSync, !this._isTimelineOpen, this._user.id);
allEntries, isInitialSync, !this._isTimelineOpen, this._user.id);
// write summary changes, and unset if nothing was actually changed
summaryChanges = this._summary.writeData(summaryChanges, txn);
if (summaryChanges) {
log.set("summaryChanges", summaryChanges.diff(this._summary.data));
}
// fetch new members while we have txn open,
// but don't make any in-memory changes yet
let heroChanges;
@ -275,11 +280,13 @@ export class Room extends EventEmitter {
return {
summaryChanges,
roomEncryption,
newAndUpdatedEntries: entries,
newEntries,
updatedEntries: retryEntries || [],
newLiveKey,
removedPendingEvents,
memberChanges,
heroChanges,
shouldFlushKeyShares,
};
}
@ -288,7 +295,12 @@ export class Room extends EventEmitter {
* Called with the changes returned from `writeSync` to apply them and emit changes.
* No storage or network operations should be done here.
*/
afterSync({summaryChanges, newAndUpdatedEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}, log) {
afterSync(changes, log) {
const {
summaryChanges, newEntries, updatedEntries, newLiveKey,
removedPendingEvents, memberChanges,
heroChanges, roomEncryption
} = changes;
log.set("id", this.id);
this._syncWriter.afterSync(newLiveKey);
this._setEncryption(roomEncryption);
@ -321,18 +333,21 @@ export class Room extends EventEmitter {
this._emitUpdate();
}
if (this._timeline) {
this._timeline.appendLiveEntries(newAndUpdatedEntries);
// these should not be added if not already there
this._timeline.replaceEntries(updatedEntries);
this._timeline.addOrReplaceEntries(newEntries);
}
if (this._observedEvents) {
this._observedEvents.updateEvents(newAndUpdatedEntries);
this._observedEvents.updateEvents(updatedEntries);
this._observedEvents.updateEvents(newEntries);
}
if (removedPendingEvents) {
this._sendQueue.emitRemovals(removedPendingEvents);
}
}
needsAfterSyncCompleted({memberChanges}) {
return this._roomEncryption?.needsToShareKeys(memberChanges);
needsAfterSyncCompleted({shouldFlushKeyShares}) {
return shouldFlushKeyShares;
}
/**
@ -483,7 +498,7 @@ export class Room extends EventEmitter {
this._sendQueue.emitRemovals(removedPendingEvents);
}
if (this._timeline) {
this._timeline.addGapEntries(gapResult.entries);
this._timeline.addOrReplaceEntries(gapResult.entries);
}
});
}
@ -548,6 +563,10 @@ export class Room extends EventEmitter {
enableSessionBackup(sessionBackup) {
this._roomEncryption?.enableSessionBackup(sessionBackup);
// TODO: do we really want to do this every time you open the app?
if (this._timeline) {
this._roomEncryption.restoreMissingSessionsFromBackup(this._timeline.remoteEntries);
}
}
get isTrackingMembers() {

View file

@ -122,36 +122,6 @@ function processTimelineEvent(data, eventEntry, isInitialSync, canMarkUnread, ow
data = data.cloneIfNeeded();
data.isUnread = true;
}
const {content} = eventEntry;
const body = content?.body;
const msgtype = content?.msgtype;
if (msgtype === "m.text" && !eventEntry.isEncrypted) {
data = data.cloneIfNeeded();
data.lastMessageBody = body;
}
}
// store the event key of the last decrypted event so when decryption does succeed,
// we can attempt to re-decrypt from this point to update the room summary
if (!!data.encryption && eventEntry.isEncrypted && eventEntry.isDecrypted) {
let hasLargerEventKey = true;
if (data.lastDecryptedEventKey) {
try {
hasLargerEventKey = eventEntry.compare(data.lastDecryptedEventKey) > 0;
} catch (err) {
// TODO: load the fragments in between here?
// this could happen if an earlier event gets decrypted that
// is in a fragment different from the live one and the timeline is not open.
// In this case, we will just read too many events once per app load
// and then keep the mapping in memory. When eventually an event is decrypted in
// the live fragment, this should stop failing and the event key will be written.
hasLargerEventKey = false;
}
}
if (hasLargerEventKey) {
data = data.cloneIfNeeded();
const {fragmentId, entryIndex} = eventEntry;
data.lastDecryptedEventKey = {fragmentId, entryIndex};
}
}
return data;
}
@ -182,12 +152,9 @@ class SummaryData {
constructor(copy, roomId) {
this.roomId = copy ? copy.roomId : roomId;
this.name = copy ? copy.name : null;
this.lastMessageBody = copy ? copy.lastMessageBody : null;
this.lastMessageTimestamp = copy ? copy.lastMessageTimestamp : null;
this.isUnread = copy ? copy.isUnread : false;
this.encryption = copy ? copy.encryption : null;
this.lastDecryptedEventKey = copy ? copy.lastDecryptedEventKey : null;
this.isDirectMessage = copy ? copy.isDirectMessage : false;
this.membership = copy ? copy.membership : null;
this.inviteCount = copy ? copy.inviteCount : 0;
this.joinCount = copy ? copy.joinCount : 0;
@ -202,6 +169,18 @@ class SummaryData {
this.cloned = copy ? true : false;
}
diff(other) {
const props = Object.getOwnPropertyNames(this);
return props.reduce((diff, prop) => {
if (prop !== "cloned") {
if (this[prop] !== other[prop]) {
diff[prop] = this[prop];
}
}
return diff;
}, {});
}
cloneIfNeeded() {
if (this.cloned) {
return this;

View file

@ -60,16 +60,8 @@ export class Timeline {
}
}
// TODO: should we rather have generic methods for
// - adding new entries
// - updating existing entries (redaction, relations)
/** @package */
appendLiveEntries(newEntries) {
this._remoteEntries.setManySorted(newEntries);
}
/** @package */
addGapEntries(newEntries) {
addOrReplaceEntries(newEntries) {
this._remoteEntries.setManySorted(newEntries);
}

View file

@ -25,6 +25,13 @@ export class EventEntry extends BaseEntry {
this._decryptionResult = null;
}
clone() {
const clone = new EventEntry(this._eventEntry, this._fragmentIdComparer);
clone._decryptionResult = this._decryptionResult;
clone._decryptionError = this._decryptionError;
return clone;
}
get event() {
return this._eventEntry.event;
}

View file

@ -41,7 +41,7 @@ class ReaderRequest {
* Raw because it doesn't do decryption and in the future it should not read relations either.
* It is just about reading entries and following fragment links
*/
export async function readRawTimelineEntriesWithTxn(roomId, eventKey, direction, amount, fragmentIdComparer, txn) {
async function readRawTimelineEntriesWithTxn(roomId, eventKey, direction, amount, fragmentIdComparer, txn) {
let entries = [];
const timelineStore = txn.timelineEvents;
const fragmentStore = txn.timelineFragments;