load worker in main and pass paths so it works both on compiled and non-compiled

This commit is contained in:
Bruno Windels 2020-09-10 18:41:23 +02:00
parent de1cc0d739
commit af36c71a59
9 changed files with 268 additions and 200 deletions

View file

@ -19,9 +19,12 @@
<script id="main" type="module">
import {main} from "./src/main.js";
main(document.body, {
wasm: "lib/olm/olm.wasm",
legacyBundle: "lib/olm/olm_legacy.js",
wasmBundle: "lib/olm/olm.js",
worker: "src/worker.js",
olm: {
wasm: "lib/olm/olm.wasm",
legacyBundle: "lib/olm/olm_legacy.js",
wasmBundle: "lib/olm/olm.js",
}
});
</script>
<script id="service-worker" type="disabled">

View file

@ -163,10 +163,14 @@ async function buildHtml(doc, version, assetPaths, manifestPath) {
findThemes(doc, (themeName, theme) => {
theme.attr("href", assetPaths.cssThemeBundle(themeName));
});
const pathsJSON = JSON.stringify({
worker: assetPaths.jsWorker(),
olm: olmFiles
});
doc("script#main").replaceWith(
`<script type="module">import {main} from "./${assetPaths.jsBundle()}"; main(document.body, ${JSON.stringify(olmFiles)});</script>` +
`<script type="module">import {main} from "./${assetPaths.jsBundle()}"; main(document.body, ${pathsJSON});</script>` +
`<script type="text/javascript" nomodule src="${assetPaths.jsLegacyBundle()}"></script>` +
`<script type="text/javascript" nomodule>${PROJECT_ID}Bundle.main(document.body, ${JSON.stringify(olmFiles)});</script>`);
`<script type="text/javascript" nomodule>${PROJECT_ID}Bundle.main(document.body, ${pathsJSON});</script>`);
removeOrEnableScript(doc("script#service-worker"), offline);
const versionScript = doc("script#version");

View file

