forked from mystiq/hydrogen-web
Extract BaseRoom from Room with summary and timeline, not sync or send
Which we can then reuse to create a dedicated ArchivedRoom class which will: - have only relevant methods and properties (e.g. no sendEvent) - turns out that you can still receive a leave room in the sync (e.g. when banned after kick) so we'll make the sync for an archived room separate from room to not overcomplicate the sync there, much like we did for Invite already.
This commit is contained in:
parent
06868abdb2
commit
1216378783
3 changed files with 518 additions and 431 deletions
481
src/matrix/room/BaseRoom.js
Normal file
481
src/matrix/room/BaseRoom.js
Normal file
|
@ -0,0 +1,481 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {EventEmitter} from "../../utils/EventEmitter.js";
|
||||
import {RoomSummary} from "./RoomSummary.js";
|
||||
import {GapWriter} from "./timeline/persistence/GapWriter.js";
|
||||
import {Timeline} from "./timeline/Timeline.js";
|
||||
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
|
||||
import {WrappedError} from "../error.js"
|
||||
import {fetchOrLoadMembers} from "./members/load.js";
|
||||
import {MemberList} from "./members/MemberList.js";
|
||||
import {Heroes} from "./members/Heroes.js";
|
||||
import {EventEntry} from "./timeline/entries/EventEntry.js";
|
||||
import {ObservedEventMap} from "./ObservedEventMap.js";
|
||||
import {DecryptionSource} from "../e2ee/common.js";
|
||||
import {ensureLogItem} from "../../logging/utils.js";
|
||||
|
||||
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
||||
|
||||
export class BaseRoom extends EventEmitter {
|
||||
constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, user, createRoomEncryption, getSyncToken, platform}) {
|
||||
super();
|
||||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
this._hsApi = hsApi;
|
||||
this._mediaRepository = mediaRepository;
|
||||
this._summary = new RoomSummary(roomId);
|
||||
this._fragmentIdComparer = new FragmentIdComparer([]);
|
||||
this._emitCollectionChange = emitCollectionChange;
|
||||
this._timeline = null;
|
||||
this._user = user;
|
||||
this._changedMembersDuringSync = null;
|
||||
this._memberList = null;
|
||||
this._createRoomEncryption = createRoomEncryption;
|
||||
this._roomEncryption = null;
|
||||
this._getSyncToken = getSyncToken;
|
||||
this._platform = platform;
|
||||
this._observedEvents = null;
|
||||
}
|
||||
|
||||
async _eventIdsToEntries(eventIds, txn) {
|
||||
const retryEntries = [];
|
||||
await Promise.all(eventIds.map(async eventId => {
|
||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||
if (storageEntry) {
|
||||
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
|
||||
}
|
||||
}));
|
||||
return retryEntries;
|
||||
}
|
||||
|
||||
_getAdditionalTimelineRetryEntries(otherRetryEntries, roomKeys) {
|
||||
let retryTimelineEntries = this._roomEncryption.filterUndecryptedEventEntriesForKeys(this._timeline.remoteEntries, roomKeys);
|
||||
// filter out any entries already in retryEntries so we don't decrypt them twice
|
||||
const existingIds = otherRetryEntries.reduce((ids, e) => {ids.add(e.id); return ids;}, new Set());
|
||||
retryTimelineEntries = retryTimelineEntries.filter(e => !existingIds.has(e.id));
|
||||
return retryTimelineEntries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for retrying decryption from other sources than sync, like key backup.
|
||||
* @internal
|
||||
* @param {RoomKey} roomKey
|
||||
* @param {Array<string>} eventIds any event ids that should be retried. There might be more in the timeline though for this key.
|
||||
* @return {Promise}
|
||||
*/
|
||||
async notifyRoomKey(roomKey, eventIds, log) {
|
||||
if (!this._roomEncryption) {
|
||||
return;
|
||||
}
|
||||
const txn = await this._storage.readTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.inboundGroupSessions,
|
||||
]);
|
||||
let retryEntries = await this._eventIdsToEntries(eventIds, txn);
|
||||
if (this._timeline) {
|
||||
const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, [roomKey]);
|
||||
retryEntries = retryEntries.concat(retryTimelineEntries);
|
||||
}
|
||||
if (retryEntries.length) {
|
||||
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn, log);
|
||||
// this will close txn while awaiting decryption
|
||||
await decryptRequest.complete();
|
||||
|
||||
this._timeline?.replaceEntries(retryEntries);
|
||||
// we would ideally write the room summary in the same txn as the groupSessionDecryptions in the
|
||||
// _decryptEntries entries and could even know which events have been decrypted for the first
|
||||
// time from DecryptionChanges.write and only pass those to the summary. As timeline changes
|
||||
// are not essential to the room summary, it's fine to write this in a separate txn for now.
|
||||
const changes = this._summary.data.applyTimelineEntries(retryEntries, false, false);
|
||||
if (await this._summary.writeAndApplyData(changes, this._storage)) {
|
||||
this._emitUpdate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_setEncryption(roomEncryption) {
|
||||
if (roomEncryption && !this._roomEncryption) {
|
||||
this._roomEncryption = roomEncryption;
|
||||
if (this._timeline) {
|
||||
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for decrypting when loading/filling the timeline, and retrying decryption,
|
||||
* not during sync, where it is split up during the multiple phases.
|
||||
*/
|
||||
_decryptEntries(source, entries, inboundSessionTxn, log = null) {
|
||||
const request = new DecryptionRequest(async (r, log) => {
|
||||
if (!inboundSessionTxn) {
|
||||
inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
|
||||
}
|
||||
if (r.cancelled) return;
|
||||
const events = entries.filter(entry => {
|
||||
return entry.eventType === EVENT_ENCRYPTED_TYPE;
|
||||
}).map(entry => entry.event);
|
||||
r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, inboundSessionTxn);
|
||||
if (r.cancelled) return;
|
||||
const changes = await r.preparation.decrypt();
|
||||
r.preparation = null;
|
||||
if (r.cancelled) return;
|
||||
const stores = [this._storage.storeNames.groupSessionDecryptions];
|
||||
const isTimelineOpen = this._isTimelineOpen;
|
||||
if (isTimelineOpen) {
|
||||
// read to fetch devices if timeline is open
|
||||
stores.push(this._storage.storeNames.deviceIdentities);
|
||||
}
|
||||
const writeTxn = await this._storage.readWriteTxn(stores);
|
||||
let decryption;
|
||||
try {
|
||||
decryption = await changes.write(writeTxn, log);
|
||||
if (isTimelineOpen) {
|
||||
await decryption.verifySenders(writeTxn);
|
||||
}
|
||||
} catch (err) {
|
||||
writeTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await writeTxn.complete();
|
||||
// TODO: log decryption errors here
|
||||
decryption.applyToEntries(entries);
|
||||
if (this._observedEvents) {
|
||||
this._observedEvents.updateEvents(entries);
|
||||
}
|
||||
}, ensureLogItem(log));
|
||||
return request;
|
||||
}
|
||||
|
||||
async _getSyncRetryDecryptEntries(newKeys, roomEncryption, txn) {
|
||||
const entriesPerKey = await Promise.all(newKeys.map(async key => {
|
||||
const retryEventIds = await roomEncryption.getEventIdsForMissingKey(key, txn);
|
||||
if (retryEventIds) {
|
||||
return this._eventIdsToEntries(retryEventIds, txn);
|
||||
}
|
||||
}));
|
||||
let retryEntries = entriesPerKey.reduce((allEntries, entries) => entries ? allEntries.concat(entries) : allEntries, []);
|
||||
// If we have the timeline open, see if there are more entries for the new keys
|
||||
// as we only store missing session information for synced events, not backfilled.
|
||||
// We want to decrypt all events we can though if the user is looking
|
||||
// at them when the timeline is open
|
||||
if (this._timeline) {
|
||||
const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, newKeys);
|
||||
// make copies so we don't modify the original entry in writeSync, before the afterSync stage
|
||||
const retryTimelineEntriesCopies = retryTimelineEntries.map(e => e.clone());
|
||||
// add to other retry entries
|
||||
retryEntries = retryEntries.concat(retryTimelineEntriesCopies);
|
||||
}
|
||||
return retryEntries;
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async load(summary, txn, log) {
|
||||
log.set("id", this.id);
|
||||
try {
|
||||
// if called from sync, there is no summary yet
|
||||
if (summary) {
|
||||
this._summary.load(summary);
|
||||
}
|
||||
if (this._summary.data.encryption) {
|
||||
const roomEncryption = this._createRoomEncryption(this, this._summary.data.encryption);
|
||||
this._setEncryption(roomEncryption);
|
||||
}
|
||||
// need to load members for name?
|
||||
if (this._summary.data.needsHeroes) {
|
||||
this._heroes = new Heroes(this._roomId);
|
||||
const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn);
|
||||
this._heroes.applyChanges(changes, this._summary.data);
|
||||
}
|
||||
} catch (err) {
|
||||
throw new WrappedError(`Could not load room ${this._roomId}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
/** @public */
|
||||
async loadMemberList(log = null) {
|
||||
if (this._memberList) {
|
||||
// TODO: also await fetchOrLoadMembers promise here
|
||||
this._memberList.retain();
|
||||
return this._memberList;
|
||||
} else {
|
||||
const members = await fetchOrLoadMembers({
|
||||
summary: this._summary,
|
||||
roomId: this._roomId,
|
||||
hsApi: this._hsApi,
|
||||
storage: this._storage,
|
||||
syncToken: this._getSyncToken(),
|
||||
// to handle race between /members and /sync
|
||||
setChangedMembersMap: map => this._changedMembersDuringSync = map,
|
||||
log,
|
||||
}, this._platform.logger);
|
||||
this._memberList = new MemberList({
|
||||
members,
|
||||
closeCallback: () => { this._memberList = null; }
|
||||
});
|
||||
return this._memberList;
|
||||
}
|
||||
}
|
||||
|
||||
/** @public */
|
||||
fillGap(fragmentEntry, amount, log = null) {
|
||||
// TODO move some/all of this out of BaseRoom
|
||||
return this._platform.logger.wrapOrRun(log, "fillGap", async log => {
|
||||
log.set("id", this.id);
|
||||
log.set("fragment", fragmentEntry.fragmentId);
|
||||
log.set("dir", fragmentEntry.direction.asApiString());
|
||||
if (fragmentEntry.edgeReached) {
|
||||
log.set("edgeReached", true);
|
||||
return;
|
||||
}
|
||||
const response = await this._hsApi.messages(this._roomId, {
|
||||
from: fragmentEntry.token,
|
||||
dir: fragmentEntry.direction.asApiString(),
|
||||
limit: amount,
|
||||
filter: {
|
||||
lazy_load_members: true,
|
||||
include_redundant_members: true,
|
||||
}
|
||||
}, {log}).response();
|
||||
|
||||
const txn = await this._storage.readWriteTxn([
|
||||
this._storage.storeNames.pendingEvents,
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
]);
|
||||
let extraGapFillChanges;
|
||||
let gapResult;
|
||||
try {
|
||||
// detect remote echos of pending messages in the gap
|
||||
extraGapFillChanges = this._writeGapFill(response.chunk, txn, log);
|
||||
// write new events into gap
|
||||
const gapWriter = new GapWriter({
|
||||
roomId: this._roomId,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
});
|
||||
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
if (this._roomEncryption) {
|
||||
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log);
|
||||
await decryptRequest.complete();
|
||||
}
|
||||
// once txn is committed, update in-memory state & emit events
|
||||
for (const fragment of gapResult.fragments) {
|
||||
this._fragmentIdComparer.add(fragment);
|
||||
}
|
||||
if (extraGapFillChanges) {
|
||||
this._applyGapFill(extraGapFillChanges);
|
||||
}
|
||||
if (this._timeline) {
|
||||
this._timeline.addOrReplaceEntries(gapResult.entries);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
allow sub classes to integrate in the gap fill lifecycle.
|
||||
JoinedRoom uses this update remote echos.
|
||||
*/
|
||||
_writeGapFill(chunk, txn, log) {}
|
||||
_applyGapFill() {}
|
||||
|
||||
/** @public */
|
||||
get name() {
|
||||
if (this._heroes) {
|
||||
return this._heroes.roomName;
|
||||
}
|
||||
const summaryData = this._summary.data;
|
||||
if (summaryData.name) {
|
||||
return summaryData.name;
|
||||
}
|
||||
if (summaryData.canonicalAlias) {
|
||||
return summaryData.canonicalAlias;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
get id() {
|
||||
return this._roomId;
|
||||
}
|
||||
|
||||
get avatarUrl() {
|
||||
if (this._summary.data.avatarUrl) {
|
||||
return this._summary.data.avatarUrl;
|
||||
} else if (this._heroes) {
|
||||
return this._heroes.roomAvatarUrl;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
get lastMessageTimestamp() {
|
||||
return this._summary.data.lastMessageTimestamp;
|
||||
}
|
||||
|
||||
get isLowPriority() {
|
||||
const tags = this._summary.data.tags;
|
||||
return !!(tags && tags['m.lowpriority']);
|
||||
}
|
||||
|
||||
get isEncrypted() {
|
||||
return !!this._summary.data.encryption;
|
||||
}
|
||||
|
||||
get isJoined() {
|
||||
return this.membership === "join";
|
||||
}
|
||||
|
||||
get isLeft() {
|
||||
return this.membership === "leave";
|
||||
}
|
||||
|
||||
get mediaRepository() {
|
||||
return this._mediaRepository;
|
||||
}
|
||||
|
||||
get membership() {
|
||||
return this._summary.data.membership;
|
||||
}
|
||||
|
||||
enableSessionBackup(sessionBackup) {
|
||||
this._roomEncryption?.enableSessionBackup(sessionBackup);
|
||||
// TODO: do we really want to do this every time you open the app?
|
||||
if (this._timeline) {
|
||||
this._platform.logger.run("enableSessionBackup", log => {
|
||||
return this._roomEncryption.restoreMissingSessionsFromBackup(this._timeline.remoteEntries, log);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
get _isTimelineOpen() {
|
||||
return !!this._timeline;
|
||||
}
|
||||
|
||||
_emitUpdate() {
|
||||
// once for event emitter listeners
|
||||
this.emit("change");
|
||||
// and once for collection listeners
|
||||
this._emitCollectionChange(this);
|
||||
}
|
||||
|
||||
/** @public */
|
||||
openTimeline(log = null) {
|
||||
return this._platform.logger.wrapOrRun(log, "open timeline", async log => {
|
||||
log.set("id", this.id);
|
||||
if (this._timeline) {
|
||||
throw new Error("not dealing with load race here for now");
|
||||
}
|
||||
this._timeline = new Timeline({
|
||||
roomId: this.id,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
pendingEvents: this._getPendingEvents(),
|
||||
closeCallback: () => {
|
||||
this._timeline = null;
|
||||
if (this._roomEncryption) {
|
||||
this._roomEncryption.notifyTimelineClosed();
|
||||
}
|
||||
},
|
||||
clock: this._platform.clock,
|
||||
logger: this._platform.logger,
|
||||
});
|
||||
if (this._roomEncryption) {
|
||||
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
|
||||
}
|
||||
await this._timeline.load(this._user, this.membership, log);
|
||||
return this._timeline;
|
||||
});
|
||||
}
|
||||
|
||||
/* allow subclasses to provide an observable list with pending events when opening the timeline */
|
||||
_getPendingEvents() { return null; }
|
||||
|
||||
observeEvent(eventId) {
|
||||
if (!this._observedEvents) {
|
||||
this._observedEvents = new ObservedEventMap(() => {
|
||||
this._observedEvents = null;
|
||||
});
|
||||
}
|
||||
let entry = null;
|
||||
if (this._timeline) {
|
||||
entry = this._timeline.getByEventId(eventId);
|
||||
}
|
||||
const observable = this._observedEvents.observe(eventId, entry);
|
||||
if (!entry) {
|
||||
// update in the background
|
||||
this._readEventById(eventId).then(entry => {
|
||||
observable.update(entry);
|
||||
}).catch(err => {
|
||||
console.warn(`could not load event ${eventId} from storage`, err);
|
||||
});
|
||||
}
|
||||
return observable;
|
||||
}
|
||||
|
||||
async _readEventById(eventId) {
|
||||
let stores = [this._storage.storeNames.timelineEvents];
|
||||
if (this.isEncrypted) {
|
||||
stores.push(this._storage.storeNames.inboundGroupSessions);
|
||||
}
|
||||
const txn = await this._storage.readTxn(stores);
|
||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||
if (storageEntry) {
|
||||
const entry = new EventEntry(storageEntry, this._fragmentIdComparer);
|
||||
if (entry.eventType === EVENT_ENCRYPTED_TYPE) {
|
||||
const request = this._decryptEntries(DecryptionSource.Timeline, [entry], txn);
|
||||
await request.complete();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
dispose() {
|
||||
this._roomEncryption?.dispose();
|
||||
this._timeline?.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
class DecryptionRequest {
|
||||
constructor(decryptFn, log) {
|
||||
this._cancelled = false;
|
||||
this.preparation = null;
|
||||
this._promise = log.wrap("decryptEntries", log => decryptFn(this, log));
|
||||
}
|
||||
|
||||
complete() {
|
||||
return this._promise;
|
||||
}
|
||||
|
||||
get cancelled() {
|
||||
return this._cancelled;
|
||||
}
|
||||
|
||||
dispose() {
|
||||
this._cancelled = true;
|
||||
if (this.preparation) {
|
||||
this.preparation.dispose();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -14,179 +14,30 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {EventEmitter} from "../../utils/EventEmitter.js";
|
||||
import {RoomSummary} from "./RoomSummary.js";
|
||||
import {BaseRoom} from "./BaseRoom.js";
|
||||
import {SyncWriter} from "./timeline/persistence/SyncWriter.js";
|
||||
import {GapWriter} from "./timeline/persistence/GapWriter.js";
|
||||
import {Timeline} from "./timeline/Timeline.js";
|
||||
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
|
||||
import {SendQueue} from "./sending/SendQueue.js";
|
||||
import {WrappedError} from "../error.js"
|
||||
import {fetchOrLoadMembers} from "./members/load.js";
|
||||
import {MemberList} from "./members/MemberList.js";
|
||||
import {Heroes} from "./members/Heroes.js";
|
||||
import {EventEntry} from "./timeline/entries/EventEntry.js";
|
||||
import {ObservedEventMap} from "./ObservedEventMap.js";
|
||||
import {AttachmentUpload} from "./AttachmentUpload.js";
|
||||
import {DecryptionSource} from "../e2ee/common.js";
|
||||
import {ensureLogItem} from "../../logging/utils.js";
|
||||
|
||||
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
||||
|
||||
export class Room extends EventEmitter {
|
||||
constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, platform}) {
|
||||
super();
|
||||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
this._hsApi = hsApi;
|
||||
this._mediaRepository = mediaRepository;
|
||||
this._summary = new RoomSummary(roomId);
|
||||
this._fragmentIdComparer = new FragmentIdComparer([]);
|
||||
this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer});
|
||||
this._emitCollectionChange = emitCollectionChange;
|
||||
this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents});
|
||||
this._timeline = null;
|
||||
this._user = user;
|
||||
this._changedMembersDuringSync = null;
|
||||
this._memberList = null;
|
||||
this._createRoomEncryption = createRoomEncryption;
|
||||
this._roomEncryption = null;
|
||||
this._getSyncToken = getSyncToken;
|
||||
this._platform = platform;
|
||||
this._observedEvents = null;
|
||||
}
|
||||
|
||||
async _eventIdsToEntries(eventIds, txn) {
|
||||
const retryEntries = [];
|
||||
await Promise.all(eventIds.map(async eventId => {
|
||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||
if (storageEntry) {
|
||||
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
|
||||
}
|
||||
}));
|
||||
return retryEntries;
|
||||
}
|
||||
|
||||
_getAdditionalTimelineRetryEntries(otherRetryEntries, roomKeys) {
|
||||
let retryTimelineEntries = this._roomEncryption.filterUndecryptedEventEntriesForKeys(this._timeline.remoteEntries, roomKeys);
|
||||
// filter out any entries already in retryEntries so we don't decrypt them twice
|
||||
const existingIds = otherRetryEntries.reduce((ids, e) => {ids.add(e.id); return ids;}, new Set());
|
||||
retryTimelineEntries = retryTimelineEntries.filter(e => !existingIds.has(e.id));
|
||||
return retryTimelineEntries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for retrying decryption from other sources than sync, like key backup.
|
||||
* @internal
|
||||
* @param {RoomKey} roomKey
|
||||
* @param {Array<string>} eventIds any event ids that should be retried. There might be more in the timeline though for this key.
|
||||
* @return {Promise}
|
||||
*/
|
||||
async notifyRoomKey(roomKey, eventIds, log) {
|
||||
if (!this._roomEncryption) {
|
||||
return;
|
||||
}
|
||||
const txn = await this._storage.readTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.inboundGroupSessions,
|
||||
]);
|
||||
let retryEntries = await this._eventIdsToEntries(eventIds, txn);
|
||||
if (this._timeline) {
|
||||
const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, [roomKey]);
|
||||
retryEntries = retryEntries.concat(retryTimelineEntries);
|
||||
}
|
||||
if (retryEntries.length) {
|
||||
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn, log);
|
||||
// this will close txn while awaiting decryption
|
||||
await decryptRequest.complete();
|
||||
|
||||
this._timeline?.replaceEntries(retryEntries);
|
||||
// we would ideally write the room summary in the same txn as the groupSessionDecryptions in the
|
||||
// _decryptEntries entries and could even know which events have been decrypted for the first
|
||||
// time from DecryptionChanges.write and only pass those to the summary. As timeline changes
|
||||
// are not essential to the room summary, it's fine to write this in a separate txn for now.
|
||||
const changes = this._summary.data.applyTimelineEntries(retryEntries, false, false);
|
||||
if (await this._summary.writeAndApplyData(changes, this._storage)) {
|
||||
this._emitUpdate();
|
||||
}
|
||||
}
|
||||
export class Room extends BaseRoom {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
const {pendingEvents} = options;
|
||||
this._syncWriter = new SyncWriter({roomId: this.id, fragmentIdComparer: this._fragmentIdComparer});
|
||||
this._sendQueue = new SendQueue({roomId: this.id, storage: this._storage, hsApi: this._hsApi, pendingEvents});
|
||||
}
|
||||
|
||||
_setEncryption(roomEncryption) {
|
||||
if (roomEncryption && !this._roomEncryption) {
|
||||
this._roomEncryption = roomEncryption;
|
||||
if (super._setEncryption(roomEncryption)) {
|
||||
this._sendQueue.enableEncryption(this._roomEncryption);
|
||||
if (this._timeline) {
|
||||
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for decrypting when loading/filling the timeline, and retrying decryption,
|
||||
* not during sync, where it is split up during the multiple phases.
|
||||
*/
|
||||
_decryptEntries(source, entries, inboundSessionTxn, log = null) {
|
||||
const request = new DecryptionRequest(async (r, log) => {
|
||||
if (!inboundSessionTxn) {
|
||||
inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
|
||||
}
|
||||
if (r.cancelled) return;
|
||||
const events = entries.filter(entry => {
|
||||
return entry.eventType === EVENT_ENCRYPTED_TYPE;
|
||||
}).map(entry => entry.event);
|
||||
r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, inboundSessionTxn);
|
||||
if (r.cancelled) return;
|
||||
const changes = await r.preparation.decrypt();
|
||||
r.preparation = null;
|
||||
if (r.cancelled) return;
|
||||
const stores = [this._storage.storeNames.groupSessionDecryptions];
|
||||
const isTimelineOpen = this._isTimelineOpen;
|
||||
if (isTimelineOpen) {
|
||||
// read to fetch devices if timeline is open
|
||||
stores.push(this._storage.storeNames.deviceIdentities);
|
||||
}
|
||||
const writeTxn = await this._storage.readWriteTxn(stores);
|
||||
let decryption;
|
||||
try {
|
||||
decryption = await changes.write(writeTxn, log);
|
||||
if (isTimelineOpen) {
|
||||
await decryption.verifySenders(writeTxn);
|
||||
}
|
||||
} catch (err) {
|
||||
writeTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await writeTxn.complete();
|
||||
// TODO: log decryption errors here
|
||||
decryption.applyToEntries(entries);
|
||||
if (this._observedEvents) {
|
||||
this._observedEvents.updateEvents(entries);
|
||||
}
|
||||
}, ensureLogItem(log));
|
||||
return request;
|
||||
}
|
||||
|
||||
async _getSyncRetryDecryptEntries(newKeys, roomEncryption, txn) {
|
||||
const entriesPerKey = await Promise.all(newKeys.map(async key => {
|
||||
const retryEventIds = await roomEncryption.getEventIdsForMissingKey(key, txn);
|
||||
if (retryEventIds) {
|
||||
return this._eventIdsToEntries(retryEventIds, txn);
|
||||
}
|
||||
}));
|
||||
let retryEntries = entriesPerKey.reduce((allEntries, entries) => entries ? allEntries.concat(entries) : allEntries, []);
|
||||
// If we have the timeline open, see if there are more entries for the new keys
|
||||
// as we only store missing session information for synced events, not backfilled.
|
||||
// We want to decrypt all events we can though if the user is looking
|
||||
// at them when the timeline is open
|
||||
if (this._timeline) {
|
||||
const retryTimelineEntries = this._getAdditionalTimelineRetryEntries(retryEntries, newKeys);
|
||||
// make copies so we don't modify the original entry in writeSync, before the afterSync stage
|
||||
const retryTimelineEntriesCopies = retryTimelineEntries.map(e => e.clone());
|
||||
// add to other retry entries
|
||||
retryEntries = retryEntries.concat(retryTimelineEntriesCopies);
|
||||
}
|
||||
return retryEntries;
|
||||
return false;
|
||||
}
|
||||
|
||||
async prepareSync(roomResponse, membership, invite, newKeys, txn, log) {
|
||||
|
@ -254,6 +105,7 @@ export class Room extends EventEmitter {
|
|||
// so no old state sticks around
|
||||
txn.roomState.removeAllForRoom(this.id);
|
||||
txn.roomMembers.removeAllForRoom(this.id);
|
||||
// TODO: this should be done in ArchivedRoom
|
||||
txn.archivedRoomSummary.remove(this.id);
|
||||
}
|
||||
const {entries: newEntries, newLiveKey, memberChanges} =
|
||||
|
@ -422,31 +274,23 @@ export class Room extends EventEmitter {
|
|||
|
||||
/** @package */
|
||||
async load(summary, txn, log) {
|
||||
log.set("id", this.id);
|
||||
try {
|
||||
// if called from sync, there is no summary yet
|
||||
if (summary) {
|
||||
this._summary.load(summary);
|
||||
}
|
||||
if (this._summary.data.encryption) {
|
||||
const roomEncryption = this._createRoomEncryption(this, this._summary.data.encryption);
|
||||
this._setEncryption(roomEncryption);
|
||||
}
|
||||
// need to load members for name?
|
||||
if (this._summary.data.needsHeroes) {
|
||||
this._heroes = new Heroes(this._roomId);
|
||||
const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn);
|
||||
this._heroes.applyChanges(changes, this._summary.data);
|
||||
}
|
||||
// don't load sync writer for archived room
|
||||
if (this.membership !== "leave") {
|
||||
return this._syncWriter.load(txn, log);
|
||||
}
|
||||
super.load(summary, txn, log);
|
||||
this._syncWriter.load(txn, log);
|
||||
} catch (err) {
|
||||
throw new WrappedError(`Could not load room ${this._roomId}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
_writeGapFill(gapChunk, txn, log) {
|
||||
const removedPendingEvents = this._sendQueue.removeRemoteEchos(gapChunk, txn, log);
|
||||
return removedPendingEvents;
|
||||
}
|
||||
|
||||
_applyGapFill(removedPendingEvents) {
|
||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||
}
|
||||
|
||||
/** @public */
|
||||
sendEvent(eventType, content, attachments, log = null) {
|
||||
this._platform.logger.wrapOrRun(log, "send", log => {
|
||||
|
@ -466,124 +310,6 @@ export class Room extends EventEmitter {
|
|||
});
|
||||
}
|
||||
|
||||
/** @public */
|
||||
async loadMemberList(log = null) {
|
||||
if (this._memberList) {
|
||||
// TODO: also await fetchOrLoadMembers promise here
|
||||
this._memberList.retain();
|
||||
return this._memberList;
|
||||
} else {
|
||||
const members = await fetchOrLoadMembers({
|
||||
summary: this._summary,
|
||||
roomId: this._roomId,
|
||||
hsApi: this._hsApi,
|
||||
storage: this._storage,
|
||||
syncToken: this._getSyncToken(),
|
||||
// to handle race between /members and /sync
|
||||
setChangedMembersMap: map => this._changedMembersDuringSync = map,
|
||||
log,
|
||||
}, this._platform.logger);
|
||||
this._memberList = new MemberList({
|
||||
members,
|
||||
closeCallback: () => { this._memberList = null; }
|
||||
});
|
||||
return this._memberList;
|
||||
}
|
||||
}
|
||||
|
||||
/** @public */
|
||||
fillGap(fragmentEntry, amount, log = null) {
|
||||
// TODO move some/all of this out of Room
|
||||
return this._platform.logger.wrapOrRun(log, "fillGap", async log => {
|
||||
log.set("id", this.id);
|
||||
log.set("fragment", fragmentEntry.fragmentId);
|
||||
log.set("dir", fragmentEntry.direction.asApiString());
|
||||
if (fragmentEntry.edgeReached) {
|
||||
log.set("edgeReached", true);
|
||||
return;
|
||||
}
|
||||
const response = await this._hsApi.messages(this._roomId, {
|
||||
from: fragmentEntry.token,
|
||||
dir: fragmentEntry.direction.asApiString(),
|
||||
limit: amount,
|
||||
filter: {
|
||||
lazy_load_members: true,
|
||||
include_redundant_members: true,
|
||||
}
|
||||
}, {log}).response();
|
||||
|
||||
const txn = await this._storage.readWriteTxn([
|
||||
this._storage.storeNames.pendingEvents,
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
]);
|
||||
let removedPendingEvents;
|
||||
let gapResult;
|
||||
try {
|
||||
// detect remote echos of pending messages in the gap
|
||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn, log);
|
||||
// write new events into gap
|
||||
const gapWriter = new GapWriter({
|
||||
roomId: this._roomId,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
});
|
||||
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn, log);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
if (this._roomEncryption) {
|
||||
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries, null, log);
|
||||
await decryptRequest.complete();
|
||||
}
|
||||
// once txn is committed, update in-memory state & emit events
|
||||
for (const fragment of gapResult.fragments) {
|
||||
this._fragmentIdComparer.add(fragment);
|
||||
}
|
||||
if (removedPendingEvents) {
|
||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||
}
|
||||
if (this._timeline) {
|
||||
this._timeline.addOrReplaceEntries(gapResult.entries);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** @public */
|
||||
get name() {
|
||||
if (this._heroes) {
|
||||
return this._heroes.roomName;
|
||||
}
|
||||
const summaryData = this._summary.data;
|
||||
if (summaryData.name) {
|
||||
return summaryData.name;
|
||||
}
|
||||
if (summaryData.canonicalAlias) {
|
||||
return summaryData.canonicalAlias;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
get id() {
|
||||
return this._roomId;
|
||||
}
|
||||
|
||||
get avatarUrl() {
|
||||
if (this._summary.data.avatarUrl) {
|
||||
return this._summary.data.avatarUrl;
|
||||
} else if (this._heroes) {
|
||||
return this._heroes.roomAvatarUrl;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
get lastMessageTimestamp() {
|
||||
return this._summary.data.lastMessageTimestamp;
|
||||
}
|
||||
|
||||
get isUnread() {
|
||||
return this._summary.data.isUnread;
|
||||
}
|
||||
|
@ -596,29 +322,6 @@ export class Room extends EventEmitter {
|
|||
return this._summary.data.highlightCount;
|
||||
}
|
||||
|
||||
get isLowPriority() {
|
||||
const tags = this._summary.data.tags;
|
||||
return !!(tags && tags['m.lowpriority']);
|
||||
}
|
||||
|
||||
get isEncrypted() {
|
||||
return !!this._summary.data.encryption;
|
||||
}
|
||||
|
||||
get membership() {
|
||||
return this._summary.data.membership;
|
||||
}
|
||||
|
||||
enableSessionBackup(sessionBackup) {
|
||||
this._roomEncryption?.enableSessionBackup(sessionBackup);
|
||||
// TODO: do we really want to do this every time you open the app?
|
||||
if (this._timeline) {
|
||||
this._platform.logger.run("enableSessionBackup", log => {
|
||||
return this._roomEncryption.restoreMissingSessionsFromBackup(this._timeline.remoteEntries, log);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
get isTrackingMembers() {
|
||||
return this._summary.data.isTrackingMembers;
|
||||
}
|
||||
|
@ -634,17 +337,6 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
get _isTimelineOpen() {
|
||||
return !!this._timeline;
|
||||
}
|
||||
|
||||
_emitUpdate() {
|
||||
// once for event emitter listeners
|
||||
this.emit("change");
|
||||
// and once for collection listeners
|
||||
this._emitCollectionChange(this);
|
||||
}
|
||||
|
||||
async clearUnread(log = null) {
|
||||
if (this.isUnread || this.notificationCount) {
|
||||
return await this._platform.logger.wrapOrRun(log, "clearUnread", async log => {
|
||||
|
@ -678,37 +370,9 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
/** @public */
|
||||
openTimeline(log = null) {
|
||||
return this._platform.logger.wrapOrRun(log, "open timeline", async log => {
|
||||
log.set("id", this.id);
|
||||
if (this._timeline) {
|
||||
throw new Error("not dealing with load race here for now");
|
||||
}
|
||||
this._timeline = new Timeline({
|
||||
roomId: this.id,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
pendingEvents: this._sendQueue.pendingEvents,
|
||||
closeCallback: () => {
|
||||
this._timeline = null;
|
||||
if (this._roomEncryption) {
|
||||
this._roomEncryption.notifyTimelineClosed();
|
||||
}
|
||||
},
|
||||
clock: this._platform.clock,
|
||||
logger: this._platform.logger,
|
||||
});
|
||||
if (this._roomEncryption) {
|
||||
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
|
||||
}
|
||||
await this._timeline.load(this._user, this.membership, log);
|
||||
return this._timeline;
|
||||
});
|
||||
}
|
||||
|
||||
get mediaRepository() {
|
||||
return this._mediaRepository;
|
||||
/* called by BaseRoom to pass pendingEvents when opening the timeline */
|
||||
_getPendingEvents() {
|
||||
return this._sendQueue.pendingEvents;
|
||||
}
|
||||
|
||||
/** @package */
|
||||
|
@ -721,75 +385,12 @@ export class Room extends EventEmitter {
|
|||
this._summary.applyChanges(changes);
|
||||
}
|
||||
|
||||
observeEvent(eventId) {
|
||||
if (!this._observedEvents) {
|
||||
this._observedEvents = new ObservedEventMap(() => {
|
||||
this._observedEvents = null;
|
||||
});
|
||||
}
|
||||
let entry = null;
|
||||
if (this._timeline) {
|
||||
entry = this._timeline.getByEventId(eventId);
|
||||
}
|
||||
const observable = this._observedEvents.observe(eventId, entry);
|
||||
if (!entry) {
|
||||
// update in the background
|
||||
this._readEventById(eventId).then(entry => {
|
||||
observable.update(entry);
|
||||
}).catch(err => {
|
||||
console.warn(`could not load event ${eventId} from storage`, err);
|
||||
});
|
||||
}
|
||||
return observable;
|
||||
}
|
||||
|
||||
async _readEventById(eventId) {
|
||||
let stores = [this._storage.storeNames.timelineEvents];
|
||||
if (this.isEncrypted) {
|
||||
stores.push(this._storage.storeNames.inboundGroupSessions);
|
||||
}
|
||||
const txn = await this._storage.readTxn(stores);
|
||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||
if (storageEntry) {
|
||||
const entry = new EventEntry(storageEntry, this._fragmentIdComparer);
|
||||
if (entry.eventType === EVENT_ENCRYPTED_TYPE) {
|
||||
const request = this._decryptEntries(DecryptionSource.Timeline, [entry], txn);
|
||||
await request.complete();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
createAttachment(blob, filename) {
|
||||
return new AttachmentUpload({blob, filename, platform: this._platform});
|
||||
}
|
||||
|
||||
dispose() {
|
||||
this._roomEncryption?.dispose();
|
||||
this._timeline?.dispose();
|
||||
super.dispose();
|
||||
this._sendQueue.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
class DecryptionRequest {
|
||||
constructor(decryptFn, log) {
|
||||
this._cancelled = false;
|
||||
this.preparation = null;
|
||||
this._promise = log.wrap("decryptEntries", log => decryptFn(this, log));
|
||||
}
|
||||
|
||||
complete() {
|
||||
return this._promise;
|
||||
}
|
||||
|
||||
get cancelled() {
|
||||
return this._cancelled;
|
||||
}
|
||||
|
||||
dispose() {
|
||||
this._cancelled = true;
|
||||
if (this.preparation) {
|
||||
this.preparation.dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js";
|
||||
import {SortedArray, MappedList, ConcatList, ObservableArray} from "../../../observable/index.js";
|
||||
import {Disposables} from "../../../utils/Disposables.js";
|
||||
import {Direction} from "./Direction.js";
|
||||
import {TimelineReader} from "./persistence/TimelineReader.js";
|
||||
|
@ -36,11 +36,16 @@ export class Timeline {
|
|||
fragmentIdComparer: this._fragmentIdComparer
|
||||
});
|
||||
this._readerRequest = null;
|
||||
const localEntries = new MappedList(pendingEvents, pe => {
|
||||
let localEntries;
|
||||
if (pendingEvents) {
|
||||
localEntries = new MappedList(pendingEvents, pe => {
|
||||
return new PendingEventEntry({pendingEvent: pe, member: this._ownMember, clock});
|
||||
}, (pee, params) => {
|
||||
pee.notifyUpdate(params);
|
||||
});
|
||||
} else {
|
||||
localEntries = new ObservableArray();
|
||||
}
|
||||
this._allEntries = new ConcatList(this._remoteEntries, localEntries);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue