diff --git a/index.html b/index.html
index b09286a0..74e44c99 100644
--- a/index.html
+++ b/index.html
@@ -19,9 +19,12 @@
` +
+ `` +
`` +
- ``);
+ ``);
removeOrEnableScript(doc("script#service-worker"), offline);
const versionScript = doc("script#version");
diff --git a/src/main.js b/src/main.js
index 79f5698d..6f279910 100644
--- a/src/main.js
+++ b/src/main.js
@@ -25,6 +25,7 @@ import {BrawlViewModel} from "./domain/BrawlViewModel.js";
import {BrawlView} from "./ui/web/BrawlView.js";
import {Clock} from "./ui/web/dom/Clock.js";
import {OnlineStatus} from "./ui/web/dom/OnlineStatus.js";
+import {WorkerPool} from "./utils/WorkerPool.js";
function addScript(src) {
return new Promise(function (resolve, reject) {
@@ -55,10 +56,27 @@ async function loadOlm(olmPaths) {
return null;
}
+// make path relative to basePath,
+// assuming it and basePath are relative to document
+function relPath(path, basePath) {
+ const idx = basePath.lastIndexOf("/");
+ const dir = idx === -1 ? "" : basePath.slice(0, idx);
+ const dirCount = dir.length ? dir.split("/").length : 0;
+ return "../".repeat(dirCount) + path;
+}
+
+async function loadWorker(paths) {
+ const workerPool = new WorkerPool(paths.worker, 4);
+ await workerPool.init();
+ const path = relPath(paths.olm.legacyBundle, paths.worker);
+ await workerPool.sendAll({type: "load_olm", path});
+ return workerPool;
+}
+
// Don't use a default export here, as we use multiple entries during legacy build,
// which does not support default exports,
// see https://github.com/rollup/plugins/tree/master/packages/multi-entry
-export async function main(container, olmPaths) {
+export async function main(container, paths) {
try {
// to replay:
// const fetchLog = await (await fetch("/fetchlogs/constrainterror.json")).json();
@@ -79,6 +97,13 @@ export async function main(container, olmPaths) {
const sessionInfoStorage = new SessionInfoStorage("brawl_sessions_v1");
const storageFactory = new StorageFactory();
+ // if wasm is not supported, we'll want
+ // to run some olm operations in a worker (mainly for IE11)
+ let workerPromise;
+ if (!window.WebAssembly) {
+ workerPromise = loadWorker(paths);
+ }
+
const vm = new BrawlViewModel({
createSessionContainer: () => {
return new SessionContainer({
@@ -88,7 +113,8 @@ export async function main(container, olmPaths) {
sessionInfoStorage,
request,
clock,
- olmPromise: loadOlm(olmPaths),
+ olmPromise: loadOlm(paths.olm),
+ workerPromise,
});
},
sessionInfoStorage,
diff --git a/src/matrix/Session.js b/src/matrix/Session.js
index c5c0c94f..be3f6a06 100644
--- a/src/matrix/Session.js
+++ b/src/matrix/Session.js
@@ -33,7 +33,7 @@ const PICKLE_KEY = "DEFAULT_KEY";
export class Session {
// sessionInfo contains deviceId, userId and homeServer
- constructor({clock, storage, hsApi, sessionInfo, olm}) {
+ constructor({clock, storage, hsApi, sessionInfo, olm, workerPool}) {
this._clock = clock;
this._storage = storage;
this._hsApi = hsApi;
@@ -52,6 +52,7 @@ export class Session {
this._megolmEncryption = null;
this._megolmDecryption = null;
this._getSyncToken = () => this.syncToken;
+ this._workerPool = workerPool;
if (olm) {
this._olmUtil = new olm.Utility();
@@ -100,6 +101,7 @@ export class Session {
this._megolmDecryption = new MegOlmDecryption({
pickleKey: PICKLE_KEY,
olm: this._olm,
+ workerPool: this._workerPool,
});
this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption: this._megolmDecryption});
}
@@ -202,6 +204,7 @@ export class Session {
}
stop() {
+ this._workerPool?.dispose();
this._sendScheduler.stop();
}
diff --git a/src/matrix/SessionContainer.js b/src/matrix/SessionContainer.js
index c07190eb..1e868eba 100644
--- a/src/matrix/SessionContainer.js
+++ b/src/matrix/SessionContainer.js
@@ -42,7 +42,7 @@ export const LoginFailure = createEnum(
);
export class SessionContainer {
- constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise}) {
+ constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise, workerPromise}) {
this._random = random;
this._clock = clock;
this._onlineStatus = onlineStatus;
@@ -59,6 +59,7 @@ export class SessionContainer {
this._sessionId = null;
this._storage = null;
this._olmPromise = olmPromise;
+ this._workerPromise = workerPromise;
}
createNewSessionId() {
@@ -152,8 +153,13 @@ export class SessionContainer {
homeServer: sessionInfo.homeServer,
};
const olm = await this._olmPromise;
+ let workerPool = null;
+ if (this._workerPromise) {
+ workerPool = await this._workerPromise;
+ }
this._session = new Session({storage: this._storage,
- sessionInfo: filteredSessionInfo, hsApi, olm, clock: this._clock});
+ sessionInfo: filteredSessionInfo, hsApi, olm,
+ clock: this._clock, workerPool});
await this._session.load();
this._status.set(LoadStatus.SessionSetup);
await this._session.beforeFirstSync(isNewLogin);
diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js
index 9726e6d8..544fa0a3 100644
--- a/src/matrix/e2ee/megolm/Decryption.js
+++ b/src/matrix/e2ee/megolm/Decryption.js
@@ -21,7 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js";
import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
import {SessionDecryption} from "./decryption/SessionDecryption.js";
import {SessionCache} from "./decryption/SessionCache.js";
-import {DecryptionWorker, WorkerPool} from "./decryption/DecryptionWorker.js";
+import {DecryptionWorker} from "./decryption/DecryptionWorker.js";
function getSenderKey(event) {
return event.content?.["sender_key"];
@@ -36,12 +36,10 @@ function getCiphertext(event) {
}
export class Decryption {
- constructor({pickleKey, olm}) {
+ constructor({pickleKey, olm, workerPool}) {
this._pickleKey = pickleKey;
this._olm = olm;
- this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4));
- //this._decryptor = new DecryptionWorker(new WorkerPool("./src/worker.js", 4));
- this._initPromise = this._decryptor.init();
+ this._decryptor = workerPool ? new DecryptionWorker(workerPool) : null;
}
createSessionCache(fallback) {
@@ -58,7 +56,6 @@ export class Decryption {
* @return {DecryptionPreparation}
*/
async prepareDecryptAll(roomId, events, sessionCache, txn) {
- await this._initPromise;
const errors = new Map();
const validEvents = [];
diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js
index 38a474ed..b44694a0 100644
--- a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js
+++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js
@@ -14,184 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-import {AbortError} from "../../../../utils/error.js";
-
-class WorkerState {
- constructor(worker) {
- this.worker = worker;
- this.busy = false;
- }
-
- attach(pool) {
- this.worker.addEventListener("message", pool);
- this.worker.addEventListener("error", pool);
- }
-
- detach(pool) {
- this.worker.removeEventListener("message", pool);
- this.worker.removeEventListener("error", pool);
- }
-}
-
-class Request {
- constructor(message, pool) {
- this._promise = new Promise((_resolve, _reject) => {
- this._resolve = _resolve;
- this._reject = _reject;
- });
- this._message = message;
- this._pool = pool;
- this._worker = null;
- }
-
- abort() {
- if (this._isNotDisposed) {
- this._pool._abortRequest(this);
- this._dispose();
- }
- }
-
- response() {
- return this._promise;
- }
-
- _dispose() {
- this._reject = null;
- this._resolve = null;
- }
-
- get _isNotDisposed() {
- return this._resolve && this._reject;
- }
-}
-
-export class WorkerPool {
- constructor(path, amount) {
- this._workers = [];
- for (let i = 0; i < amount ; ++i) {
- const worker = new WorkerState(new Worker(path));
- worker.attach(this);
- this._workers[i] = worker;
- }
- this._requests = new Map();
- this._counter = 0;
- this._pendingFlag = false;
- }
-
- handleEvent(e) {
- if (e.type === "message") {
- const message = e.data;
- const request = this._requests.get(message.replyToId);
- if (request) {
- request._worker.busy = false;
- if (request._isNotDisposed) {
- if (message.type === "success") {
- request._resolve(message.payload);
- } else if (message.type === "error") {
- request._reject(new Error(message.stack));
- }
- request._dispose();
- }
- this._requests.delete(message.replyToId);
- }
- this._sendPending();
- } else if (e.type === "error") {
- console.error("worker error", e);
- }
- }
-
- _getPendingRequest() {
- for (const r of this._requests.values()) {
- if (!r._worker) {
- return r;
- }
- }
- }
-
- _getFreeWorker() {
- for (const w of this._workers) {
- if (!w.busy) {
- return w;
- }
- }
- }
-
- _sendPending() {
- this._pendingFlag = false;
- let success;
- do {
- success = false;
- const request = this._getPendingRequest();
- if (request) {
- const worker = this._getFreeWorker();
- if (worker) {
- this._sendWith(request, worker);
- success = true;
- }
- }
- } while (success);
- }
-
- _sendWith(request, worker) {
- request._worker = worker;
- worker.busy = true;
- worker.worker.postMessage(request._message);
- }
-
- _enqueueRequest(message) {
- this._counter += 1;
- message.id = this._counter;
- const request = new Request(message, this);
- this._requests.set(message.id, request);
- return request;
- }
-
- send(message) {
- const request = this._enqueueRequest(message);
- const worker = this._getFreeWorker();
- if (worker) {
- this._sendWith(request, worker);
- }
- return request;
- }
-
- // assumes all workers are free atm
- sendAll(message) {
- const promises = this._workers.map(worker => {
- const request = this._enqueueRequest(Object.assign({}, message));
- this._sendWith(request, worker);
- return request.response();
- });
- return Promise.all(promises);
- }
-
- dispose() {
- for (const w of this._workers) {
- w.worker.terminate();
- w.detach(this);
- }
- }
-
- _trySendPendingInNextTick() {
- if (!this._pendingFlag) {
- this._pendingFlag = true;
- Promise.resolve().then(() => {
- this._sendPending();
- });
- }
- }
-
- _abortRequest(request) {
- request._reject(new AbortError());
- if (request._worker) {
- request._worker.busy = false;
- }
- this._requests.delete(request._message.id);
- // allow more requests to be aborted before trying to send other pending
- this._trySendPendingInNextTick();
- }
-}
-
export class DecryptionWorker {
constructor(workerPool) {
this._workerPool = workerPool;
@@ -201,9 +23,4 @@ export class DecryptionWorker {
const sessionKey = session.export_session(session.first_known_index());
return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey});
}
-
- async init() {
- await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"});
- //await this._workerPool.sendAll({type: "load_olm", path: "../lib/olm/olm_legacy.js"});
- }
}
diff --git a/src/utils/WorkerPool.js b/src/utils/WorkerPool.js
new file mode 100644
index 00000000..554067fe
--- /dev/null
+++ b/src/utils/WorkerPool.js
@@ -0,0 +1,212 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+import {AbortError} from "./error.js";
+
+class WorkerState {
+ constructor(worker) {
+ this.worker = worker;
+ this.busy = false;
+ }
+
+ attach(pool) {
+ this.worker.addEventListener("message", pool);
+ this.worker.addEventListener("error", pool);
+ }
+
+ detach(pool) {
+ this.worker.removeEventListener("message", pool);
+ this.worker.removeEventListener("error", pool);
+ }
+}
+
+class Request {
+ constructor(message, pool) {
+ this._promise = new Promise((_resolve, _reject) => {
+ this._resolve = _resolve;
+ this._reject = _reject;
+ });
+ this._message = message;
+ this._pool = pool;
+ this._worker = null;
+ }
+
+ abort() {
+ if (this._isNotDisposed) {
+ this._pool._abortRequest(this);
+ this._dispose();
+ }
+ }
+
+ response() {
+ return this._promise;
+ }
+
+ _dispose() {
+ this._reject = null;
+ this._resolve = null;
+ }
+
+ get _isNotDisposed() {
+ return this._resolve && this._reject;
+ }
+}
+
+export class WorkerPool {
+ // TODO: extract DOM specific bits and write unit tests
+ constructor(path, amount) {
+ this._workers = [];
+ for (let i = 0; i < amount ; ++i) {
+ const worker = new WorkerState(new Worker(path));
+ worker.attach(this);
+ this._workers[i] = worker;
+ }
+ this._requests = new Map();
+ this._counter = 0;
+ this._pendingFlag = false;
+ this._init = null;
+
+ }
+
+ init() {
+ const promise = new Promise((resolve, reject) => {
+ this._init = {resolve, reject};
+ });
+ this.sendAll({type: "ping"})
+ .then(this._init.resolve, this._init.reject)
+ .finally(() => {
+ this._init = null;
+ });
+ return promise;
+ }
+
+ handleEvent(e) {
+ console.log("WorkerPool event", e);
+ if (e.type === "message") {
+ const message = e.data;
+ const request = this._requests.get(message.replyToId);
+ if (request) {
+ request._worker.busy = false;
+ if (request._isNotDisposed) {
+ if (message.type === "success") {
+ request._resolve(message.payload);
+ } else if (message.type === "error") {
+ request._reject(new Error(message.stack));
+ }
+ request._dispose();
+ }
+ this._requests.delete(message.replyToId);
+ }
+ this._sendPending();
+ } else if (e.type === "error") {
+ if (this._init) {
+ this._init.reject(new Error("worker error during init"));
+ }
+ console.error("worker error", e);
+ }
+ }
+
+ _getPendingRequest() {
+ for (const r of this._requests.values()) {
+ if (!r._worker) {
+ return r;
+ }
+ }
+ }
+
+ _getFreeWorker() {
+ for (const w of this._workers) {
+ if (!w.busy) {
+ return w;
+ }
+ }
+ }
+
+ _sendPending() {
+ this._pendingFlag = false;
+ let success;
+ do {
+ success = false;
+ const request = this._getPendingRequest();
+ if (request) {
+ const worker = this._getFreeWorker();
+ if (worker) {
+ this._sendWith(request, worker);
+ success = true;
+ }
+ }
+ } while (success);
+ }
+
+ _sendWith(request, worker) {
+ request._worker = worker;
+ worker.busy = true;
+ worker.worker.postMessage(request._message);
+ }
+
+ _enqueueRequest(message) {
+ this._counter += 1;
+ message.id = this._counter;
+ const request = new Request(message, this);
+ this._requests.set(message.id, request);
+ return request;
+ }
+
+ send(message) {
+ const request = this._enqueueRequest(message);
+ const worker = this._getFreeWorker();
+ if (worker) {
+ this._sendWith(request, worker);
+ }
+ return request;
+ }
+
+ // assumes all workers are free atm
+ sendAll(message) {
+ const promises = this._workers.map(worker => {
+ const request = this._enqueueRequest(Object.assign({}, message));
+ this._sendWith(request, worker);
+ return request.response();
+ });
+ return Promise.all(promises);
+ }
+
+ dispose() {
+ for (const w of this._workers) {
+ w.detach(this);
+ w.worker.terminate();
+ }
+ }
+
+ _trySendPendingInNextTick() {
+ if (!this._pendingFlag) {
+ this._pendingFlag = true;
+ Promise.resolve().then(() => {
+ this._sendPending();
+ });
+ }
+ }
+
+ _abortRequest(request) {
+ request._reject(new AbortError());
+ if (request._worker) {
+ request._worker.busy = false;
+ }
+ this._requests.delete(request._message.id);
+ // allow more requests to be aborted before trying to send other pending
+ this._trySendPendingInNextTick();
+ }
+}
diff --git a/src/worker.js b/src/worker.js
index 4b2a1e43..7c6642fb 100644
--- a/src/worker.js
+++ b/src/worker.js
@@ -96,7 +96,7 @@ class MessageHandler {
async _handleMessage(message) {
const {type} = message;
if (type === "ping") {
- this._sendReply(message, {type: "pong"});
+ this._sendReply(message, {type: "success"});
} else if (type === "load_olm") {
this._sendReply(message, await this._loadOlm(message.path));
} else if (type === "megolm_decrypt") {