Merge pull request #108 from vector-im/bwindels/update-summary-on-retry-decrypt
Update summary on retry decrypt and initial sync
This commit is contained in:
commit
82cff84f92
10 changed files with 386 additions and 302 deletions
|
@ -59,12 +59,12 @@ export class DeviceMessageHandler {
|
||||||
return {roomKeys};
|
return {roomKeys};
|
||||||
}
|
}
|
||||||
|
|
||||||
_applyDecryptChanges(rooms, {roomKeys}) {
|
async _applyDecryptChanges(rooms, {roomKeys}) {
|
||||||
if (roomKeys && roomKeys.length) {
|
if (Array.isArray(roomKeys)) {
|
||||||
const roomKeysByRoom = groupBy(roomKeys, s => s.roomId);
|
for (const roomKey of roomKeys) {
|
||||||
for (const [roomId, roomKeys] of roomKeysByRoom) {
|
const room = rooms.get(roomKey.roomId);
|
||||||
const room = rooms.get(roomId);
|
// TODO: this is less parallized than it could be (like sync)
|
||||||
room?.notifyRoomKeys(roomKeys);
|
await room?.notifyRoomKey(roomKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,7 +101,7 @@ export class DeviceMessageHandler {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
await txn.complete();
|
await txn.complete();
|
||||||
this._applyDecryptChanges(rooms, changes);
|
await this._applyDecryptChanges(rooms, changes);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _getPendingEvents(txn) {
|
async _getPendingEvents(txn) {
|
||||||
|
|
|
@ -41,15 +41,12 @@ function timelineIsEmpty(roomResponse) {
|
||||||
/**
|
/**
|
||||||
* Sync steps in js-pseudocode:
|
* Sync steps in js-pseudocode:
|
||||||
* ```js
|
* ```js
|
||||||
* let preparation;
|
* // can only read some stores
|
||||||
* if (room.needsPrepareSync) {
|
* const preparation = await room.prepareSync(roomResponse, membership, prepareTxn);
|
||||||
* // can only read some stores
|
* // can do async work that is not related to storage (such as decryption)
|
||||||
* preparation = await room.prepareSync(roomResponse, prepareTxn);
|
* await room.afterPrepareSync(preparation);
|
||||||
* // can do async work that is not related to storage (such as decryption)
|
|
||||||
* preparation = await room.afterPrepareSync(preparation);
|
|
||||||
* }
|
|
||||||
* // writes and calculates changes
|
* // writes and calculates changes
|
||||||
* const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn);
|
* const changes = await room.writeSync(roomResponse, isInitialSync, preparation, syncTxn);
|
||||||
* // applies and emits changes once syncTxn is committed
|
* // applies and emits changes once syncTxn is committed
|
||||||
* room.afterSync(changes);
|
* room.afterSync(changes);
|
||||||
* if (room.needsAfterSyncCompleted(changes)) {
|
* if (room.needsAfterSyncCompleted(changes)) {
|
||||||
|
@ -180,7 +177,7 @@ export class Sync {
|
||||||
await Promise.all(roomStates.map(async rs => {
|
await Promise.all(roomStates.map(async rs => {
|
||||||
console.log(` * applying sync response to room ${rs.room.id} ...`);
|
console.log(` * applying sync response to room ${rs.room.id} ...`);
|
||||||
rs.changes = await rs.room.writeSync(
|
rs.changes = await rs.room.writeSync(
|
||||||
rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn);
|
rs.roomResponse, isInitialSync, rs.preparation, syncTxn);
|
||||||
}));
|
}));
|
||||||
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
|
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
|
||||||
} catch(err) {
|
} catch(err) {
|
||||||
|
@ -219,16 +216,11 @@ export class Sync {
|
||||||
}
|
}
|
||||||
|
|
||||||
async _prepareRooms(roomStates) {
|
async _prepareRooms(roomStates) {
|
||||||
const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync);
|
const prepareTxn = await this._openPrepareSyncTxn();
|
||||||
if (prepareRoomStates.length) {
|
await Promise.all(roomStates.map(async rs => {
|
||||||
const prepareTxn = await this._openPrepareSyncTxn();
|
rs.preparation = await rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn);
|
||||||
await Promise.all(prepareRoomStates.map(async rs => {
|
}));
|
||||||
rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn);
|
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation)));
|
||||||
}));
|
|
||||||
await Promise.all(prepareRoomStates.map(async rs => {
|
|
||||||
rs.preparation = await rs.room.afterPrepareSync(rs.preparation);
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async _openSyncTxn() {
|
async _openSyncTxn() {
|
||||||
|
|
|
@ -42,8 +42,12 @@ export class RoomEncryption {
|
||||||
|
|
||||||
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
|
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
|
||||||
this._megolmSyncCache = this._megolmDecryption.createSessionCache();
|
this._megolmSyncCache = this._megolmDecryption.createSessionCache();
|
||||||
// not `event_id`, but an internal event id passed in to the decrypt methods
|
// session => event ids of messages we tried to decrypt and the session was missing
|
||||||
this._eventIdsByMissingSession = new Map();
|
this._missingSessions = new SessionToEventIdsMap();
|
||||||
|
// sessions that may or may not be missing, but that while
|
||||||
|
// looking for a particular session came up as a candidate and were
|
||||||
|
// added to the cache to prevent further lookups from storage
|
||||||
|
this._missingSessionCandidates = new SessionToEventIdsMap();
|
||||||
this._senderDeviceCache = new Map();
|
this._senderDeviceCache = new Map();
|
||||||
this._storage = storage;
|
this._storage = storage;
|
||||||
this._sessionBackup = sessionBackup;
|
this._sessionBackup = sessionBackup;
|
||||||
|
@ -57,8 +61,7 @@ export class RoomEncryption {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this._sessionBackup = sessionBackup;
|
this._sessionBackup = sessionBackup;
|
||||||
for(const key of this._eventIdsByMissingSession.keys()) {
|
for(const {senderKey, sessionId} of this._missingSessions.getSessions()) {
|
||||||
const {senderKey, sessionId} = decodeMissingSessionKey(key);
|
|
||||||
await this._requestMissingSessionFromBackup(senderKey, sessionId, null);
|
await this._requestMissingSessionFromBackup(senderKey, sessionId, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -115,13 +118,17 @@ export class RoomEncryption {
|
||||||
if (customCache) {
|
if (customCache) {
|
||||||
customCache.dispose();
|
customCache.dispose();
|
||||||
}
|
}
|
||||||
return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this);
|
return new DecryptionPreparation(preparation, errors, {isTimelineOpen, source}, this, events);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _processDecryptionResults(results, errors, flags, txn) {
|
async _processDecryptionResults(events, results, errors, flags, txn) {
|
||||||
for (const error of errors.values()) {
|
for (const event of events) {
|
||||||
if (error.code === "MEGOLM_NO_SESSION") {
|
const error = errors.get(event.event_id);
|
||||||
this._addMissingSessionEvent(error.event, flags.source);
|
if (error?.code === "MEGOLM_NO_SESSION") {
|
||||||
|
this._addMissingSessionEvent(event, flags.source);
|
||||||
|
} else {
|
||||||
|
this._missingSessions.removeEvent(event);
|
||||||
|
this._missingSessionCandidates.removeEvent(event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (flags.isTimelineOpen) {
|
if (flags.isTimelineOpen) {
|
||||||
|
@ -145,17 +152,12 @@ export class RoomEncryption {
|
||||||
}
|
}
|
||||||
|
|
||||||
_addMissingSessionEvent(event, source) {
|
_addMissingSessionEvent(event, source) {
|
||||||
const senderKey = event.content?.["sender_key"];
|
const isNewSession = this._missingSessions.addEvent(event);
|
||||||
const sessionId = event.content?.["session_id"];
|
if (isNewSession) {
|
||||||
const key = encodeMissingSessionKey(senderKey, sessionId);
|
const senderKey = event.content?.["sender_key"];
|
||||||
let eventIds = this._eventIdsByMissingSession.get(key);
|
const sessionId = event.content?.["session_id"];
|
||||||
// new missing session
|
|
||||||
if (!eventIds) {
|
|
||||||
this._requestMissingSessionFromBackup(senderKey, sessionId, source);
|
this._requestMissingSessionFromBackup(senderKey, sessionId, source);
|
||||||
eventIds = new Set();
|
|
||||||
this._eventIdsByMissingSession.set(key, eventIds);
|
|
||||||
}
|
}
|
||||||
eventIds.add(event.event_id);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async _requestMissingSessionFromBackup(senderKey, sessionId, source) {
|
async _requestMissingSessionFromBackup(senderKey, sessionId, source) {
|
||||||
|
@ -163,7 +165,7 @@ export class RoomEncryption {
|
||||||
// and only after that proceed to request from backup
|
// and only after that proceed to request from backup
|
||||||
if (source === DecryptionSource.Sync) {
|
if (source === DecryptionSource.Sync) {
|
||||||
await this._clock.createTimeout(10000).elapsed();
|
await this._clock.createTimeout(10000).elapsed();
|
||||||
if (this._disposed || !this._eventIdsByMissingSession.has(encodeMissingSessionKey(senderKey, sessionId))) {
|
if (this._disposed || !this._missingSessions.hasSession(senderKey, sessionId)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,8 +194,8 @@ export class RoomEncryption {
|
||||||
await txn.complete();
|
await txn.complete();
|
||||||
|
|
||||||
if (roomKey) {
|
if (roomKey) {
|
||||||
// this will call into applyRoomKeys below
|
// this will reattempt decryption
|
||||||
await this._room.notifyRoomKeys([roomKey]);
|
await this._room.notifyRoomKey(roomKey);
|
||||||
}
|
}
|
||||||
} else if (session?.algorithm) {
|
} else if (session?.algorithm) {
|
||||||
console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`);
|
console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`);
|
||||||
|
@ -212,18 +214,36 @@ export class RoomEncryption {
|
||||||
* @param {Array<RoomKeyDescription>} roomKeys
|
* @param {Array<RoomKeyDescription>} roomKeys
|
||||||
* @return {Array<string>} the event ids that should be retried to decrypt
|
* @return {Array<string>} the event ids that should be retried to decrypt
|
||||||
*/
|
*/
|
||||||
applyRoomKeys(roomKeys) {
|
getEventIdsForRoomKey(roomKey) {
|
||||||
// retry decryption with the new sessions
|
// TODO: we could concat both results here, and only put stuff in
|
||||||
const retryEventIds = [];
|
// candidates if it is not in missing sessions to use a bit less memory
|
||||||
for (const roomKey of roomKeys) {
|
let eventIds = this._missingSessions.getEventIds(roomKey.senderKey, roomKey.sessionId);
|
||||||
const key = encodeMissingSessionKey(roomKey.senderKey, roomKey.sessionId);
|
if (!eventIds) {
|
||||||
const entriesForSession = this._eventIdsByMissingSession.get(key);
|
eventIds = this._missingSessionCandidates.getEventIds(roomKey.senderKey, roomKey.sessionId);
|
||||||
if (entriesForSession) {
|
}
|
||||||
this._eventIdsByMissingSession.delete(key);
|
return eventIds;
|
||||||
retryEventIds.push(...entriesForSession);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* caches mapping of session to event id of all encrypted candidates
|
||||||
|
* and filters to return only the candidates for the given room key
|
||||||
|
*/
|
||||||
|
findAndCacheEntriesForRoomKey(roomKey, candidateEntries) {
|
||||||
|
const matches = [];
|
||||||
|
|
||||||
|
for (const entry of candidateEntries) {
|
||||||
|
if (entry.eventType === ENCRYPTED_TYPE) {
|
||||||
|
this._missingSessionCandidates.addEvent(entry.event);
|
||||||
|
const senderKey = entry.event?.content?.["sender_key"];
|
||||||
|
const sessionId = entry.event?.content?.["session_id"];
|
||||||
|
if (senderKey === roomKey.senderKey && sessionId === roomKey.sessionId) {
|
||||||
|
matches.push(entry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return retryEventIds;
|
|
||||||
|
return matches;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async encrypt(type, content, hsApi) {
|
async encrypt(type, content, hsApi) {
|
||||||
|
@ -354,11 +374,12 @@ export class RoomEncryption {
|
||||||
* the decryption results before turning them
|
* the decryption results before turning them
|
||||||
*/
|
*/
|
||||||
class DecryptionPreparation {
|
class DecryptionPreparation {
|
||||||
constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption) {
|
constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption, events) {
|
||||||
this._megolmDecryptionPreparation = megolmDecryptionPreparation;
|
this._megolmDecryptionPreparation = megolmDecryptionPreparation;
|
||||||
this._extraErrors = extraErrors;
|
this._extraErrors = extraErrors;
|
||||||
this._flags = flags;
|
this._flags = flags;
|
||||||
this._roomEncryption = roomEncryption;
|
this._roomEncryption = roomEncryption;
|
||||||
|
this._events = events;
|
||||||
}
|
}
|
||||||
|
|
||||||
async decrypt() {
|
async decrypt() {
|
||||||
|
@ -366,7 +387,8 @@ class DecryptionPreparation {
|
||||||
await this._megolmDecryptionPreparation.decrypt(),
|
await this._megolmDecryptionPreparation.decrypt(),
|
||||||
this._extraErrors,
|
this._extraErrors,
|
||||||
this._flags,
|
this._flags,
|
||||||
this._roomEncryption);
|
this._roomEncryption,
|
||||||
|
this._events);
|
||||||
}
|
}
|
||||||
|
|
||||||
dispose() {
|
dispose() {
|
||||||
|
@ -375,17 +397,18 @@ class DecryptionPreparation {
|
||||||
}
|
}
|
||||||
|
|
||||||
class DecryptionChanges {
|
class DecryptionChanges {
|
||||||
constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption) {
|
constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption, events) {
|
||||||
this._megolmDecryptionChanges = megolmDecryptionChanges;
|
this._megolmDecryptionChanges = megolmDecryptionChanges;
|
||||||
this._extraErrors = extraErrors;
|
this._extraErrors = extraErrors;
|
||||||
this._flags = flags;
|
this._flags = flags;
|
||||||
this._roomEncryption = roomEncryption;
|
this._roomEncryption = roomEncryption;
|
||||||
|
this._events = events;
|
||||||
}
|
}
|
||||||
|
|
||||||
async write(txn) {
|
async write(txn) {
|
||||||
const {results, errors} = await this._megolmDecryptionChanges.write(txn);
|
const {results, errors} = await this._megolmDecryptionChanges.write(txn);
|
||||||
mergeMap(this._extraErrors, errors);
|
mergeMap(this._extraErrors, errors);
|
||||||
await this._roomEncryption._processDecryptionResults(results, errors, this._flags, txn);
|
await this._roomEncryption._processDecryptionResults(this._events, results, errors, this._flags, txn);
|
||||||
return new BatchDecryptionResult(results, errors);
|
return new BatchDecryptionResult(results, errors);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -410,3 +433,58 @@ class BatchDecryptionResult {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class SessionToEventIdsMap {
|
||||||
|
constructor() {
|
||||||
|
this._eventIdsBySession = new Map();
|
||||||
|
}
|
||||||
|
|
||||||
|
addEvent(event) {
|
||||||
|
let isNewSession = false;
|
||||||
|
const senderKey = event.content?.["sender_key"];
|
||||||
|
const sessionId = event.content?.["session_id"];
|
||||||
|
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||||
|
let eventIds = this._eventIdsBySession.get(key);
|
||||||
|
// new missing session
|
||||||
|
if (!eventIds) {
|
||||||
|
eventIds = new Set();
|
||||||
|
this._eventIdsBySession.set(key, eventIds);
|
||||||
|
isNewSession = true;
|
||||||
|
}
|
||||||
|
eventIds.add(event.event_id);
|
||||||
|
return isNewSession;
|
||||||
|
}
|
||||||
|
|
||||||
|
getEventIds(senderKey, sessionId) {
|
||||||
|
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||||
|
const entriesForSession = this._eventIdsBySession.get(key);
|
||||||
|
if (entriesForSession) {
|
||||||
|
return [...entriesForSession];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getSessions() {
|
||||||
|
return Array.from(this._eventIdsBySession.keys()).map(decodeMissingSessionKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
hasSession(senderKey, sessionId) {
|
||||||
|
return this._eventIdsBySession.has(encodeMissingSessionKey(senderKey, sessionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
removeEvent(event) {
|
||||||
|
let hasRemovedSession = false;
|
||||||
|
const senderKey = event.content?.["sender_key"];
|
||||||
|
const sessionId = event.content?.["session_id"];
|
||||||
|
const key = encodeMissingSessionKey(senderKey, sessionId);
|
||||||
|
let eventIds = this._eventIdsBySession.get(key);
|
||||||
|
if (eventIds) {
|
||||||
|
if (eventIds.delete(event.event_id)) {
|
||||||
|
if (!eventIds.length) {
|
||||||
|
this._eventIdsBySession.delete(key);
|
||||||
|
hasRemovedSession = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return hasRemovedSession;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -15,9 +15,10 @@ limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import {EventEmitter} from "../../utils/EventEmitter.js";
|
import {EventEmitter} from "../../utils/EventEmitter.js";
|
||||||
import {RoomSummary, needsHeroes} from "./RoomSummary.js";
|
import {RoomSummary} from "./RoomSummary.js";
|
||||||
import {SyncWriter} from "./timeline/persistence/SyncWriter.js";
|
import {SyncWriter} from "./timeline/persistence/SyncWriter.js";
|
||||||
import {GapWriter} from "./timeline/persistence/GapWriter.js";
|
import {GapWriter} from "./timeline/persistence/GapWriter.js";
|
||||||
|
import {readRawTimelineEntriesWithTxn} from "./timeline/persistence/TimelineReader.js";
|
||||||
import {Timeline} from "./timeline/Timeline.js";
|
import {Timeline} from "./timeline/Timeline.js";
|
||||||
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
|
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
|
||||||
import {SendQueue} from "./sending/SendQueue.js";
|
import {SendQueue} from "./sending/SendQueue.js";
|
||||||
|
@ -26,20 +27,22 @@ import {fetchOrLoadMembers} from "./members/load.js";
|
||||||
import {MemberList} from "./members/MemberList.js";
|
import {MemberList} from "./members/MemberList.js";
|
||||||
import {Heroes} from "./members/Heroes.js";
|
import {Heroes} from "./members/Heroes.js";
|
||||||
import {EventEntry} from "./timeline/entries/EventEntry.js";
|
import {EventEntry} from "./timeline/entries/EventEntry.js";
|
||||||
|
import {EventKey} from "./timeline/EventKey.js";
|
||||||
|
import {Direction} from "./timeline/Direction.js";
|
||||||
import {DecryptionSource} from "../e2ee/common.js";
|
import {DecryptionSource} from "../e2ee/common.js";
|
||||||
|
|
||||||
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
||||||
|
|
||||||
export class Room extends EventEmitter {
|
export class Room extends EventEmitter {
|
||||||
constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) {
|
constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) {
|
||||||
super();
|
super();
|
||||||
this._roomId = roomId;
|
this._roomId = roomId;
|
||||||
this._storage = storage;
|
this._storage = storage;
|
||||||
this._hsApi = hsApi;
|
this._hsApi = hsApi;
|
||||||
this._mediaRepository = mediaRepository;
|
this._mediaRepository = mediaRepository;
|
||||||
this._summary = new RoomSummary(roomId, user.id);
|
this._summary = new RoomSummary(roomId);
|
||||||
this._fragmentIdComparer = new FragmentIdComparer([]);
|
this._fragmentIdComparer = new FragmentIdComparer([]);
|
||||||
this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer});
|
this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer});
|
||||||
this._emitCollectionChange = emitCollectionChange;
|
this._emitCollectionChange = emitCollectionChange;
|
||||||
this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents});
|
this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents});
|
||||||
this._timeline = null;
|
this._timeline = null;
|
||||||
|
@ -50,43 +53,77 @@ export class Room extends EventEmitter {
|
||||||
this._roomEncryption = null;
|
this._roomEncryption = null;
|
||||||
this._getSyncToken = getSyncToken;
|
this._getSyncToken = getSyncToken;
|
||||||
this._clock = clock;
|
this._clock = clock;
|
||||||
}
|
}
|
||||||
|
|
||||||
async notifyRoomKeys(roomKeys) {
|
_readRetryDecryptCandidateEntries(sinceEventKey, txn) {
|
||||||
if (this._roomEncryption) {
|
if (sinceEventKey) {
|
||||||
let retryEventIds = this._roomEncryption.applyRoomKeys(roomKeys);
|
return readRawTimelineEntriesWithTxn(this._roomId, sinceEventKey,
|
||||||
if (retryEventIds.length) {
|
Direction.Forward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn);
|
||||||
const retryEntries = [];
|
} else {
|
||||||
const txn = await this._storage.readTxn([
|
// all messages for room ...
|
||||||
this._storage.storeNames.timelineEvents,
|
// if you haven't decrypted any message in a room yet,
|
||||||
this._storage.storeNames.inboundGroupSessions,
|
// it's unlikely you will have tons of them.
|
||||||
]);
|
// so this should be fine as a last resort
|
||||||
for (const eventId of retryEventIds) {
|
return readRawTimelineEntriesWithTxn(this._roomId, this._syncWriter.lastMessageKey,
|
||||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
Direction.Backward, Number.MAX_SAFE_INTEGER, this._fragmentIdComparer, txn);
|
||||||
if (storageEntry) {
|
}
|
||||||
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
|
|
||||||
await decryptRequest.complete();
|
|
||||||
|
|
||||||
this._timeline?.replaceEntries(retryEntries);
|
async notifyRoomKey(roomKey) {
|
||||||
// we would ideally write the room summary in the same txn as the groupSessionDecryptions in the
|
if (!this._roomEncryption) {
|
||||||
// _decryptEntries entries and could even know which events have been decrypted for the first
|
return;
|
||||||
// time from DecryptionChanges.write and only pass those to the summary. As timeline changes
|
}
|
||||||
// are not essential to the room summary, it's fine to write this in a separate txn for now.
|
const retryEventIds = this._roomEncryption.getEventIdsForRoomKey(roomKey);
|
||||||
const changes = this._summary.processTimelineEntries(retryEntries, false, this._isTimelineOpen);
|
const stores = [
|
||||||
if (changes) {
|
this._storage.storeNames.timelineEvents,
|
||||||
this._summary.writeAndApplyChanges(changes, this._storage);
|
this._storage.storeNames.inboundGroupSessions,
|
||||||
this._emitUpdate();
|
];
|
||||||
|
let txn;
|
||||||
|
let retryEntries;
|
||||||
|
if (retryEventIds) {
|
||||||
|
retryEntries = [];
|
||||||
|
txn = await this._storage.readTxn(stores);
|
||||||
|
for (const eventId of retryEventIds) {
|
||||||
|
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||||
|
if (storageEntry) {
|
||||||
|
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// we only look for messages since lastDecryptedEventKey because
|
||||||
|
// the timeline must be closed (otherwise getEventIdsForRoomKey would have found the event ids)
|
||||||
|
// and to update the summary we only care about events since lastDecryptedEventKey
|
||||||
|
const key = this._summary.data.lastDecryptedEventKey;
|
||||||
|
// key might be missing if we haven't decrypted any events in this room
|
||||||
|
const sinceEventKey = key && new EventKey(key.fragmentId, key.entryIndex);
|
||||||
|
// check we have not already decrypted the most recent event in the room
|
||||||
|
// otherwise we know that the messages for this room key will not update the room summary
|
||||||
|
if (!sinceEventKey || !sinceEventKey.equals(this._syncWriter.lastMessageKey)) {
|
||||||
|
txn = await this._storage.readTxn(stores.concat(this._storage.storeNames.timelineFragments));
|
||||||
|
const candidateEntries = await this._readRetryDecryptCandidateEntries(sinceEventKey, txn);
|
||||||
|
retryEntries = this._roomEncryption.findAndCacheEntriesForRoomKey(roomKey, candidateEntries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (retryEntries?.length) {
|
||||||
|
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
|
||||||
|
// this will close txn while awaiting decryption
|
||||||
|
await decryptRequest.complete();
|
||||||
|
|
||||||
|
this._timeline?.replaceEntries(retryEntries);
|
||||||
|
// we would ideally write the room summary in the same txn as the groupSessionDecryptions in the
|
||||||
|
// _decryptEntries entries and could even know which events have been decrypted for the first
|
||||||
|
// time from DecryptionChanges.write and only pass those to the summary. As timeline changes
|
||||||
|
// are not essential to the room summary, it's fine to write this in a separate txn for now.
|
||||||
|
const changes = this._summary.data.applyTimelineEntries(retryEntries, false, false);
|
||||||
|
if (await this._summary.writeAndApplyData(changes, this._storage)) {
|
||||||
|
this._emitUpdate();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_enableEncryption(encryptionParams) {
|
_setEncryption(roomEncryption) {
|
||||||
this._roomEncryption = this._createRoomEncryption(this, encryptionParams);
|
if (roomEncryption && !this._roomEncryption) {
|
||||||
if (this._roomEncryption) {
|
this._roomEncryption = roomEncryption;
|
||||||
this._sendQueue.enableEncryption(this._roomEncryption);
|
this._sendQueue.enableEncryption(this._roomEncryption);
|
||||||
if (this._timeline) {
|
if (this._timeline) {
|
||||||
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
|
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
|
||||||
|
@ -132,57 +169,62 @@ export class Room extends EventEmitter {
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
get needsPrepareSync() {
|
async prepareSync(roomResponse, membership, txn) {
|
||||||
// only encrypted rooms need the prepare sync steps
|
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
||||||
return !!this._roomEncryption;
|
let roomEncryption = this._roomEncryption;
|
||||||
}
|
// encryption is enabled in this sync
|
||||||
|
if (!roomEncryption && summaryChanges.encryption) {
|
||||||
|
roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption);
|
||||||
|
}
|
||||||
|
|
||||||
async prepareSync(roomResponse, txn) {
|
let decryptPreparation;
|
||||||
if (this._roomEncryption) {
|
if (roomEncryption) {
|
||||||
const events = roomResponse?.timeline?.events;
|
const events = roomResponse?.timeline?.events;
|
||||||
if (Array.isArray(events)) {
|
if (Array.isArray(events)) {
|
||||||
const eventsToDecrypt = events.filter(event => {
|
const eventsToDecrypt = events.filter(event => {
|
||||||
return event?.type === EVENT_ENCRYPTED_TYPE;
|
return event?.type === EVENT_ENCRYPTED_TYPE;
|
||||||
});
|
});
|
||||||
const preparation = await this._roomEncryption.prepareDecryptAll(
|
decryptPreparation = await roomEncryption.prepareDecryptAll(
|
||||||
eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn);
|
eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn);
|
||||||
return preparation;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
roomEncryption,
|
||||||
|
summaryChanges,
|
||||||
|
decryptPreparation,
|
||||||
|
decryptChanges: null,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async afterPrepareSync(preparation) {
|
async afterPrepareSync(preparation) {
|
||||||
if (preparation) {
|
if (preparation.decryptPreparation) {
|
||||||
const decryptChanges = await preparation.decrypt();
|
preparation.decryptChanges = await preparation.decryptPreparation.decrypt();
|
||||||
return decryptChanges;
|
preparation.decryptPreparation = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @package */
|
/** @package */
|
||||||
async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) {
|
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn) {
|
||||||
let decryption;
|
const {entries, newLiveKey, memberChanges} =
|
||||||
if (this._roomEncryption && decryptChanges) {
|
|
||||||
decryption = await decryptChanges.write(txn);
|
|
||||||
}
|
|
||||||
const {entries, newLiveKey, memberChanges} =
|
|
||||||
await this._syncWriter.writeSync(roomResponse, txn);
|
await this._syncWriter.writeSync(roomResponse, txn);
|
||||||
if (decryption) {
|
if (decryptChanges) {
|
||||||
|
const decryption = await decryptChanges.write(txn);
|
||||||
decryption.applyToEntries(entries);
|
decryption.applyToEntries(entries);
|
||||||
}
|
}
|
||||||
// pass member changes to device tracker
|
// pass member changes to device tracker
|
||||||
if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
|
if (roomEncryption && this.isTrackingMembers && memberChanges?.size) {
|
||||||
await this._roomEncryption.writeMemberChanges(memberChanges, txn);
|
await roomEncryption.writeMemberChanges(memberChanges, txn);
|
||||||
}
|
}
|
||||||
const summaryChanges = this._summary.writeSync(
|
// also apply (decrypted) timeline entries to the summary changes
|
||||||
roomResponse,
|
summaryChanges = summaryChanges.applyTimelineEntries(
|
||||||
entries,
|
entries, isInitialSync, !this._isTimelineOpen, this._user.id);
|
||||||
membership,
|
// write summary changes, and unset if nothing was actually changed
|
||||||
isInitialSync, this._isTimelineOpen,
|
summaryChanges = this._summary.writeData(summaryChanges, txn);
|
||||||
txn);
|
|
||||||
// fetch new members while we have txn open,
|
// fetch new members while we have txn open,
|
||||||
// but don't make any in-memory changes yet
|
// but don't make any in-memory changes yet
|
||||||
let heroChanges;
|
let heroChanges;
|
||||||
if (summaryChanges && needsHeroes(summaryChanges)) {
|
if (summaryChanges?.needsHeroes) {
|
||||||
// room name disappeared, open heroes
|
// room name disappeared, open heroes
|
||||||
if (!this._heroes) {
|
if (!this._heroes) {
|
||||||
this._heroes = new Heroes(this._roomId);
|
this._heroes = new Heroes(this._roomId);
|
||||||
|
@ -190,11 +232,12 @@ export class Room extends EventEmitter {
|
||||||
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
|
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
|
||||||
}
|
}
|
||||||
let removedPendingEvents;
|
let removedPendingEvents;
|
||||||
if (roomResponse.timeline && roomResponse.timeline.events) {
|
if (Array.isArray(roomResponse.timeline?.events)) {
|
||||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
||||||
}
|
}
|
||||||
return {
|
return {
|
||||||
summaryChanges,
|
summaryChanges,
|
||||||
|
roomEncryption,
|
||||||
newTimelineEntries: entries,
|
newTimelineEntries: entries,
|
||||||
newLiveKey,
|
newLiveKey,
|
||||||
removedPendingEvents,
|
removedPendingEvents,
|
||||||
|
@ -208,11 +251,9 @@ export class Room extends EventEmitter {
|
||||||
* Called with the changes returned from `writeSync` to apply them and emit changes.
|
* Called with the changes returned from `writeSync` to apply them and emit changes.
|
||||||
* No storage or network operations should be done here.
|
* No storage or network operations should be done here.
|
||||||
*/
|
*/
|
||||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges}) {
|
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}) {
|
||||||
this._syncWriter.afterSync(newLiveKey);
|
this._syncWriter.afterSync(newLiveKey);
|
||||||
if (!this._summary.encryption && summaryChanges.encryption && !this._roomEncryption) {
|
this._setEncryption(roomEncryption);
|
||||||
this._enableEncryption(summaryChanges.encryption);
|
|
||||||
}
|
|
||||||
if (memberChanges.size) {
|
if (memberChanges.size) {
|
||||||
if (this._changedMembersDuringSync) {
|
if (this._changedMembersDuringSync) {
|
||||||
for (const [userId, memberChange] of memberChanges.entries()) {
|
for (const [userId, memberChange] of memberChanges.entries()) {
|
||||||
|
@ -226,14 +267,14 @@ export class Room extends EventEmitter {
|
||||||
let emitChange = false;
|
let emitChange = false;
|
||||||
if (summaryChanges) {
|
if (summaryChanges) {
|
||||||
this._summary.applyChanges(summaryChanges);
|
this._summary.applyChanges(summaryChanges);
|
||||||
if (!this._summary.needsHeroes) {
|
if (!this._summary.data.needsHeroes) {
|
||||||
this._heroes = null;
|
this._heroes = null;
|
||||||
}
|
}
|
||||||
emitChange = true;
|
emitChange = true;
|
||||||
}
|
}
|
||||||
if (this._heroes && heroChanges) {
|
if (this._heroes && heroChanges) {
|
||||||
const oldName = this.name;
|
const oldName = this.name;
|
||||||
this._heroes.applyChanges(heroChanges, this._summary);
|
this._heroes.applyChanges(heroChanges, this._summary.data);
|
||||||
if (oldName !== this.name) {
|
if (oldName !== this.name) {
|
||||||
emitChange = true;
|
emitChange = true;
|
||||||
}
|
}
|
||||||
|
@ -247,7 +288,7 @@ export class Room extends EventEmitter {
|
||||||
if (removedPendingEvents) {
|
if (removedPendingEvents) {
|
||||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
needsAfterSyncCompleted({memberChanges}) {
|
needsAfterSyncCompleted({memberChanges}) {
|
||||||
return this._roomEncryption?.needsToShareKeys(memberChanges);
|
return this._roomEncryption?.needsToShareKeys(memberChanges);
|
||||||
|
@ -282,23 +323,24 @@ export class Room extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @package */
|
/** @package */
|
||||||
async load(summary, txn) {
|
async load(summary, txn) {
|
||||||
try {
|
try {
|
||||||
this._summary.load(summary);
|
this._summary.load(summary);
|
||||||
if (this._summary.encryption) {
|
if (this._summary.data.encryption) {
|
||||||
this._enableEncryption(this._summary.encryption);
|
const roomEncryption = this._createRoomEncryption(this, this._summary.data.encryption);
|
||||||
|
this._setEncryption(roomEncryption);
|
||||||
}
|
}
|
||||||
// need to load members for name?
|
// need to load members for name?
|
||||||
if (this._summary.needsHeroes) {
|
if (this._summary.data.needsHeroes) {
|
||||||
this._heroes = new Heroes(this._roomId);
|
this._heroes = new Heroes(this._roomId);
|
||||||
const changes = await this._heroes.calculateChanges(this._summary.heroes, [], txn);
|
const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn);
|
||||||
this._heroes.applyChanges(changes, this._summary);
|
this._heroes.applyChanges(changes, this._summary.data);
|
||||||
}
|
}
|
||||||
return this._syncWriter.load(txn);
|
return this._syncWriter.load(txn);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
throw new WrappedError(`Could not load room ${this._roomId}`, err);
|
throw new WrappedError(`Could not load room ${this._roomId}`, err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @public */
|
/** @public */
|
||||||
sendEvent(eventType, content) {
|
sendEvent(eventType, content) {
|
||||||
|
@ -388,7 +430,14 @@ export class Room extends EventEmitter {
|
||||||
if (this._heroes) {
|
if (this._heroes) {
|
||||||
return this._heroes.roomName;
|
return this._heroes.roomName;
|
||||||
}
|
}
|
||||||
return this._summary.name;
|
const summaryData = this._summary.data;
|
||||||
|
if (summaryData.name) {
|
||||||
|
return summaryData.name;
|
||||||
|
}
|
||||||
|
if (summaryData.canonicalAlias) {
|
||||||
|
return summaryData.canonicalAlias;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @public */
|
/** @public */
|
||||||
|
@ -397,8 +446,8 @@ export class Room extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
get avatarUrl() {
|
get avatarUrl() {
|
||||||
if (this._summary.avatarUrl) {
|
if (this._summary.data.avatarUrl) {
|
||||||
return this._summary.avatarUrl;
|
return this._summary.data.avatarUrl;
|
||||||
} else if (this._heroes) {
|
} else if (this._heroes) {
|
||||||
return this._heroes.roomAvatarUrl;
|
return this._heroes.roomAvatarUrl;
|
||||||
}
|
}
|
||||||
|
@ -406,28 +455,28 @@ export class Room extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
get lastMessageTimestamp() {
|
get lastMessageTimestamp() {
|
||||||
return this._summary.lastMessageTimestamp;
|
return this._summary.data.lastMessageTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
get isUnread() {
|
get isUnread() {
|
||||||
return this._summary.isUnread;
|
return this._summary.data.isUnread;
|
||||||
}
|
}
|
||||||
|
|
||||||
get notificationCount() {
|
get notificationCount() {
|
||||||
return this._summary.notificationCount;
|
return this._summary.data.notificationCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
get highlightCount() {
|
get highlightCount() {
|
||||||
return this._summary.highlightCount;
|
return this._summary.data.highlightCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
get isLowPriority() {
|
get isLowPriority() {
|
||||||
const tags = this._summary.tags;
|
const tags = this._summary.data.tags;
|
||||||
return !!(tags && tags['m.lowpriority']);
|
return !!(tags && tags['m.lowpriority']);
|
||||||
}
|
}
|
||||||
|
|
||||||
get isEncrypted() {
|
get isEncrypted() {
|
||||||
return !!this._summary.encryption;
|
return !!this._summary.data.encryption;
|
||||||
}
|
}
|
||||||
|
|
||||||
enableSessionBackup(sessionBackup) {
|
enableSessionBackup(sessionBackup) {
|
||||||
|
@ -435,7 +484,7 @@ export class Room extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
get isTrackingMembers() {
|
get isTrackingMembers() {
|
||||||
return this._summary.isTrackingMembers;
|
return this._summary.data.isTrackingMembers;
|
||||||
}
|
}
|
||||||
|
|
||||||
async _getLastEventId() {
|
async _getLastEventId() {
|
||||||
|
|
|
@ -17,11 +17,11 @@ limitations under the License.
|
||||||
import {MEGOLM_ALGORITHM} from "../e2ee/common.js";
|
import {MEGOLM_ALGORITHM} from "../e2ee/common.js";
|
||||||
|
|
||||||
|
|
||||||
function applyTimelineEntries(data, timelineEntries, isInitialSync, isTimelineOpen, ownUserId) {
|
function applyTimelineEntries(data, timelineEntries, isInitialSync, canMarkUnread, ownUserId) {
|
||||||
if (timelineEntries.length) {
|
if (timelineEntries.length) {
|
||||||
data = timelineEntries.reduce((data, entry) => {
|
data = timelineEntries.reduce((data, entry) => {
|
||||||
return processTimelineEvent(data, entry,
|
return processTimelineEvent(data, entry,
|
||||||
isInitialSync, isTimelineOpen, ownUserId);
|
isInitialSync, canMarkUnread, ownUserId);
|
||||||
}, data);
|
}, data);
|
||||||
}
|
}
|
||||||
return data;
|
return data;
|
||||||
|
@ -39,16 +39,17 @@ function applySyncResponse(data, roomResponse, membership) {
|
||||||
if (roomResponse.account_data) {
|
if (roomResponse.account_data) {
|
||||||
data = roomResponse.account_data.events.reduce(processRoomAccountData, data);
|
data = roomResponse.account_data.events.reduce(processRoomAccountData, data);
|
||||||
}
|
}
|
||||||
|
const stateEvents = roomResponse?.state?.events;
|
||||||
// state comes before timeline
|
// state comes before timeline
|
||||||
if (roomResponse.state) {
|
if (Array.isArray(stateEvents)) {
|
||||||
data = roomResponse.state.events.reduce(processStateEvent, data);
|
data = stateEvents.reduce(processStateEvent, data);
|
||||||
}
|
}
|
||||||
const {timeline} = roomResponse;
|
const timelineEvents = roomResponse?.timeline?.events;
|
||||||
// process state events in timeline
|
// process state events in timeline
|
||||||
// non-state events are handled by applyTimelineEntries
|
// non-state events are handled by applyTimelineEntries
|
||||||
// so decryption is handled properly
|
// so decryption is handled properly
|
||||||
if (timeline && Array.isArray(timeline.events)) {
|
if (Array.isArray(timelineEvents)) {
|
||||||
data = timeline.events.reduce((data, event) => {
|
data = timelineEvents.reduce((data, event) => {
|
||||||
if (typeof event.state_key === "string") {
|
if (typeof event.state_key === "string") {
|
||||||
return processStateEvent(data, event);
|
return processStateEvent(data, event);
|
||||||
}
|
}
|
||||||
|
@ -104,13 +105,13 @@ function processStateEvent(data, event) {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
function processTimelineEvent(data, eventEntry, isInitialSync, isTimelineOpen, ownUserId) {
|
function processTimelineEvent(data, eventEntry, isInitialSync, canMarkUnread, ownUserId) {
|
||||||
if (eventEntry.eventType === "m.room.message") {
|
if (eventEntry.eventType === "m.room.message") {
|
||||||
if (!data.lastMessageTimestamp || eventEntry.timestamp > data.lastMessageTimestamp) {
|
if (!data.lastMessageTimestamp || eventEntry.timestamp > data.lastMessageTimestamp) {
|
||||||
data = data.cloneIfNeeded();
|
data = data.cloneIfNeeded();
|
||||||
data.lastMessageTimestamp = eventEntry.timestamp;
|
data.lastMessageTimestamp = eventEntry.timestamp;
|
||||||
}
|
}
|
||||||
if (!isInitialSync && eventEntry.sender !== ownUserId && !isTimelineOpen) {
|
if (!isInitialSync && eventEntry.sender !== ownUserId && canMarkUnread) {
|
||||||
data = data.cloneIfNeeded();
|
data = data.cloneIfNeeded();
|
||||||
data.isUnread = true;
|
data.isUnread = true;
|
||||||
}
|
}
|
||||||
|
@ -122,6 +123,29 @@ function processTimelineEvent(data, eventEntry, isInitialSync, isTimelineOpen, o
|
||||||
data.lastMessageBody = body;
|
data.lastMessageBody = body;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// store the event key of the last decrypted event so when decryption does succeed,
|
||||||
|
// we can attempt to re-decrypt from this point to update the room summary
|
||||||
|
if (!!data.encryption && eventEntry.isEncrypted && eventEntry.isDecrypted) {
|
||||||
|
let hasLargerEventKey = true;
|
||||||
|
if (data.lastDecryptedEventKey) {
|
||||||
|
try {
|
||||||
|
hasLargerEventKey = eventEntry.compare(data.lastDecryptedEventKey) > 0;
|
||||||
|
} catch (err) {
|
||||||
|
// TODO: load the fragments in between here?
|
||||||
|
// this could happen if an earlier event gets decrypted that
|
||||||
|
// is in a fragment different from the live one and the timeline is not open.
|
||||||
|
// In this case, we will just read too many events once per app load
|
||||||
|
// and then keep the mapping in memory. When eventually an event is decrypted in
|
||||||
|
// the live fragment, this should stop failing and the event key will be written.
|
||||||
|
hasLargerEventKey = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (hasLargerEventKey) {
|
||||||
|
data = data.cloneIfNeeded();
|
||||||
|
const {fragmentId, entryIndex} = eventEntry;
|
||||||
|
data.lastDecryptedEventKey = {fragmentId, entryIndex};
|
||||||
|
}
|
||||||
|
}
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,6 +179,7 @@ class SummaryData {
|
||||||
this.lastMessageTimestamp = copy ? copy.lastMessageTimestamp : null;
|
this.lastMessageTimestamp = copy ? copy.lastMessageTimestamp : null;
|
||||||
this.isUnread = copy ? copy.isUnread : false;
|
this.isUnread = copy ? copy.isUnread : false;
|
||||||
this.encryption = copy ? copy.encryption : null;
|
this.encryption = copy ? copy.encryption : null;
|
||||||
|
this.lastDecryptedEventKey = copy ? copy.lastDecryptedEventKey : null;
|
||||||
this.isDirectMessage = copy ? copy.isDirectMessage : false;
|
this.isDirectMessage = copy ? copy.isDirectMessage : false;
|
||||||
this.membership = copy ? copy.membership : null;
|
this.membership = copy ? copy.membership : null;
|
||||||
this.inviteCount = copy ? copy.inviteCount : 0;
|
this.inviteCount = copy ? copy.inviteCount : 0;
|
||||||
|
@ -182,83 +207,27 @@ class SummaryData {
|
||||||
const {cloned, ...serializedProps} = this;
|
const {cloned, ...serializedProps} = this;
|
||||||
return serializedProps;
|
return serializedProps;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
export function needsHeroes(data) {
|
applyTimelineEntries(timelineEntries, isInitialSync, canMarkUnread, ownUserId) {
|
||||||
return !data.name && !data.canonicalAlias && data.heroes && data.heroes.length > 0;
|
return applyTimelineEntries(this, timelineEntries, isInitialSync, canMarkUnread, ownUserId);
|
||||||
|
}
|
||||||
|
|
||||||
|
applySyncResponse(roomResponse, membership) {
|
||||||
|
return applySyncResponse(this, roomResponse, membership);
|
||||||
|
}
|
||||||
|
|
||||||
|
get needsHeroes() {
|
||||||
|
return !this.name && !this.canonicalAlias && this.heroes && this.heroes.length > 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export class RoomSummary {
|
export class RoomSummary {
|
||||||
constructor(roomId, ownUserId) {
|
constructor(roomId) {
|
||||||
this._ownUserId = ownUserId;
|
|
||||||
this._data = new SummaryData(null, roomId);
|
this._data = new SummaryData(null, roomId);
|
||||||
}
|
}
|
||||||
|
|
||||||
get name() {
|
get data() {
|
||||||
if (this._data.name) {
|
return this._data;
|
||||||
return this._data.name;
|
|
||||||
}
|
|
||||||
if (this._data.canonicalAlias) {
|
|
||||||
return this._data.canonicalAlias;
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
get heroes() {
|
|
||||||
return this._data.heroes;
|
|
||||||
}
|
|
||||||
|
|
||||||
get encryption() {
|
|
||||||
return this._data.encryption;
|
|
||||||
}
|
|
||||||
|
|
||||||
// whether the room name should be determined with Heroes
|
|
||||||
get needsHeroes() {
|
|
||||||
return needsHeroes(this._data);
|
|
||||||
}
|
|
||||||
|
|
||||||
get isUnread() {
|
|
||||||
return this._data.isUnread;
|
|
||||||
}
|
|
||||||
|
|
||||||
get notificationCount() {
|
|
||||||
return this._data.notificationCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
get highlightCount() {
|
|
||||||
return this._data.highlightCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
get lastMessage() {
|
|
||||||
return this._data.lastMessageBody;
|
|
||||||
}
|
|
||||||
|
|
||||||
get lastMessageTimestamp() {
|
|
||||||
return this._data.lastMessageTimestamp;
|
|
||||||
}
|
|
||||||
|
|
||||||
get inviteCount() {
|
|
||||||
return this._data.inviteCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
get joinCount() {
|
|
||||||
return this._data.joinCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
get avatarUrl() {
|
|
||||||
return this._data.avatarUrl;
|
|
||||||
}
|
|
||||||
|
|
||||||
get hasFetchedMembers() {
|
|
||||||
return this._data.hasFetchedMembers;
|
|
||||||
}
|
|
||||||
|
|
||||||
get isTrackingMembers() {
|
|
||||||
return this._data.isTrackingMembers;
|
|
||||||
}
|
|
||||||
|
|
||||||
get tags() {
|
|
||||||
return this._data.tags;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
writeClearUnread(txn) {
|
writeClearUnread(txn) {
|
||||||
|
@ -284,48 +253,17 @@ export class RoomSummary {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
writeData(data, txn) {
|
||||||
* after retrying decryption
|
|
||||||
*/
|
|
||||||
processTimelineEntries(timelineEntries, isInitialSync, isTimelineOpen) {
|
|
||||||
// clear cloned flag, so cloneIfNeeded makes a copy and
|
|
||||||
// this._data is not modified if any field is changed.
|
|
||||||
|
|
||||||
processTimelineEvent
|
|
||||||
|
|
||||||
this._data.cloned = false;
|
|
||||||
const data = applyTimelineEntries(
|
|
||||||
this._data,
|
|
||||||
timelineEntries,
|
|
||||||
isInitialSync, isTimelineOpen,
|
|
||||||
this._ownUserId);
|
|
||||||
if (data !== this._data) {
|
|
||||||
return data;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
writeSync(roomResponse, timelineEntries, membership, isInitialSync, isTimelineOpen, txn) {
|
|
||||||
// clear cloned flag, so cloneIfNeeded makes a copy and
|
|
||||||
// this._data is not modified if any field is changed.
|
|
||||||
this._data.cloned = false;
|
|
||||||
let data = applySyncResponse(this._data, roomResponse, membership);
|
|
||||||
data = applyTimelineEntries(
|
|
||||||
data,
|
|
||||||
timelineEntries,
|
|
||||||
isInitialSync, isTimelineOpen,
|
|
||||||
this._ownUserId);
|
|
||||||
if (data !== this._data) {
|
if (data !== this._data) {
|
||||||
txn.roomSummary.set(data.serialize());
|
txn.roomSummary.set(data.serialize());
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
async writeAndApplyData(data, storage) {
|
||||||
* Only to be used with processTimelineEntries,
|
if (data === this._data) {
|
||||||
* other methods like writeSync, writeHasFetchedMembers,
|
return false;
|
||||||
* writeIsTrackingMembers, ... take a txn directly.
|
}
|
||||||
*/
|
|
||||||
async writeAndApplyChanges(data, storage) {
|
|
||||||
const txn = await storage.readWriteTxn([
|
const txn = await storage.readWriteTxn([
|
||||||
storage.storeNames.roomSummary,
|
storage.storeNames.roomSummary,
|
||||||
]);
|
]);
|
||||||
|
@ -337,10 +275,14 @@ export class RoomSummary {
|
||||||
}
|
}
|
||||||
await txn.complete();
|
await txn.complete();
|
||||||
this.applyChanges(data);
|
this.applyChanges(data);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
applyChanges(data) {
|
applyChanges(data) {
|
||||||
this._data = data;
|
this._data = data;
|
||||||
|
// clear cloned flag, so cloneIfNeeded makes a copy and
|
||||||
|
// this._data is not modified if any field is changed.
|
||||||
|
this._data.cloned = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
async load(summary) {
|
async load(summary) {
|
||||||
|
@ -353,7 +295,9 @@ export function tests() {
|
||||||
"membership trigger change": function(assert) {
|
"membership trigger change": function(assert) {
|
||||||
const summary = new RoomSummary("id");
|
const summary = new RoomSummary("id");
|
||||||
let written = false;
|
let written = false;
|
||||||
const changes = summary.writeSync({}, "join", false, false, {roomSummary: {set: () => { written = true; }}});
|
let changes = summary.data.applySyncResponse({}, "join");
|
||||||
|
const txn = {roomSummary: {set: () => { written = true; }}};
|
||||||
|
changes = summary.writeData(changes, txn);
|
||||||
assert(changes);
|
assert(changes);
|
||||||
assert(written);
|
assert(written);
|
||||||
assert.equal(changes.membership, "join");
|
assert.equal(changes.membership, "join");
|
||||||
|
|
|
@ -16,8 +16,8 @@ limitations under the License.
|
||||||
|
|
||||||
import {RoomMember} from "./RoomMember.js";
|
import {RoomMember} from "./RoomMember.js";
|
||||||
|
|
||||||
function calculateRoomName(sortedMembers, summary) {
|
function calculateRoomName(sortedMembers, summaryData) {
|
||||||
const countWithoutMe = summary.joinCount + summary.inviteCount - 1;
|
const countWithoutMe = summaryData.joinCount + summaryData.inviteCount - 1;
|
||||||
if (sortedMembers.length >= countWithoutMe) {
|
if (sortedMembers.length >= countWithoutMe) {
|
||||||
if (sortedMembers.length > 1) {
|
if (sortedMembers.length > 1) {
|
||||||
const lastMember = sortedMembers[sortedMembers.length - 1];
|
const lastMember = sortedMembers[sortedMembers.length - 1];
|
||||||
|
@ -74,7 +74,7 @@ export class Heroes {
|
||||||
return {updatedHeroMembers: updatedHeroMembers.values(), removedUserIds};
|
return {updatedHeroMembers: updatedHeroMembers.values(), removedUserIds};
|
||||||
}
|
}
|
||||||
|
|
||||||
applyChanges({updatedHeroMembers, removedUserIds}, summary) {
|
applyChanges({updatedHeroMembers, removedUserIds}, summaryData) {
|
||||||
for (const userId of removedUserIds) {
|
for (const userId of removedUserIds) {
|
||||||
this._members.delete(userId);
|
this._members.delete(userId);
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ export class Heroes {
|
||||||
this._members.set(member.userId, member);
|
this._members.set(member.userId, member);
|
||||||
}
|
}
|
||||||
const sortedMembers = Array.from(this._members.values()).sort((a, b) => a.name.localeCompare(b.name));
|
const sortedMembers = Array.from(this._members.values()).sort((a, b) => a.name.localeCompare(b.name));
|
||||||
this._roomName = calculateRoomName(sortedMembers, summary);
|
this._roomName = calculateRoomName(sortedMembers, summaryData);
|
||||||
}
|
}
|
||||||
|
|
||||||
get roomName() {
|
get roomName() {
|
||||||
|
|
|
@ -82,7 +82,7 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan
|
||||||
|
|
||||||
export async function fetchOrLoadMembers(options) {
|
export async function fetchOrLoadMembers(options) {
|
||||||
const {summary} = options;
|
const {summary} = options;
|
||||||
if (!summary.hasFetchedMembers) {
|
if (!summary.data.hasFetchedMembers) {
|
||||||
return fetchMembers(options);
|
return fetchMembers(options);
|
||||||
} else {
|
} else {
|
||||||
return loadMembers(options);
|
return loadMembers(options);
|
||||||
|
|
|
@ -63,6 +63,10 @@ export class EventKey {
|
||||||
toString() {
|
toString() {
|
||||||
return `[${this.fragmentId}/${this.eventIndex}]`;
|
return `[${this.fragmentId}/${this.eventIndex}]`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
equals(other) {
|
||||||
|
return this.fragmentId === other?.fragmentId && this.eventIndex === other?.eventIndex;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function xtests() {
|
export function xtests() {
|
||||||
|
|
|
@ -82,6 +82,10 @@ export class EventEntry extends BaseEntry {
|
||||||
return this._eventEntry.event.type === "m.room.encrypted";
|
return this._eventEntry.event.type === "m.room.encrypted";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get isDecrypted() {
|
||||||
|
return !!this._decryptionResult?.event;
|
||||||
|
}
|
||||||
|
|
||||||
get isVerified() {
|
get isVerified() {
|
||||||
return this.isEncrypted && this._decryptionResult?.isVerified;
|
return this.isEncrypted && this._decryptionResult?.isVerified;
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,52 @@ class ReaderRequest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Raw because it doesn't do decryption and in the future it should not read relations either.
|
||||||
|
* It is just about reading entries and following fragment links
|
||||||
|
*/
|
||||||
|
export async function readRawTimelineEntriesWithTxn(roomId, eventKey, direction, amount, fragmentIdComparer, txn) {
|
||||||
|
let entries = [];
|
||||||
|
const timelineStore = txn.timelineEvents;
|
||||||
|
const fragmentStore = txn.timelineFragments;
|
||||||
|
|
||||||
|
while (entries.length < amount && eventKey) {
|
||||||
|
let eventsWithinFragment;
|
||||||
|
if (direction.isForward) {
|
||||||
|
// TODO: should we pass amount - entries.length here?
|
||||||
|
eventsWithinFragment = await timelineStore.eventsAfter(roomId, eventKey, amount);
|
||||||
|
} else {
|
||||||
|
eventsWithinFragment = await timelineStore.eventsBefore(roomId, eventKey, amount);
|
||||||
|
}
|
||||||
|
let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, fragmentIdComparer));
|
||||||
|
entries = directionalConcat(entries, eventEntries, direction);
|
||||||
|
// prepend or append eventsWithinFragment to entries, and wrap them in EventEntry
|
||||||
|
|
||||||
|
if (entries.length < amount) {
|
||||||
|
const fragment = await fragmentStore.get(roomId, eventKey.fragmentId);
|
||||||
|
// TODO: why does the first fragment not need to be added? (the next *is* added below)
|
||||||
|
// it looks like this would be fine when loading in the sync island
|
||||||
|
// (as the live fragment should be added already) but not for permalinks when we support them
|
||||||
|
//
|
||||||
|
// fragmentIdComparer.addFragment(fragment);
|
||||||
|
let fragmentEntry = new FragmentBoundaryEntry(fragment, direction.isBackward, fragmentIdComparer);
|
||||||
|
// append or prepend fragmentEntry, reuse func from GapWriter?
|
||||||
|
directionalAppend(entries, fragmentEntry, direction);
|
||||||
|
// only continue loading if the fragment boundary can't be backfilled
|
||||||
|
if (!fragmentEntry.token && fragmentEntry.hasLinkedFragment) {
|
||||||
|
const nextFragment = await fragmentStore.get(roomId, fragmentEntry.linkedFragmentId);
|
||||||
|
fragmentIdComparer.add(nextFragment);
|
||||||
|
const nextFragmentEntry = new FragmentBoundaryEntry(nextFragment, direction.isForward, fragmentIdComparer);
|
||||||
|
directionalAppend(entries, nextFragmentEntry, direction);
|
||||||
|
eventKey = nextFragmentEntry.asEventKey();
|
||||||
|
} else {
|
||||||
|
eventKey = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return entries;
|
||||||
|
}
|
||||||
|
|
||||||
export class TimelineReader {
|
export class TimelineReader {
|
||||||
constructor({roomId, storage, fragmentIdComparer}) {
|
constructor({roomId, storage, fragmentIdComparer}) {
|
||||||
this._roomId = roomId;
|
this._roomId = roomId;
|
||||||
|
@ -87,40 +133,7 @@ export class TimelineReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
async _readFrom(eventKey, direction, amount, r, txn) {
|
async _readFrom(eventKey, direction, amount, r, txn) {
|
||||||
let entries = [];
|
const entries = await readRawTimelineEntriesWithTxn(this._roomId, eventKey, direction, amount, this._fragmentIdComparer, txn);
|
||||||
const timelineStore = txn.timelineEvents;
|
|
||||||
const fragmentStore = txn.timelineFragments;
|
|
||||||
|
|
||||||
while (entries.length < amount && eventKey) {
|
|
||||||
let eventsWithinFragment;
|
|
||||||
if (direction.isForward) {
|
|
||||||
eventsWithinFragment = await timelineStore.eventsAfter(this._roomId, eventKey, amount);
|
|
||||||
} else {
|
|
||||||
eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount);
|
|
||||||
}
|
|
||||||
let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, this._fragmentIdComparer));
|
|
||||||
entries = directionalConcat(entries, eventEntries, direction);
|
|
||||||
// prepend or append eventsWithinFragment to entries, and wrap them in EventEntry
|
|
||||||
|
|
||||||
if (entries.length < amount) {
|
|
||||||
const fragment = await fragmentStore.get(this._roomId, eventKey.fragmentId);
|
|
||||||
// this._fragmentIdComparer.addFragment(fragment);
|
|
||||||
let fragmentEntry = new FragmentBoundaryEntry(fragment, direction.isBackward, this._fragmentIdComparer);
|
|
||||||
// append or prepend fragmentEntry, reuse func from GapWriter?
|
|
||||||
directionalAppend(entries, fragmentEntry, direction);
|
|
||||||
// only continue loading if the fragment boundary can't be backfilled
|
|
||||||
if (!fragmentEntry.token && fragmentEntry.hasLinkedFragment) {
|
|
||||||
const nextFragment = await fragmentStore.get(this._roomId, fragmentEntry.linkedFragmentId);
|
|
||||||
this._fragmentIdComparer.add(nextFragment);
|
|
||||||
const nextFragmentEntry = new FragmentBoundaryEntry(nextFragment, direction.isForward, this._fragmentIdComparer);
|
|
||||||
directionalAppend(entries, nextFragmentEntry, direction);
|
|
||||||
eventKey = nextFragmentEntry.asEventKey();
|
|
||||||
} else {
|
|
||||||
eventKey = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this._decryptEntries) {
|
if (this._decryptEntries) {
|
||||||
r.decryptRequest = this._decryptEntries(entries, txn);
|
r.decryptRequest = this._decryptEntries(entries, txn);
|
||||||
try {
|
try {
|
||||||
|
|
Reference in a new issue