From 0bf1723d9963df586860b7b87de10478e5f6587b Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 15:40:30 +0100 Subject: [PATCH] Worker WIP --- src/domain/ViewModel.js | 3 +- src/domain/session/room/RoomViewModel.js | 3 +- src/matrix/e2ee/RoomEncryption.js | 2 +- src/matrix/e2ee/megolm/Decryption.js | 4 +- .../megolm/decryption/DecryptionWorker.js | 191 ++++++++++++++++-- .../megolm/decryption/SessionDecryption.js | 22 +- src/matrix/room/Room.js | 89 +++++--- src/matrix/room/timeline/Timeline.js | 26 ++- .../timeline/persistence/TimelineReader.js | 72 ++++--- src/utils/Disposables.js | 1 + src/worker-polyfill.js | 4 + src/worker.js | 14 +- 12 files changed, 339 insertions(+), 92 deletions(-) diff --git a/src/domain/ViewModel.js b/src/domain/ViewModel.js index bc35fabd..15812f8c 100644 --- a/src/domain/ViewModel.js +++ b/src/domain/ViewModel.js @@ -36,8 +36,7 @@ export class ViewModel extends EventEmitter { if (!this.disposables) { this.disposables = new Disposables(); } - this.disposables.track(disposable); - return disposable; + return this.disposables.track(disposable); } dispose() { diff --git a/src/domain/session/room/RoomViewModel.js b/src/domain/session/room/RoomViewModel.js index 32e09fbe..59bcd52f 100644 --- a/src/domain/session/room/RoomViewModel.js +++ b/src/domain/session/room/RoomViewModel.js @@ -38,7 +38,8 @@ export class RoomViewModel extends ViewModel { async load() { this._room.on("change", this._onRoomChange); try { - this._timeline = await this._room.openTimeline(); + this._timeline = this._room.openTimeline(); + await this._timeline.load(); this._timelineVM = new TimelineViewModel(this.childOptions({ room: this._room, timeline: this._timeline, diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index 1341389b..544081b3 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -268,7 +268,7 @@ class DecryptionPreparation { } dispose() { - this._megolmDecryptionChanges.dispose(); + this._megolmDecryptionPreparation.dispose(); } } diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index 077e9df4..6121192d 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -21,7 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js"; import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js"; import {SessionDecryption} from "./decryption/SessionDecryption.js"; import {SessionCache} from "./decryption/SessionCache.js"; -import {DecryptionWorker} from "./decryption/DecryptionWorker.js"; +import {DecryptionWorker, WorkerPool} from "./decryption/DecryptionWorker.js"; function getSenderKey(event) { return event.content?.["sender_key"]; @@ -40,7 +40,7 @@ export class Decryption { this._pickleKey = pickleKey; this._olm = olm; // this._decryptor = new DecryptionWorker(new Worker("./src/worker.js")); - this._decryptor = new DecryptionWorker(new Worker("worker-3074010154.js")); + this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4)); this._initPromise = this._decryptor.init(); } diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js index df7bb748..9cc64d74 100644 --- a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js +++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js @@ -14,51 +14,200 @@ See the License for the specific language governing permissions and limitations under the License. */ -export class DecryptionWorker { +import {AbortError} from "../../../../utils/error.js"; + +class WorkerState { constructor(worker) { - this._worker = worker; + this.worker = worker; + this.busy = false; + } + + attach(pool) { + this.worker.addEventListener("message", pool); + this.worker.addEventListener("error", pool); + } + + detach(pool) { + this.worker.removeEventListener("message", pool); + this.worker.removeEventListener("error", pool); + } +} + +class Request { + constructor(message, pool) { + this._promise = new Promise((_resolve, _reject) => { + this._resolve = _resolve; + this._reject = _reject; + }); + this._message = message; + this._pool = pool; + this._worker = null; + } + + abort() { + if (this._isNotDisposed) { + this._pool._abortRequest(this); + this._dispose(); + } + } + + response() { + return this._promise; + } + + _dispose() { + this._reject = null; + this._resolve = null; + } + + get _isNotDisposed() { + return this._resolve && this._reject; + } +} + +export class WorkerPool { + constructor(path, amount) { + this._workers = []; + for (let i = 0; i < amount ; ++i) { + const worker = new WorkerState(new Worker(path)); + worker.attach(this); + this._workers[i] = worker; + } this._requests = new Map(); this._counter = 0; - this._worker.addEventListener("message", this); + this._pendingFlag = false; } handleEvent(e) { if (e.type === "message") { const message = e.data; - console.log("worker reply", message); const request = this._requests.get(message.replyToId); if (request) { - if (message.type === "success") { - request.resolve(message.payload); - } else if (message.type === "error") { - request.reject(new Error(message.stack)); + request._worker.busy = false; + if (request._isNotDisposed) { + if (message.type === "success") { + request._resolve(message.payload); + } else if (message.type === "error") { + request._reject(new Error(message.stack)); + } + request._dispose(); } - this._requests.delete(message.ref_id); + this._requests.delete(message.replyToId); + } + console.log("got worker reply", message, this._requests.size); + this._sendPending(); + } else if (e.type === "error") { + console.error("worker error", e); + } + } + + _getPendingRequest() { + for (const r of this._requests.values()) { + if (!r._worker) { + return r; } } } - _send(message) { + _getFreeWorker() { + for (const w of this._workers) { + if (!w.busy) { + return w; + } + } + } + + _sendPending() { + this._pendingFlag = false; + console.log("seeing if there is anything to send", this._requests.size); + let success; + do { + success = false; + const request = this._getPendingRequest(); + if (request) { + console.log("sending pending request", request); + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + success = true; + } + } + } while (success); + } + + _sendWith(request, worker) { + request._worker = worker; + worker.busy = true; + console.log("sending message to worker", request._message); + worker.worker.postMessage(request._message); + } + + _enqueueRequest(message) { this._counter += 1; message.id = this._counter; - let resolve; - let reject; - const promise = new Promise((_resolve, _reject) => { - resolve = _resolve; - reject = _reject; + const request = new Request(message, this); + this._requests.set(message.id, request); + return request; + } + + send(message) { + const request = this._enqueueRequest(message); + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + } + return request; + } + + // assumes all workers are free atm + sendAll(message) { + const promises = this._workers.map(worker => { + const request = this._enqueueRequest(message); + this._sendWith(request, worker); + return request.response(); }); - this._requests.set(message.id, {reject, resolve}); - this._worker.postMessage(message); - return promise; + return Promise.all(promises); + } + + dispose() { + for (const w of this._workers) { + w.worker.terminate(); + w.detach(this); + } + } + + _trySendPendingInNextTick() { + if (!this._pendingFlag) { + this._pendingFlag = true; + Promise.resolve().then(() => { + this._sendPending(); + }); + } + } + + _abortRequest(request) { + request._reject(new AbortError()); + if (request._worker) { + request._worker.busy = false; + } + this._requests.delete(request._message.id); + // allow more requests to be aborted before trying to send other pending + this._trySendPendingInNextTick(); + } +} + +export class DecryptionWorker { + constructor(workerPool) { + this._workerPool = workerPool; } decrypt(session, ciphertext) { const sessionKey = session.export_session(session.first_known_index()); - return this._send({type: "megolm_decrypt", ciphertext, sessionKey}); + return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey}); } - init() { - return this._send({type: "load_olm", path: "olm_legacy-3232457086.js"}); + async init() { + await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"}); // return this._send({type: "load_olm", path: "../lib/olm/olm_legacy.js"}); } } diff --git a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js index 5fc32f58..30ca432e 100644 --- a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js +++ b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js @@ -27,6 +27,7 @@ export class SessionDecryption { this._sessionInfo = sessionInfo; this._events = events; this._decryptor = decryptor; + this._decryptionRequests = decryptor ? [] : null; } async decryptAll() { @@ -39,7 +40,16 @@ export class SessionDecryption { try { const {session} = this._sessionInfo; const ciphertext = event.content.ciphertext; - const {plaintext, message_index: messageIndex} = await this._decryptor.decrypt(session, ciphertext); + let decryptionResult; + if (this._decryptor) { + const request = this._decryptor.decrypt(session, ciphertext); + this._decryptionRequests.push(request); + decryptionResult = await request.response(); + } else { + decryptionResult = session.decrypt(ciphertext); + } + const plaintext = decryptionResult.plaintext; + const messageIndex = decryptionResult.message_index; let payload; try { payload = JSON.parse(plaintext); @@ -54,6 +64,10 @@ export class SessionDecryption { const result = new DecryptionResult(payload, this._sessionInfo.senderKey, this._sessionInfo.claimedKeys); results.set(event.event_id, result); } catch (err) { + // ignore AbortError from cancelling decryption requests in dispose method + if (err.name === "AbortError") { + return; + } if (!errors) { errors = new Map(); } @@ -65,6 +79,12 @@ export class SessionDecryption { } dispose() { + if (this._decryptionRequests) { + for (const r of this._decryptionRequests) { + r.abort(); + } + } + // TODO: cancel decryptions here this._sessionInfo.release(); } } diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index f281f166..233990a3 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -65,7 +65,8 @@ export class Room extends EventEmitter { retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer)); } } - await this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); + const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn); + await decryptRequest.complete(); if (this._timeline) { // only adds if already present this._timeline.replaceEntries(retryEntries); @@ -89,31 +90,39 @@ export class Room extends EventEmitter { * Used for decrypting when loading/filling the timeline, and retrying decryption, * not during sync, where it is split up during the multiple phases. */ - async _decryptEntries(source, entries, inboundSessionTxn = null) { - if (!inboundSessionTxn) { - inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); - } - const events = entries.filter(entry => { - return entry.eventType === EVENT_ENCRYPTED_TYPE; - }).map(entry => entry.event); - const isTimelineOpen = this._isTimelineOpen; - const preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn); - const changes = await preparation.decrypt(); - const stores = [this._storage.storeNames.groupSessionDecryptions]; - if (isTimelineOpen) { - // read to fetch devices if timeline is open - stores.push(this._storage.storeNames.deviceIdentities); - } - const writeTxn = await this._storage.readWriteTxn(stores); - let decryption; - try { - decryption = await changes.write(writeTxn); - } catch (err) { - writeTxn.abort(); - throw err; - } - await writeTxn.complete(); - decryption.applyToEntries(entries); + _decryptEntries(source, entries, inboundSessionTxn = null) { + const request = new DecryptionRequest(async r => { + if (!inboundSessionTxn) { + inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]); + } + if (r.cancelled) return; + const events = entries.filter(entry => { + return entry.eventType === EVENT_ENCRYPTED_TYPE; + }).map(entry => entry.event); + const isTimelineOpen = this._isTimelineOpen; + r.preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn); + if (r.cancelled) return; + // TODO: should this throw an AbortError? + const changes = await r.preparation.decrypt(); + r.preparation = null; + if (r.cancelled) return; + const stores = [this._storage.storeNames.groupSessionDecryptions]; + if (isTimelineOpen) { + // read to fetch devices if timeline is open + stores.push(this._storage.storeNames.deviceIdentities); + } + const writeTxn = await this._storage.readWriteTxn(stores); + let decryption; + try { + decryption = await changes.write(writeTxn); + } catch (err) { + writeTxn.abort(); + throw err; + } + await writeTxn.complete(); + decryption.applyToEntries(entries); + }); + return request; } get needsPrepareSync() { @@ -349,7 +358,8 @@ export class Room extends EventEmitter { } await txn.complete(); if (this._roomEncryption) { - await this._decryptEntries(DecryptionSource.Timeline, gapResult.entries); + const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries); + await decryptRequest.complete(); } // once txn is committed, update in-memory state & emit events for (const fragment of gapResult.fragments) { @@ -461,7 +471,7 @@ export class Room extends EventEmitter { } /** @public */ - async openTimeline() { + openTimeline() { if (this._timeline) { throw new Error("not dealing with load race here for now"); } @@ -483,7 +493,6 @@ export class Room extends EventEmitter { if (this._roomEncryption) { this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline)); } - await this._timeline.load(); return this._timeline; } @@ -502,3 +511,25 @@ export class Room extends EventEmitter { } } +class DecryptionRequest { + constructor(decryptFn) { + this._cancelled = false; + this.preparation = null; + this._promise = decryptFn(this); + } + + complete() { + return this._promise; + } + + get cancelled() { + return this._cancelled; + } + + dispose() { + this._cancelled = true; + if (this.preparation) { + this.preparation.dispose(); + } + } +} \ No newline at end of file diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 53c26d82..38dc1c6a 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -15,6 +15,7 @@ limitations under the License. */ import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js"; +import {Disposables} from "../../../utils/Disposables.js"; import {Direction} from "./Direction.js"; import {TimelineReader} from "./persistence/TimelineReader.js"; import {PendingEventEntry} from "./entries/PendingEventEntry.js"; @@ -26,12 +27,14 @@ export class Timeline { this._storage = storage; this._closeCallback = closeCallback; this._fragmentIdComparer = fragmentIdComparer; + this._disposables = new Disposables(); this._remoteEntries = new SortedArray((a, b) => a.compare(b)); this._timelineReader = new TimelineReader({ roomId: this._roomId, storage: this._storage, fragmentIdComparer: this._fragmentIdComparer }); + this._readerRequest = null; const localEntries = new MappedList(pendingEvents, pe => { return new PendingEventEntry({pendingEvent: pe, user}); }, (pee, params) => { @@ -42,8 +45,14 @@ export class Timeline { /** @package */ async load() { - const entries = await this._timelineReader.readFromEnd(25); - this._remoteEntries.setManySorted(entries); + // 30 seems to be a good amount to fill the entire screen + const readerRequest = this._disposables.track(this._timelineReader.readFromEnd(30)); + try { + const entries = await readerRequest.complete(); + this._remoteEntries.setManySorted(entries); + } finally { + this._disposables.disposeTracked(readerRequest); + } } replaceEntries(entries) { @@ -71,12 +80,17 @@ export class Timeline { if (!firstEventEntry) { return; } - const entries = await this._timelineReader.readFrom( + const readerRequest = this._disposables.track(this._timelineReader.readFrom( firstEventEntry.asEventKey(), Direction.Backward, amount - ); - this._remoteEntries.setManySorted(entries); + )); + try { + const entries = await readerRequest.complete(); + this._remoteEntries.setManySorted(entries); + } finally { + this._disposables.disposeTracked(readerRequest); + } } /** @public */ @@ -87,6 +101,8 @@ export class Timeline { /** @public */ close() { if (this._closeCallback) { + this._readerRequest?.dispose(); + this._readerRequest = null; this._closeCallback(); this._closeCallback = null; } diff --git a/src/matrix/room/timeline/persistence/TimelineReader.js b/src/matrix/room/timeline/persistence/TimelineReader.js index d451d76e..f5983a19 100644 --- a/src/matrix/room/timeline/persistence/TimelineReader.js +++ b/src/matrix/room/timeline/persistence/TimelineReader.js @@ -19,6 +19,24 @@ import {Direction} from "../Direction.js"; import {EventEntry} from "../entries/EventEntry.js"; import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js"; +class ReaderRequest { + constructor(fn) { + this.decryptRequest = null; + this._promise = fn(this); + } + + complete() { + return this._promise; + } + + dispose() { + if (this.decryptRequest) { + this.decryptRequest.dispose(); + this.decryptRequest = null; + } + } +} + export class TimelineReader { constructor({roomId, storage, fragmentIdComparer}) { this._roomId = roomId; @@ -42,12 +60,33 @@ export class TimelineReader { return this._storage.readTxn(stores); } - async readFrom(eventKey, direction, amount) { - const txn = await this._openTxn(); - return await this._readFrom(eventKey, direction, amount, txn); + readFrom(eventKey, direction, amount) { + return new ReaderRequest(async r => { + const txn = await this._openTxn(); + return await this._readFrom(eventKey, direction, amount, r, txn); + }); } - async _readFrom(eventKey, direction, amount, txn) { + readFromEnd(amount) { + return new ReaderRequest(async r => { + const txn = await this._openTxn(); + const liveFragment = await txn.timelineFragments.liveFragment(this._roomId); + let entries; + // room hasn't been synced yet + if (!liveFragment) { + entries = []; + } else { + this._fragmentIdComparer.add(liveFragment); + const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer); + const eventKey = liveFragmentEntry.asEventKey(); + entries = await this._readFrom(eventKey, Direction.Backward, amount, r, txn); + entries.unshift(liveFragmentEntry); + } + return entries; + }); + } + + async _readFrom(eventKey, direction, amount, r, txn) { let entries = []; const timelineStore = txn.timelineEvents; const fragmentStore = txn.timelineFragments; @@ -83,25 +122,12 @@ export class TimelineReader { } if (this._decryptEntries) { - await this._decryptEntries(entries, txn); - } - - return entries; - } - - async readFromEnd(amount) { - const txn = await this._openTxn(); - const liveFragment = await txn.timelineFragments.liveFragment(this._roomId); - let entries; - // room hasn't been synced yet - if (!liveFragment) { - entries = []; - } else { - this._fragmentIdComparer.add(liveFragment); - const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer); - const eventKey = liveFragmentEntry.asEventKey(); - entries = await this._readFrom(eventKey, Direction.Backward, amount, txn); - entries.unshift(liveFragmentEntry); + r.decryptRequest = this._decryptEntries(entries, txn); + try { + await r.decryptRequest.complete(); + } finally { + r.decryptRequest = null; + } } return entries; } diff --git a/src/utils/Disposables.js b/src/utils/Disposables.js index e5690319..e020ef83 100644 --- a/src/utils/Disposables.js +++ b/src/utils/Disposables.js @@ -29,6 +29,7 @@ export class Disposables { track(disposable) { this._disposables.push(disposable); + return disposable; } dispose() { diff --git a/src/worker-polyfill.js b/src/worker-polyfill.js index 08bbf652..15b955d5 100644 --- a/src/worker-polyfill.js +++ b/src/worker-polyfill.js @@ -14,6 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ + // polyfills needed for IE11 +// just enough to run olm, have promises and async/await import "regenerator-runtime/runtime"; import "core-js/modules/es.promise"; +import "core-js/modules/es.math.imul"; +import "core-js/modules/es.math.clz32"; diff --git a/src/worker.js b/src/worker.js index e470eaa7..4b2a1e43 100644 --- a/src/worker.js +++ b/src/worker.js @@ -94,13 +94,13 @@ class MessageHandler { } async _handleMessage(message) { - switch (message.type) { - case "load_olm": - this._sendReply(message, await this._loadOlm(message.path)); - break; - case "megolm_decrypt": - this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext)); - break; + const {type} = message; + if (type === "ping") { + this._sendReply(message, {type: "pong"}); + } else if (type === "load_olm") { + this._sendReply(message, await this._loadOlm(message.path)); + } else if (type === "megolm_decrypt") { + this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext)); } } }