forked from mystiq/hydrogen-web
also decrypt messages in the sync response that enabled encryption
like initial sync
This commit is contained in:
parent
241176d6fb
commit
a8392dc684
5 changed files with 111 additions and 192 deletions
|
@ -41,17 +41,14 @@ function timelineIsEmpty(roomResponse) {
|
|||
/**
|
||||
* Sync steps in js-pseudocode:
|
||||
* ```js
|
||||
* let preparation;
|
||||
* if (room.needsPrepareSync) {
|
||||
* // can only read some stores
|
||||
* preparation = await room.prepareSync(roomResponse, prepareTxn);
|
||||
* // can do async work that is not related to storage (such as decryption)
|
||||
* preparation = await room.afterPrepareSync(preparation);
|
||||
* }
|
||||
* // can only read some stores
|
||||
* const preparation = await room.prepareSync(roomResponse, membership, prepareTxn);
|
||||
* // can do async work that is not related to storage (such as decryption)
|
||||
* await room.afterPrepareSync(preparation);
|
||||
* // writes and calculates changes
|
||||
* const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn);
|
||||
* const changes = await room.writeSync(roomResponse, isInitialSync, preparation, syncTxn);
|
||||
* // applies and emits changes once syncTxn is committed
|
||||
* room.afterSync(changes);
|
||||
* room.afterSync(changes, preparation);
|
||||
* if (room.needsAfterSyncCompleted(changes)) {
|
||||
* // can do network requests
|
||||
* await room.afterSyncCompleted(changes);
|
||||
|
@ -173,14 +170,14 @@ export class Sync {
|
|||
const isInitialSync = !syncToken;
|
||||
syncToken = response.next_batch;
|
||||
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
|
||||
await this._prepareRooms(roomStates);
|
||||
await this._prepareRooms(roomStates, isInitialSync);
|
||||
let sessionChanges;
|
||||
const syncTxn = await this._openSyncTxn();
|
||||
try {
|
||||
await Promise.all(roomStates.map(async rs => {
|
||||
console.log(` * applying sync response to room ${rs.room.id} ...`);
|
||||
rs.changes = await rs.room.writeSync(
|
||||
rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn);
|
||||
rs.roomResponse, isInitialSync, rs.preparation, syncTxn);
|
||||
}));
|
||||
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
|
||||
} catch(err) {
|
||||
|
@ -219,16 +216,11 @@ export class Sync {
|
|||
}
|
||||
|
||||
async _prepareRooms(roomStates) {
|
||||
const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync);
|
||||
if (prepareRoomStates.length) {
|
||||
const prepareTxn = await this._openPrepareSyncTxn();
|
||||
await Promise.all(prepareRoomStates.map(async rs => {
|
||||
rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn);
|
||||
}));
|
||||
await Promise.all(prepareRoomStates.map(async rs => {
|
||||
rs.preparation = await rs.room.afterPrepareSync(rs.preparation);
|
||||
}));
|
||||
}
|
||||
const prepareTxn = await this._openPrepareSyncTxn();
|
||||
await Promise.all(roomStates.map(async rs => {
|
||||
rs.preparation = await rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn);
|
||||
}));
|
||||
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation)));
|
||||
}
|
||||
|
||||
async _openSyncTxn() {
|
||||
|
|
|
@ -37,7 +37,7 @@ export class Room extends EventEmitter {
|
|||
this._storage = storage;
|
||||
this._hsApi = hsApi;
|
||||
this._mediaRepository = mediaRepository;
|
||||
this._summary = new RoomSummary(roomId, user.id);
|
||||
this._summary = new RoomSummary(roomId);
|
||||
this._fragmentIdComparer = new FragmentIdComparer([]);
|
||||
this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer});
|
||||
this._emitCollectionChange = emitCollectionChange;
|
||||
|
@ -84,18 +84,17 @@ export class Room extends EventEmitter {
|
|||
// _decryptEntries entries and could even know which events have been decrypted for the first
|
||||
// time from DecryptionChanges.write and only pass those to the summary. As timeline changes
|
||||
// are not essential to the room summary, it's fine to write this in a separate txn for now.
|
||||
const changes = this._summary.processTimelineEntries(retryEntries, false, this._isTimelineOpen);
|
||||
if (changes) {
|
||||
this._summary.writeAndApplyChanges(changes, this._storage);
|
||||
const changes = this._summary.data.applyTimelineEntries(retryEntries, false, this._isTimelineOpen);
|
||||
if (await this._summary.writeAndApplyData(changes, this._storage)) {
|
||||
this._emitUpdate();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_enableEncryption(encryptionParams) {
|
||||
this._roomEncryption = this._createRoomEncryption(this, encryptionParams);
|
||||
if (this._roomEncryption) {
|
||||
_setEncryption(roomEncryption) {
|
||||
if (roomEncryption && !this._roomEncryption) {
|
||||
this._roomEncryption = roomEncryption;
|
||||
this._sendQueue.enableEncryption(this._roomEncryption);
|
||||
if (this._timeline) {
|
||||
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
|
||||
|
@ -141,57 +140,62 @@ export class Room extends EventEmitter {
|
|||
return request;
|
||||
}
|
||||
|
||||
get needsPrepareSync() {
|
||||
// only encrypted rooms need the prepare sync steps
|
||||
return !!this._roomEncryption;
|
||||
}
|
||||
async prepareSync(roomResponse, membership, txn) {
|
||||
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
||||
let roomEncryption = this._roomEncryption;
|
||||
// encryption is enabled in this sync
|
||||
if (!roomEncryption && summaryChanges.encryption) {
|
||||
roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption);
|
||||
}
|
||||
|
||||
async prepareSync(roomResponse, txn) {
|
||||
if (this._roomEncryption) {
|
||||
let decryptPreparation;
|
||||
if (roomEncryption) {
|
||||
const events = roomResponse?.timeline?.events;
|
||||
if (Array.isArray(events)) {
|
||||
const eventsToDecrypt = events.filter(event => {
|
||||
return event?.type === EVENT_ENCRYPTED_TYPE;
|
||||
});
|
||||
const preparation = await this._roomEncryption.prepareDecryptAll(
|
||||
decryptPreparation = await roomEncryption.prepareDecryptAll(
|
||||
eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn);
|
||||
return preparation;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
roomEncryption,
|
||||
summaryChanges,
|
||||
decryptPreparation,
|
||||
decryptChanges: null,
|
||||
};
|
||||
}
|
||||
|
||||
async afterPrepareSync(preparation) {
|
||||
if (preparation) {
|
||||
const decryptChanges = await preparation.decrypt();
|
||||
return decryptChanges;
|
||||
if (preparation.decryptPreparation) {
|
||||
preparation.decryptChanges = await preparation.decryptPreparation.decrypt();
|
||||
preparation.decryptPreparation = null;
|
||||
}
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) {
|
||||
let decryption;
|
||||
if (this._roomEncryption && decryptChanges) {
|
||||
decryption = await decryptChanges.write(txn);
|
||||
}
|
||||
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn) {
|
||||
const {entries, newLiveKey, memberChanges} =
|
||||
await this._syncWriter.writeSync(roomResponse, txn);
|
||||
if (decryption) {
|
||||
if (decryptChanges) {
|
||||
const decryption = await decryptChanges.write(txn);
|
||||
decryption.applyToEntries(entries);
|
||||
}
|
||||
// pass member changes to device tracker
|
||||
if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
|
||||
await this._roomEncryption.writeMemberChanges(memberChanges, txn);
|
||||
if (roomEncryption && this.isTrackingMembers && memberChanges?.size) {
|
||||
await roomEncryption.writeMemberChanges(memberChanges, txn);
|
||||
}
|
||||
const summaryChanges = this._summary.writeSync(
|
||||
roomResponse,
|
||||
entries,
|
||||
membership,
|
||||
isInitialSync, this._isTimelineOpen,
|
||||
txn);
|
||||
// also apply (decrypted) timeline entries to the summary changes
|
||||
summaryChanges = summaryChanges.applyTimelineEntries(
|
||||
entries, isInitialSync, this._isTimelineOpen, this._user.id);
|
||||
// write summary changes, and unset if nothing was actually changed
|
||||
summaryChanges = this._summary.writeData(summaryChanges, txn);
|
||||
// fetch new members while we have txn open,
|
||||
// but don't make any in-memory changes yet
|
||||
let heroChanges;
|
||||
if (summaryChanges && needsHeroes(summaryChanges)) {
|
||||
if (summaryChanges?.needsHeroes) {
|
||||
// room name disappeared, open heroes
|
||||
if (!this._heroes) {
|
||||
this._heroes = new Heroes(this._roomId);
|
||||
|
@ -204,6 +208,7 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
return {
|
||||
summaryChanges,
|
||||
roomEncryption,
|
||||
newTimelineEntries: entries,
|
||||
newLiveKey,
|
||||
removedPendingEvents,
|
||||
|
@ -217,11 +222,9 @@ export class Room extends EventEmitter {
|
|||
* Called with the changes returned from `writeSync` to apply them and emit changes.
|
||||
* No storage or network operations should be done here.
|
||||
*/
|
||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges}) {
|
||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}) {
|
||||
this._syncWriter.afterSync(newLiveKey);
|
||||
if (!this._summary.encryption && summaryChanges.encryption && !this._roomEncryption) {
|
||||
this._enableEncryption(summaryChanges.encryption);
|
||||
}
|
||||
this._setEncryption(roomEncryption);
|
||||
if (memberChanges.size) {
|
||||
if (this._changedMembersDuringSync) {
|
||||
for (const [userId, memberChange] of memberChanges.entries()) {
|
||||
|
@ -235,14 +238,14 @@ export class Room extends EventEmitter {
|
|||
let emitChange = false;
|
||||
if (summaryChanges) {
|
||||
this._summary.applyChanges(summaryChanges);
|
||||
if (!this._summary.needsHeroes) {
|
||||
if (!this._summary.data.needsHeroes) {
|
||||
this._heroes = null;
|
||||
}
|
||||
emitChange = true;
|
||||
}
|
||||
if (this._heroes && heroChanges) {
|
||||
const oldName = this.name;
|
||||
this._heroes.applyChanges(heroChanges, this._summary);
|
||||
this._heroes.applyChanges(heroChanges, this._summary.data);
|
||||
if (oldName !== this.name) {
|
||||
emitChange = true;
|
||||
}
|
||||
|
@ -294,14 +297,15 @@ export class Room extends EventEmitter {
|
|||
async load(summary, txn) {
|
||||
try {
|
||||
this._summary.load(summary);
|
||||
if (this._summary.encryption) {
|
||||
this._enableEncryption(this._summary.encryption);
|
||||
if (this._summary.data.encryption) {
|
||||
const roomEncryption = this._createRoomEncryption(this, this._summary.data.encryption);
|
||||
this._setEncryption(roomEncryption);
|
||||
}
|
||||
// need to load members for name?
|
||||
if (this._summary.needsHeroes) {
|
||||
if (this._summary.data.needsHeroes) {
|
||||
this._heroes = new Heroes(this._roomId);
|
||||
const changes = await this._heroes.calculateChanges(this._summary.heroes, [], txn);
|
||||
this._heroes.applyChanges(changes, this._summary);
|
||||
const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn);
|
||||
this._heroes.applyChanges(changes, this._summary.data);
|
||||
}
|
||||
return this._syncWriter.load(txn);
|
||||
} catch (err) {
|
||||
|
@ -397,7 +401,14 @@ export class Room extends EventEmitter {
|
|||
if (this._heroes) {
|
||||
return this._heroes.roomName;
|
||||
}
|
||||
return this._summary.name;
|
||||
const summaryData = this._summary.data;
|
||||
if (summaryData.name) {
|
||||
return summaryData.name;
|
||||
}
|
||||
if (summaryData.canonicalAlias) {
|
||||
return summaryData.canonicalAlias;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
|
@ -406,8 +417,8 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
|
||||
get avatarUrl() {
|
||||
if (this._summary.avatarUrl) {
|
||||
return this._summary.avatarUrl;
|
||||
if (this._summary.data.avatarUrl) {
|
||||
return this._summary.data.avatarUrl;
|
||||
} else if (this._heroes) {
|
||||
return this._heroes.roomAvatarUrl;
|
||||
}
|
||||
|
@ -415,28 +426,28 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
|
||||
get lastMessageTimestamp() {
|
||||
return this._summary.lastMessageTimestamp;
|
||||
return this._summary.data.lastMessageTimestamp;
|
||||
}
|
||||
|
||||
get isUnread() {
|
||||
return this._summary.isUnread;
|
||||
return this._summary.data.isUnread;
|
||||
}
|
||||
|
||||
get notificationCount() {
|
||||
return this._summary.notificationCount;
|
||||
return this._summary.data.notificationCount;
|
||||
}
|
||||
|
||||
get highlightCount() {
|
||||
return this._summary.highlightCount;
|
||||
return this._summary.data.highlightCount;
|
||||
}
|
||||
|
||||
get isLowPriority() {
|
||||
const tags = this._summary.tags;
|
||||
const tags = this._summary.data.tags;
|
||||
return !!(tags && tags['m.lowpriority']);
|
||||
}
|
||||
|
||||
get isEncrypted() {
|
||||
return !!this._summary.encryption;
|
||||
return !!this._summary.data.encryption;
|
||||
}
|
||||
|
||||
enableSessionBackup(sessionBackup) {
|
||||
|
@ -444,7 +455,7 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
|
||||
get isTrackingMembers() {
|
||||
return this._summary.isTrackingMembers;
|
||||
return this._summary.data.isTrackingMembers;
|
||||
}
|
||||
|
||||
async _getLastEventId() {
|
||||
|
|
|
@ -39,16 +39,17 @@ function applySyncResponse(data, roomResponse, membership) {
|
|||
if (roomResponse.account_data) {
|
||||
data = roomResponse.account_data.events.reduce(processRoomAccountData, data);
|
||||
}
|
||||
const stateEvents = roomResponse?.state?.events;
|
||||
// state comes before timeline
|
||||
if (roomResponse.state) {
|
||||
data = roomResponse.state.events.reduce(processStateEvent, data);
|
||||
if (Array.isArray(stateEvents)) {
|
||||
data = stateEvents.reduce(processStateEvent, data);
|
||||
}
|
||||
const {timeline} = roomResponse;
|
||||
const timelineEvents = roomResponse?.timeline?.events;
|
||||
// process state events in timeline
|
||||
// non-state events are handled by applyTimelineEntries
|
||||
// so decryption is handled properly
|
||||
if (timeline && Array.isArray(timeline.events)) {
|
||||
data = timeline.events.reduce((data, event) => {
|
||||
if (Array.isArray(timelineEvents)) {
|
||||
data = timelineEvents.reduce((data, event) => {
|
||||
if (typeof event.state_key === "string") {
|
||||
return processStateEvent(data, event);
|
||||
}
|
||||
|
@ -200,87 +201,27 @@ class SummaryData {
|
|||
const {cloned, ...serializedProps} = this;
|
||||
return serializedProps;
|
||||
}
|
||||
}
|
||||
|
||||
export function needsHeroes(data) {
|
||||
return !data.name && !data.canonicalAlias && data.heroes && data.heroes.length > 0;
|
||||
applyTimelineEntries(timelineEntries, isInitialSync, isTimelineOpen, ownUserId) {
|
||||
return applyTimelineEntries(this, timelineEntries, isInitialSync, isTimelineOpen, ownUserId);
|
||||
}
|
||||
|
||||
applySyncResponse(roomResponse, membership) {
|
||||
return applySyncResponse(this, roomResponse, membership);
|
||||
}
|
||||
|
||||
get needsHeroes() {
|
||||
return !this.name && !this.canonicalAlias && this.heroes && this.heroes.length > 0;
|
||||
}
|
||||
}
|
||||
|
||||
export class RoomSummary {
|
||||
constructor(roomId, ownUserId) {
|
||||
this._ownUserId = ownUserId;
|
||||
constructor(roomId) {
|
||||
this._data = new SummaryData(null, roomId);
|
||||
}
|
||||
|
||||
get name() {
|
||||
if (this._data.name) {
|
||||
return this._data.name;
|
||||
}
|
||||
if (this._data.canonicalAlias) {
|
||||
return this._data.canonicalAlias;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
get heroes() {
|
||||
return this._data.heroes;
|
||||
}
|
||||
|
||||
get encryption() {
|
||||
return this._data.encryption;
|
||||
}
|
||||
|
||||
// whether the room name should be determined with Heroes
|
||||
get needsHeroes() {
|
||||
return needsHeroes(this._data);
|
||||
}
|
||||
|
||||
get isUnread() {
|
||||
return this._data.isUnread;
|
||||
}
|
||||
|
||||
get notificationCount() {
|
||||
return this._data.notificationCount;
|
||||
}
|
||||
|
||||
get highlightCount() {
|
||||
return this._data.highlightCount;
|
||||
}
|
||||
|
||||
get lastMessage() {
|
||||
return this._data.lastMessageBody;
|
||||
}
|
||||
|
||||
get lastMessageTimestamp() {
|
||||
return this._data.lastMessageTimestamp;
|
||||
}
|
||||
|
||||
get inviteCount() {
|
||||
return this._data.inviteCount;
|
||||
}
|
||||
|
||||
get joinCount() {
|
||||
return this._data.joinCount;
|
||||
}
|
||||
|
||||
get avatarUrl() {
|
||||
return this._data.avatarUrl;
|
||||
}
|
||||
|
||||
get hasFetchedMembers() {
|
||||
return this._data.hasFetchedMembers;
|
||||
}
|
||||
|
||||
get isTrackingMembers() {
|
||||
return this._data.isTrackingMembers;
|
||||
}
|
||||
|
||||
get tags() {
|
||||
return this._data.tags;
|
||||
}
|
||||
|
||||
get lastDecryptedEventKey() {
|
||||
return this._data.lastDecryptedEventKey;
|
||||
get data() {
|
||||
return this._data;
|
||||
}
|
||||
|
||||
writeClearUnread(txn) {
|
||||
|
@ -306,45 +247,17 @@ export class RoomSummary {
|
|||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* after retrying decryption
|
||||
*/
|
||||
processTimelineEntries(timelineEntries, isInitialSync, isTimelineOpen) {
|
||||
// clear cloned flag, so cloneIfNeeded makes a copy and
|
||||
// this._data is not modified if any field is changed.
|
||||
this._data.cloned = false;
|
||||
const data = applyTimelineEntries(
|
||||
this._data,
|
||||
timelineEntries,
|
||||
isInitialSync, isTimelineOpen,
|
||||
this._ownUserId);
|
||||
if (data !== this._data) {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
writeSync(roomResponse, timelineEntries, membership, isInitialSync, isTimelineOpen, txn) {
|
||||
// clear cloned flag, so cloneIfNeeded makes a copy and
|
||||
// this._data is not modified if any field is changed.
|
||||
this._data.cloned = false;
|
||||
let data = applySyncResponse(this._data, roomResponse, membership);
|
||||
data = applyTimelineEntries(
|
||||
data,
|
||||
timelineEntries,
|
||||
isInitialSync, isTimelineOpen,
|
||||
this._ownUserId);
|
||||
writeData(data, txn) {
|
||||
if (data !== this._data) {
|
||||
txn.roomSummary.set(data.serialize());
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Only to be used with processTimelineEntries,
|
||||
* other methods like writeSync, writeHasFetchedMembers,
|
||||
* writeIsTrackingMembers, ... take a txn directly.
|
||||
*/
|
||||
async writeAndApplyChanges(data, storage) {
|
||||
async writeAndApplyData(data, storage) {
|
||||
if (data === this._data) {
|
||||
return;
|
||||
}
|
||||
const txn = await storage.readWriteTxn([
|
||||
storage.storeNames.roomSummary,
|
||||
]);
|
||||
|
@ -360,6 +273,9 @@ export class RoomSummary {
|
|||
|
||||
applyChanges(data) {
|
||||
this._data = data;
|
||||
// clear cloned flag, so cloneIfNeeded makes a copy and
|
||||
// this._data is not modified if any field is changed.
|
||||
this._data.cloned = false;
|
||||
}
|
||||
|
||||
async load(summary) {
|
||||
|
|
|
@ -16,8 +16,8 @@ limitations under the License.
|
|||
|
||||
import {RoomMember} from "./RoomMember.js";
|
||||
|
||||
function calculateRoomName(sortedMembers, summary) {
|
||||
const countWithoutMe = summary.joinCount + summary.inviteCount - 1;
|
||||
function calculateRoomName(sortedMembers, summaryData) {
|
||||
const countWithoutMe = summaryData.joinCount + summaryData.inviteCount - 1;
|
||||
if (sortedMembers.length >= countWithoutMe) {
|
||||
if (sortedMembers.length > 1) {
|
||||
const lastMember = sortedMembers[sortedMembers.length - 1];
|
||||
|
@ -74,7 +74,7 @@ export class Heroes {
|
|||
return {updatedHeroMembers: updatedHeroMembers.values(), removedUserIds};
|
||||
}
|
||||
|
||||
applyChanges({updatedHeroMembers, removedUserIds}, summary) {
|
||||
applyChanges({updatedHeroMembers, removedUserIds}, summaryData) {
|
||||
for (const userId of removedUserIds) {
|
||||
this._members.delete(userId);
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ export class Heroes {
|
|||
this._members.set(member.userId, member);
|
||||
}
|
||||
const sortedMembers = Array.from(this._members.values()).sort((a, b) => a.name.localeCompare(b.name));
|
||||
this._roomName = calculateRoomName(sortedMembers, summary);
|
||||
this._roomName = calculateRoomName(sortedMembers, summaryData);
|
||||
}
|
||||
|
||||
get roomName() {
|
||||
|
|
|
@ -82,7 +82,7 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan
|
|||
|
||||
export async function fetchOrLoadMembers(options) {
|
||||
const {summary} = options;
|
||||
if (!summary.hasFetchedMembers) {
|
||||
if (!summary.data.hasFetchedMembers) {
|
||||
return fetchMembers(options);
|
||||
} else {
|
||||
return loadMembers(options);
|
||||
|
|
Loading…
Reference in a new issue