From af36c71a5974de8241f7fafa78e48b998a21f3ee Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Thu, 10 Sep 2020 18:41:23 +0200 Subject: [PATCH] load worker in main and pass paths so it works both on compiled and non-compiled --- index.html | 9 +- scripts/build.mjs | 8 +- src/main.js | 30 ++- src/matrix/Session.js | 5 +- src/matrix/SessionContainer.js | 10 +- src/matrix/e2ee/megolm/Decryption.js | 9 +- .../megolm/decryption/DecryptionWorker.js | 183 --------------- src/utils/WorkerPool.js | 212 ++++++++++++++++++ src/worker.js | 2 +- 9 files changed, 268 insertions(+), 200 deletions(-) create mode 100644 src/utils/WorkerPool.js diff --git a/index.html b/index.html index b09286a0..74e44c99 100644 --- a/index.html +++ b/index.html @@ -19,9 +19,12 @@ ` + + `` + `` + - ``); + ``); removeOrEnableScript(doc("script#service-worker"), offline); const versionScript = doc("script#version"); diff --git a/src/main.js b/src/main.js index 79f5698d..6f279910 100644 --- a/src/main.js +++ b/src/main.js @@ -25,6 +25,7 @@ import {BrawlViewModel} from "./domain/BrawlViewModel.js"; import {BrawlView} from "./ui/web/BrawlView.js"; import {Clock} from "./ui/web/dom/Clock.js"; import {OnlineStatus} from "./ui/web/dom/OnlineStatus.js"; +import {WorkerPool} from "./utils/WorkerPool.js"; function addScript(src) { return new Promise(function (resolve, reject) { @@ -55,10 +56,27 @@ async function loadOlm(olmPaths) { return null; } +// make path relative to basePath, +// assuming it and basePath are relative to document +function relPath(path, basePath) { + const idx = basePath.lastIndexOf("/"); + const dir = idx === -1 ? "" : basePath.slice(0, idx); + const dirCount = dir.length ? dir.split("/").length : 0; + return "../".repeat(dirCount) + path; +} + +async function loadWorker(paths) { + const workerPool = new WorkerPool(paths.worker, 4); + await workerPool.init(); + const path = relPath(paths.olm.legacyBundle, paths.worker); + await workerPool.sendAll({type: "load_olm", path}); + return workerPool; +} + // Don't use a default export here, as we use multiple entries during legacy build, // which does not support default exports, // see https://github.com/rollup/plugins/tree/master/packages/multi-entry -export async function main(container, olmPaths) { +export async function main(container, paths) { try { // to replay: // const fetchLog = await (await fetch("/fetchlogs/constrainterror.json")).json(); @@ -79,6 +97,13 @@ export async function main(container, olmPaths) { const sessionInfoStorage = new SessionInfoStorage("brawl_sessions_v1"); const storageFactory = new StorageFactory(); + // if wasm is not supported, we'll want + // to run some olm operations in a worker (mainly for IE11) + let workerPromise; + if (!window.WebAssembly) { + workerPromise = loadWorker(paths); + } + const vm = new BrawlViewModel({ createSessionContainer: () => { return new SessionContainer({ @@ -88,7 +113,8 @@ export async function main(container, olmPaths) { sessionInfoStorage, request, clock, - olmPromise: loadOlm(olmPaths), + olmPromise: loadOlm(paths.olm), + workerPromise, }); }, sessionInfoStorage, diff --git a/src/matrix/Session.js b/src/matrix/Session.js index c5c0c94f..be3f6a06 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -33,7 +33,7 @@ const PICKLE_KEY = "DEFAULT_KEY"; export class Session { // sessionInfo contains deviceId, userId and homeServer - constructor({clock, storage, hsApi, sessionInfo, olm}) { + constructor({clock, storage, hsApi, sessionInfo, olm, workerPool}) { this._clock = clock; this._storage = storage; this._hsApi = hsApi; @@ -52,6 +52,7 @@ export class Session { this._megolmEncryption = null; this._megolmDecryption = null; this._getSyncToken = () => this.syncToken; + this._workerPool = workerPool; if (olm) { this._olmUtil = new olm.Utility(); @@ -100,6 +101,7 @@ export class Session { this._megolmDecryption = new MegOlmDecryption({ pickleKey: PICKLE_KEY, olm: this._olm, + workerPool: this._workerPool, }); this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption: this._megolmDecryption}); } @@ -202,6 +204,7 @@ export class Session { } stop() { + this._workerPool?.dispose(); this._sendScheduler.stop(); } diff --git a/src/matrix/SessionContainer.js b/src/matrix/SessionContainer.js index c07190eb..1e868eba 100644 --- a/src/matrix/SessionContainer.js +++ b/src/matrix/SessionContainer.js @@ -42,7 +42,7 @@ export const LoginFailure = createEnum( ); export class SessionContainer { - constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise}) { + constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise, workerPromise}) { this._random = random; this._clock = clock; this._onlineStatus = onlineStatus; @@ -59,6 +59,7 @@ export class SessionContainer { this._sessionId = null; this._storage = null; this._olmPromise = olmPromise; + this._workerPromise = workerPromise; } createNewSessionId() { @@ -152,8 +153,13 @@ export class SessionContainer { homeServer: sessionInfo.homeServer, }; const olm = await this._olmPromise; + let workerPool = null; + if (this._workerPromise) { + workerPool = await this._workerPromise; + } this._session = new Session({storage: this._storage, - sessionInfo: filteredSessionInfo, hsApi, olm, clock: this._clock}); + sessionInfo: filteredSessionInfo, hsApi, olm, + clock: this._clock, workerPool}); await this._session.load(); this._status.set(LoadStatus.SessionSetup); await this._session.beforeFirstSync(isNewLogin); diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js index 9726e6d8..544fa0a3 100644 --- a/src/matrix/e2ee/megolm/Decryption.js +++ b/src/matrix/e2ee/megolm/Decryption.js @@ -21,7 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js"; import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js"; import {SessionDecryption} from "./decryption/SessionDecryption.js"; import {SessionCache} from "./decryption/SessionCache.js"; -import {DecryptionWorker, WorkerPool} from "./decryption/DecryptionWorker.js"; +import {DecryptionWorker} from "./decryption/DecryptionWorker.js"; function getSenderKey(event) { return event.content?.["sender_key"]; @@ -36,12 +36,10 @@ function getCiphertext(event) { } export class Decryption { - constructor({pickleKey, olm}) { + constructor({pickleKey, olm, workerPool}) { this._pickleKey = pickleKey; this._olm = olm; - this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4)); - //this._decryptor = new DecryptionWorker(new WorkerPool("./src/worker.js", 4)); - this._initPromise = this._decryptor.init(); + this._decryptor = workerPool ? new DecryptionWorker(workerPool) : null; } createSessionCache(fallback) { @@ -58,7 +56,6 @@ export class Decryption { * @return {DecryptionPreparation} */ async prepareDecryptAll(roomId, events, sessionCache, txn) { - await this._initPromise; const errors = new Map(); const validEvents = []; diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js index 38a474ed..b44694a0 100644 --- a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js +++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js @@ -14,184 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -import {AbortError} from "../../../../utils/error.js"; - -class WorkerState { - constructor(worker) { - this.worker = worker; - this.busy = false; - } - - attach(pool) { - this.worker.addEventListener("message", pool); - this.worker.addEventListener("error", pool); - } - - detach(pool) { - this.worker.removeEventListener("message", pool); - this.worker.removeEventListener("error", pool); - } -} - -class Request { - constructor(message, pool) { - this._promise = new Promise((_resolve, _reject) => { - this._resolve = _resolve; - this._reject = _reject; - }); - this._message = message; - this._pool = pool; - this._worker = null; - } - - abort() { - if (this._isNotDisposed) { - this._pool._abortRequest(this); - this._dispose(); - } - } - - response() { - return this._promise; - } - - _dispose() { - this._reject = null; - this._resolve = null; - } - - get _isNotDisposed() { - return this._resolve && this._reject; - } -} - -export class WorkerPool { - constructor(path, amount) { - this._workers = []; - for (let i = 0; i < amount ; ++i) { - const worker = new WorkerState(new Worker(path)); - worker.attach(this); - this._workers[i] = worker; - } - this._requests = new Map(); - this._counter = 0; - this._pendingFlag = false; - } - - handleEvent(e) { - if (e.type === "message") { - const message = e.data; - const request = this._requests.get(message.replyToId); - if (request) { - request._worker.busy = false; - if (request._isNotDisposed) { - if (message.type === "success") { - request._resolve(message.payload); - } else if (message.type === "error") { - request._reject(new Error(message.stack)); - } - request._dispose(); - } - this._requests.delete(message.replyToId); - } - this._sendPending(); - } else if (e.type === "error") { - console.error("worker error", e); - } - } - - _getPendingRequest() { - for (const r of this._requests.values()) { - if (!r._worker) { - return r; - } - } - } - - _getFreeWorker() { - for (const w of this._workers) { - if (!w.busy) { - return w; - } - } - } - - _sendPending() { - this._pendingFlag = false; - let success; - do { - success = false; - const request = this._getPendingRequest(); - if (request) { - const worker = this._getFreeWorker(); - if (worker) { - this._sendWith(request, worker); - success = true; - } - } - } while (success); - } - - _sendWith(request, worker) { - request._worker = worker; - worker.busy = true; - worker.worker.postMessage(request._message); - } - - _enqueueRequest(message) { - this._counter += 1; - message.id = this._counter; - const request = new Request(message, this); - this._requests.set(message.id, request); - return request; - } - - send(message) { - const request = this._enqueueRequest(message); - const worker = this._getFreeWorker(); - if (worker) { - this._sendWith(request, worker); - } - return request; - } - - // assumes all workers are free atm - sendAll(message) { - const promises = this._workers.map(worker => { - const request = this._enqueueRequest(Object.assign({}, message)); - this._sendWith(request, worker); - return request.response(); - }); - return Promise.all(promises); - } - - dispose() { - for (const w of this._workers) { - w.worker.terminate(); - w.detach(this); - } - } - - _trySendPendingInNextTick() { - if (!this._pendingFlag) { - this._pendingFlag = true; - Promise.resolve().then(() => { - this._sendPending(); - }); - } - } - - _abortRequest(request) { - request._reject(new AbortError()); - if (request._worker) { - request._worker.busy = false; - } - this._requests.delete(request._message.id); - // allow more requests to be aborted before trying to send other pending - this._trySendPendingInNextTick(); - } -} - export class DecryptionWorker { constructor(workerPool) { this._workerPool = workerPool; @@ -201,9 +23,4 @@ export class DecryptionWorker { const sessionKey = session.export_session(session.first_known_index()); return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey}); } - - async init() { - await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"}); - //await this._workerPool.sendAll({type: "load_olm", path: "../lib/olm/olm_legacy.js"}); - } } diff --git a/src/utils/WorkerPool.js b/src/utils/WorkerPool.js new file mode 100644 index 00000000..554067fe --- /dev/null +++ b/src/utils/WorkerPool.js @@ -0,0 +1,212 @@ +/* +Copyright 2020 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import {AbortError} from "./error.js"; + +class WorkerState { + constructor(worker) { + this.worker = worker; + this.busy = false; + } + + attach(pool) { + this.worker.addEventListener("message", pool); + this.worker.addEventListener("error", pool); + } + + detach(pool) { + this.worker.removeEventListener("message", pool); + this.worker.removeEventListener("error", pool); + } +} + +class Request { + constructor(message, pool) { + this._promise = new Promise((_resolve, _reject) => { + this._resolve = _resolve; + this._reject = _reject; + }); + this._message = message; + this._pool = pool; + this._worker = null; + } + + abort() { + if (this._isNotDisposed) { + this._pool._abortRequest(this); + this._dispose(); + } + } + + response() { + return this._promise; + } + + _dispose() { + this._reject = null; + this._resolve = null; + } + + get _isNotDisposed() { + return this._resolve && this._reject; + } +} + +export class WorkerPool { + // TODO: extract DOM specific bits and write unit tests + constructor(path, amount) { + this._workers = []; + for (let i = 0; i < amount ; ++i) { + const worker = new WorkerState(new Worker(path)); + worker.attach(this); + this._workers[i] = worker; + } + this._requests = new Map(); + this._counter = 0; + this._pendingFlag = false; + this._init = null; + + } + + init() { + const promise = new Promise((resolve, reject) => { + this._init = {resolve, reject}; + }); + this.sendAll({type: "ping"}) + .then(this._init.resolve, this._init.reject) + .finally(() => { + this._init = null; + }); + return promise; + } + + handleEvent(e) { + console.log("WorkerPool event", e); + if (e.type === "message") { + const message = e.data; + const request = this._requests.get(message.replyToId); + if (request) { + request._worker.busy = false; + if (request._isNotDisposed) { + if (message.type === "success") { + request._resolve(message.payload); + } else if (message.type === "error") { + request._reject(new Error(message.stack)); + } + request._dispose(); + } + this._requests.delete(message.replyToId); + } + this._sendPending(); + } else if (e.type === "error") { + if (this._init) { + this._init.reject(new Error("worker error during init")); + } + console.error("worker error", e); + } + } + + _getPendingRequest() { + for (const r of this._requests.values()) { + if (!r._worker) { + return r; + } + } + } + + _getFreeWorker() { + for (const w of this._workers) { + if (!w.busy) { + return w; + } + } + } + + _sendPending() { + this._pendingFlag = false; + let success; + do { + success = false; + const request = this._getPendingRequest(); + if (request) { + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + success = true; + } + } + } while (success); + } + + _sendWith(request, worker) { + request._worker = worker; + worker.busy = true; + worker.worker.postMessage(request._message); + } + + _enqueueRequest(message) { + this._counter += 1; + message.id = this._counter; + const request = new Request(message, this); + this._requests.set(message.id, request); + return request; + } + + send(message) { + const request = this._enqueueRequest(message); + const worker = this._getFreeWorker(); + if (worker) { + this._sendWith(request, worker); + } + return request; + } + + // assumes all workers are free atm + sendAll(message) { + const promises = this._workers.map(worker => { + const request = this._enqueueRequest(Object.assign({}, message)); + this._sendWith(request, worker); + return request.response(); + }); + return Promise.all(promises); + } + + dispose() { + for (const w of this._workers) { + w.detach(this); + w.worker.terminate(); + } + } + + _trySendPendingInNextTick() { + if (!this._pendingFlag) { + this._pendingFlag = true; + Promise.resolve().then(() => { + this._sendPending(); + }); + } + } + + _abortRequest(request) { + request._reject(new AbortError()); + if (request._worker) { + request._worker.busy = false; + } + this._requests.delete(request._message.id); + // allow more requests to be aborted before trying to send other pending + this._trySendPendingInNextTick(); + } +} diff --git a/src/worker.js b/src/worker.js index 4b2a1e43..7c6642fb 100644 --- a/src/worker.js +++ b/src/worker.js @@ -96,7 +96,7 @@ class MessageHandler { async _handleMessage(message) { const {type} = message; if (type === "ping") { - this._sendReply(message, {type: "pong"}); + this._sendReply(message, {type: "success"}); } else if (type === "load_olm") { this._sendReply(message, await this._loadOlm(message.path)); } else if (type === "megolm_decrypt") {