From 7c1f9dbed02c34a64a03cdedb76449925e5ab9b6 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Wed, 9 Sep 2020 16:28:43 +0200 Subject: [PATCH 1/9] split up megolm decryption so it can happen in multiple steps,see README --- src/matrix/e2ee/megolm/Decryption.js | 187 +++++++----------- .../megolm/decryption/DecryptionChanges.js | 78 ++++++++ .../decryption/DecryptionPreparation.js | 52 +++++ src/matrix/e2ee/megolm/decryption/README.md | 6 + .../megolm/decryption/ReplayDetectionEntry.js | 24 +++ .../e2ee/megolm/decryption/SessionCache.js | 68 +++++++ .../megolm/decryption/SessionDecryption.js | 75 +++++++ .../e2ee/megolm/decryption/SessionInfo.js | 44 +++++ src/utils/mergeMap.js | 41 ++++ 9 files changed, 455 insertions(+), 120 deletions(-) create mode 100644 src/matrix/e2ee/megolm/decryption/DecryptionChanges.js create mode 100644 src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js create mode 100644 src/matrix/e2ee/megolm/decryption/README.md create mode 100644 src/matrix/e2ee/megolm/decryption/ReplayDetectionEntry.js create mode 100644 src/matrix/e2ee/megolm/decryption/SessionCache.js create mode 100644 src/matrix/e2ee/megolm/decryption/SessionDecryption.js create mode 100644 src/matrix/e2ee/megolm/decryption/SessionInfo.js create mode 100644 src/utils/mergeMap.js diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index bd3665b3..d4352926 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -15,102 +15,101 @@ limitations under the License. */ import {DecryptionError} from "../common.js"; -import {DecryptionResult} from "../DecryptionResult.js"; +import {groupBy} from "../../../utils/groupBy.js"; -const CACHE_MAX_SIZE = 10; +import {SessionInfo} from "./decryption/SessionInfo.js"; +import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js"; +import {SessionDecryption} from "./decryption/SessionDecryption.js"; +import {SessionCache} from "./decryption/SessionCache.js"; + +function getSenderKey(event) { + return event.content?.["sender_key"]; +} + +function getSessionId(event) { + return event.content?.["session_id"]; +} + +function getCiphertext(event) { + return event.content?.ciphertext; +} export class Decryption { constructor({pickleKey, olm}) { this._pickleKey = pickleKey; this._olm = olm; + // this._worker = new MessageHandler(new Worker("worker-2580578233.js")); } - createSessionCache() { - return new SessionCache(); + createSessionCache(fallback) { + return new SessionCache(fallback); } /** - * [decrypt description] + * Reads all the state from storage to be able to decrypt the given events. + * Decryption can then happen outside of a storage transaction. * @param {[type]} roomId [description] - * @param {[type]} event [description] + * @param {[type]} events [description] * @param {[type]} sessionCache [description] * @param {[type]} txn [description] - * @return {DecryptionResult?} the decrypted event result, or undefined if the session id is not known. + * @return {DecryptionPreparation} */ - async decrypt(roomId, event, sessionCache, txn) { - const senderKey = event.content?.["sender_key"]; - const sessionId = event.content?.["session_id"]; - const ciphertext = event.content?.ciphertext; + async prepareDecryptAll(roomId, events, sessionCache, txn) { + const errors = new Map(); + const validEvents = []; - if ( - typeof senderKey !== "string" || - typeof sessionId !== "string" || - typeof ciphertext !== "string" - ) { - throw new DecryptionError("MEGOLM_INVALID_EVENT", event); + for (const event of events) { + const isValid = typeof getSenderKey(event) === "string" && + typeof getSessionId(event) === "string" && + typeof getCiphertext(event) === "string"; + if (isValid) { + validEvents.push(event); + } else { + errors.set(event.event_id, new DecryptionError("MEGOLM_INVALID_EVENT", event)) + } } - let session; - let claimedKeys; - const cacheEntry = sessionCache.get(roomId, senderKey, sessionId); - if (cacheEntry) { - session = cacheEntry.session; - claimedKeys = cacheEntry.claimedKeys; - } else { + const eventsBySession = groupBy(validEvents, event => { + return `${getSenderKey(event)}|${getSessionId(event)}`; + }); + + const sessionDecryptions = []; + + await Promise.all(Array.from(eventsBySession.values()).map(async eventsForSession => { + const first = eventsForSession[0]; + const senderKey = getSenderKey(first); + const sessionId = getSessionId(first); + const sessionInfo = await this._getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn); + if (!sessionInfo) { + for (const event of eventsForSession) { + errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event)); + } + } else { + sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession)); + } + })); + + return new DecryptionPreparation(roomId, sessionDecryptions, errors); + } + + async _getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn) { + let sessionInfo; + sessionInfo = sessionCache.get(roomId, senderKey, sessionId); + if (!sessionInfo) { const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId); if (sessionEntry) { - session = new this._olm.InboundGroupSession(); + let session = new this._olm.InboundGroupSession(); try { session.unpickle(this._pickleKey, sessionEntry.session); + sessionInfo = new SessionInfo(roomId, senderKey, session, sessionEntry.claimedKeys); } catch (err) { session.free(); throw err; } - claimedKeys = sessionEntry.claimedKeys; - sessionCache.add(roomId, senderKey, session, claimedKeys); + sessionCache.add(sessionInfo); } } - if (!session) { - return; - } - const {plaintext, message_index: messageIndex} = session.decrypt(ciphertext); - let payload; - try { - payload = JSON.parse(plaintext); - } catch (err) { - throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err}); - } - if (payload.room_id !== roomId) { - throw new DecryptionError("MEGOLM_WRONG_ROOM", event, - {encryptedRoomId: payload.room_id, eventRoomId: roomId}); - } - await this._handleReplayAttack(roomId, sessionId, messageIndex, event, txn); - return new DecryptionResult(payload, senderKey, claimedKeys); - } - - async _handleReplayAttack(roomId, sessionId, messageIndex, event, txn) { - const eventId = event.event_id; - const timestamp = event.origin_server_ts; - const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex); - if (decryption && decryption.eventId !== eventId) { - // the one with the newest timestamp should be the attack - const decryptedEventIsBad = decryption.timestamp < timestamp; - const badEventId = decryptedEventIsBad ? eventId : decryption.eventId; - throw new DecryptionError("MEGOLM_REPLAYED_INDEX", event, { - messageIndex, - badEventId, - otherEventId: decryption.eventId - }); - } - if (!decryption) { - txn.groupSessionDecryptions.set({ - roomId, - sessionId, - messageIndex, - eventId, - timestamp - }); - } + return sessionInfo; } /** @@ -165,55 +164,3 @@ export class Decryption { } } -class SessionCache { - constructor() { - this._sessions = []; - } - - /** - * @type {CacheEntry} - * @property {InboundGroupSession} session the unpickled session - * @property {Object} claimedKeys an object with the claimed ed25519 key - * - * - * @param {string} roomId - * @param {string} senderKey - * @param {string} sessionId - * @return {CacheEntry?} - */ - get(roomId, senderKey, sessionId) { - const idx = this._sessions.findIndex(s => { - return s.roomId === roomId && - s.senderKey === senderKey && - sessionId === s.session.session_id(); - }); - if (idx !== -1) { - const entry = this._sessions[idx]; - // move to top - if (idx > 0) { - this._sessions.splice(idx, 1); - this._sessions.unshift(entry); - } - return entry; - } - } - - add(roomId, senderKey, session, claimedKeys) { - // add new at top - this._sessions.unshift({roomId, senderKey, session, claimedKeys}); - if (this._sessions.length > CACHE_MAX_SIZE) { - // free sessions we're about to remove - for (let i = CACHE_MAX_SIZE; i < this._sessions.length; i += 1) { - this._sessions[i].session.free(); - } - this._sessions = this._sessions.slice(0, CACHE_MAX_SIZE); - } - } - - dispose() { - for (const entry of this._sessions) { - entry.session.free(); - } - - } -} diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionChanges.js b/src/matrix/e2ee/megolm/decryption/DecryptionChanges.js new file mode 100644 index 00000000..5597aaf7 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/DecryptionChanges.js @@ -0,0 +1,78 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {DecryptionError} from "../../common.js"; + +export class DecryptionChanges { + constructor(roomId, results, errors, replayEntries) { + this._roomId = roomId; + this._results = results; + this._errors = errors; + this._replayEntries = replayEntries; + } + + /** + * @type MegolmBatchDecryptionResult + * @property {Map} results a map of event id to decryption result + * @property {Map} errors event id -> errors + * + * Handle replay attack detection, and return result + * @param {[type]} txn [description] + * @return {MegolmBatchDecryptionResult} + */ + async write(txn) { + await Promise.all(this._replayEntries.map(async replayEntry => { + try { + this._handleReplayAttack(this._roomId, replayEntry, txn); + } catch (err) { + this._errors.set(replayEntry.eventId, err); + } + })); + return { + results: this._results, + errors: this._errors + }; + } + + async _handleReplayAttack(roomId, replayEntry, txn) { + const {messageIndex, sessionId, eventId, timestamp} = replayEntry; + const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex); + + if (decryption && decryption.eventId !== eventId) { + // the one with the newest timestamp should be the attack + const decryptedEventIsBad = decryption.timestamp < timestamp; + const badEventId = decryptedEventIsBad ? eventId : decryption.eventId; + // discard result + this._results.delete(eventId); + + throw new DecryptionError("MEGOLM_REPLAYED_INDEX", event, { + messageIndex, + badEventId, + otherEventId: decryption.eventId + }); + } + + if (!decryption) { + txn.groupSessionDecryptions.set({ + roomId, + sessionId, + messageIndex, + eventId, + timestamp + }); + } + } +} diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js new file mode 100644 index 00000000..02ee32df --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {DecryptionChanges} from "./DecryptionChanges.js"; +import {mergeMap} from "../../../../utils/mergeMap.js"; + +/** + * Class that contains all the state loaded from storage to decrypt the given events + */ +export class DecryptionPreparation { + constructor(roomId, sessionDecryptions, errors) { + this._roomId = roomId; + this._sessionDecryptions = sessionDecryptions; + this._initialErrors = errors; + } + + async decrypt() { + try { + const errors = this._initialErrors; + const results = new Map(); + const replayEntries = []; + await Promise.all(this._sessionDecryptions.map(async sessionDecryption => { + const sessionResult = await sessionDecryption.decryptAll(); + mergeMap(sessionResult.errors, errors); + mergeMap(sessionResult.results, results); + replayEntries.push(...sessionResult.replayEntries); + })); + return new DecryptionChanges(this._roomId, results, errors, replayEntries); + } finally { + this.dispose(); + } + } + + dispose() { + for (const sd of this._sessionDecryptions) { + sd.dispose(); + } + } +} diff --git a/src/matrix/e2ee/megolm/decryption/README.md b/src/matrix/e2ee/megolm/decryption/README.md new file mode 100644 index 00000000..b9bb3568 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/README.md @@ -0,0 +1,6 @@ +Lots of classes here. The complexity comes from needing to offload decryption to a webworker, mainly for IE11. We can't keep a idb transaction open while waiting for the response from the worker, so need to batch decryption of multiple events and do decryption in multiple steps: + + 1. Read all used inbound sessions for the batch of events, requires a read txn. This happens in `Decryption`. Sessions are loaded into `SessionInfo` objects, which are also kept in a `SessionCache` to prevent having to read and unpickle them all the time. + 2. Actually decrypt. No txn can stay open during this step, as it can be offloaded to a worker and is thus async. This happens in `DecryptionPreparation`, which delegates to `SessionDecryption` per session. + 3. Read and write for the replay detection, requires a read/write txn. This happens in `DecryptionChanges` + 4. Return the decrypted entries, and errors if any diff --git a/src/matrix/e2ee/megolm/decryption/ReplayDetectionEntry.js b/src/matrix/e2ee/megolm/decryption/ReplayDetectionEntry.js new file mode 100644 index 00000000..e5ce2845 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/ReplayDetectionEntry.js @@ -0,0 +1,24 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +export class ReplayDetectionEntry { + constructor(sessionId, messageIndex, event) { + this.sessionId = sessionId; + this.messageIndex = messageIndex; + this.eventId = event.event_id; + this.timestamp = event.origin_server_ts; + } +} diff --git a/src/matrix/e2ee/megolm/decryption/SessionCache.js b/src/matrix/e2ee/megolm/decryption/SessionCache.js new file mode 100644 index 00000000..efb7ef54 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/SessionCache.js @@ -0,0 +1,68 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +const CACHE_MAX_SIZE = 10; + +/** + * Cache of unpickled inbound megolm session. + */ +export class SessionCache { + constructor() { + this._sessions = []; + } + + /** + * @param {string} roomId + * @param {string} senderKey + * @param {string} sessionId + * @return {SessionInfo?} + */ + get(roomId, senderKey, sessionId) { + const idx = this._sessions.findIndex(s => { + return s.roomId === roomId && + s.senderKey === senderKey && + sessionId === s.session.session_id(); + }); + if (idx !== -1) { + const sessionInfo = this._sessions[idx]; + // move to top + if (idx > 0) { + this._sessions.splice(idx, 1); + this._sessions.unshift(sessionInfo); + } + return sessionInfo; + } + } + + add(sessionInfo) { + sessionInfo.retain(); + // add new at top + this._sessions.unshift(sessionInfo); + if (this._sessions.length > CACHE_MAX_SIZE) { + // free sessions we're about to remove + for (let i = CACHE_MAX_SIZE; i < this._sessions.length; i += 1) { + this._sessions[i].release(); + } + this._sessions = this._sessions.slice(0, CACHE_MAX_SIZE); + } + } + + dispose() { + for (const sessionInfo of this._sessions) { + sessionInfo.release(); + } + } +} diff --git a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js new file mode 100644 index 00000000..6abda029 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js @@ -0,0 +1,75 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {DecryptionResult} from "../../DecryptionResult.js"; +import {DecryptionError} from "../../common.js"; +import {ReplayDetectionEntry} from "./ReplayDetectionEntry.js"; + +/** + * Does the actual decryption of all events for a given megolm session in a batch + */ +export class SessionDecryption { + constructor(sessionInfo, events) { + sessionInfo.retain(); + this._sessionInfo = sessionInfo; + this._events = events; + } + + async decryptAll() { + const replayEntries = []; + const results = new Map(); + let errors; + const roomId = this._sessionInfo.roomId; + + await Promise.all(this._events.map(async event => { + try { + const {session} = this._sessionInfo; + const ciphertext = event.content.ciphertext; + const {plaintext, message_index: messageIndex} = await this._decrypt(session, ciphertext); + let payload; + try { + payload = JSON.parse(plaintext); + } catch (err) { + throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err}); + } + if (payload.room_id !== roomId) { + throw new DecryptionError("MEGOLM_WRONG_ROOM", event, + {encryptedRoomId: payload.room_id, eventRoomId: roomId}); + } + replayEntries.push(new ReplayDetectionEntry(session.session_id(), messageIndex, event)); + const result = new DecryptionResult(payload, this._sessionInfo.senderKey, this._sessionInfo.claimedKeys); + results.set(event.event_id, result); + } catch (err) { + if (!errors) { + errors = new Map(); + } + errors.set(event.event_id, err); + } + })); + + return {results, errors, replayEntries}; + } + + async _decrypt(session, ciphertext) { + // const sessionKey = session.export_session(session.first_known_index()); + // return this._worker.decrypt(sessionKey, ciphertext); + return session.decrypt(ciphertext); + } + + dispose() { + this._sessionInfo.release(); + } +} diff --git a/src/matrix/e2ee/megolm/decryption/SessionInfo.js b/src/matrix/e2ee/megolm/decryption/SessionInfo.js new file mode 100644 index 00000000..dedc3222 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/SessionInfo.js @@ -0,0 +1,44 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/** + * session loaded in memory with everything needed to create DecryptionResults + * and to store/retrieve it in the SessionCache + */ +export class SessionInfo { + constructor(roomId, senderKey, session, claimedKeys) { + this.roomId = roomId; + this.senderKey = senderKey; + this.session = session; + this.claimedKeys = claimedKeys; + this._refCounter = 0; + } + + retain() { + this._refCounter += 1; + } + + release() { + this._refCounter -= 1; + if (this._refCounter <= 0) { + this.dispose(); + } + } + + dispose() { + this.session.free(); + } +} diff --git a/src/utils/mergeMap.js b/src/utils/mergeMap.js new file mode 100644 index 00000000..a0aed207 --- /dev/null +++ b/src/utils/mergeMap.js @@ -0,0 +1,41 @@ +/* +Copyright 2020 Bruno Windels + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +export function mergeMap(src, dst) { + if (src) { + for (const [key, value] of src.entries()) { + dst.set(key, value); + } + } +} + +export function tests() { + return { + "mergeMap with src": assert => { + const src = new Map(); + src.set(1, "a"); + const dst = new Map(); + dst.set(2, "b"); + mergeMap(src, dst); + assert.equal(dst.get(1), "a"); + assert.equal(dst.get(2), "b"); + assert.equal(src.get(2), null); + }, + "mergeMap without src doesn't fail": () => { + mergeMap(undefined, new Map()); + } + } +} From 1c77c3b8763a389bff220b4b344135848449d28c Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 12:09:17 +0200 Subject: [PATCH 2/9] expose multi-step decryption from RoomEncryption, adjust room timeline sync code hasn't been adjusted yet --- src/matrix/e2ee/RoomEncryption.js | 136 +++++++++++++++--- src/matrix/e2ee/common.js | 3 + src/matrix/room/Room.js | 116 +++++++-------- src/matrix/room/timeline/Timeline.js | 15 -- .../timeline/persistence/TimelineReader.js | 62 +++----- 5 files changed, 199 insertions(+), 133 deletions(-) diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index c8f993e5..1341389b 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -14,8 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -import {MEGOLM_ALGORITHM} from "./common.js"; +import {MEGOLM_ALGORITHM, DecryptionSource} from "./common.js"; import {groupBy} from "../../utils/groupBy.js"; +import {mergeMap} from "../../utils/mergeMap.js"; import {makeTxnId} from "../common.js"; const ENCRYPTED_TYPE = "m.room.encrypted"; @@ -55,23 +56,54 @@ export class RoomEncryption { return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn); } - async decrypt(event, isSync, isTimelineOpen, retryData, txn) { - if (event.redacted_because || event.unsigned?.redacted_because) { - return; + // this happens before entries exists, as they are created by the syncwriter + // but we want to be able to map it back to something in the timeline easily + // when retrying decryption. + async prepareDecryptAll(events, source, isTimelineOpen, txn) { + const errors = []; + const validEvents = []; + for (const event of events) { + if (event.redacted_because || event.unsigned?.redacted_because) { + continue; + } + if (event.content?.algorithm !== MEGOLM_ALGORITHM) { + errors.set(event.event_id, new Error("Unsupported algorithm: " + event.content?.algorithm)); + } + validEvents.push(event); } - if (event.content?.algorithm !== MEGOLM_ALGORITHM) { - throw new Error("Unsupported algorithm: " + event.content?.algorithm); + let customCache; + let sessionCache; + if (source === DecryptionSource.Sync) { + sessionCache = this._megolmSyncCache; + } else if (source === DecryptionSource.Timeline) { + sessionCache = this._megolmBackfillCache; + } else if (source === DecryptionSource.Retry) { + // when retrying, we could have mixed events from at the bottom of the timeline (sync) + // and somewhere else, so create a custom cache we use just for this operation. + customCache = this._megolmEncryption.createSessionCache(); + sessionCache = customCache; + } else { + throw new Error("Unknown source: " + source); } - let sessionCache = isSync ? this._megolmSyncCache : this._megolmBackfillCache; - const result = await this._megolmDecryption.decrypt( - this._room.id, event, sessionCache, txn); - if (!result) { - this._addMissingSessionEvent(event, isSync, retryData); + const preparation = await this._megolmDecryption.prepareDecryptAll( + this._room.id, validEvents, sessionCache, txn); + if (customCache) { + customCache.dispose(); } - if (result && isTimelineOpen) { - await this._verifyDecryptionResult(result, txn); + return new DecryptionPreparation(preparation, errors, {isTimelineOpen}, this); + } + + async _processDecryptionResults(results, errors, flags, txn) { + for (const error of errors.values()) { + if (error.code === "MEGOLM_NO_SESSION") { + this._addMissingSessionEvent(error.event); + } + } + if (flags.isTimelineOpen) { + for (const result of results.values()) { + await this._verifyDecryptionResult(result, txn); + } } - return result; } async _verifyDecryptionResult(result, txn) { @@ -87,30 +119,30 @@ export class RoomEncryption { } } - _addMissingSessionEvent(event, isSync, data) { + _addMissingSessionEvent(event) { const senderKey = event.content?.["sender_key"]; const sessionId = event.content?.["session_id"]; const key = `${senderKey}|${sessionId}`; let eventIds = this._eventIdsByMissingSession.get(key); if (!eventIds) { - eventIds = new Map(); + eventIds = new Set(); this._eventIdsByMissingSession.set(key, eventIds); } - eventIds.set(event.event_id, {data, isSync}); + eventIds.add(event.event_id); } applyRoomKeys(roomKeys) { // retry decryption with the new sessions - const retryEntries = []; + const retryEventIds = []; for (const roomKey of roomKeys) { const key = `${roomKey.senderKey}|${roomKey.sessionId}`; const entriesForSession = this._eventIdsByMissingSession.get(key); if (entriesForSession) { this._eventIdsByMissingSession.delete(key); - retryEntries.push(...entriesForSession.values()); + retryEventIds.push(...entriesForSession); } } - return retryEntries; + return retryEventIds; } async encrypt(type, content, hsApi) { @@ -214,3 +246,67 @@ export class RoomEncryption { await hsApi.sendToDevice(type, payload, txnId).response(); } } + +/** + * wrappers around megolm decryption classes to be able to post-process + * the decryption results before turning them + */ +class DecryptionPreparation { + constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption) { + this._megolmDecryptionPreparation = megolmDecryptionPreparation; + this._extraErrors = extraErrors; + this._flags = flags; + this._roomEncryption = roomEncryption; + } + + async decrypt() { + return new DecryptionChanges( + await this._megolmDecryptionPreparation.decrypt(), + this._extraErrors, + this._flags, + this._roomEncryption); + } + + dispose() { + this._megolmDecryptionChanges.dispose(); + } +} + +class DecryptionChanges { + constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption) { + this._megolmDecryptionChanges = megolmDecryptionChanges; + this._extraErrors = extraErrors; + this._flags = flags; + this._roomEncryption = roomEncryption; + } + + async write(txn) { + const {results, errors} = await this._megolmDecryptionChanges.write(txn); + mergeMap(this._extraErrors, errors); + await this._roomEncryption._processDecryptionResults(results, errors, this._flags, txn); + return new BatchDecryptionResult(results, errors); + } +} + +class BatchDecryptionResult { + constructor(results, errors) { + this.results = results; + this.errors = errors; + console.log("BatchDecryptionResult", this); + } + + applyToEntries(entries) { + console.log("BatchDecryptionResult.applyToEntries", this); + for (const entry of entries) { + const result = this.results.get(entry.id); + if (result) { + entry.setDecryptionResult(result); + } else { + const error = this.errors.get(entry.id); + if (error) { + entry.setDecryptionError(error); + } + } + } + } +} diff --git a/src/matrix/e2ee/common.js b/src/matrix/e2ee/common.js index 3312032b..190f2fa2 100644 --- a/src/matrix/e2ee/common.js +++ b/src/matrix/e2ee/common.js @@ -15,6 +15,9 @@ limitations under the License. */ import anotherjson from "../../../lib/another-json/index.js"; +import {createEnum} from "../../utils/enum.js"; + +export const DecryptionSource = createEnum(["Sync", "Timeline", "Retry"]); // use common prefix so it's easy to clear properties that are not e2ee related during session clear export const SESSION_KEY_PREFIX = "e2ee:"; diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index a2b84717..6a18f34f 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -26,6 +26,9 @@ import {fetchOrLoadMembers} from "./members/load.js"; import {MemberList} from "./members/MemberList.js"; import {Heroes} from "./members/Heroes.js"; import {EventEntry} from "./timeline/entries/EventEntry.js"; +import {DecryptionSource} from "../e2ee/common.js"; + +const EVENT_ENCRYPTED_TYPE = "m.room.encrypted"; export class Room extends EventEmitter { constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents, user, createRoomEncryption, getSyncToken}) { @@ -49,43 +52,26 @@ export class Room extends EventEmitter { async notifyRoomKeys(roomKeys) { if (this._roomEncryption) { - // array of {data, isSync} - let retryEntries = this._roomEncryption.applyRoomKeys(roomKeys); - let decryptedEntries = []; - if (retryEntries.length) { - // groupSessionDecryptions can be written, the other stores not - const txn = await this._storage.readWriteTxn([ + let retryEventIds = this._roomEncryption.applyRoomKeys(roomKeys); + if (retryEventIds.length) { + const retryEntries = []; + const txn = await this._storage.readTxn([ this._storage.storeNames.timelineEvents, this._storage.storeNames.inboundGroupSessions, - this._storage.storeNames.groupSessionDecryptions, - this._storage.storeNames.deviceIdentities, ]); - try { - for (const retryEntry of retryEntries) { - const {data: eventKey} = retryEntry; - let entry = this._timeline?.findEntry(eventKey); - if (!entry) { - const storageEntry = await txn.timelineEvents.get(this._roomId, eventKey); - if (storageEntry) { - entry = new EventEntry(storageEntry, this._fragmentIdComparer); - } - } - if (entry) { - entry = await this._decryptEntry(entry, txn, retryEntry.isSync); - decryptedEntries.push(entry); - } + for (const eventId of retryEventIds) { + const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId); + if (storageEntry) { + retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); } - } catch (err) { - txn.abort(); - throw err; } - await txn.complete(); + await this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); + if (this._timeline) { + // only adds if already present + this._timeline.replaceEntries(retryEntries); + } + // pass decryptedEntries to roomSummary } - if (this._timeline) { - // only adds if already present - this._timeline.replaceEntries(decryptedEntries); - } - // pass decryptedEntries to roomSummary } } @@ -94,22 +80,42 @@ export class Room extends EventEmitter { if (this._roomEncryption) { this._sendQueue.enableEncryption(this._roomEncryption); if (this._timeline) { - this._timeline.enableEncryption(this._decryptEntries.bind(this)); + this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); } } } - async _decryptEntry(entry, txn, isSync) { - if (entry.eventType === "m.room.encrypted") { - try { - const decryptionResult = await this._roomEncryption.decrypt( - entry.event, isSync, !!this._timeline, entry.asEventKey(), txn); - if (decryptionResult) { - entry.setDecryptionResult(decryptionResult); - } - } catch (err) { - console.warn("event decryption error", err, entry.event); - entry.setDecryptionError(err); + /** + * Used for decrypting when loading/filling the timeline, and retrying decryption, + * not during sync, where it is split up during the multiple phases. + */ + async _decryptEntries(source, entries, inboundSessionTxn = null) { + if (!inboundSessionTxn) { + inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); + } + const events = entries.filter(entry => { + return entry.eventType === EVENT_ENCRYPTED_TYPE; + }).map(entry => entry.event); + const isTimelineOpen = this._isTimelineOpen; + const preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn); + const changes = await preparation.decrypt(); + const stores = [this._storage.storeNames.groupSessionDecryptions]; + if (isTimelineOpen) { + // read to fetch devices if timeline is open + stores.push(this._storage.storeNames.deviceIdentities); + } + const writeTxn = await this._storage.readWriteTxn(stores); + let decryption; + try { + decryption = await changes.write(writeTxn); + } catch (err) { + writeTxn.abort(); + throw err; + } + await writeTxn.complete(); + decryption.applyToEntries(entries); + } + } } return entry; @@ -299,19 +305,11 @@ export class Room extends EventEmitter { } }).response(); - let stores = [ + const txn = await this._storage.readWriteTxn([ this._storage.storeNames.pendingEvents, this._storage.storeNames.timelineEvents, this._storage.storeNames.timelineFragments, - ]; - if (this._roomEncryption) { - stores = stores.concat([ - this._storage.storeNames.inboundGroupSessions, - this._storage.storeNames.groupSessionDecryptions, - this._storage.storeNames.deviceIdentities, - ]); - } - const txn = await this._storage.readWriteTxn(stores); + ]); let removedPendingEvents; let gapResult; try { @@ -324,14 +322,14 @@ export class Room extends EventEmitter { fragmentIdComparer: this._fragmentIdComparer, }); gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn); - if (this._roomEncryption) { - gapResult.entries = await this._decryptEntries(gapResult.entries, txn, false); - } } catch (err) { txn.abort(); throw err; } await txn.complete(); + if (this._roomEncryption) { + await this._decryptEntries(DecryptionSource.Timeline, gapResult.entries); + } // once txn is committed, update in-memory state & emit events for (const fragment of gapResult.fragments) { this._fragmentIdComparer.add(fragment); @@ -406,6 +404,10 @@ export class Room extends EventEmitter { } } + get _isTimelineOpen() { + return !!this._timeline; + } + async clearUnread() { if (this.isUnread || this.notificationCount) { const txn = await this._storage.readWriteTxn([ @@ -458,7 +460,7 @@ export class Room extends EventEmitter { user: this._user, }); if (this._roomEncryption) { - this._timeline.enableEncryption(this._decryptEntries.bind(this)); + this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); } await this._timeline.load(); return this._timeline; diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index c2e9d0ce..7245568d 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -46,21 +46,6 @@ export class Timeline { this._remoteEntries.setManySorted(entries); } - findEntry(eventKey) { - // a storage event entry has a fragmentId and eventIndex property, used for sorting, - // just like an EventKey, so this will work, but perhaps a bit brittle. - const entry = new EventEntry(eventKey, this._fragmentIdComparer); - try { - const idx = this._remoteEntries.indexOf(entry); - if (idx !== -1) { - return this._remoteEntries.get(idx); - } - } catch (err) { - // fragmentIdComparer threw, ignore - return; - } - } - replaceEntries(entries) { for (const entry of entries) { this._remoteEntries.replace(entry); diff --git a/src/matrix/room/timeline/persistence/TimelineReader.js b/src/matrix/room/timeline/persistence/TimelineReader.js index 4446eaf1..d451d76e 100644 --- a/src/matrix/room/timeline/persistence/TimelineReader.js +++ b/src/matrix/room/timeline/persistence/TimelineReader.js @@ -32,34 +32,19 @@ export class TimelineReader { } _openTxn() { + const stores = [ + this._storage.storeNames.timelineEvents, + this._storage.storeNames.timelineFragments, + ]; if (this._decryptEntries) { - return this._storage.readWriteTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.timelineFragments, - this._storage.storeNames.inboundGroupSessions, - this._storage.storeNames.groupSessionDecryptions, - this._storage.storeNames.deviceIdentities, - ]); - - } else { - return this._storage.readTxn([ - this._storage.storeNames.timelineEvents, - this._storage.storeNames.timelineFragments, - ]); + stores.push(this._storage.storeNames.inboundGroupSessions); } + return this._storage.readTxn(stores); } async readFrom(eventKey, direction, amount) { const txn = await this._openTxn(); - let entries; - try { - entries = await this._readFrom(eventKey, direction, amount, txn); - } catch (err) { - txn.abort(); - throw err; - } - await txn.complete(); - return entries; + return await this._readFrom(eventKey, direction, amount, txn); } async _readFrom(eventKey, direction, amount, txn) { @@ -75,9 +60,6 @@ export class TimelineReader { eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount); } let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, this._fragmentIdComparer)); - if (this._decryptEntries) { - eventEntries = await this._decryptEntries(eventEntries, txn); - } entries = directionalConcat(entries, eventEntries, direction); // prepend or append eventsWithinFragment to entries, and wrap them in EventEntry @@ -100,29 +82,27 @@ export class TimelineReader { } } + if (this._decryptEntries) { + await this._decryptEntries(entries, txn); + } + return entries; } async readFromEnd(amount) { const txn = await this._openTxn(); + const liveFragment = await txn.timelineFragments.liveFragment(this._roomId); let entries; - try { - const liveFragment = await txn.timelineFragments.liveFragment(this._roomId); - // room hasn't been synced yet - if (!liveFragment) { - entries = []; - } else { - this._fragmentIdComparer.add(liveFragment); - const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer); - const eventKey = liveFragmentEntry.asEventKey(); - entries = await this._readFrom(eventKey, Direction.Backward, amount, txn); - entries.unshift(liveFragmentEntry); - } - } catch (err) { - txn.abort(); - throw err; + // room hasn't been synced yet + if (!liveFragment) { + entries = []; + } else { + this._fragmentIdComparer.add(liveFragment); + const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer); + const eventKey = liveFragmentEntry.asEventKey(); + entries = await this._readFrom(eventKey, Direction.Backward, amount, txn); + entries.unshift(liveFragmentEntry); } - await txn.complete(); return entries; } } From 94b0cfbd72c46416e42a0c54007b635b28091eab Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 12:11:43 +0200 Subject: [PATCH 3/9] add prepareSync and afterPrepareSync steps to sync, run decryption in it --- src/matrix/Session.js | 4 +- src/matrix/Sync.js | 165 ++++++++++++++++++++++++-------------- src/matrix/e2ee/README.md | 44 ++++++++++ src/matrix/room/Room.js | 61 +++++++++----- 4 files changed, 190 insertions(+), 84 deletions(-) create mode 100644 src/matrix/e2ee/README.md diff --git a/src/matrix/Session.js b/src/matrix/Session.js index 19725b58..c5c0c94f 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -255,7 +255,7 @@ export class Session { return room; } - async writeSync(syncResponse, syncFilterId, roomChanges, txn) { + async writeSync(syncResponse, syncFilterId, txn) { const changes = {}; const syncToken = syncResponse.next_batch; const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count; @@ -362,7 +362,7 @@ export function tests() { } } }; - const newSessionData = await session.writeSync({next_batch: "b"}, 6, {}, syncTxn); + const newSessionData = await session.writeSync({next_batch: "b"}, 6, syncTxn); assert(syncSet); assert.equal(session.syncToken, "a"); assert.equal(session.syncFilterId, 5); diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js index 598b9169..c81acee0 100644 --- a/src/matrix/Sync.js +++ b/src/matrix/Sync.js @@ -29,21 +29,6 @@ export const SyncStatus = createEnum( "Stopped" ); -function parseRooms(roomsSection, roomCallback) { - if (roomsSection) { - const allMemberships = ["join", "invite", "leave"]; - for(const membership of allMemberships) { - const membershipSection = roomsSection[membership]; - if (membershipSection) { - return Object.entries(membershipSection).map(([roomId, roomResponse]) => { - return roomCallback(roomId, roomResponse, membership); - }); - } - } - } - return []; -} - function timelineIsEmpty(roomResponse) { try { const events = roomResponse?.timeline?.events; @@ -53,6 +38,26 @@ function timelineIsEmpty(roomResponse) { } } +/** + * Sync steps in js-pseudocode: + * ```js + * let preparation; + * if (room.needsPrepareSync) { + * // can only read some stores + * preparation = await room.prepareSync(roomResponse, prepareTxn); + * // can do async work that is not related to storage (such as decryption) + * preparation = await room.afterPrepareSync(preparation); + * } + * // writes and calculates changes + * const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn); + * // applies and emits changes once syncTxn is committed + * room.afterSync(changes); + * if (room.needsAfterSyncCompleted(changes)) { + * // can do network requests + * await room.afterSyncCompleted(changes); + * } + * ``` + */ export class Sync { constructor({hsApi, session, storage}) { this._hsApi = hsApi; @@ -90,13 +95,13 @@ export class Sync { let afterSyncCompletedPromise = Promise.resolve(); // if syncToken is falsy, it will first do an initial sync ... while(this._status.get() !== SyncStatus.Stopped) { - let roomChanges; + let roomStates; try { console.log(`starting sync request with since ${syncToken} ...`); const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined; const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise); syncToken = syncResult.syncToken; - roomChanges = syncResult.roomChanges; + roomStates = syncResult.roomStates; this._status.set(SyncStatus.Syncing); } catch (err) { if (!(err instanceof AbortError)) { @@ -105,12 +110,12 @@ export class Sync { } } if (!this._error) { - afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges); + afterSyncCompletedPromise = this._runAfterSyncCompleted(roomStates); } } } - async _runAfterSyncCompleted(roomChanges) { + async _runAfterSyncCompleted(roomStates) { const sessionPromise = (async () => { try { await this._session.afterSyncCompleted(); @@ -118,23 +123,22 @@ export class Sync { console.error("error during session afterSyncCompleted, continuing", err.stack); } })(); - let allPromises = [sessionPromise]; - const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => { - return rc.changes.needsAfterSyncCompleted; + const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => { + return rs.room.needsAfterSyncCompleted(rs.changes); + }); + const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => { + try { + await rs.room.afterSyncCompleted(rs.changes); + } catch (err) { + console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack); + } }); - if (roomsNeedingAfterSyncCompleted.length) { - allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => { - try { - await room.afterSyncCompleted(changes); - } catch (err) { - console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack); - } - })); - } // run everything in parallel, // we don't want to delay the next sync too much - await Promise.all(allPromises); + // Also, since all promises won't reject (as they have a try/catch) + // it's fine to use Promise.all + await Promise.all(roomsPromises.concat(sessionPromise)); } async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) { @@ -152,16 +156,17 @@ export class Sync { const isInitialSync = !syncToken; syncToken = response.next_batch; - const syncTxn = await this._openSyncTxn(); - let roomChanges = []; + const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync); + await this._prepareRooms(roomStates); let sessionChanges; + const syncTxn = await this._openSyncTxn(); try { - // to_device - // presence - if (response.rooms) { - roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn); - } - sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn); + await Promise.all(roomStates.map(async rs => { + console.log(` * applying sync response to room ${rs.room.id} ...`); + rs.changes = await rs.room.writeSync( + rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn); + })); + sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn); } catch(err) { console.warn("aborting syncTxn because of error"); console.error(err); @@ -180,31 +185,31 @@ export class Sync { } this._session.afterSync(sessionChanges); // emit room related events after txn has been closed - for(let {room, changes} of roomChanges) { - room.afterSync(changes); + for(let rs of roomStates) { + rs.room.afterSync(rs.changes); } - return {syncToken, roomChanges}; + return {syncToken, roomStates}; } - async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) { - const roomChanges = []; - const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => { - // ignore rooms with empty timelines during initial sync, - // see https://github.com/vector-im/hydrogen-web/issues/15 - if (isInitialSync && timelineIsEmpty(roomResponse)) { - return; - } - let room = this._session.rooms.get(roomId); - if (!room) { - room = this._session.createRoom(roomId); - } - console.log(` * applying sync response to room ${roomId} ...`); - const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn); - roomChanges.push({room, changes}); - }); - await Promise.all(promises); - return roomChanges; + async _openPrepareSyncTxn() { + const storeNames = this._storage.storeNames; + return await this._storage.readTxn([ + storeNames.inboundGroupSessions, + ]); + } + + async _prepareRooms(roomStates) { + const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync); + if (prepareRoomStates.length) { + const prepareTxn = await this._openPrepareSyncTxn(); + await Promise.all(prepareRoomStates.map(async rs => { + rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn); + })); + await Promise.all(prepareRoomStates.map(async rs => { + rs.preparation = await rs.room.afterPrepareSync(rs.preparation); + })); + } } async _openSyncTxn() { @@ -218,13 +223,39 @@ export class Sync { storeNames.timelineFragments, storeNames.pendingEvents, storeNames.userIdentities, - storeNames.inboundGroupSessions, storeNames.groupSessionDecryptions, storeNames.deviceIdentities, // to discard outbound session when somebody leaves a room storeNames.outboundGroupSessions ]); } + + _parseRoomsResponse(roomsSection, isInitialSync) { + const roomStates = []; + if (roomsSection) { + // don't do "invite", "leave" for now + const allMemberships = ["join"]; + for(const membership of allMemberships) { + const membershipSection = roomsSection[membership]; + if (membershipSection) { + for (const [roomId, roomResponse] of Object.entries(membershipSection)) { + // ignore rooms with empty timelines during initial sync, + // see https://github.com/vector-im/hydrogen-web/issues/15 + if (isInitialSync && timelineIsEmpty(roomResponse)) { + return; + } + let room = this._session.rooms.get(roomId); + if (!room) { + room = this._session.createRoom(roomId); + } + roomStates.push(new RoomSyncProcessState(room, roomResponse, membership)); + } + } + } + } + return roomStates; + } + stop() { if (this._status.get() === SyncStatus.Stopped) { @@ -237,3 +268,13 @@ export class Sync { } } } + +class RoomSyncProcessState { + constructor(room, roomResponse, membership) { + this.room = room; + this.roomResponse = roomResponse; + this.membership = membership; + this.preparation = null; + this.changes = null; + } +} diff --git a/src/matrix/e2ee/README.md b/src/matrix/e2ee/README.md new file mode 100644 index 00000000..46f4e95f --- /dev/null +++ b/src/matrix/e2ee/README.md @@ -0,0 +1,44 @@ +## Integratation within the sync lifetime cycle + +### prepareSync + + The session can start its own read/write transactions here, rooms only read from a shared transaction + + - session + - device handler + - txn + - write pending encrypted + - txn + - olm decryption read + - olm async decryption + - dispatch to worker + - txn + - olm decryption write / remove pending encrypted + - rooms (with shared read txn) + - megolm decryption read + +### afterPrepareSync + + - rooms + - megolm async decryption + - dispatch to worker + +### writeSync + + - rooms (with shared readwrite txn) + - megolm decryption write, yielding decrypted events + - use decrypted events to write room summary + +### afterSync + + - rooms + - emit changes + +### afterSyncCompleted + + - session + - e2ee account + - generate more otks if needed + - upload new otks if needed or device keys if not uploaded before + - rooms + - share new room keys if needed diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 6a18f34f..f281f166 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -116,30 +116,52 @@ export class Room extends EventEmitter { decryption.applyToEntries(entries); } - } - } - return entry; + get needsPrepareSync() { + // only encrypted rooms need the prepare sync steps + return !!this._roomEncryption; } - async _decryptEntries(entries, txn, isSync = false) { - return await Promise.all(entries.map(async e => this._decryptEntry(e, txn, isSync))); + async prepareSync(roomResponse, txn) { + if (this._roomEncryption) { + const events = roomResponse?.timeline?.events; + if (Array.isArray(events)) { + const eventsToDecrypt = events.filter(event => { + return event?.type === EVENT_ENCRYPTED_TYPE; + }); + const preparation = await this._roomEncryption.prepareDecryptAll( + eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn); + return preparation; + } + } + } + + async afterPrepareSync(preparation) { + if (preparation) { + const decryptChanges = await preparation.decrypt(); + return decryptChanges; + } } /** @package */ - async writeSync(roomResponse, membership, isInitialSync, txn) { - const isTimelineOpen = !!this._timeline; + async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) { + let decryption; + if (this._roomEncryption && decryptChanges) { + decryption = await decryptChanges.write(txn); + } + const {entries, newLiveKey, memberChanges} = + await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn); + if (decryption) { + decryption.applyToEntries(entries); + } + // pass member changes to device tracker + if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) { + await this._roomEncryption.writeMemberChanges(memberChanges, txn); + } const summaryChanges = this._summary.writeSync( roomResponse, membership, - isInitialSync, isTimelineOpen, + isInitialSync, this._isTimelineOpen, txn); - const {entries: encryptedEntries, newLiveKey, memberChanges} = - await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn); - // decrypt if applicable - let entries = encryptedEntries; - if (this._roomEncryption) { - entries = await this._decryptEntries(encryptedEntries, txn, true); - } // fetch new members while we have txn open, // but don't make any in-memory changes yet let heroChanges; @@ -150,10 +172,6 @@ export class Room extends EventEmitter { } heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn); } - // pass member changes to device tracker - if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) { - await this._roomEncryption.writeMemberChanges(memberChanges, txn); - } let removedPendingEvents; if (roomResponse.timeline && roomResponse.timeline.events) { removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); @@ -165,7 +183,6 @@ export class Room extends EventEmitter { removedPendingEvents, memberChanges, heroChanges, - needsAfterSyncCompleted: this._roomEncryption?.needsToShareKeys(memberChanges) }; } @@ -216,6 +233,10 @@ export class Room extends EventEmitter { } } + needsAfterSyncCompleted({memberChanges}) { + return this._roomEncryption?.needsToShareKeys(memberChanges); + } + /** * Only called if the result of writeSync had `needsAfterSyncCompleted` set. * Can be used to do longer running operations that resulted from the last sync, From 17412bbb2f18e520b88e023006acbab1e5ea9e45 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 12:12:39 +0200 Subject: [PATCH 4/9] more validation --- src/matrix/room/timeline/persistence/SyncWriter.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/matrix/room/timeline/persistence/SyncWriter.js b/src/matrix/room/timeline/persistence/SyncWriter.js index 130b22d1..9f42163d 100644 --- a/src/matrix/room/timeline/persistence/SyncWriter.js +++ b/src/matrix/room/timeline/persistence/SyncWriter.js @@ -140,7 +140,7 @@ export class SyncWriter { async _writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn) { const memberChanges = new Map(); - if (timeline.events) { + if (Array.isArray(timeline.events)) { const events = deduplicateEvents(timeline.events); for(const event of events) { // store event in timeline @@ -220,6 +220,7 @@ export class SyncWriter { // important this happens before _writeTimeline so // members are available in the transaction const memberChanges = await this._writeStateEvents(roomResponse, trackNewlyJoined, txn); + // TODO: remove trackNewlyJoined and pass in memberChanges const timelineResult = await this._writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn); currentKey = timelineResult.currentKey; // merge member changes from state and timeline, giving precedence to the latter From fdbc5f3c1da40fad6cc1ca4df9e9324353ee02ad Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 13:00:11 +0200 Subject: [PATCH 5/9] WIP worker work --- scripts/build.mjs | 55 +++++++-- .../room/timeline/TimelineViewModel.js | 2 +- src/legacy-polyfill.js | 2 +- src/matrix/e2ee/megolm/Decryption.js | 8 +- .../megolm/decryption/DecryptionWorker.js | 64 +++++++++++ .../megolm/decryption/SessionDecryption.js | 11 +- src/matrix/room/timeline/Timeline.js | 2 +- src/worker-polyfill.js | 19 +++ src/worker.js | 108 ++++++++++++++++++ 9 files changed, 248 insertions(+), 23 deletions(-) create mode 100644 src/matrix/e2ee/megolm/decryption/DecryptionWorker.js create mode 100644 src/worker-polyfill.js create mode 100644 src/worker.js diff --git a/scripts/build.mjs b/scripts/build.mjs index 15d090be..d4393d50 100644 --- a/scripts/build.mjs +++ b/scripts/build.mjs @@ -84,10 +84,11 @@ async function build() { // also creates the directories where the theme css bundles are placed in, // so do it first const themeAssets = await copyThemeAssets(themes, legacy); - const jsBundlePath = await buildJs(); - const jsLegacyBundlePath = await buildJsLegacy(); + const jsBundlePath = await buildJs("src/main.js", `${PROJECT_ID}.js`); + const jsLegacyBundlePath = await buildJsLegacy("src/main.js", `${PROJECT_ID}-legacy.js`); + const jsWorkerPath = await buildWorkerJsLegacy("src/worker.js", `worker.js`); const cssBundlePaths = await buildCssBundles(legacy ? buildCssLegacy : buildCss, themes, themeAssets); - const assetPaths = createAssetPaths(jsBundlePath, jsLegacyBundlePath, cssBundlePaths, themeAssets); + const assetPaths = createAssetPaths(jsBundlePath, jsLegacyBundlePath, jsWorkerPath, cssBundlePaths, themeAssets); let manifestPath; if (offline) { @@ -98,7 +99,7 @@ async function build() { console.log(`built ${PROJECT_ID} ${version} successfully`); } -function createAssetPaths(jsBundlePath, jsLegacyBundlePath, cssBundlePaths, themeAssets) { +function createAssetPaths(jsBundlePath, jsLegacyBundlePath, jsWorkerPath, cssBundlePaths, themeAssets) { function trim(path) { if (!path.startsWith(targetDir)) { throw new Error("invalid target path: " + targetDir); @@ -108,6 +109,7 @@ function createAssetPaths(jsBundlePath, jsLegacyBundlePath, cssBundlePaths, them return { jsBundle: () => trim(jsBundlePath), jsLegacyBundle: () => trim(jsLegacyBundlePath), + jsWorker: () => trim(jsWorkerPath), cssMainBundle: () => trim(cssBundlePaths.main), cssThemeBundle: themeName => trim(cssBundlePaths.themes[themeName]), cssThemeBundles: () => Object.values(cssBundlePaths.themes).map(a => trim(a)), @@ -180,23 +182,24 @@ async function buildHtml(doc, version, assetPaths, manifestPath) { await fs.writeFile(path.join(targetDir, "index.html"), doc.html(), "utf8"); } -async function buildJs() { +async function buildJs(inputFile, outputName) { // create js bundle const bundle = await rollup({ - input: 'src/main.js', + input: inputFile, plugins: [removeJsComments({comments: "none"})] }); const {output} = await bundle.generate({ format: 'es', + // TODO: can remove this? name: `${PROJECT_ID}Bundle` }); const code = output[0].code; - const bundlePath = resource(`${PROJECT_ID}.js`, code); + const bundlePath = resource(outputName, code); await fs.writeFile(bundlePath, code, "utf8"); return bundlePath; } -async function buildJsLegacy() { +async function buildJsLegacy(inputFile, outputName) { // compile down to whatever IE 11 needs const babelPlugin = babel.babel({ babelHelpers: 'bundled', @@ -214,7 +217,7 @@ async function buildJsLegacy() { }); // create js bundle const rollupConfig = { - input: ['src/legacy-polyfill.js', 'src/main.js'], + input: ['src/legacy-polyfill.js', inputFile], plugins: [multi(), commonjs(), nodeResolve(), babelPlugin, removeJsComments({comments: "none"})] }; const bundle = await rollup(rollupConfig); @@ -223,7 +226,39 @@ async function buildJsLegacy() { name: `${PROJECT_ID}Bundle` }); const code = output[0].code; - const bundlePath = resource(`${PROJECT_ID}-legacy.js`, code); + const bundlePath = resource(outputName, code); + await fs.writeFile(bundlePath, code, "utf8"); + return bundlePath; +} + +async function buildWorkerJsLegacy(inputFile, outputName) { + // compile down to whatever IE 11 needs + const babelPlugin = babel.babel({ + babelHelpers: 'bundled', + exclude: 'node_modules/**', + presets: [ + [ + "@babel/preset-env", + { + useBuiltIns: "entry", + corejs: "3", + targets: "IE 11" + } + ] + ] + }); + // create js bundle + const rollupConfig = { + input: ['src/worker-polyfill.js', inputFile], + plugins: [multi(), commonjs(), nodeResolve(), babelPlugin, removeJsComments({comments: "none"})] + }; + const bundle = await rollup(rollupConfig); + const {output} = await bundle.generate({ + format: 'iife', + name: `${PROJECT_ID}Bundle` + }); + const code = output[0].code; + const bundlePath = resource(outputName, code); await fs.writeFile(bundlePath, code, "utf8"); return bundlePath; } diff --git a/src/domain/session/room/timeline/TimelineViewModel.js b/src/domain/session/room/timeline/TimelineViewModel.js index 1527d3ae..16d529cb 100644 --- a/src/domain/session/room/timeline/TimelineViewModel.js +++ b/src/domain/session/room/timeline/TimelineViewModel.js @@ -54,7 +54,7 @@ export class TimelineViewModel extends ViewModel { if (firstTile.shape === "gap") { return firstTile.fill(); } else { - await this._timeline.loadAtTop(50); + await this._timeline.loadAtTop(10); return false; } } diff --git a/src/legacy-polyfill.js b/src/legacy-polyfill.js index 5665158c..a48416c7 100644 --- a/src/legacy-polyfill.js +++ b/src/legacy-polyfill.js @@ -23,4 +23,4 @@ if (!Element.prototype.remove) { Element.prototype.remove = function remove() { this.parentNode.removeChild(this); }; -} \ No newline at end of file +} diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index d4352926..077e9df4 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -21,6 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js"; import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js"; import {SessionDecryption} from "./decryption/SessionDecryption.js"; import {SessionCache} from "./decryption/SessionCache.js"; +import {DecryptionWorker} from "./decryption/DecryptionWorker.js"; function getSenderKey(event) { return event.content?.["sender_key"]; @@ -38,7 +39,9 @@ export class Decryption { constructor({pickleKey, olm}) { this._pickleKey = pickleKey; this._olm = olm; - // this._worker = new MessageHandler(new Worker("worker-2580578233.js")); + // this._decryptor = new DecryptionWorker(new Worker("./src/worker.js")); + this._decryptor = new DecryptionWorker(new Worker("worker-3074010154.js")); + this._initPromise = this._decryptor.init(); } createSessionCache(fallback) { @@ -55,6 +58,7 @@ export class Decryption { * @return {DecryptionPreparation} */ async prepareDecryptAll(roomId, events, sessionCache, txn) { + await this._initPromise; const errors = new Map(); const validEvents = []; @@ -85,7 +89,7 @@ export class Decryption { errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event)); } } else { - sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession)); + sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._decryptor)); } })); diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js new file mode 100644 index 00000000..df7bb748 --- /dev/null +++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js @@ -0,0 +1,64 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +export class DecryptionWorker { + constructor(worker) { + this._worker = worker; + this._requests = new Map(); + this._counter = 0; + this._worker.addEventListener("message", this); + } + + handleEvent(e) { + if (e.type === "message") { + const message = e.data; + console.log("worker reply", message); + const request = this._requests.get(message.replyToId); + if (request) { + if (message.type === "success") { + request.resolve(message.payload); + } else if (message.type === "error") { + request.reject(new Error(message.stack)); + } + this._requests.delete(message.ref_id); + } + } + } + + _send(message) { + this._counter += 1; + message.id = this._counter; + let resolve; + let reject; + const promise = new Promise((_resolve, _reject) => { + resolve = _resolve; + reject = _reject; + }); + this._requests.set(message.id, {reject, resolve}); + this._worker.postMessage(message); + return promise; + } + + decrypt(session, ciphertext) { + const sessionKey = session.export_session(session.first_known_index()); + return this._send({type: "megolm_decrypt", ciphertext, sessionKey}); + } + + init() { + return this._send({type: "load_olm", path: "olm_legacy-3232457086.js"}); + // return this._send({type: "load_olm", path: "../lib/olm/olm_legacy.js"}); + } +} diff --git a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js index 6abda029..5fc32f58 100644 --- a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js +++ b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js @@ -22,10 +22,11 @@ import {ReplayDetectionEntry} from "./ReplayDetectionEntry.js"; * Does the actual decryption of all events for a given megolm session in a batch */ export class SessionDecryption { - constructor(sessionInfo, events) { + constructor(sessionInfo, events, decryptor) { sessionInfo.retain(); this._sessionInfo = sessionInfo; this._events = events; + this._decryptor = decryptor; } async decryptAll() { @@ -38,7 +39,7 @@ export class SessionDecryption { try { const {session} = this._sessionInfo; const ciphertext = event.content.ciphertext; - const {plaintext, message_index: messageIndex} = await this._decrypt(session, ciphertext); + const {plaintext, message_index: messageIndex} = await this._decryptor.decrypt(session, ciphertext); let payload; try { payload = JSON.parse(plaintext); @@ -63,12 +64,6 @@ export class SessionDecryption { return {results, errors, replayEntries}; } - async _decrypt(session, ciphertext) { - // const sessionKey = session.export_session(session.first_known_index()); - // return this._worker.decrypt(sessionKey, ciphertext); - return session.decrypt(ciphertext); - } - dispose() { this._sessionInfo.release(); } diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 7245568d..53c26d82 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -42,7 +42,7 @@ export class Timeline { /** @package */ async load() { - const entries = await this._timelineReader.readFromEnd(50); + const entries = await this._timelineReader.readFromEnd(25); this._remoteEntries.setManySorted(entries); } diff --git a/src/worker-polyfill.js b/src/worker-polyfill.js new file mode 100644 index 00000000..08bbf652 --- /dev/null +++ b/src/worker-polyfill.js @@ -0,0 +1,19 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// polyfills needed for IE11 +import "regenerator-runtime/runtime"; +import "core-js/modules/es.promise"; diff --git a/src/worker.js b/src/worker.js new file mode 100644 index 00000000..e470eaa7 --- /dev/null +++ b/src/worker.js @@ -0,0 +1,108 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +function asErrorMessage(err) { + return { + type: "error", + message: err.message, + stack: err.stack + }; +} + +function asSuccessMessage(payload) { + return { + type: "success", + payload + }; +} + +class MessageHandler { + constructor() { + this._olm = null; + } + + handleEvent(e) { + if (e.type === "message") { + this._handleMessage(e.data); + } + } + + _sendReply(refMessage, reply) { + reply.replyToId = refMessage.id; + self.postMessage(reply); + } + + _toMessage(fn) { + try { + let payload = fn(); + if (payload instanceof Promise) { + return payload.then( + payload => asSuccessMessage(payload), + err => asErrorMessage(err) + ); + } else { + return asSuccessMessage(payload); + } + } catch (err) { + return asErrorMessage(err); + } + } + + _loadOlm(path) { + return this._toMessage(async () => { + // might have some problems here with window vs self as global object? + if (self.msCrypto && !self.crypto) { + self.crypto = self.msCrypto; + } + self.importScripts(path); + const olm = self.olm_exports; + // mangle the globals enough to make olm load believe it is running in a browser + self.window = self; + self.document = {}; + await olm.init(); + delete self.document; + delete self.window; + this._olm = olm; + }); + } + + _megolmDecrypt(sessionKey, ciphertext) { + return this._toMessage(() => { + let session; + try { + session = new this._olm.InboundGroupSession(); + session.import_session(sessionKey); + // returns object with plaintext and message_index + return session.decrypt(ciphertext); + } finally { + session?.free(); + } + }); + } + + async _handleMessage(message) { + switch (message.type) { + case "load_olm": + this._sendReply(message, await this._loadOlm(message.path)); + break; + case "megolm_decrypt": + this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext)); + break; + } + } +} + +self.addEventListener("message", new MessageHandler()); From 0bf1723d9963df586860b7b87de10478e5f6587b Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 15:40:30 +0100 Subject: [PATCH 6/9] Worker WIP --- src/domain/ViewModel.js | 3 +- src/domain/session/room/RoomViewModel.js | 3 +- src/matrix/e2ee/RoomEncryption.js | 2 +- src/matrix/e2ee/megolm/Decryption.js | 4 +- .../megolm/decryption/DecryptionWorker.js | 191 ++++++++++++++++-- .../megolm/decryption/SessionDecryption.js | 22 +- src/matrix/room/Room.js | 89 +++++--- src/matrix/room/timeline/Timeline.js | 26 ++- .../timeline/persistence/TimelineReader.js | 72 ++++--- src/utils/Disposables.js | 1 + src/worker-polyfill.js | 4 + src/worker.js | 14 +- 12 files changed, 339 insertions(+), 92 deletions(-) diff --git a/src/domain/ViewModel.js b/src/domain/ViewModel.js index bc35fabd..15812f8c 100644 --- a/src/domain/ViewModel.js +++ b/src/domain/ViewModel.js @@ -36,8 +36,7 @@ export class ViewModel extends EventEmitter { if (!this.disposables) { this.disposables = new Disposables(); } - this.disposables.track(disposable); - return disposable; + return this.disposables.track(disposable); } dispose() { diff --git a/src/domain/session/room/RoomViewModel.js b/src/domain/session/room/RoomViewModel.js index 32e09fbe..59bcd52f 100644 --- a/src/domain/session/room/RoomViewModel.js +++ b/src/domain/session/room/RoomViewModel.js @@ -38,7 +38,8 @@ export class RoomViewModel extends ViewModel { async load() { this._room.on("change", this._onRoomChange); try { - this._timeline = await this._room.openTimeline(); + this._timeline = this._room.openTimeline(); + await this._timeline.load(); this._timelineVM = new TimelineViewModel(this.childOptions({ room: this._room, timeline: this._timeline, diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index 1341389b..544081b3 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -268,7 +268,7 @@ class DecryptionPreparation { } dispose() { - this._megolmDecryptionChanges.dispose(); + this._megolmDecryptionPreparation.dispose(); } } diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index 077e9df4..6121192d 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -21,7 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js"; import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js"; import {SessionDecryption} from "./decryption/SessionDecryption.js"; import {SessionCache} from "./decryption/SessionCache.js"; -import {DecryptionWorker} from "./decryption/DecryptionWorker.js"; +import {DecryptionWorker, WorkerPool} from "./decryption/DecryptionWorker.js"; function getSenderKey(event) { return event.content?.["sender_key"]; @@ -40,7 +40,7 @@ export class Decryption { this._pickleKey = pickleKey; this._olm = olm; // this._decryptor = new DecryptionWorker(new Worker("./src/worker.js")); - this._decryptor = new DecryptionWorker(new Worker("worker-3074010154.js")); + this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4)); this._initPromise = this._decryptor.init(); } diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js index df7bb748..9cc64d74 100644 --- a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js +++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js @@ -14,51 +14,200 @@ See the License for the specific language governing permissions and limitations under the License. */ -export class DecryptionWorker { +import {AbortError} from "../../../../utils/error.js"; + +class WorkerState { constructor(worker) { - this._worker = worker; + this.worker = worker; + this.busy = false; + } + + attach(pool) { + this.worker.addEventListener("message", pool); + this.worker.addEventListener("error", pool); + } + + detach(pool) { + this.worker.removeEventListener("message", pool); + this.worker.removeEventListener("error", pool); + } +} + +class Request { + constructor(message, pool) { + this._promise = new Promise((_resolve, _reject) => { + this._resolve = _resolve; + this._reject = _reject; + }); + this._message = message; + this._pool = pool; + this._worker = null; + } + + abort() { + if (this._isNotDisposed) { + this._pool._abortRequest(this); + this._dispose(); + } + } + + response() { + return this._promise; + } + + _dispose() { + this._reject = null; + this._resolve = null; + } + + get _isNotDisposed() { + return this._resolve && this._reject; + } +} + +export class WorkerPool { + constructor(path, amount) { + this._workers = []; + for (let i = 0; i < amount ; ++i) { + const worker = new WorkerState(new Worker(path)); + worker.attach(this); + this._workers[i] = worker; + } this._requests = new Map(); this._counter = 0; - this._worker.addEventListener("message", this); + this._pendingFlag = false; } handleEvent(e) { if (e.type === "message") { const message = e.data; - console.log("worker reply", message); const request = this._requests.get(message.replyToId); if (request) { - if (message.type === "success") { - request.resolve(message.payload); - } else if (message.type === "error") { - request.reject(new Error(message.stack)); + request._worker.busy = false; + if (request._isNotDisposed) { + if (message.type === "success") { + request._resolve(message.payload); + } else if (message.type === "error") { + request._reject(new Error(message.stack)); + } + request._dispose(); } - this._requests.delete(message.ref_id); + this._requests.delete(message.replyToId); + } + console.log("got worker reply", message, this._requests.size); + this._sendPending(); + } else if (e.type === "error") { + console.error("worker error", e); + } + } + + _getPendingRequest() { + for (const r of this._requests.values()) { + if (!r._worker) { + return r; } } } - _send(message) { + _getFreeWorker() { + for (const w of this._workers) { + if (!w.busy) { + return w; + } + } + } + + _sendPending() { + this._pendingFlag = false; + console.log("seeing if there is anything to send", this._requests.size); + let success; + do { + success = false; + const request = this._getPendingRequest(); + if (request) { + console.log("sending pending request", request); + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + success = true; + } + } + } while (success); + } + + _sendWith(request, worker) { + request._worker = worker; + worker.busy = true; + console.log("sending message to worker", request._message); + worker.worker.postMessage(request._message); + } + + _enqueueRequest(message) { this._counter += 1; message.id = this._counter; - let resolve; - let reject; - const promise = new Promise((_resolve, _reject) => { - resolve = _resolve; - reject = _reject; + const request = new Request(message, this); + this._requests.set(message.id, request); + return request; + } + + send(message) { + const request = this._enqueueRequest(message); + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + } + return request; + } + + // assumes all workers are free atm + sendAll(message) { + const promises = this._workers.map(worker => { + const request = this._enqueueRequest(message); + this._sendWith(request, worker); + return request.response(); }); - this._requests.set(message.id, {reject, resolve}); - this._worker.postMessage(message); - return promise; + return Promise.all(promises); + } + + dispose() { + for (const w of this._workers) { + w.worker.terminate(); + w.detach(this); + } + } + + _trySendPendingInNextTick() { + if (!this._pendingFlag) { + this._pendingFlag = true; + Promise.resolve().then(() => { + this._sendPending(); + }); + } + } + + _abortRequest(request) { + request._reject(new AbortError()); + if (request._worker) { + request._worker.busy = false; + } + this._requests.delete(request._message.id); + // allow more requests to be aborted before trying to send other pending + this._trySendPendingInNextTick(); + } +} + +export class DecryptionWorker { + constructor(workerPool) { + this._workerPool = workerPool; } decrypt(session, ciphertext) { const sessionKey = session.export_session(session.first_known_index()); - return this._send({type: "megolm_decrypt", ciphertext, sessionKey}); + return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey}); } - init() { - return this._send({type: "load_olm", path: "olm_legacy-3232457086.js"}); + async init() { + await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"}); // return this._send({type: "load_olm", path: "../lib/olm/olm_legacy.js"}); } } diff --git a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js index 5fc32f58..30ca432e 100644 --- a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js +++ b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js @@ -27,6 +27,7 @@ export class SessionDecryption { this._sessionInfo = sessionInfo; this._events = events; this._decryptor = decryptor; + this._decryptionRequests = decryptor ? [] : null; } async decryptAll() { @@ -39,7 +40,16 @@ export class SessionDecryption { try { const {session} = this._sessionInfo; const ciphertext = event.content.ciphertext; - const {plaintext, message_index: messageIndex} = await this._decryptor.decrypt(session, ciphertext); + let decryptionResult; + if (this._decryptor) { + const request = this._decryptor.decrypt(session, ciphertext); + this._decryptionRequests.push(request); + decryptionResult = await request.response(); + } else { + decryptionResult = session.decrypt(ciphertext); + } + const plaintext = decryptionResult.plaintext; + const messageIndex = decryptionResult.message_index; let payload; try { payload = JSON.parse(plaintext); @@ -54,6 +64,10 @@ export class SessionDecryption { const result = new DecryptionResult(payload, this._sessionInfo.senderKey, this._sessionInfo.claimedKeys); results.set(event.event_id, result); } catch (err) { + // ignore AbortError from cancelling decryption requests in dispose method + if (err.name === "AbortError") { + return; + } if (!errors) { errors = new Map(); } @@ -65,6 +79,12 @@ export class SessionDecryption { } dispose() { + if (this._decryptionRequests) { + for (const r of this._decryptionRequests) { + r.abort(); + } + } + // TODO: cancel decryptions here this._sessionInfo.release(); } } diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index f281f166..233990a3 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -65,7 +65,8 @@ export class Room extends EventEmitter { retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); } } - await this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); + const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); + await decryptRequest.complete(); if (this._timeline) { // only adds if already present this._timeline.replaceEntries(retryEntries); @@ -89,31 +90,39 @@ export class Room extends EventEmitter { * Used for decrypting when loading/filling the timeline, and retrying decryption, * not during sync, where it is split up during the multiple phases. */ - async _decryptEntries(source, entries, inboundSessionTxn = null) { - if (!inboundSessionTxn) { - inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); - } - const events = entries.filter(entry => { - return entry.eventType === EVENT_ENCRYPTED_TYPE; - }).map(entry => entry.event); - const isTimelineOpen = this._isTimelineOpen; - const preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn); - const changes = await preparation.decrypt(); - const stores = [this._storage.storeNames.groupSessionDecryptions]; - if (isTimelineOpen) { - // read to fetch devices if timeline is open - stores.push(this._storage.storeNames.deviceIdentities); - } - const writeTxn = await this._storage.readWriteTxn(stores); - let decryption; - try { - decryption = await changes.write(writeTxn); - } catch (err) { - writeTxn.abort(); - throw err; - } - await writeTxn.complete(); - decryption.applyToEntries(entries); + _decryptEntries(source, entries, inboundSessionTxn = null) { + const request = new DecryptionRequest(async r => { + if (!inboundSessionTxn) { + inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); + } + if (r.cancelled) return; + const events = entries.filter(entry => { + return entry.eventType === EVENT_ENCRYPTED_TYPE; + }).map(entry => entry.event); + const isTimelineOpen = this._isTimelineOpen; + r.preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn); + if (r.cancelled) return; + // TODO: should this throw an AbortError? + const changes = await r.preparation.decrypt(); + r.preparation = null; + if (r.cancelled) return; + const stores = [this._storage.storeNames.groupSessionDecryptions]; + if (isTimelineOpen) { + // read to fetch devices if timeline is open + stores.push(this._storage.storeNames.deviceIdentities); + } + const writeTxn = await this._storage.readWriteTxn(stores); + let decryption; + try { + decryption = await changes.write(writeTxn); + } catch (err) { + writeTxn.abort(); + throw err; + } + await writeTxn.complete(); + decryption.applyToEntries(entries); + }); + return request; } get needsPrepareSync() { @@ -349,7 +358,8 @@ export class Room extends EventEmitter { } await txn.complete(); if (this._roomEncryption) { - await this._decryptEntries(DecryptionSource.Timeline, gapResult.entries); + const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries); + await decryptRequest.complete(); } // once txn is committed, update in-memory state & emit events for (const fragment of gapResult.fragments) { @@ -461,7 +471,7 @@ export class Room extends EventEmitter { } /** @public */ - async openTimeline() { + openTimeline() { if (this._timeline) { throw new Error("not dealing with load race here for now"); } @@ -483,7 +493,6 @@ export class Room extends EventEmitter { if (this._roomEncryption) { this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); } - await this._timeline.load(); return this._timeline; } @@ -502,3 +511,25 @@ export class Room extends EventEmitter { } } +class DecryptionRequest { + constructor(decryptFn) { + this._cancelled = false; + this.preparation = null; + this._promise = decryptFn(this); + } + + complete() { + return this._promise; + } + + get cancelled() { + return this._cancelled; + } + + dispose() { + this._cancelled = true; + if (this.preparation) { + this.preparation.dispose(); + } + } +} \ No newline at end of file diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 53c26d82..38dc1c6a 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -15,6 +15,7 @@ limitations under the License. */ import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js"; +import {Disposables} from "../../../utils/Disposables.js"; import {Direction} from "./Direction.js"; import {TimelineReader} from "./persistence/TimelineReader.js"; import {PendingEventEntry} from "./entries/PendingEventEntry.js"; @@ -26,12 +27,14 @@ export class Timeline { this._storage = storage; this._closeCallback = closeCallback; this._fragmentIdComparer = fragmentIdComparer; + this._disposables = new Disposables(); this._remoteEntries = new SortedArray((a, b) => a.compare(b)); this._timelineReader = new TimelineReader({ roomId: this._roomId, storage: this._storage, fragmentIdComparer: this._fragmentIdComparer }); + this._readerRequest = null; const localEntries = new MappedList(pendingEvents, pe => { return new PendingEventEntry({pendingEvent: pe, user}); }, (pee, params) => { @@ -42,8 +45,14 @@ export class Timeline { /** @package */ async load() { - const entries = await this._timelineReader.readFromEnd(25); - this._remoteEntries.setManySorted(entries); + // 30 seems to be a good amount to fill the entire screen + const readerRequest = this._disposables.track(this._timelineReader.readFromEnd(30)); + try { + const entries = await readerRequest.complete(); + this._remoteEntries.setManySorted(entries); + } finally { + this._disposables.disposeTracked(readerRequest); + } } replaceEntries(entries) { @@ -71,12 +80,17 @@ export class Timeline { if (!firstEventEntry) { return; } - const entries = await this._timelineReader.readFrom( + const readerRequest = this._disposables.track(this._timelineReader.readFrom( firstEventEntry.asEventKey(), Direction.Backward, amount - ); - this._remoteEntries.setManySorted(entries); + )); + try { + const entries = await readerRequest.complete(); + this._remoteEntries.setManySorted(entries); + } finally { + this._disposables.disposeTracked(readerRequest); + } } /** @public */ @@ -87,6 +101,8 @@ export class Timeline { /** @public */ close() { if (this._closeCallback) { + this._readerRequest?.dispose(); + this._readerRequest = null; this._closeCallback(); this._closeCallback = null; } diff --git a/src/matrix/room/timeline/persistence/TimelineReader.js b/src/matrix/room/timeline/persistence/TimelineReader.js index d451d76e..f5983a19 100644 --- a/src/matrix/room/timeline/persistence/TimelineReader.js +++ b/src/matrix/room/timeline/persistence/TimelineReader.js @@ -19,6 +19,24 @@ import {Direction} from "../Direction.js"; import {EventEntry} from "../entries/EventEntry.js"; import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js"; +class ReaderRequest { + constructor(fn) { + this.decryptRequest = null; + this._promise = fn(this); + } + + complete() { + return this._promise; + } + + dispose() { + if (this.decryptRequest) { + this.decryptRequest.dispose(); + this.decryptRequest = null; + } + } +} + export class TimelineReader { constructor({roomId, storage, fragmentIdComparer}) { this._roomId = roomId; @@ -42,12 +60,33 @@ export class TimelineReader { return this._storage.readTxn(stores); } - async readFrom(eventKey, direction, amount) { - const txn = await this._openTxn(); - return await this._readFrom(eventKey, direction, amount, txn); + readFrom(eventKey, direction, amount) { + return new ReaderRequest(async r => { + const txn = await this._openTxn(); + return await this._readFrom(eventKey, direction, amount, r, txn); + }); } - async _readFrom(eventKey, direction, amount, txn) { + readFromEnd(amount) { + return new ReaderRequest(async r => { + const txn = await this._openTxn(); + const liveFragment = await txn.timelineFragments.liveFragment(this._roomId); + let entries; + // room hasn't been synced yet + if (!liveFragment) { + entries = []; + } else { + this._fragmentIdComparer.add(liveFragment); + const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer); + const eventKey = liveFragmentEntry.asEventKey(); + entries = await this._readFrom(eventKey, Direction.Backward, amount, r, txn); + entries.unshift(liveFragmentEntry); + } + return entries; + }); + } + + async _readFrom(eventKey, direction, amount, r, txn) { let entries = []; const timelineStore = txn.timelineEvents; const fragmentStore = txn.timelineFragments; @@ -83,25 +122,12 @@ export class TimelineReader { } if (this._decryptEntries) { - await this._decryptEntries(entries, txn); - } - - return entries; - } - - async readFromEnd(amount) { - const txn = await this._openTxn(); - const liveFragment = await txn.timelineFragments.liveFragment(this._roomId); - let entries; - // room hasn't been synced yet - if (!liveFragment) { - entries = []; - } else { - this._fragmentIdComparer.add(liveFragment); - const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer); - const eventKey = liveFragmentEntry.asEventKey(); - entries = await this._readFrom(eventKey, Direction.Backward, amount, txn); - entries.unshift(liveFragmentEntry); + r.decryptRequest = this._decryptEntries(entries, txn); + try { + await r.decryptRequest.complete(); + } finally { + r.decryptRequest = null; + } } return entries; } diff --git a/src/utils/Disposables.js b/src/utils/Disposables.js index e5690319..e020ef83 100644 --- a/src/utils/Disposables.js +++ b/src/utils/Disposables.js @@ -29,6 +29,7 @@ export class Disposables { track(disposable) { this._disposables.push(disposable); + return disposable; } dispose() { diff --git a/src/worker-polyfill.js b/src/worker-polyfill.js index 08bbf652..15b955d5 100644 --- a/src/worker-polyfill.js +++ b/src/worker-polyfill.js @@ -14,6 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ + // polyfills needed for IE11 +// just enough to run olm, have promises and async/await import "regenerator-runtime/runtime"; import "core-js/modules/es.promise"; +import "core-js/modules/es.math.imul"; +import "core-js/modules/es.math.clz32"; diff --git a/src/worker.js b/src/worker.js index e470eaa7..4b2a1e43 100644 --- a/src/worker.js +++ b/src/worker.js @@ -94,13 +94,13 @@ class MessageHandler { } async _handleMessage(message) { - switch (message.type) { - case "load_olm": - this._sendReply(message, await this._loadOlm(message.path)); - break; - case "megolm_decrypt": - this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext)); - break; + const {type} = message; + if (type === "ping") { + this._sendReply(message, {type: "pong"}); + } else if (type === "load_olm") { + this._sendReply(message, await this._loadOlm(message.path)); + } else if (type === "megolm_decrypt") { + this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext)); } } } From de1cc0d7395f4831032be3b061ba6c0925cd63d7 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 17:43:01 +0200 Subject: [PATCH 7/9] abort decrypt requests when changing room --- src/domain/session/room/RoomViewModel.js | 10 ++++------ src/matrix/e2ee/RoomEncryption.js | 2 -- src/matrix/e2ee/megolm/Decryption.js | 2 +- .../e2ee/megolm/decryption/DecryptionPreparation.js | 3 +++ src/matrix/e2ee/megolm/decryption/DecryptionWorker.js | 8 ++------ src/matrix/room/Room.js | 2 +- src/matrix/room/timeline/Timeline.js | 6 ++---- src/utils/Disposables.js | 9 ++++++++- 8 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/domain/session/room/RoomViewModel.js b/src/domain/session/room/RoomViewModel.js index 59bcd52f..a2ea5f66 100644 --- a/src/domain/session/room/RoomViewModel.js +++ b/src/domain/session/room/RoomViewModel.js @@ -38,7 +38,7 @@ export class RoomViewModel extends ViewModel { async load() { this._room.on("change", this._onRoomChange); try { - this._timeline = this._room.openTimeline(); + this._timeline = this.track(this._room.openTimeline()); await this._timeline.load(); this._timelineVM = new TimelineViewModel(this.childOptions({ room: this._room, @@ -63,17 +63,15 @@ export class RoomViewModel extends ViewModel { } dispose() { - // this races with enable, on the await openTimeline() - if (this._timeline) { - // will stop the timeline from delivering updates on entries - this._timeline.close(); - } + super.dispose(); if (this._clearUnreadTimout) { this._clearUnreadTimout.abort(); this._clearUnreadTimout = null; } } + // called from view to close room + // parent vm will dispose this vm close() { this._closeCallback(); } diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index 544081b3..44229b97 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -292,11 +292,9 @@ class BatchDecryptionResult { constructor(results, errors) { this.results = results; this.errors = errors; - console.log("BatchDecryptionResult", this); } applyToEntries(entries) { - console.log("BatchDecryptionResult.applyToEntries", this); for (const entry of entries) { const result = this.results.get(entry.id); if (result) { diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index 6121192d..9726e6d8 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -39,8 +39,8 @@ export class Decryption { constructor({pickleKey, olm}) { this._pickleKey = pickleKey; this._olm = olm; - // this._decryptor = new DecryptionWorker(new Worker("./src/worker.js")); this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4)); + //this._decryptor = new DecryptionWorker(new WorkerPool("./src/worker.js", 4)); this._initPromise = this._decryptor.init(); } diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js index 02ee32df..e24d70af 100644 --- a/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js +++ b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js @@ -28,6 +28,9 @@ export class DecryptionPreparation { } async decrypt() { + // console.log("start sleeping"); + // await new Promise(resolve => setTimeout(resolve, 5000)); + // console.log("done sleeping"); try { const errors = this._initialErrors; const results = new Map(); diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js index 9cc64d74..38a474ed 100644 --- a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js +++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js @@ -94,7 +94,6 @@ export class WorkerPool { } this._requests.delete(message.replyToId); } - console.log("got worker reply", message, this._requests.size); this._sendPending(); } else if (e.type === "error") { console.error("worker error", e); @@ -119,13 +118,11 @@ export class WorkerPool { _sendPending() { this._pendingFlag = false; - console.log("seeing if there is anything to send", this._requests.size); let success; do { success = false; const request = this._getPendingRequest(); if (request) { - console.log("sending pending request", request); const worker = this._getFreeWorker(); if (worker) { this._sendWith(request, worker); @@ -138,7 +135,6 @@ export class WorkerPool { _sendWith(request, worker) { request._worker = worker; worker.busy = true; - console.log("sending message to worker", request._message); worker.worker.postMessage(request._message); } @@ -162,7 +158,7 @@ export class WorkerPool { // assumes all workers are free atm sendAll(message) { const promises = this._workers.map(worker => { - const request = this._enqueueRequest(message); + const request = this._enqueueRequest(Object.assign({}, message)); this._sendWith(request, worker); return request.response(); }); @@ -208,6 +204,6 @@ export class DecryptionWorker { async init() { await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"}); - // return this._send({type: "load_olm", path: "../lib/olm/olm_legacy.js"}); + //await this._workerPool.sendAll({type: "load_olm", path: "../lib/olm/olm_legacy.js"}); } } diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 233990a3..98f114a5 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -532,4 +532,4 @@ class DecryptionRequest { this.preparation.dispose(); } } -} \ No newline at end of file +} diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 38dc1c6a..74362a13 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -19,7 +19,6 @@ import {Disposables} from "../../../utils/Disposables.js"; import {Direction} from "./Direction.js"; import {TimelineReader} from "./persistence/TimelineReader.js"; import {PendingEventEntry} from "./entries/PendingEventEntry.js"; -import {EventEntry} from "./entries/EventEntry.js"; export class Timeline { constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user}) { @@ -99,10 +98,9 @@ export class Timeline { } /** @public */ - close() { + dispose() { if (this._closeCallback) { - this._readerRequest?.dispose(); - this._readerRequest = null; + this._disposables.dispose(); this._closeCallback(); this._closeCallback = null; } diff --git a/src/utils/Disposables.js b/src/utils/Disposables.js index e020ef83..efc49897 100644 --- a/src/utils/Disposables.js +++ b/src/utils/Disposables.js @@ -28,6 +28,9 @@ export class Disposables { } track(disposable) { + if (this.isDisposed) { + throw new Error("Already disposed, check isDisposed after await if needed"); + } this._disposables.push(disposable); return disposable; } @@ -41,8 +44,12 @@ export class Disposables { } } + get isDisposed() { + return this._disposables === null; + } + disposeTracked(value) { - if (value === undefined || value === null) { + if (value === undefined || value === null || this.isDisposed) { return null; } const idx = this._disposables.indexOf(value); From af36c71a5974de8241f7fafa78e48b998a21f3ee Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 18:41:23 +0200 Subject: [PATCH 8/9] load worker in main and pass paths so it works both on compiled and non-compiled --- index.html | 9 +- scripts/build.mjs | 8 +- src/main.js | 30 ++- src/matrix/Session.js | 5 +- src/matrix/SessionContainer.js | 10 +- src/matrix/e2ee/megolm/Decryption.js | 9 +- .../megolm/decryption/DecryptionWorker.js | 183 --------------- src/utils/WorkerPool.js | 212 ++++++++++++++++++ src/worker.js | 2 +- 9 files changed, 268 insertions(+), 200 deletions(-) create mode 100644 src/utils/WorkerPool.js diff --git a/index.html b/index.html index b09286a0..74e44c99 100644 --- a/index.html +++ b/index.html @@ -19,9 +19,12 @@ ` + + `` + `` + - ``); + ``); removeOrEnableScript(doc("script#service-worker"), offline); const versionScript = doc("script#version"); diff --git a/src/main.js b/src/main.js index 79f5698d..6f279910 100644 --- a/src/main.js +++ b/src/main.js @@ -25,6 +25,7 @@ import {BrawlViewModel} from "./domain/BrawlViewModel.js"; import {BrawlView} from "./ui/web/BrawlView.js"; import {Clock} from "./ui/web/dom/Clock.js"; import {OnlineStatus} from "./ui/web/dom/OnlineStatus.js"; +import {WorkerPool} from "./utils/WorkerPool.js"; function addScript(src) { return new Promise(function (resolve, reject) { @@ -55,10 +56,27 @@ async function loadOlm(olmPaths) { return null; } +// make path relative to basePath, +// assuming it and basePath are relative to document +function relPath(path, basePath) { + const idx = basePath.lastIndexOf("/"); + const dir = idx === -1 ? "" : basePath.slice(0, idx); + const dirCount = dir.length ? dir.split("/").length : 0; + return "../".repeat(dirCount) + path; +} + +async function loadWorker(paths) { + const workerPool = new WorkerPool(paths.worker, 4); + await workerPool.init(); + const path = relPath(paths.olm.legacyBundle, paths.worker); + await workerPool.sendAll({type: "load_olm", path}); + return workerPool; +} + // Don't use a default export here, as we use multiple entries during legacy build, // which does not support default exports, // see https://github.com/rollup/plugins/tree/master/packages/multi-entry -export async function main(container, olmPaths) { +export async function main(container, paths) { try { // to replay: // const fetchLog = await (await fetch("/fetchlogs/constrainterror.json")).json(); @@ -79,6 +97,13 @@ export async function main(container, olmPaths) { const sessionInfoStorage = new SessionInfoStorage("brawl_sessions_v1"); const storageFactory = new StorageFactory(); + // if wasm is not supported, we'll want + // to run some olm operations in a worker (mainly for IE11) + let workerPromise; + if (!window.WebAssembly) { + workerPromise = loadWorker(paths); + } + const vm = new BrawlViewModel({ createSessionContainer: () => { return new SessionContainer({ @@ -88,7 +113,8 @@ export async function main(container, olmPaths) { sessionInfoStorage, request, clock, - olmPromise: loadOlm(olmPaths), + olmPromise: loadOlm(paths.olm), + workerPromise, }); }, sessionInfoStorage, diff --git a/src/matrix/Session.js b/src/matrix/Session.js index c5c0c94f..be3f6a06 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -33,7 +33,7 @@ const PICKLE_KEY = "DEFAULT_KEY"; export class Session { // sessionInfo contains deviceId, userId and homeServer - constructor({clock, storage, hsApi, sessionInfo, olm}) { + constructor({clock, storage, hsApi, sessionInfo, olm, workerPool}) { this._clock = clock; this._storage = storage; this._hsApi = hsApi; @@ -52,6 +52,7 @@ export class Session { this._megolmEncryption = null; this._megolmDecryption = null; this._getSyncToken = () => this.syncToken; + this._workerPool = workerPool; if (olm) { this._olmUtil = new olm.Utility(); @@ -100,6 +101,7 @@ export class Session { this._megolmDecryption = new MegOlmDecryption({ pickleKey: PICKLE_KEY, olm: this._olm, + workerPool: this._workerPool, }); this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption: this._megolmDecryption}); } @@ -202,6 +204,7 @@ export class Session { } stop() { + this._workerPool?.dispose(); this._sendScheduler.stop(); } diff --git a/src/matrix/SessionContainer.js b/src/matrix/SessionContainer.js index c07190eb..1e868eba 100644 --- a/src/matrix/SessionContainer.js +++ b/src/matrix/SessionContainer.js @@ -42,7 +42,7 @@ export const LoginFailure = createEnum( ); export class SessionContainer { - constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise}) { + constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise, workerPromise}) { this._random = random; this._clock = clock; this._onlineStatus = onlineStatus; @@ -59,6 +59,7 @@ export class SessionContainer { this._sessionId = null; this._storage = null; this._olmPromise = olmPromise; + this._workerPromise = workerPromise; } createNewSessionId() { @@ -152,8 +153,13 @@ export class SessionContainer { homeServer: sessionInfo.homeServer, }; const olm = await this._olmPromise; + let workerPool = null; + if (this._workerPromise) { + workerPool = await this._workerPromise; + } this._session = new Session({storage: this._storage, - sessionInfo: filteredSessionInfo, hsApi, olm, clock: this._clock}); + sessionInfo: filteredSessionInfo, hsApi, olm, + clock: this._clock, workerPool}); await this._session.load(); this._status.set(LoadStatus.SessionSetup); await this._session.beforeFirstSync(isNewLogin); diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index 9726e6d8..544fa0a3 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -21,7 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js"; import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js"; import {SessionDecryption} from "./decryption/SessionDecryption.js"; import {SessionCache} from "./decryption/SessionCache.js"; -import {DecryptionWorker, WorkerPool} from "./decryption/DecryptionWorker.js"; +import {DecryptionWorker} from "./decryption/DecryptionWorker.js"; function getSenderKey(event) { return event.content?.["sender_key"]; @@ -36,12 +36,10 @@ function getCiphertext(event) { } export class Decryption { - constructor({pickleKey, olm}) { + constructor({pickleKey, olm, workerPool}) { this._pickleKey = pickleKey; this._olm = olm; - this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4)); - //this._decryptor = new DecryptionWorker(new WorkerPool("./src/worker.js", 4)); - this._initPromise = this._decryptor.init(); + this._decryptor = workerPool ? new DecryptionWorker(workerPool) : null; } createSessionCache(fallback) { @@ -58,7 +56,6 @@ export class Decryption { * @return {DecryptionPreparation} */ async prepareDecryptAll(roomId, events, sessionCache, txn) { - await this._initPromise; const errors = new Map(); const validEvents = []; diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js index 38a474ed..b44694a0 100644 --- a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js +++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js @@ -14,184 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -import {AbortError} from "../../../../utils/error.js"; - -class WorkerState { - constructor(worker) { - this.worker = worker; - this.busy = false; - } - - attach(pool) { - this.worker.addEventListener("message", pool); - this.worker.addEventListener("error", pool); - } - - detach(pool) { - this.worker.removeEventListener("message", pool); - this.worker.removeEventListener("error", pool); - } -} - -class Request { - constructor(message, pool) { - this._promise = new Promise((_resolve, _reject) => { - this._resolve = _resolve; - this._reject = _reject; - }); - this._message = message; - this._pool = pool; - this._worker = null; - } - - abort() { - if (this._isNotDisposed) { - this._pool._abortRequest(this); - this._dispose(); - } - } - - response() { - return this._promise; - } - - _dispose() { - this._reject = null; - this._resolve = null; - } - - get _isNotDisposed() { - return this._resolve && this._reject; - } -} - -export class WorkerPool { - constructor(path, amount) { - this._workers = []; - for (let i = 0; i < amount ; ++i) { - const worker = new WorkerState(new Worker(path)); - worker.attach(this); - this._workers[i] = worker; - } - this._requests = new Map(); - this._counter = 0; - this._pendingFlag = false; - } - - handleEvent(e) { - if (e.type === "message") { - const message = e.data; - const request = this._requests.get(message.replyToId); - if (request) { - request._worker.busy = false; - if (request._isNotDisposed) { - if (message.type === "success") { - request._resolve(message.payload); - } else if (message.type === "error") { - request._reject(new Error(message.stack)); - } - request._dispose(); - } - this._requests.delete(message.replyToId); - } - this._sendPending(); - } else if (e.type === "error") { - console.error("worker error", e); - } - } - - _getPendingRequest() { - for (const r of this._requests.values()) { - if (!r._worker) { - return r; - } - } - } - - _getFreeWorker() { - for (const w of this._workers) { - if (!w.busy) { - return w; - } - } - } - - _sendPending() { - this._pendingFlag = false; - let success; - do { - success = false; - const request = this._getPendingRequest(); - if (request) { - const worker = this._getFreeWorker(); - if (worker) { - this._sendWith(request, worker); - success = true; - } - } - } while (success); - } - - _sendWith(request, worker) { - request._worker = worker; - worker.busy = true; - worker.worker.postMessage(request._message); - } - - _enqueueRequest(message) { - this._counter += 1; - message.id = this._counter; - const request = new Request(message, this); - this._requests.set(message.id, request); - return request; - } - - send(message) { - const request = this._enqueueRequest(message); - const worker = this._getFreeWorker(); - if (worker) { - this._sendWith(request, worker); - } - return request; - } - - // assumes all workers are free atm - sendAll(message) { - const promises = this._workers.map(worker => { - const request = this._enqueueRequest(Object.assign({}, message)); - this._sendWith(request, worker); - return request.response(); - }); - return Promise.all(promises); - } - - dispose() { - for (const w of this._workers) { - w.worker.terminate(); - w.detach(this); - } - } - - _trySendPendingInNextTick() { - if (!this._pendingFlag) { - this._pendingFlag = true; - Promise.resolve().then(() => { - this._sendPending(); - }); - } - } - - _abortRequest(request) { - request._reject(new AbortError()); - if (request._worker) { - request._worker.busy = false; - } - this._requests.delete(request._message.id); - // allow more requests to be aborted before trying to send other pending - this._trySendPendingInNextTick(); - } -} - export class DecryptionWorker { constructor(workerPool) { this._workerPool = workerPool; @@ -201,9 +23,4 @@ export class DecryptionWorker { const sessionKey = session.export_session(session.first_known_index()); return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey}); } - - async init() { - await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"}); - //await this._workerPool.sendAll({type: "load_olm", path: "../lib/olm/olm_legacy.js"}); - } } diff --git a/src/utils/WorkerPool.js b/src/utils/WorkerPool.js new file mode 100644 index 00000000..554067fe --- /dev/null +++ b/src/utils/WorkerPool.js @@ -0,0 +1,212 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {AbortError} from "./error.js"; + +class WorkerState { + constructor(worker) { + this.worker = worker; + this.busy = false; + } + + attach(pool) { + this.worker.addEventListener("message", pool); + this.worker.addEventListener("error", pool); + } + + detach(pool) { + this.worker.removeEventListener("message", pool); + this.worker.removeEventListener("error", pool); + } +} + +class Request { + constructor(message, pool) { + this._promise = new Promise((_resolve, _reject) => { + this._resolve = _resolve; + this._reject = _reject; + }); + this._message = message; + this._pool = pool; + this._worker = null; + } + + abort() { + if (this._isNotDisposed) { + this._pool._abortRequest(this); + this._dispose(); + } + } + + response() { + return this._promise; + } + + _dispose() { + this._reject = null; + this._resolve = null; + } + + get _isNotDisposed() { + return this._resolve && this._reject; + } +} + +export class WorkerPool { + // TODO: extract DOM specific bits and write unit tests + constructor(path, amount) { + this._workers = []; + for (let i = 0; i < amount ; ++i) { + const worker = new WorkerState(new Worker(path)); + worker.attach(this); + this._workers[i] = worker; + } + this._requests = new Map(); + this._counter = 0; + this._pendingFlag = false; + this._init = null; + + } + + init() { + const promise = new Promise((resolve, reject) => { + this._init = {resolve, reject}; + }); + this.sendAll({type: "ping"}) + .then(this._init.resolve, this._init.reject) + .finally(() => { + this._init = null; + }); + return promise; + } + + handleEvent(e) { + console.log("WorkerPool event", e); + if (e.type === "message") { + const message = e.data; + const request = this._requests.get(message.replyToId); + if (request) { + request._worker.busy = false; + if (request._isNotDisposed) { + if (message.type === "success") { + request._resolve(message.payload); + } else if (message.type === "error") { + request._reject(new Error(message.stack)); + } + request._dispose(); + } + this._requests.delete(message.replyToId); + } + this._sendPending(); + } else if (e.type === "error") { + if (this._init) { + this._init.reject(new Error("worker error during init")); + } + console.error("worker error", e); + } + } + + _getPendingRequest() { + for (const r of this._requests.values()) { + if (!r._worker) { + return r; + } + } + } + + _getFreeWorker() { + for (const w of this._workers) { + if (!w.busy) { + return w; + } + } + } + + _sendPending() { + this._pendingFlag = false; + let success; + do { + success = false; + const request = this._getPendingRequest(); + if (request) { + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + success = true; + } + } + } while (success); + } + + _sendWith(request, worker) { + request._worker = worker; + worker.busy = true; + worker.worker.postMessage(request._message); + } + + _enqueueRequest(message) { + this._counter += 1; + message.id = this._counter; + const request = new Request(message, this); + this._requests.set(message.id, request); + return request; + } + + send(message) { + const request = this._enqueueRequest(message); + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + } + return request; + } + + // assumes all workers are free atm + sendAll(message) { + const promises = this._workers.map(worker => { + const request = this._enqueueRequest(Object.assign({}, message)); + this._sendWith(request, worker); + return request.response(); + }); + return Promise.all(promises); + } + + dispose() { + for (const w of this._workers) { + w.detach(this); + w.worker.terminate(); + } + } + + _trySendPendingInNextTick() { + if (!this._pendingFlag) { + this._pendingFlag = true; + Promise.resolve().then(() => { + this._sendPending(); + }); + } + } + + _abortRequest(request) { + request._reject(new AbortError()); + if (request._worker) { + request._worker.busy = false; + } + this._requests.delete(request._message.id); + // allow more requests to be aborted before trying to send other pending + this._trySendPendingInNextTick(); + } +} diff --git a/src/worker.js b/src/worker.js index 4b2a1e43..7c6642fb 100644 --- a/src/worker.js +++ b/src/worker.js @@ -96,7 +96,7 @@ class MessageHandler { async _handleMessage(message) { const {type} = message; if (type === "ping") { - this._sendReply(message, {type: "pong"}); + this._sendReply(message, {type: "success"}); } else if (type === "load_olm") { this._sendReply(message, await this._loadOlm(message.path)); } else if (type === "megolm_decrypt") { From 78fecd003a847f25da1f9b7d19c27d3d763d4dea Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 18:57:29 +0200 Subject: [PATCH 9/9] cleanup --- src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js | 3 --- src/matrix/room/Room.js | 1 - src/utils/WorkerPool.js | 1 - 3 files changed, 5 deletions(-) diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js index e24d70af..02ee32df 100644 --- a/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js +++ b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js @@ -28,9 +28,6 @@ export class DecryptionPreparation { } async decrypt() { - // console.log("start sleeping"); - // await new Promise(resolve => setTimeout(resolve, 5000)); - // console.log("done sleeping"); try { const errors = this._initialErrors; const results = new Map(); diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 98f114a5..3704223e 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -102,7 +102,6 @@ export class Room extends EventEmitter { const isTimelineOpen = this._isTimelineOpen; r.preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn); if (r.cancelled) return; - // TODO: should this throw an AbortError? const changes = await r.preparation.decrypt(); r.preparation = null; if (r.cancelled) return; diff --git a/src/utils/WorkerPool.js b/src/utils/WorkerPool.js index 554067fe..56feaf8c 100644 --- a/src/utils/WorkerPool.js +++ b/src/utils/WorkerPool.js @@ -94,7 +94,6 @@ export class WorkerPool { } handleEvent(e) { - console.log("WorkerPool event", e); if (e.type === "message") { const message = e.data; const request = this._requests.get(message.replyToId);