Worker WIP

This commit is contained in:
Bruno Windels 2020-09-10 15:40:30 +01:00
parent fdbc5f3c1d
commit 0bf1723d99
12 changed files with 339 additions and 92 deletions

View file

@ -36,8 +36,7 @@ export class ViewModel extends EventEmitter {
if (!this.disposables) {
this.disposables = new Disposables();
}
this.disposables.track(disposable);
return disposable;
return this.disposables.track(disposable);
}
dispose() {

View file

@ -38,7 +38,8 @@ export class RoomViewModel extends ViewModel {
async load() {
this._room.on("change", this._onRoomChange);
try {
this._timeline = await this._room.openTimeline();
this._timeline = this._room.openTimeline();
await this._timeline.load();
this._timelineVM = new TimelineViewModel(this.childOptions({
room: this._room,
timeline: this._timeline,

View file

@ -268,7 +268,7 @@ class DecryptionPreparation {
}
dispose() {
this._megolmDecryptionChanges.dispose();
this._megolmDecryptionPreparation.dispose();
}
}

View file

@ -21,7 +21,7 @@ import {SessionInfo} from "./decryption/SessionInfo.js";
import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
import {SessionDecryption} from "./decryption/SessionDecryption.js";
import {SessionCache} from "./decryption/SessionCache.js";
import {DecryptionWorker} from "./decryption/DecryptionWorker.js";
import {DecryptionWorker, WorkerPool} from "./decryption/DecryptionWorker.js";
function getSenderKey(event) {
return event.content?.["sender_key"];
@ -40,7 +40,7 @@ export class Decryption {
this._pickleKey = pickleKey;
this._olm = olm;
// this._decryptor = new DecryptionWorker(new Worker("./src/worker.js"));
this._decryptor = new DecryptionWorker(new Worker("worker-3074010154.js"));
this._decryptor = new DecryptionWorker(new WorkerPool("worker-1039452087.js", 4));
this._initPromise = this._decryptor.init();
}

View file

@ -14,51 +14,200 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
export class DecryptionWorker {
import {AbortError} from "../../../../utils/error.js";
class WorkerState {
constructor(worker) {
this._worker = worker;
this.worker = worker;
this.busy = false;
}
attach(pool) {
this.worker.addEventListener("message", pool);
this.worker.addEventListener("error", pool);
}
detach(pool) {
this.worker.removeEventListener("message", pool);
this.worker.removeEventListener("error", pool);
}
}
class Request {
constructor(message, pool) {
this._promise = new Promise((_resolve, _reject) => {
this._resolve = _resolve;
this._reject = _reject;
});
this._message = message;
this._pool = pool;
this._worker = null;
}
abort() {
if (this._isNotDisposed) {
this._pool._abortRequest(this);
this._dispose();
}
}
response() {
return this._promise;
}
_dispose() {
this._reject = null;
this._resolve = null;
}
get _isNotDisposed() {
return this._resolve && this._reject;
}
}
export class WorkerPool {
constructor(path, amount) {
this._workers = [];
for (let i = 0; i < amount ; ++i) {
const worker = new WorkerState(new Worker(path));
worker.attach(this);
this._workers[i] = worker;
}
this._requests = new Map();
this._counter = 0;
this._worker.addEventListener("message", this);
this._pendingFlag = false;
}
handleEvent(e) {
if (e.type === "message") {
const message = e.data;
console.log("worker reply", message);
const request = this._requests.get(message.replyToId);
if (request) {
if (message.type === "success") {
request.resolve(message.payload);
} else if (message.type === "error") {
request.reject(new Error(message.stack));
request._worker.busy = false;
if (request._isNotDisposed) {
if (message.type === "success") {
request._resolve(message.payload);
} else if (message.type === "error") {
request._reject(new Error(message.stack));
}
request._dispose();
}
this._requests.delete(message.ref_id);
this._requests.delete(message.replyToId);
}
console.log("got worker reply", message, this._requests.size);
this._sendPending();
} else if (e.type === "error") {
console.error("worker error", e);
}
}
_getPendingRequest() {
for (const r of this._requests.values()) {
if (!r._worker) {
return r;
}
}
}
_send(message) {
_getFreeWorker() {
for (const w of this._workers) {
if (!w.busy) {
return w;
}
}
}
_sendPending() {
this._pendingFlag = false;
console.log("seeing if there is anything to send", this._requests.size);
let success;
do {
success = false;
const request = this._getPendingRequest();
if (request) {
console.log("sending pending request", request);
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
success = true;
}
}
} while (success);
}
_sendWith(request, worker) {
request._worker = worker;
worker.busy = true;
console.log("sending message to worker", request._message);
worker.worker.postMessage(request._message);
}
_enqueueRequest(message) {
this._counter += 1;
message.id = this._counter;
let resolve;
let reject;
const promise = new Promise((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
const request = new Request(message, this);
this._requests.set(message.id, request);
return request;
}
send(message) {
const request = this._enqueueRequest(message);
const worker = this._getFreeWorker();
if (worker) {
this._sendWith(request, worker);
}
return request;
}
// assumes all workers are free atm
sendAll(message) {
const promises = this._workers.map(worker => {
const request = this._enqueueRequest(message);
this._sendWith(request, worker);
return request.response();
});
this._requests.set(message.id, {reject, resolve});
this._worker.postMessage(message);
return promise;
return Promise.all(promises);
}
dispose() {
for (const w of this._workers) {
w.worker.terminate();
w.detach(this);
}
}
_trySendPendingInNextTick() {
if (!this._pendingFlag) {
this._pendingFlag = true;
Promise.resolve().then(() => {
this._sendPending();
});
}
}
_abortRequest(request) {
request._reject(new AbortError());
if (request._worker) {
request._worker.busy = false;
}
this._requests.delete(request._message.id);
// allow more requests to be aborted before trying to send other pending
this._trySendPendingInNextTick();
}
}
export class DecryptionWorker {
constructor(workerPool) {
this._workerPool = workerPool;
}
decrypt(session, ciphertext) {
const sessionKey = session.export_session(session.first_known_index());
return this._send({type: "megolm_decrypt", ciphertext, sessionKey});
return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey});
}
init() {
return this._send({type: "load_olm", path: "olm_legacy-3232457086.js"});
async init() {
await this._workerPool.sendAll({type: "load_olm", path: "olm_legacy-3232457086.js"});
// return this._send({type: "load_olm", path: "../lib/olm/olm_legacy.js"});
}
}

View file

@ -27,6 +27,7 @@ export class SessionDecryption {
this._sessionInfo = sessionInfo;
this._events = events;
this._decryptor = decryptor;
this._decryptionRequests = decryptor ? [] : null;
}
async decryptAll() {
@ -39,7 +40,16 @@ export class SessionDecryption {
try {
const {session} = this._sessionInfo;
const ciphertext = event.content.ciphertext;
const {plaintext, message_index: messageIndex} = await this._decryptor.decrypt(session, ciphertext);
let decryptionResult;
if (this._decryptor) {
const request = this._decryptor.decrypt(session, ciphertext);
this._decryptionRequests.push(request);
decryptionResult = await request.response();
} else {
decryptionResult = session.decrypt(ciphertext);
}
const plaintext = decryptionResult.plaintext;
const messageIndex = decryptionResult.message_index;
let payload;
try {
payload = JSON.parse(plaintext);
@ -54,6 +64,10 @@ export class SessionDecryption {
const result = new DecryptionResult(payload, this._sessionInfo.senderKey, this._sessionInfo.claimedKeys);
results.set(event.event_id, result);
} catch (err) {
// ignore AbortError from cancelling decryption requests in dispose method
if (err.name === "AbortError") {
return;
}
if (!errors) {
errors = new Map();
}
@ -65,6 +79,12 @@ export class SessionDecryption {
}
dispose() {
if (this._decryptionRequests) {
for (const r of this._decryptionRequests) {
r.abort();
}
}
// TODO: cancel decryptions here
this._sessionInfo.release();
}
}

View file

@ -65,7 +65,8 @@ export class Room extends EventEmitter {
retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
}
}
await this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
await decryptRequest.complete();
if (this._timeline) {
// only adds if already present
this._timeline.replaceEntries(retryEntries);
@ -89,31 +90,39 @@ export class Room extends EventEmitter {
* Used for decrypting when loading/filling the timeline, and retrying decryption,
* not during sync, where it is split up during the multiple phases.
*/
async _decryptEntries(source, entries, inboundSessionTxn = null) {
if (!inboundSessionTxn) {
inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
}
const events = entries.filter(entry => {
return entry.eventType === EVENT_ENCRYPTED_TYPE;
}).map(entry => entry.event);
const isTimelineOpen = this._isTimelineOpen;
const preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn);
const changes = await preparation.decrypt();
const stores = [this._storage.storeNames.groupSessionDecryptions];
if (isTimelineOpen) {
// read to fetch devices if timeline is open
stores.push(this._storage.storeNames.deviceIdentities);
}
const writeTxn = await this._storage.readWriteTxn(stores);
let decryption;
try {
decryption = await changes.write(writeTxn);
} catch (err) {
writeTxn.abort();
throw err;
}
await writeTxn.complete();
decryption.applyToEntries(entries);
_decryptEntries(source, entries, inboundSessionTxn = null) {
const request = new DecryptionRequest(async r => {
if (!inboundSessionTxn) {
inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
}
if (r.cancelled) return;
const events = entries.filter(entry => {
return entry.eventType === EVENT_ENCRYPTED_TYPE;
}).map(entry => entry.event);
const isTimelineOpen = this._isTimelineOpen;
r.preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn);
if (r.cancelled) return;
// TODO: should this throw an AbortError?
const changes = await r.preparation.decrypt();
r.preparation = null;
if (r.cancelled) return;
const stores = [this._storage.storeNames.groupSessionDecryptions];
if (isTimelineOpen) {
// read to fetch devices if timeline is open
stores.push(this._storage.storeNames.deviceIdentities);
}
const writeTxn = await this._storage.readWriteTxn(stores);
let decryption;
try {
decryption = await changes.write(writeTxn);
} catch (err) {
writeTxn.abort();
throw err;
}
await writeTxn.complete();
decryption.applyToEntries(entries);
});
return request;
}
get needsPrepareSync() {
@ -349,7 +358,8 @@ export class Room extends EventEmitter {
}
await txn.complete();
if (this._roomEncryption) {
await this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
await decryptRequest.complete();
}
// once txn is committed, update in-memory state & emit events
for (const fragment of gapResult.fragments) {
@ -461,7 +471,7 @@ export class Room extends EventEmitter {
}
/** @public */
async openTimeline() {
openTimeline() {
if (this._timeline) {
throw new Error("not dealing with load race here for now");
}
@ -483,7 +493,6 @@ export class Room extends EventEmitter {
if (this._roomEncryption) {
this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
}
await this._timeline.load();
return this._timeline;
}
@ -502,3 +511,25 @@ export class Room extends EventEmitter {
}
}
class DecryptionRequest {
constructor(decryptFn) {
this._cancelled = false;
this.preparation = null;
this._promise = decryptFn(this);
}
complete() {
return this._promise;
}
get cancelled() {
return this._cancelled;
}
dispose() {
this._cancelled = true;
if (this.preparation) {
this.preparation.dispose();
}
}
}

View file

@ -15,6 +15,7 @@ limitations under the License.
*/
import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js";
import {Disposables} from "../../../utils/Disposables.js";
import {Direction} from "./Direction.js";
import {TimelineReader} from "./persistence/TimelineReader.js";
import {PendingEventEntry} from "./entries/PendingEventEntry.js";
@ -26,12 +27,14 @@ export class Timeline {
this._storage = storage;
this._closeCallback = closeCallback;
this._fragmentIdComparer = fragmentIdComparer;
this._disposables = new Disposables();
this._remoteEntries = new SortedArray((a, b) => a.compare(b));
this._timelineReader = new TimelineReader({
roomId: this._roomId,
storage: this._storage,
fragmentIdComparer: this._fragmentIdComparer
});
this._readerRequest = null;
const localEntries = new MappedList(pendingEvents, pe => {
return new PendingEventEntry({pendingEvent: pe, user});
}, (pee, params) => {
@ -42,8 +45,14 @@ export class Timeline {
/** @package */
async load() {
const entries = await this._timelineReader.readFromEnd(25);
this._remoteEntries.setManySorted(entries);
// 30 seems to be a good amount to fill the entire screen
const readerRequest = this._disposables.track(this._timelineReader.readFromEnd(30));
try {
const entries = await readerRequest.complete();
this._remoteEntries.setManySorted(entries);
} finally {
this._disposables.disposeTracked(readerRequest);
}
}
replaceEntries(entries) {
@ -71,12 +80,17 @@ export class Timeline {
if (!firstEventEntry) {
return;
}
const entries = await this._timelineReader.readFrom(
const readerRequest = this._disposables.track(this._timelineReader.readFrom(
firstEventEntry.asEventKey(),
Direction.Backward,
amount
);
this._remoteEntries.setManySorted(entries);
));
try {
const entries = await readerRequest.complete();
this._remoteEntries.setManySorted(entries);
} finally {
this._disposables.disposeTracked(readerRequest);
}
}
/** @public */
@ -87,6 +101,8 @@ export class Timeline {
/** @public */
close() {
if (this._closeCallback) {
this._readerRequest?.dispose();
this._readerRequest = null;
this._closeCallback();
this._closeCallback = null;
}

View file

@ -19,6 +19,24 @@ import {Direction} from "../Direction.js";
import {EventEntry} from "../entries/EventEntry.js";
import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js";
class ReaderRequest {
constructor(fn) {
this.decryptRequest = null;
this._promise = fn(this);
}
complete() {
return this._promise;
}
dispose() {
if (this.decryptRequest) {
this.decryptRequest.dispose();
this.decryptRequest = null;
}
}
}
export class TimelineReader {
constructor({roomId, storage, fragmentIdComparer}) {
this._roomId = roomId;
@ -42,12 +60,33 @@ export class TimelineReader {
return this._storage.readTxn(stores);
}
async readFrom(eventKey, direction, amount) {
const txn = await this._openTxn();
return await this._readFrom(eventKey, direction, amount, txn);
readFrom(eventKey, direction, amount) {
return new ReaderRequest(async r => {
const txn = await this._openTxn();
return await this._readFrom(eventKey, direction, amount, r, txn);
});
}
async _readFrom(eventKey, direction, amount, txn) {
readFromEnd(amount) {
return new ReaderRequest(async r => {
const txn = await this._openTxn();
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
let entries;
// room hasn't been synced yet
if (!liveFragment) {
entries = [];
} else {
this._fragmentIdComparer.add(liveFragment);
const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
const eventKey = liveFragmentEntry.asEventKey();
entries = await this._readFrom(eventKey, Direction.Backward, amount, r, txn);
entries.unshift(liveFragmentEntry);
}
return entries;
});
}
async _readFrom(eventKey, direction, amount, r, txn) {
let entries = [];
const timelineStore = txn.timelineEvents;
const fragmentStore = txn.timelineFragments;
@ -83,25 +122,12 @@ export class TimelineReader {
}
if (this._decryptEntries) {
await this._decryptEntries(entries, txn);
}
return entries;
}
async readFromEnd(amount) {
const txn = await this._openTxn();
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
let entries;
// room hasn't been synced yet
if (!liveFragment) {
entries = [];
} else {
this._fragmentIdComparer.add(liveFragment);
const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
const eventKey = liveFragmentEntry.asEventKey();
entries = await this._readFrom(eventKey, Direction.Backward, amount, txn);
entries.unshift(liveFragmentEntry);
r.decryptRequest = this._decryptEntries(entries, txn);
try {
await r.decryptRequest.complete();
} finally {
r.decryptRequest = null;
}
}
return entries;
}

View file

@ -29,6 +29,7 @@ export class Disposables {
track(disposable) {
this._disposables.push(disposable);
return disposable;
}
dispose() {

View file

@ -14,6 +14,10 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// polyfills needed for IE11
// just enough to run olm, have promises and async/await
import "regenerator-runtime/runtime";
import "core-js/modules/es.promise";
import "core-js/modules/es.math.imul";
import "core-js/modules/es.math.clz32";

View file

@ -94,13 +94,13 @@ class MessageHandler {
}
async _handleMessage(message) {
switch (message.type) {
case "load_olm":
this._sendReply(message, await this._loadOlm(message.path));
break;
case "megolm_decrypt":
this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext));
break;
const {type} = message;
if (type === "ping") {
this._sendReply(message, {type: "pong"});
} else if (type === "load_olm") {
this._sendReply(message, await this._loadOlm(message.path));
} else if (type === "megolm_decrypt") {
this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext));
}
}
}