WIP to store missing session event ids
This commit is contained in:
parent
3fa2d22015
commit
c6db23bcfb
8 changed files with 217 additions and 264 deletions
|
@ -248,6 +248,7 @@ export class Sync {
|
|||
return this._storage.readTxn([
|
||||
storeNames.olmSessions,
|
||||
storeNames.inboundGroupSessions,
|
||||
storeNames.timelineEvents // to read events that can now be decrypted
|
||||
]);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,40 +1,41 @@
|
|||
## Integratation within the sync lifetime cycle
|
||||
|
||||
### prepareSync
|
||||
### session.prepareSync
|
||||
|
||||
Decrypt any device messages, and turn them into RoomKey instances.
|
||||
Any rooms that are not in the sync response but for which we receive keys will be included in the rooms to sync.
|
||||
|
||||
Runs before any room.prepareSync, so the new room keys can be passed to each room prepareSync to use in decryption.
|
||||
|
||||
### room.prepareSync
|
||||
|
||||
The session can start its own read/write transactions here, rooms only read from a shared transaction
|
||||
|
||||
- session
|
||||
- device handler
|
||||
- txn
|
||||
- write pending encrypted
|
||||
- txn
|
||||
- olm decryption read
|
||||
- olm async decryption
|
||||
- dispatch to worker
|
||||
- txn
|
||||
- olm decryption write / remove pending encrypted
|
||||
- rooms (with shared read txn)
|
||||
- megolm decryption read
|
||||
- megolm decryption read using any new keys decrypted by the session.
|
||||
|
||||
### afterPrepareSync
|
||||
### room.afterPrepareSync
|
||||
|
||||
- rooms
|
||||
- megolm async decryption
|
||||
- dispatch to worker
|
||||
|
||||
### writeSync
|
||||
### room.writeSync
|
||||
|
||||
- rooms (with shared readwrite txn)
|
||||
- megolm decryption write, yielding decrypted events
|
||||
- use decrypted events to write room summary
|
||||
|
||||
### afterSync
|
||||
### session.writeSync
|
||||
|
||||
- writes any room keys that were received
|
||||
|
||||
### room.afterSync
|
||||
|
||||
- rooms
|
||||
- emit changes
|
||||
|
||||
### afterSyncCompleted
|
||||
### room.afterSyncCompleted
|
||||
|
||||
- session
|
||||
- e2ee account
|
||||
|
|
|
@ -15,7 +15,7 @@ limitations under the License.
|
|||
*/
|
||||
|
||||
import {MEGOLM_ALGORITHM, DecryptionSource} from "./common.js";
|
||||
import {groupBy} from "../../utils/groupBy.js";
|
||||
import {groupEventsBySession} from "./megolm/decryption/utils.js";
|
||||
import {mergeMap} from "../../utils/mergeMap.js";
|
||||
import {makeTxnId} from "../common.js";
|
||||
|
||||
|
@ -25,15 +25,6 @@ const ENCRYPTED_TYPE = "m.room.encrypted";
|
|||
// note that encrypt could still create a new session
|
||||
const MIN_PRESHARE_INTERVAL = 60 * 1000; // 1min
|
||||
|
||||
function encodeMissingSessionKey(senderKey, sessionId) {
|
||||
return `${senderKey}|${sessionId}`;
|
||||
}
|
||||
|
||||
function decodeMissingSessionKey(key) {
|
||||
const [senderKey, sessionId] = key.split("|");
|
||||
return {senderKey, sessionId};
|
||||
}
|
||||
|
||||
export class RoomEncryption {
|
||||
constructor({room, deviceTracker, olmEncryption, megolmEncryption, megolmDecryption, encryptionParams, storage, sessionBackup, notifyMissingMegolmSession, clock}) {
|
||||
this._room = room;
|
||||
|
@ -43,32 +34,40 @@ export class RoomEncryption {
|
|||
this._megolmDecryption = megolmDecryption;
|
||||
// content of the m.room.encryption event
|
||||
this._encryptionParams = encryptionParams;
|
||||
|
||||
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
|
||||
this._megolmSyncCache = this._megolmDecryption.createSessionCache(1);
|
||||
// session => event ids of messages we tried to decrypt and the session was missing
|
||||
this._missingSessions = new SessionToEventIdsMap();
|
||||
// sessions that may or may not be missing, but that while
|
||||
// looking for a particular session came up as a candidate and were
|
||||
// added to the cache to prevent further lookups from storage
|
||||
this._missingSessionCandidates = new SessionToEventIdsMap();
|
||||
// caches devices to verify events
|
||||
this._senderDeviceCache = new Map();
|
||||
this._storage = storage;
|
||||
this._sessionBackup = sessionBackup;
|
||||
this._notifyMissingMegolmSession = notifyMissingMegolmSession;
|
||||
this._clock = clock;
|
||||
this._disposed = false;
|
||||
this._isFlushingRoomKeyShares = false;
|
||||
this._lastKeyPreShareTime = null;
|
||||
this._disposed = false;
|
||||
}
|
||||
|
||||
async enableSessionBackup(sessionBackup) {
|
||||
enableSessionBackup(sessionBackup) {
|
||||
if (this._sessionBackup) {
|
||||
return;
|
||||
}
|
||||
this._sessionBackup = sessionBackup;
|
||||
for(const {senderKey, sessionId} of this._missingSessions.getSessions()) {
|
||||
await this._requestMissingSessionFromBackup(senderKey, sessionId, null);
|
||||
}
|
||||
|
||||
async restoreMissingSessionsFromBackup(events) {
|
||||
const eventsBySession = groupEventsBySession(events);
|
||||
const groups = Array.from(eventsBySession.values());
|
||||
const txn = this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
|
||||
const hasSessions = await Promise.all(groups.map(async group => {
|
||||
return this._megolmDecryption.hasSession(this._room.id, group.senderKey, group.sessionId, txn);
|
||||
}));
|
||||
const missingSessions = groups.filter((_, i) => !hasSessions[i]);
|
||||
if (missingSessions.length) {
|
||||
// start with last sessions which should be for the last items in the timeline
|
||||
for (var i = missingSessions.length - 1; i >= 0; i--) {
|
||||
const session = missingSessions[i];
|
||||
await this._requestMissingSessionFromBackup(session.senderKey, session.sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,16 +128,48 @@ export class RoomEncryption {
|
|||
return new DecryptionPreparation(preparation, errors, source, this, events);
|
||||
}
|
||||
|
||||
async _processDecryptionResults(events, results, errors, flags, txn) {
|
||||
for (const event of events) {
|
||||
async _processDecryptionResults(events, results, errors, source, txn) {
|
||||
const missingSessionEvents = events.filter(event => {
|
||||
const error = errors.get(event.event_id);
|
||||
if (error?.code === "MEGOLM_NO_SESSION") {
|
||||
this._addMissingSessionEvent(event, flags.source);
|
||||
} else {
|
||||
this._missingSessions.removeEvent(event);
|
||||
this._missingSessionCandidates.removeEvent(event);
|
||||
}
|
||||
return error?.code === "MEGOLM_NO_SESSION";
|
||||
});
|
||||
if (!missingSessionEvents.length) {
|
||||
return;
|
||||
}
|
||||
const eventsBySession = groupEventsBySession(events);
|
||||
if (source === DecryptionSource.Sync) {
|
||||
await Promise.all(Array.from(eventsBySession.values()).map(async group => {
|
||||
const eventIds = group.events.map(e => e.event_id);
|
||||
return this._megolmDecryption.addMissingKeyEventIds(
|
||||
this._room.id, group.senderKey, group.sessionId, eventIds, txn);
|
||||
}));
|
||||
}
|
||||
|
||||
// TODO: do proper logging here
|
||||
// run detached
|
||||
Promise.resolve().then(async () => {
|
||||
// if the message came from sync, wait 10s to see if the room key arrives late,
|
||||
// and only after that proceed to request from backup
|
||||
if (source === DecryptionSource.Sync) {
|
||||
await this._clock.createTimeout(10000).elapsed();
|
||||
if (this._disposed) {
|
||||
return;
|
||||
}
|
||||
// now check which sessions have been received already
|
||||
const txn = this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
|
||||
await Promise.all(Array.from(eventsBySession).map(async ([key, group]) => {
|
||||
if (await this._megolmDecryption.hasSession(this._room.id, group.senderKey, group.sessionId, txn)) {
|
||||
eventsBySession.delete(key);
|
||||
}
|
||||
}));
|
||||
}
|
||||
await Promise.all(Array.from(eventsBySession.values()).map(group => {
|
||||
return this._requestMissingSessionFromBackup(group.senderKey, group.sessionId);
|
||||
}));
|
||||
}).catch(err => {
|
||||
console.log("failed to fetch missing session from key backup");
|
||||
console.error(err);
|
||||
});
|
||||
}
|
||||
|
||||
async _verifyDecryptionResult(result, txn) {
|
||||
|
@ -154,24 +185,7 @@ export class RoomEncryption {
|
|||
}
|
||||
}
|
||||
|
||||
_addMissingSessionEvent(event, source) {
|
||||
const isNewSession = this._missingSessions.addEvent(event);
|
||||
if (isNewSession) {
|
||||
const senderKey = event.content?.["sender_key"];
|
||||
const sessionId = event.content?.["session_id"];
|
||||
this._requestMissingSessionFromBackup(senderKey, sessionId, source);
|
||||
}
|
||||
}
|
||||
|
||||
async _requestMissingSessionFromBackup(senderKey, sessionId, source) {
|
||||
// if the message came from sync, wait 10s to see if the room key arrives,
|
||||
// and only after that proceed to request from backup
|
||||
if (source === DecryptionSource.Sync) {
|
||||
await this._clock.createTimeout(10000).elapsed();
|
||||
if (this._disposed || !this._missingSessions.hasSession(senderKey, sessionId)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
async _requestMissingSessionFromBackup(senderKey, sessionId) {
|
||||
// show prompt to enable secret storage
|
||||
if (!this._sessionBackup) {
|
||||
this._notifyMissingMegolmSession();
|
||||
|
@ -211,43 +225,19 @@ export class RoomEncryption {
|
|||
console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`Could not get session ${sessionId} from backup`, err);
|
||||
if (!(err.name === "HomeServerError" && err.errcode === "M_NOT_FOUND")) {
|
||||
console.error(`Could not get session ${sessionId} from backup`, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {RoomKey} roomKeys
|
||||
* @return {Array<string>} the event ids that should be retried to decrypt
|
||||
* @param {Transaction} txn
|
||||
* @return {Promise<Array<string>>} the event ids that should be retried to decrypt
|
||||
*/
|
||||
getEventIdsForRoomKey(roomKey) {
|
||||
// TODO: we could concat both results here, and only put stuff in
|
||||
// candidates if it is not in missing sessions to use a bit less memory
|
||||
let eventIds = this._missingSessions.getEventIds(roomKey.senderKey, roomKey.sessionId);
|
||||
if (!eventIds) {
|
||||
eventIds = this._missingSessionCandidates.getEventIds(roomKey.senderKey, roomKey.sessionId);
|
||||
}
|
||||
return eventIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* caches mapping of session to event id of all encrypted candidates
|
||||
* and filters to return only the candidates for the given room key
|
||||
*/
|
||||
findAndCacheEntriesForRoomKey(roomKey, candidateEntries) {
|
||||
const matches = [];
|
||||
|
||||
for (const entry of candidateEntries) {
|
||||
if (entry.eventType === ENCRYPTED_TYPE) {
|
||||
this._missingSessionCandidates.addEvent(entry.event);
|
||||
const senderKey = entry.event?.content?.["sender_key"];
|
||||
const sessionId = entry.event?.content?.["session_id"];
|
||||
if (senderKey === roomKey.senderKey && sessionId === roomKey.sessionId) {
|
||||
matches.push(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return matches;
|
||||
getEventIdsForMissingKey(roomKey, txn) {
|
||||
return this._megolmDecryption.getEventIdsForMissingKey(this._room.id, roomKey.senderKey, roomKey.sessionId, txn);
|
||||
}
|
||||
|
||||
/** shares the encryption key for the next message if needed */
|
||||
|
@ -485,58 +475,3 @@ class BatchDecryptionResult {
|
|||
}));
|
||||
}
|
||||
}
|
||||
|
||||
class SessionToEventIdsMap {
|
||||
constructor() {
|
||||
this._eventIdsBySession = new Map();
|
||||
}
|
||||
|
||||
addEvent(event) {
|
||||
let isNewSession = false;
|
||||
const senderKey = event.content?.["sender_key"];
|
||||
const sessionId = event.content?.["session_id"];
|
||||
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||
let eventIds = this._eventIdsBySession.get(key);
|
||||
// new missing session
|
||||
if (!eventIds) {
|
||||
eventIds = new Set();
|
||||
this._eventIdsBySession.set(key, eventIds);
|
||||
isNewSession = true;
|
||||
}
|
||||
eventIds.add(event.event_id);
|
||||
return isNewSession;
|
||||
}
|
||||
|
||||
getEventIds(senderKey, sessionId) {
|
||||
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||
const entriesForSession = this._eventIdsBySession.get(key);
|
||||
if (entriesForSession) {
|
||||
return [...entriesForSession];
|
||||
}
|
||||
}
|
||||
|
||||
getSessions() {
|
||||
return Array.from(this._eventIdsBySession.keys()).map(decodeMissingSessionKey);
|
||||
}
|
||||
|
||||
hasSession(senderKey, sessionId) {
|
||||
return this._eventIdsBySession.has(encodeMissingSessionKey(senderKey, sessionId));
|
||||
}
|
||||
|
||||
removeEvent(event) {
|
||||
let hasRemovedSession = false;
|
||||
const senderKey = event.content?.["sender_key"];
|
||||
const sessionId = event.content?.["session_id"];
|
||||
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||
let eventIds = this._eventIdsBySession.get(key);
|
||||
if (eventIds) {
|
||||
if (eventIds.delete(event.event_id)) {
|
||||
if (!eventIds.length) {
|
||||
this._eventIdsBySession.delete(key);
|
||||
hasRemovedSession = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return hasRemovedSession;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,25 +15,13 @@ limitations under the License.
|
|||
*/
|
||||
|
||||
import {DecryptionError} from "../common.js";
|
||||
import {groupBy} from "../../../utils/groupBy.js";
|
||||
import * as RoomKey from "./decryption/RoomKey.js";
|
||||
import {SessionInfo} from "./decryption/SessionInfo.js";
|
||||
import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
|
||||
import {SessionDecryption} from "./decryption/SessionDecryption.js";
|
||||
import {SessionCache} from "./decryption/SessionCache.js";
|
||||
import {MEGOLM_ALGORITHM} from "../common.js";
|
||||
|
||||
function getSenderKey(event) {
|
||||
return event.content?.["sender_key"];
|
||||
}
|
||||
|
||||
function getSessionId(event) {
|
||||
return event.content?.["session_id"];
|
||||
}
|
||||
|
||||
function getCiphertext(event) {
|
||||
return event.content?.ciphertext;
|
||||
}
|
||||
import {validateEvent, groupEventsBySession} from "./decryption/utils.js";
|
||||
|
||||
export class Decryption {
|
||||
constructor({pickleKey, olm, olmWorker}) {
|
||||
|
@ -46,6 +34,37 @@ export class Decryption {
|
|||
return new SessionCache(size);
|
||||
}
|
||||
|
||||
async addMissingKeyEventIds(roomId, senderKey, sessionId, eventIds, txn) {
|
||||
let sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
|
||||
// we never want to overwrite an existing key
|
||||
if (sessionEntry?.session) {
|
||||
return;
|
||||
}
|
||||
if (sessionEntry) {
|
||||
const uniqueEventIds = new Set(sessionEntry.eventIds);
|
||||
for (const id of eventIds) {
|
||||
uniqueEventIds.add(id);
|
||||
}
|
||||
sessionEntry.eventIds = Array.from(uniqueEventIds);
|
||||
} else {
|
||||
sessionEntry = {roomId, senderKey, sessionId, eventIds};
|
||||
}
|
||||
txn.inboundGroupSessions.set(sessionEntry);
|
||||
}
|
||||
|
||||
async getEventIdsForMissingKey(roomId, senderKey, sessionId, txn) {
|
||||
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
|
||||
if (sessionEntry && !sessionEntry.session) {
|
||||
return sessionEntry.eventIds;
|
||||
}
|
||||
}
|
||||
|
||||
async hasSession(roomId, senderKey, sessionId, txn) {
|
||||
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
|
||||
const isValidSession = typeof sessionEntry?.session === "string";
|
||||
return isValidSession;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads all the state from storage to be able to decrypt the given events.
|
||||
* Decryption can then happen outside of a storage transaction.
|
||||
|
@ -61,28 +80,22 @@ export class Decryption {
|
|||
const validEvents = [];
|
||||
|
||||
for (const event of events) {
|
||||
const isValid = typeof getSenderKey(event) === "string" &&
|
||||
typeof getSessionId(event) === "string" &&
|
||||
typeof getCiphertext(event) === "string";
|
||||
if (isValid) {
|
||||
if (validateEvent(event)) {
|
||||
validEvents.push(event);
|
||||
} else {
|
||||
errors.set(event.event_id, new DecryptionError("MEGOLM_INVALID_EVENT", event))
|
||||
}
|
||||
}
|
||||
|
||||
const eventsBySession = groupBy(validEvents, event => {
|
||||
return `${getSenderKey(event)}|${getSessionId(event)}`;
|
||||
});
|
||||
const eventsBySession = groupEventsBySession(validEvents);
|
||||
|
||||
const sessionDecryptions = [];
|
||||
await Promise.all(Array.from(eventsBySession.values()).map(async eventsForSession => {
|
||||
const firstEvent = eventsForSession[0];
|
||||
const sessionInfo = await this._getSessionInfoForEvent(roomId, firstEvent, newKeys, sessionCache, txn);
|
||||
await Promise.all(Array.from(eventsBySession.values()).map(async group => {
|
||||
const sessionInfo = await this._getSessionInfo(roomId, group.senderKey, group.sessionId, newKeys, sessionCache, txn);
|
||||
if (sessionInfo) {
|
||||
sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._olmWorker));
|
||||
sessionDecryptions.push(new SessionDecryption(sessionInfo, group.events, this._olmWorker));
|
||||
} else {
|
||||
for (const event of eventsForSession) {
|
||||
for (const event of group.events) {
|
||||
errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event));
|
||||
}
|
||||
}
|
||||
|
@ -91,9 +104,7 @@ export class Decryption {
|
|||
return new DecryptionPreparation(roomId, sessionDecryptions, errors);
|
||||
}
|
||||
|
||||
async _getSessionInfoForEvent(roomId, event, newKeys, sessionCache, txn) {
|
||||
const senderKey = getSenderKey(event);
|
||||
const sessionId = getSessionId(event);
|
||||
async _getSessionInfo(roomId, senderKey, sessionId, newKeys, sessionCache, txn) {
|
||||
let sessionInfo;
|
||||
if (newKeys) {
|
||||
const key = newKeys.find(k => k.roomId === roomId && k.senderKey === senderKey && k.sessionId === sessionId);
|
||||
|
@ -110,7 +121,7 @@ export class Decryption {
|
|||
}
|
||||
if (!sessionInfo) {
|
||||
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
|
||||
if (sessionEntry) {
|
||||
if (sessionEntry && sessionEntry.session) {
|
||||
let session = new this._olm.InboundGroupSession();
|
||||
try {
|
||||
session.unpickle(this._pickleKey, sessionEntry.session);
|
||||
|
|
|
@ -33,7 +33,7 @@ export class BaseRoomKey {
|
|||
async _isBetterThanKnown(session, olm, pickleKey, txn) {
|
||||
let isBetter = true;
|
||||
const existingSessionEntry = await txn.inboundGroupSessions.get(this.roomId, this.senderKey, this.sessionId);
|
||||
if (existingSessionEntry) {
|
||||
if (existingSessionEntry?.session) {
|
||||
const existingSession = new olm.InboundGroupSession();
|
||||
try {
|
||||
existingSession.unpickle(pickleKey, existingSessionEntry.session);
|
||||
|
|
57
src/matrix/e2ee/megolm/decryption/utils.js
Normal file
57
src/matrix/e2ee/megolm/decryption/utils.js
Normal file
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {groupByWithCreator} from "../../../../utils/groupBy.js";
|
||||
|
||||
function getSenderKey(event) {
|
||||
return event.content?.["sender_key"];
|
||||
}
|
||||
|
||||
function getSessionId(event) {
|
||||
return event.content?.["session_id"];
|
||||
}
|
||||
|
||||
function getCiphertext(event) {
|
||||
return event.content?.ciphertext;
|
||||
}
|
||||
|
||||
export function validateEvent(event) {
|
||||
return typeof getSenderKey(event) === "string" &&
|
||||
typeof getSessionId(event) === "string" &&
|
||||
typeof getCiphertext(event) === "string";
|
||||
}
|
||||
|
||||
class SessionKeyGroup {
|
||||
constructor() {
|
||||
this.events = [];
|
||||
}
|
||||
|
||||
get senderKey() {
|
||||
return getSenderKey(this.events[0]);
|
||||
}
|
||||
|
||||
get sessionId() {
|
||||
return getSessionId(this.events[0]);
|
||||
}
|
||||
}
|
||||
|
||||
export function groupEventsBySession(events) {
|
||||
return groupByWithCreator(events,
|
||||
event => `${getSenderKey(event)}|${getSessionId(event)}`,
|
||||
() => new SessionKeyGroup(),
|
||||
(group, event) => group.events.push(event)
|
||||
);
|
||||
}
|
|
@ -58,56 +58,36 @@ export class Room extends EventEmitter {
|
|||
this._observedEvents = null;
|
||||
}
|
||||
|
||||
_readRetryDecryptCandidateEntries(sinceEventKey, txn) {
|
||||
if (sinceEventKey) {
|
||||
return readRawTimelineEntriesWithTxn(this._roomId, sinceEventKey,
|
||||
Direction.Forward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn);
|
||||
} else {
|
||||
// all messages for room ...
|
||||
// if you haven't decrypted any message in a room yet,
|
||||
// it's unlikely you will have tons of them.
|
||||
// so this should be fine as a last resort
|
||||
return readRawTimelineEntriesWithTxn(this._roomId, this._syncWriter.lastMessageKey,
|
||||
Direction.Backward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn);
|
||||
}
|
||||
}
|
||||
|
||||
async notifyRoomKey(roomKey) {
|
||||
if (!this._roomEncryption) {
|
||||
return;
|
||||
}
|
||||
const retryEventIds = this._roomEncryption.getEventIdsForRoomKey(roomKey);
|
||||
const stores = [
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.inboundGroupSessions,
|
||||
];
|
||||
let txn;
|
||||
let retryEntries;
|
||||
async _getRetryDecryptEntriesForKey(roomKey, txn) {
|
||||
const retryEventIds = await this._roomEncryption.getEventIdsForMissingKey(roomKey, txn);
|
||||
const retryEntries = [];
|
||||
if (retryEventIds) {
|
||||
retryEntries = [];
|
||||
txn = this._storage.readTxn(stores);
|
||||
for (const eventId of retryEventIds) {
|
||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||
if (storageEntry) {
|
||||
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// we only look for messages since lastDecryptedEventKey because
|
||||
// the timeline must be closed (otherwise getEventIdsForRoomKey would have found the event ids)
|
||||
// and to update the summary we only care about events since lastDecryptedEventKey
|
||||
const key = this._summary.data.lastDecryptedEventKey;
|
||||
// key might be missing if we haven't decrypted any events in this room
|
||||
const sinceEventKey = key && new EventKey(key.fragmentId, key.entryIndex);
|
||||
// check we have not already decrypted the most recent event in the room
|
||||
// otherwise we know that the messages for this room key will not update the room summary
|
||||
if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) {
|
||||
txn = this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments));
|
||||
const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn);
|
||||
retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries);
|
||||
}
|
||||
}
|
||||
if (retryEntries?.length) {
|
||||
return retryEntries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for keys received from other sources than sync, like key backup.
|
||||
* @internal
|
||||
* @param {RoomKey} roomKey
|
||||
* @return {Promise}
|
||||
*/
|
||||
async notifyRoomKey(roomKey) {
|
||||
if (!this._roomEncryption) {
|
||||
return;
|
||||
}
|
||||
const txn = this._storage.readTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.inboundGroupSessions,
|
||||
]);
|
||||
const retryEntries = this._getRetryDecryptEntriesForKey(roomKey, txn);
|
||||
if (retryEntries.length) {
|
||||
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
|
||||
// this will close txn while awaiting decryption
|
||||
await decryptRequest.complete();
|
||||
|
@ -197,11 +177,10 @@ export class Room extends EventEmitter {
|
|||
if (roomEncryption) {
|
||||
// also look for events in timeline here
|
||||
let events = roomResponse?.timeline?.events || [];
|
||||
// when new keys arrive, also see if any events currently loaded in the timeline
|
||||
// can now be retried to decrypt
|
||||
if (this._timeline && newKeys) {
|
||||
retryEntries = roomEncryption.filterEventEntriesForKeys(
|
||||
this._timeline.remoteEntries, newKeys);
|
||||
// when new keys arrive, also see if any events that can now be retried to decrypt
|
||||
if (newKeys) {
|
||||
const nestedEntries = await Promise.all(newKeys.map(key => this._getRetryDecryptEntriesForKey(key, txn)));
|
||||
const retryEntries = nestedEntries.reduce((allEntries, entries) => allEntries.concat(entries), []);
|
||||
if (retryEntries.length) {
|
||||
log.set("retry", retryEntries.length);
|
||||
events = events.concat(retryEntries.map(entry => entry.event));
|
||||
|
@ -228,7 +207,7 @@ export class Room extends EventEmitter {
|
|||
|
||||
async afterPrepareSync(preparation, parentLog) {
|
||||
if (preparation.decryptPreparation) {
|
||||
await parentLog.wrap("afterPrepareSync decrypt", async log => {
|
||||
await parentLog.wrap("decrypt", async log => {
|
||||
log.set("id", this.id);
|
||||
preparation.decryptChanges = await preparation.decryptPreparation.decrypt();
|
||||
preparation.decryptPreparation = null;
|
||||
|
@ -247,10 +226,8 @@ export class Room extends EventEmitter {
|
|||
await decryption.verifySenders(txn);
|
||||
}
|
||||
if (retryEntries?.length) {
|
||||
// TODO: this will modify existing timeline entries (which we should not do in writeSync),
|
||||
// but it is a temporary way of reattempting decryption while timeline is open
|
||||
// won't need copies when tracking missing sessions properly
|
||||
// prepend the retried entries, as we know they are older (not that it should matter much for the summary)
|
||||
// prepend the retried entries, as we know they are older
|
||||
// (not that it should matter much for the summary)
|
||||
entries.unshift(...retryEntries);
|
||||
}
|
||||
decryption.applyToEntries(entries);
|
||||
|
@ -554,6 +531,10 @@ export class Room extends EventEmitter {
|
|||
|
||||
enableSessionBackup(sessionBackup) {
|
||||
this._roomEncryption?.enableSessionBackup(sessionBackup);
|
||||
if (this._timeline) {
|
||||
const timelineEvents = this._timeline.remoteEntries.filter(e => e.event).map(e => e.event);
|
||||
this._roomEncryption.restoreMissingSessionsFromBackup(timelineEvents);
|
||||
}
|
||||
}
|
||||
|
||||
get isTrackingMembers() {
|
||||
|
|
|
@ -122,36 +122,6 @@ function processTimelineEvent(data, eventEntry, isInitialSync, canMarkUnread, ow
|
|||
data = data.cloneIfNeeded();
|
||||
data.isUnread = true;
|
||||
}
|
||||
const {content} = eventEntry;
|
||||
const body = content?.body;
|
||||
const msgtype = content?.msgtype;
|
||||
if (msgtype === "m.text" && !eventEntry.isEncrypted) {
|
||||
data = data.cloneIfNeeded();
|
||||
data.lastMessageBody = body;
|
||||
}
|
||||
}
|
||||
// store the event key of the last decrypted event so when decryption does succeed,
|
||||
// we can attempt to re-decrypt from this point to update the room summary
|
||||
if (!!data.encryption && eventEntry.isEncrypted && eventEntry.isDecrypted) {
|
||||
let hasLargerEventKey = true;
|
||||
if (data.lastDecryptedEventKey) {
|
||||
try {
|
||||
hasLargerEventKey = eventEntry.compare(data.lastDecryptedEventKey) > 0;
|
||||
} catch (err) {
|
||||
// TODO: load the fragments in between here?
|
||||
// this could happen if an earlier event gets decrypted that
|
||||
// is in a fragment different from the live one and the timeline is not open.
|
||||
// In this case, we will just read too many events once per app load
|
||||
// and then keep the mapping in memory. When eventually an event is decrypted in
|
||||
// the live fragment, this should stop failing and the event key will be written.
|
||||
hasLargerEventKey = false;
|
||||
}
|
||||
}
|
||||
if (hasLargerEventKey) {
|
||||
data = data.cloneIfNeeded();
|
||||
const {fragmentId, entryIndex} = eventEntry;
|
||||
data.lastDecryptedEventKey = {fragmentId, entryIndex};
|
||||
}
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
@ -182,12 +152,9 @@ class SummaryData {
|
|||
constructor(copy, roomId) {
|
||||
this.roomId = copy ? copy.roomId : roomId;
|
||||
this.name = copy ? copy.name : null;
|
||||
this.lastMessageBody = copy ? copy.lastMessageBody : null;
|
||||
this.lastMessageTimestamp = copy ? copy.lastMessageTimestamp : null;
|
||||
this.isUnread = copy ? copy.isUnread : false;
|
||||
this.encryption = copy ? copy.encryption : null;
|
||||
this.lastDecryptedEventKey = copy ? copy.lastDecryptedEventKey : null;
|
||||
this.isDirectMessage = copy ? copy.isDirectMessage : false;
|
||||
this.membership = copy ? copy.membership : null;
|
||||
this.inviteCount = copy ? copy.inviteCount : 0;
|
||||
this.joinCount = copy ? copy.joinCount : 0;
|
||||
|
|
Reference in a new issue