draft of falling back to reading entries since last decrypted event key
this change notifyRoomKey(s) to only take one room key at a time to simplify things
This commit is contained in:
parent
a8392dc684
commit
9d41e122a0
5 changed files with 165 additions and 76 deletions
|
@ -59,12 +59,12 @@ export class DeviceMessageHandler {
|
||||||
return {roomKeys};
|
return {roomKeys};
|
||||||
}
|
}
|
||||||
|
|
||||||
_applyDecryptChanges(rooms, {roomKeys}) {
|
async _applyDecryptChanges(rooms, {roomKeys}) {
|
||||||
if (roomKeys && roomKeys.length) {
|
if (Array.isArray(roomKeys)) {
|
||||||
const roomKeysByRoom = groupBy(roomKeys, s => s.roomId);
|
for (const roomKey of roomKeys) {
|
||||||
for (const [roomId, roomKeys] of roomKeysByRoom) {
|
const room = rooms.get(roomKey.roomId);
|
||||||
const room = rooms.get(roomId);
|
// TODO: this is less parallized than it could be (like sync)
|
||||||
room?.notifyRoomKeys(roomKeys);
|
await room?.notifyRoomKey(roomKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,7 +101,7 @@ export class DeviceMessageHandler {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
await txn.complete();
|
await txn.complete();
|
||||||
this._applyDecryptChanges(rooms, changes);
|
await this._applyDecryptChanges(rooms, changes);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _getPendingEvents(txn) {
|
async _getPendingEvents(txn) {
|
||||||
|
|
|
@ -42,8 +42,12 @@ export class RoomEncryption {
|
||||||
|
|
||||||
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
|
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
|
||||||
this._megolmSyncCache = this._megolmDecryption.createSessionCache();
|
this._megolmSyncCache = this._megolmDecryption.createSessionCache();
|
||||||
// not `event_id`, but an internal event id passed in to the decrypt methods
|
// session => event ids of messages we tried to decrypt and the session was missing
|
||||||
this._eventIdsByMissingSession = new Map();
|
this._missingSessions = new SessionToEventIdsMap();
|
||||||
|
// sessions that may or may not be missing, but that while
|
||||||
|
// looking for a particular session came up as a candidate and were
|
||||||
|
// added to the cache to prevent further lookups from storage
|
||||||
|
this._missingSessionCandidates = new SessionToEventIdsMap();
|
||||||
this._senderDeviceCache = new Map();
|
this._senderDeviceCache = new Map();
|
||||||
this._storage = storage;
|
this._storage = storage;
|
||||||
this._sessionBackup = sessionBackup;
|
this._sessionBackup = sessionBackup;
|
||||||
|
@ -57,8 +61,7 @@ export class RoomEncryption {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this._sessionBackup = sessionBackup;
|
this._sessionBackup = sessionBackup;
|
||||||
for(const key of this._eventIdsByMissingSession.keys()) {
|
for(const {senderKey, sessionId} of this._missingSessions.getSessions()) {
|
||||||
const {senderKey, sessionId} = decodeMissingSessionKey(key);
|
|
||||||
await this._requestMissingSessionFromBackup(senderKey, sessionId, null);
|
await this._requestMissingSessionFromBackup(senderKey, sessionId, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,13 +118,17 @@ export class RoomEncryption {
|
||||||
if (customCache) {
|
if (customCache) {
|
||||||
customCache.dispose();
|
customCache.dispose();
|
||||||
}
|
}
|
||||||
return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this);
|
return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this, events);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _processDecryptionResults(results, errors, flags, txn) {
|
async _processDecryptionResults(events, results, errors, flags, txn) {
|
||||||
for (const error of errors.values()) {
|
for (const event of events) {
|
||||||
if (error.code === "MEGOLM_NO_SESSION") {
|
const error = errors.get(event.event_id);
|
||||||
this._addMissingSessionEvent(error.event, flags.source);
|
if (error?.code === "MEGOLM_NO_SESSION") {
|
||||||
|
this._addMissingSessionEvent(event, flags.source);
|
||||||
|
} else {
|
||||||
|
this._missingSessions.removeEvent(event);
|
||||||
|
this._missingSessionCandidates.removeEvent(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (flags.isTimelineOpen) {
|
if (flags.isTimelineOpen) {
|
||||||
|
@ -145,17 +152,12 @@ export class RoomEncryption {
|
||||||
}
|
}
|
||||||
|
|
||||||
_addMissingSessionEvent(event, source) {
|
_addMissingSessionEvent(event, source) {
|
||||||
const senderKey = event.content?.["sender_key"];
|
const isNewSession = this._missingSessions.addEvent(event);
|
||||||
const sessionId = event.content?.["session_id"];
|
if (isNewSession) {
|
||||||
const key = encodeMissingSessionKey(senderKey, sessionId);
|
const senderKey = event.content?.["sender_key"];
|
||||||
let eventIds = this._eventIdsByMissingSession.get(key);
|
const sessionId = event.content?.["session_id"];
|
||||||
// new missing session
|
|
||||||
if (!eventIds) {
|
|
||||||
this._requestMissingSessionFromBackup(senderKey, sessionId, source);
|
this._requestMissingSessionFromBackup(senderKey, sessionId, source);
|
||||||
eventIds = new Set();
|
|
||||||
this._eventIdsByMissingSession.set(key, eventIds);
|
|
||||||
}
|
}
|
||||||
eventIds.add(event.event_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async _requestMissingSessionFromBackup(senderKey, sessionId, source) {
|
async _requestMissingSessionFromBackup(senderKey, sessionId, source) {
|
||||||
|
@ -163,7 +165,7 @@ export class RoomEncryption {
|
||||||
// and only after that proceed to request from backup
|
// and only after that proceed to request from backup
|
||||||
if (source === DecryptionSource.Sync) {
|
if (source === DecryptionSource.Sync) {
|
||||||
await this._clock.createTimeout(10000).elapsed();
|
await this._clock.createTimeout(10000).elapsed();
|
||||||
if (this._disposed || !this._eventIdsByMissingSession.has(encodeMissingSessionKey(senderKey, sessionId))) {
|
if (this._disposed || !this._missingSessions.hasSession(senderKey, sessionId)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,8 +194,8 @@ export class RoomEncryption {
|
||||||
await txn.complete();
|
await txn.complete();
|
||||||
|
|
||||||
if (roomKey) {
|
if (roomKey) {
|
||||||
// this will call into applyRoomKeys below
|
// this will reattempt decryption
|
||||||
await this._room.notifyRoomKeys([roomKey]);
|
await this._room.notifyRoomKey(roomKey);
|
||||||
}
|
}
|
||||||
} else if (session?.algorithm) {
|
} else if (session?.algorithm) {
|
||||||
console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`);
|
console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`);
|
||||||
|
@ -212,18 +214,17 @@ export class RoomEncryption {
|
||||||
* @param {Array<RoomKeyDescription>} roomKeys
|
* @param {Array<RoomKeyDescription>} roomKeys
|
||||||
* @return {Array<string>} the event ids that should be retried to decrypt
|
* @return {Array<string>} the event ids that should be retried to decrypt
|
||||||
*/
|
*/
|
||||||
applyRoomKeys(roomKeys) {
|
getEventIdsForRoomKey(roomKey) {
|
||||||
// retry decryption with the new sessions
|
let eventIds = this._missingSessions.getEventIds(roomKey.senderKey, roomKey.sessionId);
|
||||||
const retryEventIds = [];
|
if (!eventIds) {
|
||||||
for (const roomKey of roomKeys) {
|
eventIds = this._missingSessionCandidates.getEventIds(roomKey.senderKey, roomKey.sessionId);
|
||||||
const key = encodeMissingSessionKey(roomKey.senderKey, roomKey.sessionId);
|
|
||||||
const entriesForSession = this._eventIdsByMissingSession.get(key);
|
|
||||||
if (entriesForSession) {
|
|
||||||
this._eventIdsByMissingSession.delete(key);
|
|
||||||
retryEventIds.push(...entriesForSession);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return retryEventIds;
|
return eventIds;
|
||||||
|
}
|
||||||
|
|
||||||
|
findAndCacheEntriesForRoomKey(roomKey, candidateEntries) {
|
||||||
|
// add all to _missingSessionCandidates
|
||||||
|
// filter messages to roomKey
|
||||||
}
|
}
|
||||||
|
|
||||||
async encrypt(type, content, hsApi) {
|
async encrypt(type, content, hsApi) {
|
||||||
|
@ -354,11 +355,12 @@ export class RoomEncryption {
|
||||||
* the decryption results before turning them
|
* the decryption results before turning them
|
||||||
*/
|
*/
|
||||||
class DecryptionPreparation {
|
class DecryptionPreparation {
|
||||||
constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption) {
|
constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption, events) {
|
||||||
this._megolmDecryptionPreparation = megolmDecryptionPreparation;
|
this._megolmDecryptionPreparation = megolmDecryptionPreparation;
|
||||||
this._extraErrors = extraErrors;
|
this._extraErrors = extraErrors;
|
||||||
this._flags = flags;
|
this._flags = flags;
|
||||||
this._roomEncryption = roomEncryption;
|
this._roomEncryption = roomEncryption;
|
||||||
|
this._events = events;
|
||||||
}
|
}
|
||||||
|
|
||||||
async decrypt() {
|
async decrypt() {
|
||||||
|
@ -366,7 +368,8 @@ class DecryptionPreparation {
|
||||||
await this._megolmDecryptionPreparation.decrypt(),
|
await this._megolmDecryptionPreparation.decrypt(),
|
||||||
this._extraErrors,
|
this._extraErrors,
|
||||||
this._flags,
|
this._flags,
|
||||||
this._roomEncryption);
|
this._roomEncryption,
|
||||||
|
this._events);
|
||||||
}
|
}
|
||||||
|
|
||||||
dispose() {
|
dispose() {
|
||||||
|
@ -375,17 +378,18 @@ class DecryptionPreparation {
|
||||||
}
|
}
|
||||||
|
|
||||||
class DecryptionChanges {
|
class DecryptionChanges {
|
||||||
constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption) {
|
constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption, events) {
|
||||||
this._megolmDecryptionChanges = megolmDecryptionChanges;
|
this._megolmDecryptionChanges = megolmDecryptionChanges;
|
||||||
this._extraErrors = extraErrors;
|
this._extraErrors = extraErrors;
|
||||||
this._flags = flags;
|
this._flags = flags;
|
||||||
this._roomEncryption = roomEncryption;
|
this._roomEncryption = roomEncryption;
|
||||||
|
this._events = events;
|
||||||
}
|
}
|
||||||
|
|
||||||
async write(txn) {
|
async write(txn) {
|
||||||
const {results, errors} = await this._megolmDecryptionChanges.write(txn);
|
const {results, errors} = await this._megolmDecryptionChanges.write(txn);
|
||||||
mergeMap(this._extraErrors, errors);
|
mergeMap(this._extraErrors, errors);
|
||||||
await this._roomEncryption._processDecryptionResults(results, errors, this._flags, txn);
|
await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._flags, txn);
|
||||||
return new BatchDecryptionResult(results, errors);
|
return new BatchDecryptionResult(results, errors);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -410,3 +414,58 @@ class BatchDecryptionResult {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class SessionToEventIdsMap {
|
||||||
|
constructor() {
|
||||||
|
this._eventIdsBySession = new Map();
|
||||||
|
}
|
||||||
|
|
||||||
|
addEvent(event) {
|
||||||
|
let isNewSession = false;
|
||||||
|
const senderKey = event.content?.["sender_key"];
|
||||||
|
const sessionId = event.content?.["session_id"];
|
||||||
|
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||||
|
let eventIds = this._eventIdsBySession.get(key);
|
||||||
|
// new missing session
|
||||||
|
if (!eventIds) {
|
||||||
|
eventIds = new Set();
|
||||||
|
this._eventIdsBySession.set(key, eventIds);
|
||||||
|
isNewSession = true;
|
||||||
|
}
|
||||||
|
eventIds.add(event.event_id);
|
||||||
|
return isNewSession;
|
||||||
|
}
|
||||||
|
|
||||||
|
getEventIds(senderKey, sessionId) {
|
||||||
|
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||||
|
const entriesForSession = this._eventIdsBySession.get(key);
|
||||||
|
if (entriesForSession) {
|
||||||
|
return [...entriesForSession];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getSessions() {
|
||||||
|
return Array.from(this._eventIdsBySession.keys()).map(decodeMissingSessionKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
hasSession(senderKey, sessionId) {
|
||||||
|
return this._eventIdsBySession.has(encodeMissingSessionKey(senderKey, sessionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
removeEvent(event) {
|
||||||
|
let hasRemovedSession = false;
|
||||||
|
const senderKey = event.content?.["sender_key"];
|
||||||
|
const sessionId = event.content?.["session_id"];
|
||||||
|
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||||
|
let eventIds = this._eventIdsBySession.get(key);
|
||||||
|
if (eventIds) {
|
||||||
|
if (eventIds.delete(event.event_id)) {
|
||||||
|
if (!eventIds.length) {
|
||||||
|
this._eventIdsBySession.delete(key);
|
||||||
|
hasRemovedSession = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return hasRemovedSession;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -52,42 +52,63 @@ export class Room extends EventEmitter {
|
||||||
this._clock = clock;
|
this._clock = clock;
|
||||||
}
|
}
|
||||||
|
|
||||||
_readEntriesToRetryDecryption(retryEventIds) {
|
_readRetryDecryptCandidateEntries(sinceEventKey, txn) {
|
||||||
const readFromEventKey = retryEventIds.length !== 0;
|
if (sinceEventKey) {
|
||||||
const stores =
|
return readFromWithTxn(sinceEventKey, Direction.Forward, Number.MAX_SAFE_INTEGER, txn);
|
||||||
const txn = await this._storage.readTxn([
|
} else {
|
||||||
this._storage.storeNames.timelineEvents,
|
// all messages for room
|
||||||
this._storage.storeNames.inboundGroupSessions,
|
return readFromWithTxn(this._syncWriter.lastMessageKey, Direction.Backward, Number.MAX_SAFE_INTEGER, txn);
|
||||||
]);
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async notifyRoomKeys(roomKeys) {
|
async notifyRoomKey(roomKey) {
|
||||||
if (this._roomEncryption) {
|
if (!this._roomEncryption) {
|
||||||
let retryEventIds = this._roomEncryption.applyRoomKeys(roomKeys);
|
return;
|
||||||
if (retryEventIds.length) {
|
}
|
||||||
const retryEntries = [];
|
const retryEventIds = this._roomEncryption.getEventIdsForRoomKey(roomKey);
|
||||||
const txn = await this._storage.readTxn([
|
const stores = [
|
||||||
this._storage.storeNames.timelineEvents,
|
this._storage.storeNames.timelineEvents,
|
||||||
this._storage.storeNames.inboundGroupSessions,
|
this._storage.storeNames.inboundGroupSessions,
|
||||||
]);
|
];
|
||||||
for (const eventId of retryEventIds) {
|
let txn;
|
||||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
let retryEntries;
|
||||||
if (storageEntry) {
|
if (retryEventIds) {
|
||||||
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
|
retryEntries = [];
|
||||||
}
|
txn = await this._storage.readTxn(stores);
|
||||||
|
for (const eventId of retryEventIds) {
|
||||||
|
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||||
|
if (storageEntry) {
|
||||||
|
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
|
||||||
}
|
}
|
||||||
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
|
}
|
||||||
await decryptRequest.complete();
|
} else {
|
||||||
|
// we only look for messages since lastDecryptedEventKey because
|
||||||
|
// the timeline must be closed (otherwise getEventIdsForRoomKey would have found the event ids)
|
||||||
|
// and to update the summary we only care about events since lastDecryptedEventKey
|
||||||
|
const key = this._summary.data.lastDecryptedEventKey;
|
||||||
|
// key might be missing if we haven't decrypted any events in this room
|
||||||
|
const sinceEventKey = key && new EventKey(key.fragmentId, key.entryIndex);
|
||||||
|
// check we have not already decrypted the most recent event in the room
|
||||||
|
// otherwise we know that the messages for this room key will not update the room summary
|
||||||
|
if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) {
|
||||||
|
txn = await this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments));
|
||||||
|
const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn);
|
||||||
|
retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (retryEntries?.length) {
|
||||||
|
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
|
||||||
|
// this will close txn while awaiting decryption
|
||||||
|
await decryptRequest.complete();
|
||||||
|
|
||||||
this._timeline?.replaceEntries(retryEntries);
|
this._timeline?.replaceEntries(retryEntries);
|
||||||
// we would ideally write the room summary in the same txn as the groupSessionDecryptions in the
|
// we would ideally write the room summary in the same txn as the groupSessionDecryptions in the
|
||||||
// _decryptEntries entries and could even know which events have been decrypted for the first
|
// _decryptEntries entries and could even know which events have been decrypted for the first
|
||||||
// time from DecryptionChanges.write and only pass those to the summary. As timeline changes
|
// time from DecryptionChanges.write and only pass those to the summary. As timeline changes
|
||||||
// are not essential to the room summary, it's fine to write this in a separate txn for now.
|
// are not essential to the room summary, it's fine to write this in a separate txn for now.
|
||||||
const changes = this._summary.data.applyTimelineEntries(retryEntries, false, this._isTimelineOpen);
|
const changes = this._summary.data.applyTimelineEntries(retryEntries, false, this._isTimelineOpen);
|
||||||
if (await this._summary.writeAndApplyData(changes, this._storage)) {
|
if (await this._summary.writeAndApplyData(changes, this._storage)) {
|
||||||
this._emitUpdate();
|
this._emitUpdate();
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,7 +224,7 @@ export class Room extends EventEmitter {
|
||||||
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
|
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
|
||||||
}
|
}
|
||||||
let removedPendingEvents;
|
let removedPendingEvents;
|
||||||
if (roomResponse.timeline && roomResponse.timeline.events) {
|
if (Array.isArray(roomResponse.timeline?.events)) {
|
||||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
||||||
}
|
}
|
||||||
return {
|
return {
|
||||||
|
|
|
@ -63,6 +63,10 @@ export class EventKey {
|
||||||
toString() {
|
toString() {
|
||||||
return `[${this.fragmentId}/${this.eventIndex}]`;
|
return `[${this.fragmentId}/${this.eventIndex}]`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
equals(other) {
|
||||||
|
return this.fragmentId === other?.fragmentId && this.eventIndex === other?.eventIndex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function xtests() {
|
export function xtests() {
|
||||||
|
|
|
@ -46,6 +46,7 @@ export async function readFromWithTxn(eventKey, direction, amount, r, txn) {
|
||||||
while (entries.length < amount && eventKey) {
|
while (entries.length < amount && eventKey) {
|
||||||
let eventsWithinFragment;
|
let eventsWithinFragment;
|
||||||
if (direction.isForward) {
|
if (direction.isForward) {
|
||||||
|
// TODO: should we pass amount - entries.length here?
|
||||||
eventsWithinFragment = await timelineStore.eventsAfter(this._roomId, eventKey, amount);
|
eventsWithinFragment = await timelineStore.eventsAfter(this._roomId, eventKey, amount);
|
||||||
} else {
|
} else {
|
||||||
eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount);
|
eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount);
|
||||||
|
@ -56,6 +57,10 @@ export async function readFromWithTxn(eventKey, direction, amount, r, txn) {
|
||||||
|
|
||||||
if (entries.length < amount) {
|
if (entries.length < amount) {
|
||||||
const fragment = await fragmentStore.get(this._roomId, eventKey.fragmentId);
|
const fragment = await fragmentStore.get(this._roomId, eventKey.fragmentId);
|
||||||
|
// TODO: why does the first fragment not need to be added? (the next *is* added below)
|
||||||
|
// it looks like this would be fine when loading in the sync island
|
||||||
|
// (as the live fragment should be added already) but not for permalinks when we support them
|
||||||
|
//
|
||||||
// this._fragmentIdComparer.addFragment(fragment);
|
// this._fragmentIdComparer.addFragment(fragment);
|
||||||
let fragmentEntry = new FragmentBoundaryEntry(fragment, direction.isBackward, this._fragmentIdComparer);
|
let fragmentEntry = new FragmentBoundaryEntry(fragment, direction.isBackward, this._fragmentIdComparer);
|
||||||
// append or prepend fragmentEntry, reuse func from GapWriter?
|
// append or prepend fragmentEntry, reuse func from GapWriter?
|
||||||
|
|
Reference in a new issue