@ -25,6 +25,7 @@ import {BrawlViewModel} from "./domain/BrawlViewModel.js";
import {BrawlView} from "./ui/web/BrawlView.js";
import {Clock} from "./ui/web/dom/Clock.js";
import {OnlineStatus} from "./ui/web/dom/OnlineStatus.js";
import {WorkerPool} from "./utils/WorkerPool.js";
function addScript(src) {
return new Promise(function (resolve, reject) {
@ -55,10 +56,27 @@ async function loadOlm(olmPaths) {
return null;
}
// make path relative to basePath,
// assuming it and basePath are relative to document
function relPath(path, basePath) {
const idx = basePath.lastIndexOf("/");
const dir = idx === -1 ? "" : basePath.slice(0, idx);
const dirCount = dir.length ? dir.split("/").length : 0;
return "../".repeat(dirCount) + path;
}
async function loadWorker(paths) {
const workerPool = new WorkerPool(paths.worker, 4);
await workerPool.init();
const path = relPath(paths.olm.legacyBundle, paths.worker);
await workerPool.sendAll({type: "load_olm", path});
return workerPool;
}
// Don't use a default export here, as we use multiple entries during legacy build,
// which does not support default exports,
// see https://github.com/rollup/plugins/tree/master/packages/multi-entry
export async function main(container, olmPaths) {
export async function main(container, paths) {
try {
// to replay:
// const fetchLog = await (await fetch("/fetchlogs/constrainterror.json")).json();
@ -79,6 +97,13 @@ export async function main(container, olmPaths) {
const sessionInfoStorage = new SessionInfoStorage("brawl_sessions_v1");
const storageFactory = new StorageFactory();
// if wasm is not supported, we'll want
// to run some olm operations in a worker (mainly for IE11)
let workerPromise;
if (!window.WebAssembly) {
workerPromise = loadWorker(paths);
}
const vm = new BrawlViewModel({
createSessionContainer: () => {
return new SessionContainer({
@ -88,7 +113,8 @@ export async function main(container, olmPaths) {
sessionInfoStorage,
request,
clock,
olmPromise: loadOlm(olmPaths),
olmPromise: loadOlm(paths.olm),
workerPromise,
});
},
sessionInfoStorage,

View file

@ -33,7 +33,7 @@ const PICKLE_KEY = "DEFAULT_KEY";
export class Session {
// sessionInfo contains deviceId, userId and homeServer
constructor({clock, storage, hsApi, sessionInfo, olm}) {
constructor({clock, storage, hsApi, sessionInfo, olm, workerPool}) {
this._clock = clock;
this._storage = storage;
this._hsApi = hsApi;
@ -52,6 +52,7 @@ export class Session {
this._megolmEncryption = null;
this._megolmDecryption = null;
this._getSyncToken = () => this.syncToken;
this._workerPool = workerPool;
if (olm) {
this._olmUtil = new olm.Utility();
@ -100,6 +101,7 @@ export class Session {
this._megolmDecryption = new MegOlmDecryption({
pickleKey: PICKLE_KEY,
olm: this._olm,
workerPool: this._workerPool,
});
this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption: this._megolmDecryption});
}
@ -202,6 +204,7 @@ export class Session {
}
stop() {
this._workerPool?.dispose();
this._sendScheduler.stop();
}

View file

@ -42,7 +42,7 @@ export const LoginFailure = createEnum(
);
export class SessionContainer {
constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise}) {
constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise, workerPromise}) {
this._random = random;
this._clock = clock;
this._onlineStatus = onlineStatus;
@ -59,6 +59,7 @@ export class SessionContainer {
this._sessionId = null;
this._storage = null;
this._olmPromise = olmPromise;
this._workerPromise = workerPromise;
}
createNewSessionId() {
@ -152,8 +153,13 @@ export class SessionContainer {
homeServer: sessionInfo.homeServer,
};
const olm = await this._olmPromise;
let workerPool = null;
if (this._workerPromise) {
workerPool = await this._workerPromise;
}
this._session = new Session({storage: this._storage,
sessionInfo: filteredSessionInfo, hsApi, olm, clock: this._clock});
sessionInfo: filteredSessionInfo, hsApi, olm,
clock: this._clock, workerPool});
await this._session.load();
this._status.set(LoadStatus.SessionSetup);
await this._session.beforeFirstSync(isNewLogin);

View file

@ -21,7 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js";
import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
import {SessionDecryption} from "./decryption/SessionDecryption.js";
import {SessionCache} from "./decryption/SessionCache.js";
import {DecryptionWorker, WorkerPool} from "./decryption/DecryptionWorker.js";
import {DecryptionWorker} from "./decryption/DecryptionWorker.js";
function getSenderKey(event) {
return event.content?.["sender_key"];
@ -36,12 +36,10 @@ function getCiphertext(event) {
}
export class Decryption {
constructor({pickleKey, olm}) {
constructor({pickleKey, olm, workerPool}) {
this._pickleKey = pickleKey;
this._olm = olm;
this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4));
//this._decryptor = new DecryptionWorker(new WorkerPool("./src/worker.js", 4));
this._initPromise = this._decryptor.init();
this._decryptor = workerPool ? new DecryptionWorker(workerPool) : null;
}
createSessionCache(fallback) {
@ -58,7 +56,6 @@ export class Decryption {
* @return {DecryptionPreparation}
*/
async prepareDecryptAll(roomId, events, sessionCache, txn) {
await this._initPromise;
const errors = new Map();
const validEvents = [];

View file

@ -14,184 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {AbortError} from "../../../../utils/error.js";
class WorkerState {
constructor(worker) {
this.worker = worker;
this.busy = false;
}
attach(pool) {
this.worker.addEventListener("message", pool);
this.worker.addEventListener("error", pool);
}
detach(pool) {
this.worker.removeEventListener("message", pool);
this.worker.removeEventListener("error", pool);
}
}
class Request {
constructor(message, pool) {
this._promise = new Promise((_resolve, _reject) => {
this._resolve = _resolve;
this._reject = _reject;
});
this._message = message;
this._pool = pool;
this._worker = null;
}
abort() {
if (this._isNotDisposed) {
this._pool._abortRequest(this);
this._dispose();
}
}
response() {
return this._promise;
}
_dispose() {
this._reject = null;
this._resolve = null;
}
get _isNotDisposed() {
return this._resolve && this._reject;
}
}
export class WorkerPool {
constructor(path, amount) {
this._workers = [];
for (let i = 0; i < amount ; ++i) {
const worker = new WorkerState(new Worker(path));
worker.attach(this);
this._workers[i] = worker;
}
this._requests = new Map();
this._counter = 0;
this._pendingFlag = false;
}
handleEvent(e) {
if (e.type === "message") {
const message = e.data;
const request = this._requests.get(message.replyToId);
if (request) {
request._worker.busy = false;
if (request._isNotDisposed) {
if (message.type === "success") {
request._resolve(message.payload);
} else if (message.type === "error") {
request._reject(new Error(message.stack));
}
request._dispose();
}
this._requests.delete(message.replyToId);
}
this._sendPending();
} else if (e.type === "error") {
console.error("worker error", e);
}
}
_getPendingRequest() {
for (const r of this._requests.values()) {
if (!r._worker) {
return r;
}
}
}
_getFreeWorker() {
for (const w of this._workers) {
if (!w.busy) {
return w;
}
}
}
_sendPending() {
this._pendingFlag = false;
let success;
do {
success = false;
const request = this._getPendingRequest();
if (request) {
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
success = true;
}
}
} while (success);
}
_sendWith(request, worker) {
request._worker = worker;
worker.busy = true;
worker.worker.postMessage(request._message);
}
_enqueueRequest(message) {
this._counter += 1;
message.id = this._counter;
const request = new Request(message, this);
this._requests.set(message.id, request);
return request;
}
send(message) {
const request = this._enqueueRequest(message);
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
}
return request;
}
// assumes all workers are free atm
sendAll(message) {
const promises = this._workers.map(worker => {
const request = this._enqueueRequest(Object.assign({}, message));
this._sendWith(request, worker);
return request.response();
});
return Promise.all(promises);
}
dispose() {
for (const w of this._workers) {
w.worker.terminate();
w.detach(this);
}
}
_trySendPendingInNextTick() {
if (!this._pendingFlag) {
this._pendingFlag = true;
Promise.resolve().then(() => {
this._sendPending();
});
}
}
_abortRequest(request) {
request._reject(new AbortError());
if (request._worker) {
request._worker.busy = false;
}
this._requests.delete(request._message.id);
// allow more requests to be aborted before trying to send other pending
this._trySendPendingInNextTick();
}
}
export class DecryptionWorker {
constructor(workerPool) {
this._workerPool = workerPool;
@ -201,9 +23,4 @@ export class DecryptionWorker {
const sessionKey = session.export_session(session.first_known_index());
return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey});
}
async init() {
await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"});
//await this._workerPool.sendAll({type: "load_olm", path: "../lib/olm/olm_legacy.js"});
}
}

212
src/utils/WorkerPool.js Normal file
View file

@ -0,0 +1,212 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {AbortError} from "./error.js";
class WorkerState {
constructor(worker) {
this.worker = worker;
this.busy = false;
}
attach(pool) {
this.worker.addEventListener("message", pool);
this.worker.addEventListener("error", pool);
}
detach(pool) {
this.worker.removeEventListener("message", pool);
this.worker.removeEventListener("error", pool);
}
}
class Request {
constructor(message, pool) {
this._promise = new Promise((_resolve, _reject) => {
this._resolve = _resolve;
this._reject = _reject;
});
this._message = message;
this._pool = pool;
this._worker = null;
}
abort() {
if (this._isNotDisposed) {
this._pool._abortRequest(this);
this._dispose();
}
}
response() {
return this._promise;
}
_dispose() {
this._reject = null;
this._resolve = null;
}
get _isNotDisposed() {
return this._resolve && this._reject;
}
}
export class WorkerPool {
// TODO: extract DOM specific bits and write unit tests
constructor(path, amount) {
this._workers = [];
for (let i = 0; i < amount ; ++i) {
const worker = new WorkerState(new Worker(path));
worker.attach(this);
this._workers[i] = worker;
}
this._requests = new Map();
this._counter = 0;
this._pendingFlag = false;
this._init = null;
}
init() {
const promise = new Promise((resolve, reject) => {
this._init = {resolve, reject};
});
this.sendAll({type: "ping"})
.then(this._init.resolve, this._init.reject)
.finally(() => {
this._init = null;
});
return promise;
}
handleEvent(e) {
console.log("WorkerPool event", e);
if (e.type === "message") {
const message = e.data;
const request = this._requests.get(message.replyToId);
if (request) {
request._worker.busy = false;
if (request._isNotDisposed) {
if (message.type === "success") {
request._resolve(message.payload);
} else if (message.type === "error") {
request._reject(new Error(message.stack));
}
request._dispose();
}
this._requests.delete(message.replyToId);
}
this._sendPending();
} else if (e.type === "error") {
if (this._init) {
this._init.reject(new Error("worker error during init"));
}
console.error("worker error", e);
}
}
_getPendingRequest() {
for (const r of this._requests.values()) {
if (!r._worker) {
return r;
}
}
}
_getFreeWorker() {
for (const w of this._workers) {
if (!w.busy) {
return w;
}
}
}
_sendPending() {
this._pendingFlag = false;
let success;
do {
success = false;
const request = this._getPendingRequest();
if (request) {
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
success = true;
}
}
} while (success);
}
_sendWith(request, worker) {
request._worker = worker;
worker.busy = true;
worker.worker.postMessage(request._message);
}
_enqueueRequest(message) {
this._counter += 1;
message.id = this._counter;
const request = new Request(message, this);
this._requests.set(message.id, request);
return request;
}
send(message) {
const request = this._enqueueRequest(message);
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
}
return request;
}
// assumes all workers are free atm
sendAll(message) {
const promises = this._workers.map(worker => {
const request = this._enqueueRequest(Object.assign({}, message));
this._sendWith(request, worker);
return request.response();
});
return Promise.all(promises);
}
dispose() {
for (const w of this._workers) {
w.detach(this);
w.worker.terminate();
}
}
_trySendPendingInNextTick() {
if (!this._pendingFlag) {
this._pendingFlag = true;
Promise.resolve().then(() => {
this._sendPending();
});
}
}
_abortRequest(request) {
request._reject(new AbortError());
if (request._worker) {
request._worker.busy = false;
}
this._requests.delete(request._message.id);
// allow more requests to be aborted before trying to send other pending
this._trySendPendingInNextTick();
}
}

View file

@ -96,7 +96,7 @@ class MessageHandler {
async _handleMessage(message) {
const {type} = message;
if (type === "ping") {
this._sendReply(message, {type: "pong"});
this._sendReply(message, {type: "success"});
} else if (type === "load_olm") {
this._sendReply(message, await this._loadOlm(message.path));
} else if (type === "megolm_decrypt") {