diff --git a/index.html b/index.html
index b09286a0..74e44c99 100644
--- a/index.html
+++ b/index.html
@@ -19,9 +19,12 @@
` +
+ `` +
`` +
- ``);
+ ``);
removeOrEnableScript(doc("script#service-worker"), offline);
const versionScript = doc("script#version");
@@ -180,23 +186,24 @@ async function buildHtml(doc, version, assetPaths, manifestPath) {
await fs.writeFile(path.join(targetDir, "index.html"), doc.html(), "utf8");
}
-async function buildJs() {
+async function buildJs(inputFile, outputName) {
// create js bundle
const bundle = await rollup({
- input: 'src/main.js',
+ input: inputFile,
plugins: [removeJsComments({comments: "none"})]
});
const {output} = await bundle.generate({
format: 'es',
+ // TODO: can remove this?
name: `${PROJECT_ID}Bundle`
});
const code = output[0].code;
- const bundlePath = resource(`${PROJECT_ID}.js`, code);
+ const bundlePath = resource(outputName, code);
await fs.writeFile(bundlePath, code, "utf8");
return bundlePath;
}
-async function buildJsLegacy() {
+async function buildJsLegacy(inputFile, outputName) {
// compile down to whatever IE 11 needs
const babelPlugin = babel.babel({
babelHelpers: 'bundled',
@@ -214,7 +221,7 @@ async function buildJsLegacy() {
});
// create js bundle
const rollupConfig = {
- input: ['src/legacy-polyfill.js', 'src/main.js'],
+ input: ['src/legacy-polyfill.js', inputFile],
plugins: [multi(), commonjs(), nodeResolve(), babelPlugin, removeJsComments({comments: "none"})]
};
const bundle = await rollup(rollupConfig);
@@ -223,7 +230,39 @@ async function buildJsLegacy() {
name: `${PROJECT_ID}Bundle`
});
const code = output[0].code;
- const bundlePath = resource(`${PROJECT_ID}-legacy.js`, code);
+ const bundlePath = resource(outputName, code);
+ await fs.writeFile(bundlePath, code, "utf8");
+ return bundlePath;
+}
+
+async function buildWorkerJsLegacy(inputFile, outputName) {
+ // compile down to whatever IE 11 needs
+ const babelPlugin = babel.babel({
+ babelHelpers: 'bundled',
+ exclude: 'node_modules/**',
+ presets: [
+ [
+ "@babel/preset-env",
+ {
+ useBuiltIns: "entry",
+ corejs: "3",
+ targets: "IE 11"
+ }
+ ]
+ ]
+ });
+ // create js bundle
+ const rollupConfig = {
+ input: ['src/worker-polyfill.js', inputFile],
+ plugins: [multi(), commonjs(), nodeResolve(), babelPlugin, removeJsComments({comments: "none"})]
+ };
+ const bundle = await rollup(rollupConfig);
+ const {output} = await bundle.generate({
+ format: 'iife',
+ name: `${PROJECT_ID}Bundle`
+ });
+ const code = output[0].code;
+ const bundlePath = resource(outputName, code);
await fs.writeFile(bundlePath, code, "utf8");
return bundlePath;
}
diff --git a/src/domain/ViewModel.js b/src/domain/ViewModel.js
index bc35fabd..15812f8c 100644
--- a/src/domain/ViewModel.js
+++ b/src/domain/ViewModel.js
@@ -36,8 +36,7 @@ export class ViewModel extends EventEmitter {
if (!this.disposables) {
this.disposables = new Disposables();
}
- this.disposables.track(disposable);
- return disposable;
+ return this.disposables.track(disposable);
}
dispose() {
diff --git a/src/domain/session/room/RoomViewModel.js b/src/domain/session/room/RoomViewModel.js
index 32e09fbe..a2ea5f66 100644
--- a/src/domain/session/room/RoomViewModel.js
+++ b/src/domain/session/room/RoomViewModel.js
@@ -38,7 +38,8 @@ export class RoomViewModel extends ViewModel {
async load() {
this._room.on("change", this._onRoomChange);
try {
- this._timeline = await this._room.openTimeline();
+ this._timeline = this.track(this._room.openTimeline());
+ await this._timeline.load();
this._timelineVM = new TimelineViewModel(this.childOptions({
room: this._room,
timeline: this._timeline,
@@ -62,17 +63,15 @@ export class RoomViewModel extends ViewModel {
}
dispose() {
- // this races with enable, on the await openTimeline()
- if (this._timeline) {
- // will stop the timeline from delivering updates on entries
- this._timeline.close();
- }
+ super.dispose();
if (this._clearUnreadTimout) {
this._clearUnreadTimout.abort();
this._clearUnreadTimout = null;
}
}
+ // called from view to close room
+ // parent vm will dispose this vm
close() {
this._closeCallback();
}
diff --git a/src/domain/session/room/timeline/TimelineViewModel.js b/src/domain/session/room/timeline/TimelineViewModel.js
index 1527d3ae..16d529cb 100644
--- a/src/domain/session/room/timeline/TimelineViewModel.js
+++ b/src/domain/session/room/timeline/TimelineViewModel.js
@@ -54,7 +54,7 @@ export class TimelineViewModel extends ViewModel {
if (firstTile.shape === "gap") {
return firstTile.fill();
} else {
- await this._timeline.loadAtTop(50);
+ await this._timeline.loadAtTop(10);
return false;
}
}
diff --git a/src/legacy-polyfill.js b/src/legacy-polyfill.js
index 5665158c..a48416c7 100644
--- a/src/legacy-polyfill.js
+++ b/src/legacy-polyfill.js
@@ -23,4 +23,4 @@ if (!Element.prototype.remove) {
Element.prototype.remove = function remove() {
this.parentNode.removeChild(this);
};
-}
\ No newline at end of file
+}
diff --git a/src/main.js b/src/main.js
index 79f5698d..6f279910 100644
--- a/src/main.js
+++ b/src/main.js
@@ -25,6 +25,7 @@ import {BrawlViewModel} from "./domain/BrawlViewModel.js";
import {BrawlView} from "./ui/web/BrawlView.js";
import {Clock} from "./ui/web/dom/Clock.js";
import {OnlineStatus} from "./ui/web/dom/OnlineStatus.js";
+import {WorkerPool} from "./utils/WorkerPool.js";
function addScript(src) {
return new Promise(function (resolve, reject) {
@@ -55,10 +56,27 @@ async function loadOlm(olmPaths) {
return null;
}
+// make path relative to basePath,
+// assuming it and basePath are relative to document
+function relPath(path, basePath) {
+ const idx = basePath.lastIndexOf("/");
+ const dir = idx === -1 ? "" : basePath.slice(0, idx);
+ const dirCount = dir.length ? dir.split("/").length : 0;
+ return "../".repeat(dirCount) + path;
+}
+
+async function loadWorker(paths) {
+ const workerPool = new WorkerPool(paths.worker, 4);
+ await workerPool.init();
+ const path = relPath(paths.olm.legacyBundle, paths.worker);
+ await workerPool.sendAll({type: "load_olm", path});
+ return workerPool;
+}
+
// Don't use a default export here, as we use multiple entries during legacy build,
// which does not support default exports,
// see https://github.com/rollup/plugins/tree/master/packages/multi-entry
-export async function main(container, olmPaths) {
+export async function main(container, paths) {
try {
// to replay:
// const fetchLog = await (await fetch("/fetchlogs/constrainterror.json")).json();
@@ -79,6 +97,13 @@ export async function main(container, olmPaths) {
const sessionInfoStorage = new SessionInfoStorage("brawl_sessions_v1");
const storageFactory = new StorageFactory();
+ // if wasm is not supported, we'll want
+ // to run some olm operations in a worker (mainly for IE11)
+ let workerPromise;
+ if (!window.WebAssembly) {
+ workerPromise = loadWorker(paths);
+ }
+
const vm = new BrawlViewModel({
createSessionContainer: () => {
return new SessionContainer({
@@ -88,7 +113,8 @@ export async function main(container, olmPaths) {
sessionInfoStorage,
request,
clock,
- olmPromise: loadOlm(olmPaths),
+ olmPromise: loadOlm(paths.olm),
+ workerPromise,
});
},
sessionInfoStorage,
diff --git a/src/matrix/Session.js b/src/matrix/Session.js
index 19725b58..be3f6a06 100644
--- a/src/matrix/Session.js
+++ b/src/matrix/Session.js
@@ -33,7 +33,7 @@ const PICKLE_KEY = "DEFAULT_KEY";
export class Session {
// sessionInfo contains deviceId, userId and homeServer
- constructor({clock, storage, hsApi, sessionInfo, olm}) {
+ constructor({clock, storage, hsApi, sessionInfo, olm, workerPool}) {
this._clock = clock;
this._storage = storage;
this._hsApi = hsApi;
@@ -52,6 +52,7 @@ export class Session {
this._megolmEncryption = null;
this._megolmDecryption = null;
this._getSyncToken = () => this.syncToken;
+ this._workerPool = workerPool;
if (olm) {
this._olmUtil = new olm.Utility();
@@ -100,6 +101,7 @@ export class Session {
this._megolmDecryption = new MegOlmDecryption({
pickleKey: PICKLE_KEY,
olm: this._olm,
+ workerPool: this._workerPool,
});
this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption: this._megolmDecryption});
}
@@ -202,6 +204,7 @@ export class Session {
}
stop() {
+ this._workerPool?.dispose();
this._sendScheduler.stop();
}
@@ -255,7 +258,7 @@ export class Session {
return room;
}
- async writeSync(syncResponse, syncFilterId, roomChanges, txn) {
+ async writeSync(syncResponse, syncFilterId, txn) {
const changes = {};
const syncToken = syncResponse.next_batch;
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
@@ -362,7 +365,7 @@ export function tests() {
}
}
};
- const newSessionData = await session.writeSync({next_batch: "b"}, 6, {}, syncTxn);
+ const newSessionData = await session.writeSync({next_batch: "b"}, 6, syncTxn);
assert(syncSet);
assert.equal(session.syncToken, "a");
assert.equal(session.syncFilterId, 5);
diff --git a/src/matrix/SessionContainer.js b/src/matrix/SessionContainer.js
index c07190eb..1e868eba 100644
--- a/src/matrix/SessionContainer.js
+++ b/src/matrix/SessionContainer.js
@@ -42,7 +42,7 @@ export const LoginFailure = createEnum(
);
export class SessionContainer {
- constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise}) {
+ constructor({clock, random, onlineStatus, request, storageFactory, sessionInfoStorage, olmPromise, workerPromise}) {
this._random = random;
this._clock = clock;
this._onlineStatus = onlineStatus;
@@ -59,6 +59,7 @@ export class SessionContainer {
this._sessionId = null;
this._storage = null;
this._olmPromise = olmPromise;
+ this._workerPromise = workerPromise;
}
createNewSessionId() {
@@ -152,8 +153,13 @@ export class SessionContainer {
homeServer: sessionInfo.homeServer,
};
const olm = await this._olmPromise;
+ let workerPool = null;
+ if (this._workerPromise) {
+ workerPool = await this._workerPromise;
+ }
this._session = new Session({storage: this._storage,
- sessionInfo: filteredSessionInfo, hsApi, olm, clock: this._clock});
+ sessionInfo: filteredSessionInfo, hsApi, olm,
+ clock: this._clock, workerPool});
await this._session.load();
this._status.set(LoadStatus.SessionSetup);
await this._session.beforeFirstSync(isNewLogin);
diff --git a/src/matrix/Sync.js b/src/matrix/Sync.js
index 598b9169..c81acee0 100644
--- a/src/matrix/Sync.js
+++ b/src/matrix/Sync.js
@@ -29,21 +29,6 @@ export const SyncStatus = createEnum(
"Stopped"
);
-function parseRooms(roomsSection, roomCallback) {
- if (roomsSection) {
- const allMemberships = ["join", "invite", "leave"];
- for(const membership of allMemberships) {
- const membershipSection = roomsSection[membership];
- if (membershipSection) {
- return Object.entries(membershipSection).map(([roomId, roomResponse]) => {
- return roomCallback(roomId, roomResponse, membership);
- });
- }
- }
- }
- return [];
-}
-
function timelineIsEmpty(roomResponse) {
try {
const events = roomResponse?.timeline?.events;
@@ -53,6 +38,26 @@ function timelineIsEmpty(roomResponse) {
}
}
+/**
+ * Sync steps in js-pseudocode:
+ * ```js
+ * let preparation;
+ * if (room.needsPrepareSync) {
+ * // can only read some stores
+ * preparation = await room.prepareSync(roomResponse, prepareTxn);
+ * // can do async work that is not related to storage (such as decryption)
+ * preparation = await room.afterPrepareSync(preparation);
+ * }
+ * // writes and calculates changes
+ * const changes = await room.writeSync(roomResponse, membership, isInitialSync, preparation, syncTxn);
+ * // applies and emits changes once syncTxn is committed
+ * room.afterSync(changes);
+ * if (room.needsAfterSyncCompleted(changes)) {
+ * // can do network requests
+ * await room.afterSyncCompleted(changes);
+ * }
+ * ```
+ */
export class Sync {
constructor({hsApi, session, storage}) {
this._hsApi = hsApi;
@@ -90,13 +95,13 @@ export class Sync {
let afterSyncCompletedPromise = Promise.resolve();
// if syncToken is falsy, it will first do an initial sync ...
while(this._status.get() !== SyncStatus.Stopped) {
- let roomChanges;
+ let roomStates;
try {
console.log(`starting sync request with since ${syncToken} ...`);
const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined;
const syncResult = await this._syncRequest(syncToken, timeout, afterSyncCompletedPromise);
syncToken = syncResult.syncToken;
- roomChanges = syncResult.roomChanges;
+ roomStates = syncResult.roomStates;
this._status.set(SyncStatus.Syncing);
} catch (err) {
if (!(err instanceof AbortError)) {
@@ -105,12 +110,12 @@ export class Sync {
}
}
if (!this._error) {
- afterSyncCompletedPromise = this._runAfterSyncCompleted(roomChanges);
+ afterSyncCompletedPromise = this._runAfterSyncCompleted(roomStates);
}
}
}
- async _runAfterSyncCompleted(roomChanges) {
+ async _runAfterSyncCompleted(roomStates) {
const sessionPromise = (async () => {
try {
await this._session.afterSyncCompleted();
@@ -118,23 +123,22 @@ export class Sync {
console.error("error during session afterSyncCompleted, continuing", err.stack);
}
})();
- let allPromises = [sessionPromise];
- const roomsNeedingAfterSyncCompleted = roomChanges.filter(rc => {
- return rc.changes.needsAfterSyncCompleted;
+ const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => {
+ return rs.room.needsAfterSyncCompleted(rs.changes);
+ });
+ const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => {
+ try {
+ await rs.room.afterSyncCompleted(rs.changes);
+ } catch (err) {
+ console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack);
+ }
});
- if (roomsNeedingAfterSyncCompleted.length) {
- allPromises = allPromises.concat(roomsNeedingAfterSyncCompleted.map(async ({room, changes}) => {
- try {
- await room.afterSyncCompleted(changes);
- } catch (err) {
- console.error(`error during room ${room.id} afterSyncCompleted, continuing`, err.stack);
- }
- }));
- }
// run everything in parallel,
// we don't want to delay the next sync too much
- await Promise.all(allPromises);
+ // Also, since all promises won't reject (as they have a try/catch)
+ // it's fine to use Promise.all
+ await Promise.all(roomsPromises.concat(sessionPromise));
}
async _syncRequest(syncToken, timeout, prevAfterSyncCompletedPromise) {
@@ -152,16 +156,17 @@ export class Sync {
const isInitialSync = !syncToken;
syncToken = response.next_batch;
- const syncTxn = await this._openSyncTxn();
- let roomChanges = [];
+ const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
+ await this._prepareRooms(roomStates);
let sessionChanges;
+ const syncTxn = await this._openSyncTxn();
try {
- // to_device
- // presence
- if (response.rooms) {
- roomChanges = await this._writeRoomResponses(response.rooms, isInitialSync, syncTxn);
- }
- sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn);
+ await Promise.all(roomStates.map(async rs => {
+ console.log(` * applying sync response to room ${rs.room.id} ...`);
+ rs.changes = await rs.room.writeSync(
+ rs.roomResponse, rs.membership, isInitialSync, rs.preparation, syncTxn);
+ }));
+ sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
} catch(err) {
console.warn("aborting syncTxn because of error");
console.error(err);
@@ -180,31 +185,31 @@ export class Sync {
}
this._session.afterSync(sessionChanges);
// emit room related events after txn has been closed
- for(let {room, changes} of roomChanges) {
- room.afterSync(changes);
+ for(let rs of roomStates) {
+ rs.room.afterSync(rs.changes);
}
- return {syncToken, roomChanges};
+ return {syncToken, roomStates};
}
- async _writeRoomResponses(roomResponses, isInitialSync, syncTxn) {
- const roomChanges = [];
- const promises = parseRooms(roomResponses, async (roomId, roomResponse, membership) => {
- // ignore rooms with empty timelines during initial sync,
- // see https://github.com/vector-im/hydrogen-web/issues/15
- if (isInitialSync && timelineIsEmpty(roomResponse)) {
- return;
- }
- let room = this._session.rooms.get(roomId);
- if (!room) {
- room = this._session.createRoom(roomId);
- }
- console.log(` * applying sync response to room ${roomId} ...`);
- const changes = await room.writeSync(roomResponse, membership, isInitialSync, syncTxn);
- roomChanges.push({room, changes});
- });
- await Promise.all(promises);
- return roomChanges;
+ async _openPrepareSyncTxn() {
+ const storeNames = this._storage.storeNames;
+ return await this._storage.readTxn([
+ storeNames.inboundGroupSessions,
+ ]);
+ }
+
+ async _prepareRooms(roomStates) {
+ const prepareRoomStates = roomStates.filter(rs => rs.room.needsPrepareSync);
+ if (prepareRoomStates.length) {
+ const prepareTxn = await this._openPrepareSyncTxn();
+ await Promise.all(prepareRoomStates.map(async rs => {
+ rs.preparation = await rs.room.prepareSync(rs.roomResponse, prepareTxn);
+ }));
+ await Promise.all(prepareRoomStates.map(async rs => {
+ rs.preparation = await rs.room.afterPrepareSync(rs.preparation);
+ }));
+ }
}
async _openSyncTxn() {
@@ -218,13 +223,39 @@ export class Sync {
storeNames.timelineFragments,
storeNames.pendingEvents,
storeNames.userIdentities,
- storeNames.inboundGroupSessions,
storeNames.groupSessionDecryptions,
storeNames.deviceIdentities,
// to discard outbound session when somebody leaves a room
storeNames.outboundGroupSessions
]);
}
+
+ _parseRoomsResponse(roomsSection, isInitialSync) {
+ const roomStates = [];
+ if (roomsSection) {
+ // don't do "invite", "leave" for now
+ const allMemberships = ["join"];
+ for(const membership of allMemberships) {
+ const membershipSection = roomsSection[membership];
+ if (membershipSection) {
+ for (const [roomId, roomResponse] of Object.entries(membershipSection)) {
+ // ignore rooms with empty timelines during initial sync,
+ // see https://github.com/vector-im/hydrogen-web/issues/15
+ if (isInitialSync && timelineIsEmpty(roomResponse)) {
+ return;
+ }
+ let room = this._session.rooms.get(roomId);
+ if (!room) {
+ room = this._session.createRoom(roomId);
+ }
+ roomStates.push(new RoomSyncProcessState(room, roomResponse, membership));
+ }
+ }
+ }
+ }
+ return roomStates;
+ }
+
stop() {
if (this._status.get() === SyncStatus.Stopped) {
@@ -237,3 +268,13 @@ export class Sync {
}
}
}
+
+class RoomSyncProcessState {
+ constructor(room, roomResponse, membership) {
+ this.room = room;
+ this.roomResponse = roomResponse;
+ this.membership = membership;
+ this.preparation = null;
+ this.changes = null;
+ }
+}
diff --git a/src/matrix/e2ee/README.md b/src/matrix/e2ee/README.md
new file mode 100644
index 00000000..46f4e95f
--- /dev/null
+++ b/src/matrix/e2ee/README.md
@@ -0,0 +1,44 @@
+## Integratation within the sync lifetime cycle
+
+### prepareSync
+
+ The session can start its own read/write transactions here, rooms only read from a shared transaction
+
+ - session
+ - device handler
+ - txn
+ - write pending encrypted
+ - txn
+ - olm decryption read
+ - olm async decryption
+ - dispatch to worker
+ - txn
+ - olm decryption write / remove pending encrypted
+ - rooms (with shared read txn)
+ - megolm decryption read
+
+### afterPrepareSync
+
+ - rooms
+ - megolm async decryption
+ - dispatch to worker
+
+### writeSync
+
+ - rooms (with shared readwrite txn)
+ - megolm decryption write, yielding decrypted events
+ - use decrypted events to write room summary
+
+### afterSync
+
+ - rooms
+ - emit changes
+
+### afterSyncCompleted
+
+ - session
+ - e2ee account
+ - generate more otks if needed
+ - upload new otks if needed or device keys if not uploaded before
+ - rooms
+ - share new room keys if needed
diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js
index c8f993e5..44229b97 100644
--- a/src/matrix/e2ee/RoomEncryption.js
+++ b/src/matrix/e2ee/RoomEncryption.js
@@ -14,8 +14,9 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-import {MEGOLM_ALGORITHM} from "./common.js";
+import {MEGOLM_ALGORITHM, DecryptionSource} from "./common.js";
import {groupBy} from "../../utils/groupBy.js";
+import {mergeMap} from "../../utils/mergeMap.js";
import {makeTxnId} from "../common.js";
const ENCRYPTED_TYPE = "m.room.encrypted";
@@ -55,23 +56,54 @@ export class RoomEncryption {
return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
}
- async decrypt(event, isSync, isTimelineOpen, retryData, txn) {
- if (event.redacted_because || event.unsigned?.redacted_because) {
- return;
+ // this happens before entries exists, as they are created by the syncwriter
+ // but we want to be able to map it back to something in the timeline easily
+ // when retrying decryption.
+ async prepareDecryptAll(events, source, isTimelineOpen, txn) {
+ const errors = [];
+ const validEvents = [];
+ for (const event of events) {
+ if (event.redacted_because || event.unsigned?.redacted_because) {
+ continue;
+ }
+ if (event.content?.algorithm !== MEGOLM_ALGORITHM) {
+ errors.set(event.event_id, new Error("Unsupported algorithm: " + event.content?.algorithm));
+ }
+ validEvents.push(event);
}
- if (event.content?.algorithm !== MEGOLM_ALGORITHM) {
- throw new Error("Unsupported algorithm: " + event.content?.algorithm);
+ let customCache;
+ let sessionCache;
+ if (source === DecryptionSource.Sync) {
+ sessionCache = this._megolmSyncCache;
+ } else if (source === DecryptionSource.Timeline) {
+ sessionCache = this._megolmBackfillCache;
+ } else if (source === DecryptionSource.Retry) {
+ // when retrying, we could have mixed events from at the bottom of the timeline (sync)
+ // and somewhere else, so create a custom cache we use just for this operation.
+ customCache = this._megolmEncryption.createSessionCache();
+ sessionCache = customCache;
+ } else {
+ throw new Error("Unknown source: " + source);
}
- let sessionCache = isSync ? this._megolmSyncCache : this._megolmBackfillCache;
- const result = await this._megolmDecryption.decrypt(
- this._room.id, event, sessionCache, txn);
- if (!result) {
- this._addMissingSessionEvent(event, isSync, retryData);
+ const preparation = await this._megolmDecryption.prepareDecryptAll(
+ this._room.id, validEvents, sessionCache, txn);
+ if (customCache) {
+ customCache.dispose();
}
- if (result && isTimelineOpen) {
- await this._verifyDecryptionResult(result, txn);
+ return new DecryptionPreparation(preparation, errors, {isTimelineOpen}, this);
+ }
+
+ async _processDecryptionResults(results, errors, flags, txn) {
+ for (const error of errors.values()) {
+ if (error.code === "MEGOLM_NO_SESSION") {
+ this._addMissingSessionEvent(error.event);
+ }
+ }
+ if (flags.isTimelineOpen) {
+ for (const result of results.values()) {
+ await this._verifyDecryptionResult(result, txn);
+ }
}
- return result;
}
async _verifyDecryptionResult(result, txn) {
@@ -87,30 +119,30 @@ export class RoomEncryption {
}
}
- _addMissingSessionEvent(event, isSync, data) {
+ _addMissingSessionEvent(event) {
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
const key = `${senderKey}|${sessionId}`;
let eventIds = this._eventIdsByMissingSession.get(key);
if (!eventIds) {
- eventIds = new Map();
+ eventIds = new Set();
this._eventIdsByMissingSession.set(key, eventIds);
}
- eventIds.set(event.event_id, {data, isSync});
+ eventIds.add(event.event_id);
}
applyRoomKeys(roomKeys) {
// retry decryption with the new sessions
- const retryEntries = [];
+ const retryEventIds = [];
for (const roomKey of roomKeys) {
const key = `${roomKey.senderKey}|${roomKey.sessionId}`;
const entriesForSession = this._eventIdsByMissingSession.get(key);
if (entriesForSession) {
this._eventIdsByMissingSession.delete(key);
- retryEntries.push(...entriesForSession.values());
+ retryEventIds.push(...entriesForSession);
}
}
- return retryEntries;
+ return retryEventIds;
}
async encrypt(type, content, hsApi) {
@@ -214,3 +246,65 @@ export class RoomEncryption {
await hsApi.sendToDevice(type, payload, txnId).response();
}
}
+
+/**
+ * wrappers around megolm decryption classes to be able to post-process
+ * the decryption results before turning them
+ */
+class DecryptionPreparation {
+ constructor(megolmDecryptionPreparation, extraErrors, flags, roomEncryption) {
+ this._megolmDecryptionPreparation = megolmDecryptionPreparation;
+ this._extraErrors = extraErrors;
+ this._flags = flags;
+ this._roomEncryption = roomEncryption;
+ }
+
+ async decrypt() {
+ return new DecryptionChanges(
+ await this._megolmDecryptionPreparation.decrypt(),
+ this._extraErrors,
+ this._flags,
+ this._roomEncryption);
+ }
+
+ dispose() {
+ this._megolmDecryptionPreparation.dispose();
+ }
+}
+
+class DecryptionChanges {
+ constructor(megolmDecryptionChanges, extraErrors, flags, roomEncryption) {
+ this._megolmDecryptionChanges = megolmDecryptionChanges;
+ this._extraErrors = extraErrors;
+ this._flags = flags;
+ this._roomEncryption = roomEncryption;
+ }
+
+ async write(txn) {
+ const {results, errors} = await this._megolmDecryptionChanges.write(txn);
+ mergeMap(this._extraErrors, errors);
+ await this._roomEncryption._processDecryptionResults(results, errors, this._flags, txn);
+ return new BatchDecryptionResult(results, errors);
+ }
+}
+
+class BatchDecryptionResult {
+ constructor(results, errors) {
+ this.results = results;
+ this.errors = errors;
+ }
+
+ applyToEntries(entries) {
+ for (const entry of entries) {
+ const result = this.results.get(entry.id);
+ if (result) {
+ entry.setDecryptionResult(result);
+ } else {
+ const error = this.errors.get(entry.id);
+ if (error) {
+ entry.setDecryptionError(error);
+ }
+ }
+ }
+ }
+}
diff --git a/src/matrix/e2ee/common.js b/src/matrix/e2ee/common.js
index 3312032b..190f2fa2 100644
--- a/src/matrix/e2ee/common.js
+++ b/src/matrix/e2ee/common.js
@@ -15,6 +15,9 @@ limitations under the License.
*/
import anotherjson from "../../../lib/another-json/index.js";
+import {createEnum} from "../../utils/enum.js";
+
+export const DecryptionSource = createEnum(["Sync", "Timeline", "Retry"]);
// use common prefix so it's easy to clear properties that are not e2ee related during session clear
export const SESSION_KEY_PREFIX = "e2ee:";
diff --git a/src/matrix/e2ee/megolm/Decryption.js b/src/matrix/e2ee/megolm/Decryption.js
index bd3665b3..544fa0a3 100644
--- a/src/matrix/e2ee/megolm/Decryption.js
+++ b/src/matrix/e2ee/megolm/Decryption.js
@@ -15,102 +15,102 @@ limitations under the License.
*/
import {DecryptionError} from "../common.js";
-import {DecryptionResult} from "../DecryptionResult.js";
+import {groupBy} from "../../../utils/groupBy.js";
-const CACHE_MAX_SIZE = 10;
+import {SessionInfo} from "./decryption/SessionInfo.js";
+import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
+import {SessionDecryption} from "./decryption/SessionDecryption.js";
+import {SessionCache} from "./decryption/SessionCache.js";
+import {DecryptionWorker} from "./decryption/DecryptionWorker.js";
+
+function getSenderKey(event) {
+ return event.content?.["sender_key"];
+}
+
+function getSessionId(event) {
+ return event.content?.["session_id"];
+}
+
+function getCiphertext(event) {
+ return event.content?.ciphertext;
+}
export class Decryption {
- constructor({pickleKey, olm}) {
+ constructor({pickleKey, olm, workerPool}) {
this._pickleKey = pickleKey;
this._olm = olm;
+ this._decryptor = workerPool ? new DecryptionWorker(workerPool) : null;
}
- createSessionCache() {
- return new SessionCache();
+ createSessionCache(fallback) {
+ return new SessionCache(fallback);
}
/**
- * [decrypt description]
+ * Reads all the state from storage to be able to decrypt the given events.
+ * Decryption can then happen outside of a storage transaction.
* @param {[type]} roomId [description]
- * @param {[type]} event [description]
+ * @param {[type]} events [description]
* @param {[type]} sessionCache [description]
* @param {[type]} txn [description]
- * @return {DecryptionResult?} the decrypted event result, or undefined if the session id is not known.
+ * @return {DecryptionPreparation}
*/
- async decrypt(roomId, event, sessionCache, txn) {
- const senderKey = event.content?.["sender_key"];
- const sessionId = event.content?.["session_id"];
- const ciphertext = event.content?.ciphertext;
+ async prepareDecryptAll(roomId, events, sessionCache, txn) {
+ const errors = new Map();
+ const validEvents = [];
- if (
- typeof senderKey !== "string" ||
- typeof sessionId !== "string" ||
- typeof ciphertext !== "string"
- ) {
- throw new DecryptionError("MEGOLM_INVALID_EVENT", event);
+ for (const event of events) {
+ const isValid = typeof getSenderKey(event) === "string" &&
+ typeof getSessionId(event) === "string" &&
+ typeof getCiphertext(event) === "string";
+ if (isValid) {
+ validEvents.push(event);
+ } else {
+ errors.set(event.event_id, new DecryptionError("MEGOLM_INVALID_EVENT", event))
+ }
}
- let session;
- let claimedKeys;
- const cacheEntry = sessionCache.get(roomId, senderKey, sessionId);
- if (cacheEntry) {
- session = cacheEntry.session;
- claimedKeys = cacheEntry.claimedKeys;
- } else {
+ const eventsBySession = groupBy(validEvents, event => {
+ return `${getSenderKey(event)}|${getSessionId(event)}`;
+ });
+
+ const sessionDecryptions = [];
+
+ await Promise.all(Array.from(eventsBySession.values()).map(async eventsForSession => {
+ const first = eventsForSession[0];
+ const senderKey = getSenderKey(first);
+ const sessionId = getSessionId(first);
+ const sessionInfo = await this._getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn);
+ if (!sessionInfo) {
+ for (const event of eventsForSession) {
+ errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event));
+ }
+ } else {
+ sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._decryptor));
+ }
+ }));
+
+ return new DecryptionPreparation(roomId, sessionDecryptions, errors);
+ }
+
+ async _getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn) {
+ let sessionInfo;
+ sessionInfo = sessionCache.get(roomId, senderKey, sessionId);
+ if (!sessionInfo) {
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
if (sessionEntry) {
- session = new this._olm.InboundGroupSession();
+ let session = new this._olm.InboundGroupSession();
try {
session.unpickle(this._pickleKey, sessionEntry.session);
+ sessionInfo = new SessionInfo(roomId, senderKey, session, sessionEntry.claimedKeys);
} catch (err) {
session.free();
throw err;
}
- claimedKeys = sessionEntry.claimedKeys;
- sessionCache.add(roomId, senderKey, session, claimedKeys);
+ sessionCache.add(sessionInfo);
}
}
- if (!session) {
- return;
- }
- const {plaintext, message_index: messageIndex} = session.decrypt(ciphertext);
- let payload;
- try {
- payload = JSON.parse(plaintext);
- } catch (err) {
- throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err});
- }
- if (payload.room_id !== roomId) {
- throw new DecryptionError("MEGOLM_WRONG_ROOM", event,
- {encryptedRoomId: payload.room_id, eventRoomId: roomId});
- }
- await this._handleReplayAttack(roomId, sessionId, messageIndex, event, txn);
- return new DecryptionResult(payload, senderKey, claimedKeys);
- }
-
- async _handleReplayAttack(roomId, sessionId, messageIndex, event, txn) {
- const eventId = event.event_id;
- const timestamp = event.origin_server_ts;
- const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex);
- if (decryption && decryption.eventId !== eventId) {
- // the one with the newest timestamp should be the attack
- const decryptedEventIsBad = decryption.timestamp < timestamp;
- const badEventId = decryptedEventIsBad ? eventId : decryption.eventId;
- throw new DecryptionError("MEGOLM_REPLAYED_INDEX", event, {
- messageIndex,
- badEventId,
- otherEventId: decryption.eventId
- });
- }
- if (!decryption) {
- txn.groupSessionDecryptions.set({
- roomId,
- sessionId,
- messageIndex,
- eventId,
- timestamp
- });
- }
+ return sessionInfo;
}
/**
@@ -165,55 +165,3 @@ export class Decryption {
}
}
-class SessionCache {
- constructor() {
- this._sessions = [];
- }
-
- /**
- * @type {CacheEntry}
- * @property {InboundGroupSession} session the unpickled session
- * @property {Object} claimedKeys an object with the claimed ed25519 key
- *
- *
- * @param {string} roomId
- * @param {string} senderKey
- * @param {string} sessionId
- * @return {CacheEntry?}
- */
- get(roomId, senderKey, sessionId) {
- const idx = this._sessions.findIndex(s => {
- return s.roomId === roomId &&
- s.senderKey === senderKey &&
- sessionId === s.session.session_id();
- });
- if (idx !== -1) {
- const entry = this._sessions[idx];
- // move to top
- if (idx > 0) {
- this._sessions.splice(idx, 1);
- this._sessions.unshift(entry);
- }
- return entry;
- }
- }
-
- add(roomId, senderKey, session, claimedKeys) {
- // add new at top
- this._sessions.unshift({roomId, senderKey, session, claimedKeys});
- if (this._sessions.length > CACHE_MAX_SIZE) {
- // free sessions we're about to remove
- for (let i = CACHE_MAX_SIZE; i < this._sessions.length; i += 1) {
- this._sessions[i].session.free();
- }
- this._sessions = this._sessions.slice(0, CACHE_MAX_SIZE);
- }
- }
-
- dispose() {
- for (const entry of this._sessions) {
- entry.session.free();
- }
-
- }
-}
diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionChanges.js b/src/matrix/e2ee/megolm/decryption/DecryptionChanges.js
new file mode 100644
index 00000000..5597aaf7
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/DecryptionChanges.js
@@ -0,0 +1,78 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+import {DecryptionError} from "../../common.js";
+
+export class DecryptionChanges {
+ constructor(roomId, results, errors, replayEntries) {
+ this._roomId = roomId;
+ this._results = results;
+ this._errors = errors;
+ this._replayEntries = replayEntries;
+ }
+
+ /**
+ * @type MegolmBatchDecryptionResult
+ * @property {Map} results a map of event id to decryption result
+ * @property {Map} errors event id -> errors
+ *
+ * Handle replay attack detection, and return result
+ * @param {[type]} txn [description]
+ * @return {MegolmBatchDecryptionResult}
+ */
+ async write(txn) {
+ await Promise.all(this._replayEntries.map(async replayEntry => {
+ try {
+ this._handleReplayAttack(this._roomId, replayEntry, txn);
+ } catch (err) {
+ this._errors.set(replayEntry.eventId, err);
+ }
+ }));
+ return {
+ results: this._results,
+ errors: this._errors
+ };
+ }
+
+ async _handleReplayAttack(roomId, replayEntry, txn) {
+ const {messageIndex, sessionId, eventId, timestamp} = replayEntry;
+ const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex);
+
+ if (decryption && decryption.eventId !== eventId) {
+ // the one with the newest timestamp should be the attack
+ const decryptedEventIsBad = decryption.timestamp < timestamp;
+ const badEventId = decryptedEventIsBad ? eventId : decryption.eventId;
+ // discard result
+ this._results.delete(eventId);
+
+ throw new DecryptionError("MEGOLM_REPLAYED_INDEX", event, {
+ messageIndex,
+ badEventId,
+ otherEventId: decryption.eventId
+ });
+ }
+
+ if (!decryption) {
+ txn.groupSessionDecryptions.set({
+ roomId,
+ sessionId,
+ messageIndex,
+ eventId,
+ timestamp
+ });
+ }
+ }
+}
diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js
new file mode 100644
index 00000000..02ee32df
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/DecryptionPreparation.js
@@ -0,0 +1,52 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+import {DecryptionChanges} from "./DecryptionChanges.js";
+import {mergeMap} from "../../../../utils/mergeMap.js";
+
+/**
+ * Class that contains all the state loaded from storage to decrypt the given events
+ */
+export class DecryptionPreparation {
+ constructor(roomId, sessionDecryptions, errors) {
+ this._roomId = roomId;
+ this._sessionDecryptions = sessionDecryptions;
+ this._initialErrors = errors;
+ }
+
+ async decrypt() {
+ try {
+ const errors = this._initialErrors;
+ const results = new Map();
+ const replayEntries = [];
+ await Promise.all(this._sessionDecryptions.map(async sessionDecryption => {
+ const sessionResult = await sessionDecryption.decryptAll();
+ mergeMap(sessionResult.errors, errors);
+ mergeMap(sessionResult.results, results);
+ replayEntries.push(...sessionResult.replayEntries);
+ }));
+ return new DecryptionChanges(this._roomId, results, errors, replayEntries);
+ } finally {
+ this.dispose();
+ }
+ }
+
+ dispose() {
+ for (const sd of this._sessionDecryptions) {
+ sd.dispose();
+ }
+ }
+}
diff --git a/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js
new file mode 100644
index 00000000..b44694a0
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/DecryptionWorker.js
@@ -0,0 +1,26 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+export class DecryptionWorker {
+ constructor(workerPool) {
+ this._workerPool = workerPool;
+ }
+
+ decrypt(session, ciphertext) {
+ const sessionKey = session.export_session(session.first_known_index());
+ return this._workerPool.send({type: "megolm_decrypt", ciphertext, sessionKey});
+ }
+}
diff --git a/src/matrix/e2ee/megolm/decryption/README.md b/src/matrix/e2ee/megolm/decryption/README.md
new file mode 100644
index 00000000..b9bb3568
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/README.md
@@ -0,0 +1,6 @@
+Lots of classes here. The complexity comes from needing to offload decryption to a webworker, mainly for IE11. We can't keep a idb transaction open while waiting for the response from the worker, so need to batch decryption of multiple events and do decryption in multiple steps:
+
+ 1. Read all used inbound sessions for the batch of events, requires a read txn. This happens in `Decryption`. Sessions are loaded into `SessionInfo` objects, which are also kept in a `SessionCache` to prevent having to read and unpickle them all the time.
+ 2. Actually decrypt. No txn can stay open during this step, as it can be offloaded to a worker and is thus async. This happens in `DecryptionPreparation`, which delegates to `SessionDecryption` per session.
+ 3. Read and write for the replay detection, requires a read/write txn. This happens in `DecryptionChanges`
+ 4. Return the decrypted entries, and errors if any
diff --git a/src/matrix/e2ee/megolm/decryption/ReplayDetectionEntry.js b/src/matrix/e2ee/megolm/decryption/ReplayDetectionEntry.js
new file mode 100644
index 00000000..e5ce2845
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/ReplayDetectionEntry.js
@@ -0,0 +1,24 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+export class ReplayDetectionEntry {
+ constructor(sessionId, messageIndex, event) {
+ this.sessionId = sessionId;
+ this.messageIndex = messageIndex;
+ this.eventId = event.event_id;
+ this.timestamp = event.origin_server_ts;
+ }
+}
diff --git a/src/matrix/e2ee/megolm/decryption/SessionCache.js b/src/matrix/e2ee/megolm/decryption/SessionCache.js
new file mode 100644
index 00000000..efb7ef54
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/SessionCache.js
@@ -0,0 +1,68 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+const CACHE_MAX_SIZE = 10;
+
+/**
+ * Cache of unpickled inbound megolm session.
+ */
+export class SessionCache {
+ constructor() {
+ this._sessions = [];
+ }
+
+ /**
+ * @param {string} roomId
+ * @param {string} senderKey
+ * @param {string} sessionId
+ * @return {SessionInfo?}
+ */
+ get(roomId, senderKey, sessionId) {
+ const idx = this._sessions.findIndex(s => {
+ return s.roomId === roomId &&
+ s.senderKey === senderKey &&
+ sessionId === s.session.session_id();
+ });
+ if (idx !== -1) {
+ const sessionInfo = this._sessions[idx];
+ // move to top
+ if (idx > 0) {
+ this._sessions.splice(idx, 1);
+ this._sessions.unshift(sessionInfo);
+ }
+ return sessionInfo;
+ }
+ }
+
+ add(sessionInfo) {
+ sessionInfo.retain();
+ // add new at top
+ this._sessions.unshift(sessionInfo);
+ if (this._sessions.length > CACHE_MAX_SIZE) {
+ // free sessions we're about to remove
+ for (let i = CACHE_MAX_SIZE; i < this._sessions.length; i += 1) {
+ this._sessions[i].release();
+ }
+ this._sessions = this._sessions.slice(0, CACHE_MAX_SIZE);
+ }
+ }
+
+ dispose() {
+ for (const sessionInfo of this._sessions) {
+ sessionInfo.release();
+ }
+ }
+}
diff --git a/src/matrix/e2ee/megolm/decryption/SessionDecryption.js b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js
new file mode 100644
index 00000000..30ca432e
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/SessionDecryption.js
@@ -0,0 +1,90 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+import {DecryptionResult} from "../../DecryptionResult.js";
+import {DecryptionError} from "../../common.js";
+import {ReplayDetectionEntry} from "./ReplayDetectionEntry.js";
+
+/**
+ * Does the actual decryption of all events for a given megolm session in a batch
+ */
+export class SessionDecryption {
+ constructor(sessionInfo, events, decryptor) {
+ sessionInfo.retain();
+ this._sessionInfo = sessionInfo;
+ this._events = events;
+ this._decryptor = decryptor;
+ this._decryptionRequests = decryptor ? [] : null;
+ }
+
+ async decryptAll() {
+ const replayEntries = [];
+ const results = new Map();
+ let errors;
+ const roomId = this._sessionInfo.roomId;
+
+ await Promise.all(this._events.map(async event => {
+ try {
+ const {session} = this._sessionInfo;
+ const ciphertext = event.content.ciphertext;
+ let decryptionResult;
+ if (this._decryptor) {
+ const request = this._decryptor.decrypt(session, ciphertext);
+ this._decryptionRequests.push(request);
+ decryptionResult = await request.response();
+ } else {
+ decryptionResult = session.decrypt(ciphertext);
+ }
+ const plaintext = decryptionResult.plaintext;
+ const messageIndex = decryptionResult.message_index;
+ let payload;
+ try {
+ payload = JSON.parse(plaintext);
+ } catch (err) {
+ throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err});
+ }
+ if (payload.room_id !== roomId) {
+ throw new DecryptionError("MEGOLM_WRONG_ROOM", event,
+ {encryptedRoomId: payload.room_id, eventRoomId: roomId});
+ }
+ replayEntries.push(new ReplayDetectionEntry(session.session_id(), messageIndex, event));
+ const result = new DecryptionResult(payload, this._sessionInfo.senderKey, this._sessionInfo.claimedKeys);
+ results.set(event.event_id, result);
+ } catch (err) {
+ // ignore AbortError from cancelling decryption requests in dispose method
+ if (err.name === "AbortError") {
+ return;
+ }
+ if (!errors) {
+ errors = new Map();
+ }
+ errors.set(event.event_id, err);
+ }
+ }));
+
+ return {results, errors, replayEntries};
+ }
+
+ dispose() {
+ if (this._decryptionRequests) {
+ for (const r of this._decryptionRequests) {
+ r.abort();
+ }
+ }
+ // TODO: cancel decryptions here
+ this._sessionInfo.release();
+ }
+}
diff --git a/src/matrix/e2ee/megolm/decryption/SessionInfo.js b/src/matrix/e2ee/megolm/decryption/SessionInfo.js
new file mode 100644
index 00000000..dedc3222
--- /dev/null
+++ b/src/matrix/e2ee/megolm/decryption/SessionInfo.js
@@ -0,0 +1,44 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * session loaded in memory with everything needed to create DecryptionResults
+ * and to store/retrieve it in the SessionCache
+ */
+export class SessionInfo {
+ constructor(roomId, senderKey, session, claimedKeys) {
+ this.roomId = roomId;
+ this.senderKey = senderKey;
+ this.session = session;
+ this.claimedKeys = claimedKeys;
+ this._refCounter = 0;
+ }
+
+ retain() {
+ this._refCounter += 1;
+ }
+
+ release() {
+ this._refCounter -= 1;
+ if (this._refCounter <= 0) {
+ this.dispose();
+ }
+ }
+
+ dispose() {
+ this.session.free();
+ }
+}
diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js
index a2b84717..3704223e 100644
--- a/src/matrix/room/Room.js
+++ b/src/matrix/room/Room.js
@@ -26,6 +26,9 @@ import {fetchOrLoadMembers} from "./members/load.js";
import {MemberList} from "./members/MemberList.js";
import {Heroes} from "./members/Heroes.js";
import {EventEntry} from "./timeline/entries/EventEntry.js";
+import {DecryptionSource} from "../e2ee/common.js";
+
+const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
export class Room extends EventEmitter {
constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents, user, createRoomEncryption, getSyncToken}) {
@@ -49,43 +52,27 @@ export class Room extends EventEmitter {
async notifyRoomKeys(roomKeys) {
if (this._roomEncryption) {
- // array of {data, isSync}
- let retryEntries = this._roomEncryption.applyRoomKeys(roomKeys);
- let decryptedEntries = [];
- if (retryEntries.length) {
- // groupSessionDecryptions can be written, the other stores not
- const txn = await this._storage.readWriteTxn([
+ let retryEventIds = this._roomEncryption.applyRoomKeys(roomKeys);
+ if (retryEventIds.length) {
+ const retryEntries = [];
+ const txn = await this._storage.readTxn([
this._storage.storeNames.timelineEvents,
this._storage.storeNames.inboundGroupSessions,
- this._storage.storeNames.groupSessionDecryptions,
- this._storage.storeNames.deviceIdentities,
]);
- try {
- for (const retryEntry of retryEntries) {
- const {data: eventKey} = retryEntry;
- let entry = this._timeline?.findEntry(eventKey);
- if (!entry) {
- const storageEntry = await txn.timelineEvents.get(this._roomId, eventKey);
- if (storageEntry) {
- entry = new EventEntry(storageEntry, this._fragmentIdComparer);
- }
- }
- if (entry) {
- entry = await this._decryptEntry(entry, txn, retryEntry.isSync);
- decryptedEntries.push(entry);
- }
+ for (const eventId of retryEventIds) {
+ const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
+ if (storageEntry) {
+ retryEntries.push(new EventEntry(storageEntry, this._fragmentIdComparer));
}
- } catch (err) {
- txn.abort();
- throw err;
}
- await txn.complete();
+ const decryptRequest = this._decryptEntries(DecryptionSource.Retry, retryEntries, txn);
+ await decryptRequest.complete();
+ if (this._timeline) {
+ // only adds if already present
+ this._timeline.replaceEntries(retryEntries);
+ }
+ // pass decryptedEntries to roomSummary
}
- if (this._timeline) {
- // only adds if already present
- this._timeline.replaceEntries(decryptedEntries);
- }
- // pass decryptedEntries to roomSummary
}
}
@@ -94,46 +81,95 @@ export class Room extends EventEmitter {
if (this._roomEncryption) {
this._sendQueue.enableEncryption(this._roomEncryption);
if (this._timeline) {
- this._timeline.enableEncryption(this._decryptEntries.bind(this));
+ this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
}
}
}
- async _decryptEntry(entry, txn, isSync) {
- if (entry.eventType === "m.room.encrypted") {
+ /**
+ * Used for decrypting when loading/filling the timeline, and retrying decryption,
+ * not during sync, where it is split up during the multiple phases.
+ */
+ _decryptEntries(source, entries, inboundSessionTxn = null) {
+ const request = new DecryptionRequest(async r => {
+ if (!inboundSessionTxn) {
+ inboundSessionTxn = await this._storage.readTxn([this._storage.storeNames.inboundGroupSessions]);
+ }
+ if (r.cancelled) return;
+ const events = entries.filter(entry => {
+ return entry.eventType === EVENT_ENCRYPTED_TYPE;
+ }).map(entry => entry.event);
+ const isTimelineOpen = this._isTimelineOpen;
+ r.preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn);
+ if (r.cancelled) return;
+ const changes = await r.preparation.decrypt();
+ r.preparation = null;
+ if (r.cancelled) return;
+ const stores = [this._storage.storeNames.groupSessionDecryptions];
+ if (isTimelineOpen) {
+ // read to fetch devices if timeline is open
+ stores.push(this._storage.storeNames.deviceIdentities);
+ }
+ const writeTxn = await this._storage.readWriteTxn(stores);
+ let decryption;
try {
- const decryptionResult = await this._roomEncryption.decrypt(
- entry.event, isSync, !!this._timeline, entry.asEventKey(), txn);
- if (decryptionResult) {
- entry.setDecryptionResult(decryptionResult);
- }
+ decryption = await changes.write(writeTxn);
} catch (err) {
- console.warn("event decryption error", err, entry.event);
- entry.setDecryptionError(err);
+ writeTxn.abort();
+ throw err;
}
- }
- return entry;
+ await writeTxn.complete();
+ decryption.applyToEntries(entries);
+ });
+ return request;
}
- async _decryptEntries(entries, txn, isSync = false) {
- return await Promise.all(entries.map(async e => this._decryptEntry(e, txn, isSync)));
+ get needsPrepareSync() {
+ // only encrypted rooms need the prepare sync steps
+ return !!this._roomEncryption;
+ }
+
+ async prepareSync(roomResponse, txn) {
+ if (this._roomEncryption) {
+ const events = roomResponse?.timeline?.events;
+ if (Array.isArray(events)) {
+ const eventsToDecrypt = events.filter(event => {
+ return event?.type === EVENT_ENCRYPTED_TYPE;
+ });
+ const preparation = await this._roomEncryption.prepareDecryptAll(
+ eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn);
+ return preparation;
+ }
+ }
+ }
+
+ async afterPrepareSync(preparation) {
+ if (preparation) {
+ const decryptChanges = await preparation.decrypt();
+ return decryptChanges;
+ }
}
/** @package */
- async writeSync(roomResponse, membership, isInitialSync, txn) {
- const isTimelineOpen = !!this._timeline;
+ async writeSync(roomResponse, membership, isInitialSync, decryptChanges, txn) {
+ let decryption;
+ if (this._roomEncryption && decryptChanges) {
+ decryption = await decryptChanges.write(txn);
+ }
+ const {entries, newLiveKey, memberChanges} =
+ await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
+ if (decryption) {
+ decryption.applyToEntries(entries);
+ }
+ // pass member changes to device tracker
+ if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
+ await this._roomEncryption.writeMemberChanges(memberChanges, txn);
+ }
const summaryChanges = this._summary.writeSync(
roomResponse,
membership,
- isInitialSync, isTimelineOpen,
+ isInitialSync, this._isTimelineOpen,
txn);
- const {entries: encryptedEntries, newLiveKey, memberChanges} =
- await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
- // decrypt if applicable
- let entries = encryptedEntries;
- if (this._roomEncryption) {
- entries = await this._decryptEntries(encryptedEntries, txn, true);
- }
// fetch new members while we have txn open,
// but don't make any in-memory changes yet
let heroChanges;
@@ -144,10 +180,6 @@ export class Room extends EventEmitter {
}
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
}
- // pass member changes to device tracker
- if (this._roomEncryption && this.isTrackingMembers && memberChanges?.size) {
- await this._roomEncryption.writeMemberChanges(memberChanges, txn);
- }
let removedPendingEvents;
if (roomResponse.timeline && roomResponse.timeline.events) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
@@ -159,7 +191,6 @@ export class Room extends EventEmitter {
removedPendingEvents,
memberChanges,
heroChanges,
- needsAfterSyncCompleted: this._roomEncryption?.needsToShareKeys(memberChanges)
};
}
@@ -210,6 +241,10 @@ export class Room extends EventEmitter {
}
}
+ needsAfterSyncCompleted({memberChanges}) {
+ return this._roomEncryption?.needsToShareKeys(memberChanges);
+ }
+
/**
* Only called if the result of writeSync had `needsAfterSyncCompleted` set.
* Can be used to do longer running operations that resulted from the last sync,
@@ -299,19 +334,11 @@ export class Room extends EventEmitter {
}
}).response();
- let stores = [
+ const txn = await this._storage.readWriteTxn([
this._storage.storeNames.pendingEvents,
this._storage.storeNames.timelineEvents,
this._storage.storeNames.timelineFragments,
- ];
- if (this._roomEncryption) {
- stores = stores.concat([
- this._storage.storeNames.inboundGroupSessions,
- this._storage.storeNames.groupSessionDecryptions,
- this._storage.storeNames.deviceIdentities,
- ]);
- }
- const txn = await this._storage.readWriteTxn(stores);
+ ]);
let removedPendingEvents;
let gapResult;
try {
@@ -324,14 +351,15 @@ export class Room extends EventEmitter {
fragmentIdComparer: this._fragmentIdComparer,
});
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
- if (this._roomEncryption) {
- gapResult.entries = await this._decryptEntries(gapResult.entries, txn, false);
- }
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
+ if (this._roomEncryption) {
+ const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
+ await decryptRequest.complete();
+ }
// once txn is committed, update in-memory state & emit events
for (const fragment of gapResult.fragments) {
this._fragmentIdComparer.add(fragment);
@@ -406,6 +434,10 @@ export class Room extends EventEmitter {
}
}
+ get _isTimelineOpen() {
+ return !!this._timeline;
+ }
+
async clearUnread() {
if (this.isUnread || this.notificationCount) {
const txn = await this._storage.readWriteTxn([
@@ -438,7 +470,7 @@ export class Room extends EventEmitter {
}
/** @public */
- async openTimeline() {
+ openTimeline() {
if (this._timeline) {
throw new Error("not dealing with load race here for now");
}
@@ -458,9 +490,8 @@ export class Room extends EventEmitter {
user: this._user,
});
if (this._roomEncryption) {
- this._timeline.enableEncryption(this._decryptEntries.bind(this));
+ this._timeline.enableEncryption(this._decryptEntries.bind(this, DecryptionSource.Timeline));
}
- await this._timeline.load();
return this._timeline;
}
@@ -479,3 +510,25 @@ export class Room extends EventEmitter {
}
}
+class DecryptionRequest {
+ constructor(decryptFn) {
+ this._cancelled = false;
+ this.preparation = null;
+ this._promise = decryptFn(this);
+ }
+
+ complete() {
+ return this._promise;
+ }
+
+ get cancelled() {
+ return this._cancelled;
+ }
+
+ dispose() {
+ this._cancelled = true;
+ if (this.preparation) {
+ this.preparation.dispose();
+ }
+ }
+}
diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js
index c2e9d0ce..74362a13 100644
--- a/src/matrix/room/timeline/Timeline.js
+++ b/src/matrix/room/timeline/Timeline.js
@@ -15,10 +15,10 @@ limitations under the License.
*/
import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js";
+import {Disposables} from "../../../utils/Disposables.js";
import {Direction} from "./Direction.js";
import {TimelineReader} from "./persistence/TimelineReader.js";
import {PendingEventEntry} from "./entries/PendingEventEntry.js";
-import {EventEntry} from "./entries/EventEntry.js";
export class Timeline {
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user}) {
@@ -26,12 +26,14 @@ export class Timeline {
this._storage = storage;
this._closeCallback = closeCallback;
this._fragmentIdComparer = fragmentIdComparer;
+ this._disposables = new Disposables();
this._remoteEntries = new SortedArray((a, b) => a.compare(b));
this._timelineReader = new TimelineReader({
roomId: this._roomId,
storage: this._storage,
fragmentIdComparer: this._fragmentIdComparer
});
+ this._readerRequest = null;
const localEntries = new MappedList(pendingEvents, pe => {
return new PendingEventEntry({pendingEvent: pe, user});
}, (pee, params) => {
@@ -42,22 +44,13 @@ export class Timeline {
/** @package */
async load() {
- const entries = await this._timelineReader.readFromEnd(50);
- this._remoteEntries.setManySorted(entries);
- }
-
- findEntry(eventKey) {
- // a storage event entry has a fragmentId and eventIndex property, used for sorting,
- // just like an EventKey, so this will work, but perhaps a bit brittle.
- const entry = new EventEntry(eventKey, this._fragmentIdComparer);
+ // 30 seems to be a good amount to fill the entire screen
+ const readerRequest = this._disposables.track(this._timelineReader.readFromEnd(30));
try {
- const idx = this._remoteEntries.indexOf(entry);
- if (idx !== -1) {
- return this._remoteEntries.get(idx);
- }
- } catch (err) {
- // fragmentIdComparer threw, ignore
- return;
+ const entries = await readerRequest.complete();
+ this._remoteEntries.setManySorted(entries);
+ } finally {
+ this._disposables.disposeTracked(readerRequest);
}
}
@@ -86,12 +79,17 @@ export class Timeline {
if (!firstEventEntry) {
return;
}
- const entries = await this._timelineReader.readFrom(
+ const readerRequest = this._disposables.track(this._timelineReader.readFrom(
firstEventEntry.asEventKey(),
Direction.Backward,
amount
- );
- this._remoteEntries.setManySorted(entries);
+ ));
+ try {
+ const entries = await readerRequest.complete();
+ this._remoteEntries.setManySorted(entries);
+ } finally {
+ this._disposables.disposeTracked(readerRequest);
+ }
}
/** @public */
@@ -100,8 +98,9 @@ export class Timeline {
}
/** @public */
- close() {
+ dispose() {
if (this._closeCallback) {
+ this._disposables.dispose();
this._closeCallback();
this._closeCallback = null;
}
diff --git a/src/matrix/room/timeline/persistence/SyncWriter.js b/src/matrix/room/timeline/persistence/SyncWriter.js
index 130b22d1..9f42163d 100644
--- a/src/matrix/room/timeline/persistence/SyncWriter.js
+++ b/src/matrix/room/timeline/persistence/SyncWriter.js
@@ -140,7 +140,7 @@ export class SyncWriter {
async _writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn) {
const memberChanges = new Map();
- if (timeline.events) {
+ if (Array.isArray(timeline.events)) {
const events = deduplicateEvents(timeline.events);
for(const event of events) {
// store event in timeline
@@ -220,6 +220,7 @@ export class SyncWriter {
// important this happens before _writeTimeline so
// members are available in the transaction
const memberChanges = await this._writeStateEvents(roomResponse, trackNewlyJoined, txn);
+ // TODO: remove trackNewlyJoined and pass in memberChanges
const timelineResult = await this._writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn);
currentKey = timelineResult.currentKey;
// merge member changes from state and timeline, giving precedence to the latter
diff --git a/src/matrix/room/timeline/persistence/TimelineReader.js b/src/matrix/room/timeline/persistence/TimelineReader.js
index 4446eaf1..f5983a19 100644
--- a/src/matrix/room/timeline/persistence/TimelineReader.js
+++ b/src/matrix/room/timeline/persistence/TimelineReader.js
@@ -19,6 +19,24 @@ import {Direction} from "../Direction.js";
import {EventEntry} from "../entries/EventEntry.js";
import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js";
+class ReaderRequest {
+ constructor(fn) {
+ this.decryptRequest = null;
+ this._promise = fn(this);
+ }
+
+ complete() {
+ return this._promise;
+ }
+
+ dispose() {
+ if (this.decryptRequest) {
+ this.decryptRequest.dispose();
+ this.decryptRequest = null;
+ }
+ }
+}
+
export class TimelineReader {
constructor({roomId, storage, fragmentIdComparer}) {
this._roomId = roomId;
@@ -32,37 +50,43 @@ export class TimelineReader {
}
_openTxn() {
+ const stores = [
+ this._storage.storeNames.timelineEvents,
+ this._storage.storeNames.timelineFragments,
+ ];
if (this._decryptEntries) {
- return this._storage.readWriteTxn([
- this._storage.storeNames.timelineEvents,
- this._storage.storeNames.timelineFragments,
- this._storage.storeNames.inboundGroupSessions,
- this._storage.storeNames.groupSessionDecryptions,
- this._storage.storeNames.deviceIdentities,
- ]);
-
- } else {
- return this._storage.readTxn([
- this._storage.storeNames.timelineEvents,
- this._storage.storeNames.timelineFragments,
- ]);
+ stores.push(this._storage.storeNames.inboundGroupSessions);
}
+ return this._storage.readTxn(stores);
}
- async readFrom(eventKey, direction, amount) {
- const txn = await this._openTxn();
- let entries;
- try {
- entries = await this._readFrom(eventKey, direction, amount, txn);
- } catch (err) {
- txn.abort();
- throw err;
- }
- await txn.complete();
- return entries;
+ readFrom(eventKey, direction, amount) {
+ return new ReaderRequest(async r => {
+ const txn = await this._openTxn();
+ return await this._readFrom(eventKey, direction, amount, r, txn);
+ });
}
- async _readFrom(eventKey, direction, amount, txn) {
+ readFromEnd(amount) {
+ return new ReaderRequest(async r => {
+ const txn = await this._openTxn();
+ const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
+ let entries;
+ // room hasn't been synced yet
+ if (!liveFragment) {
+ entries = [];
+ } else {
+ this._fragmentIdComparer.add(liveFragment);
+ const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
+ const eventKey = liveFragmentEntry.asEventKey();
+ entries = await this._readFrom(eventKey, Direction.Backward, amount, r, txn);
+ entries.unshift(liveFragmentEntry);
+ }
+ return entries;
+ });
+ }
+
+ async _readFrom(eventKey, direction, amount, r, txn) {
let entries = [];
const timelineStore = txn.timelineEvents;
const fragmentStore = txn.timelineFragments;
@@ -75,9 +99,6 @@ export class TimelineReader {
eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount);
}
let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, this._fragmentIdComparer));
- if (this._decryptEntries) {
- eventEntries = await this._decryptEntries(eventEntries, txn);
- }
entries = directionalConcat(entries, eventEntries, direction);
// prepend or append eventsWithinFragment to entries, and wrap them in EventEntry
@@ -100,29 +121,14 @@ export class TimelineReader {
}
}
- return entries;
- }
-
- async readFromEnd(amount) {
- const txn = await this._openTxn();
- let entries;
- try {
- const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
- // room hasn't been synced yet
- if (!liveFragment) {
- entries = [];
- } else {
- this._fragmentIdComparer.add(liveFragment);
- const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
- const eventKey = liveFragmentEntry.asEventKey();
- entries = await this._readFrom(eventKey, Direction.Backward, amount, txn);
- entries.unshift(liveFragmentEntry);
+ if (this._decryptEntries) {
+ r.decryptRequest = this._decryptEntries(entries, txn);
+ try {
+ await r.decryptRequest.complete();
+ } finally {
+ r.decryptRequest = null;
}
- } catch (err) {
- txn.abort();
- throw err;
}
- await txn.complete();
return entries;
}
}
diff --git a/src/utils/Disposables.js b/src/utils/Disposables.js
index e5690319..efc49897 100644
--- a/src/utils/Disposables.js
+++ b/src/utils/Disposables.js
@@ -28,7 +28,11 @@ export class Disposables {
}
track(disposable) {
+ if (this.isDisposed) {
+ throw new Error("Already disposed, check isDisposed after await if needed");
+ }
this._disposables.push(disposable);
+ return disposable;
}
dispose() {
@@ -40,8 +44,12 @@ export class Disposables {
}
}
+ get isDisposed() {
+ return this._disposables === null;
+ }
+
disposeTracked(value) {
- if (value === undefined || value === null) {
+ if (value === undefined || value === null || this.isDisposed) {
return null;
}
const idx = this._disposables.indexOf(value);
diff --git a/src/utils/WorkerPool.js b/src/utils/WorkerPool.js
new file mode 100644
index 00000000..56feaf8c
--- /dev/null
+++ b/src/utils/WorkerPool.js
@@ -0,0 +1,211 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+import {AbortError} from "./error.js";
+
+class WorkerState {
+ constructor(worker) {
+ this.worker = worker;
+ this.busy = false;
+ }
+
+ attach(pool) {
+ this.worker.addEventListener("message", pool);
+ this.worker.addEventListener("error", pool);
+ }
+
+ detach(pool) {
+ this.worker.removeEventListener("message", pool);
+ this.worker.removeEventListener("error", pool);
+ }
+}
+
+class Request {
+ constructor(message, pool) {
+ this._promise = new Promise((_resolve, _reject) => {
+ this._resolve = _resolve;
+ this._reject = _reject;
+ });
+ this._message = message;
+ this._pool = pool;
+ this._worker = null;
+ }
+
+ abort() {
+ if (this._isNotDisposed) {
+ this._pool._abortRequest(this);
+ this._dispose();
+ }
+ }
+
+ response() {
+ return this._promise;
+ }
+
+ _dispose() {
+ this._reject = null;
+ this._resolve = null;
+ }
+
+ get _isNotDisposed() {
+ return this._resolve && this._reject;
+ }
+}
+
+export class WorkerPool {
+ // TODO: extract DOM specific bits and write unit tests
+ constructor(path, amount) {
+ this._workers = [];
+ for (let i = 0; i < amount ; ++i) {
+ const worker = new WorkerState(new Worker(path));
+ worker.attach(this);
+ this._workers[i] = worker;
+ }
+ this._requests = new Map();
+ this._counter = 0;
+ this._pendingFlag = false;
+ this._init = null;
+
+ }
+
+ init() {
+ const promise = new Promise((resolve, reject) => {
+ this._init = {resolve, reject};
+ });
+ this.sendAll({type: "ping"})
+ .then(this._init.resolve, this._init.reject)
+ .finally(() => {
+ this._init = null;
+ });
+ return promise;
+ }
+
+ handleEvent(e) {
+ if (e.type === "message") {
+ const message = e.data;
+ const request = this._requests.get(message.replyToId);
+ if (request) {
+ request._worker.busy = false;
+ if (request._isNotDisposed) {
+ if (message.type === "success") {
+ request._resolve(message.payload);
+ } else if (message.type === "error") {
+ request._reject(new Error(message.stack));
+ }
+ request._dispose();
+ }
+ this._requests.delete(message.replyToId);
+ }
+ this._sendPending();
+ } else if (e.type === "error") {
+ if (this._init) {
+ this._init.reject(new Error("worker error during init"));
+ }
+ console.error("worker error", e);
+ }
+ }
+
+ _getPendingRequest() {
+ for (const r of this._requests.values()) {
+ if (!r._worker) {
+ return r;
+ }
+ }
+ }
+
+ _getFreeWorker() {
+ for (const w of this._workers) {
+ if (!w.busy) {
+ return w;
+ }
+ }
+ }
+
+ _sendPending() {
+ this._pendingFlag = false;
+ let success;
+ do {
+ success = false;
+ const request = this._getPendingRequest();
+ if (request) {
+ const worker = this._getFreeWorker();
+ if (worker) {
+ this._sendWith(request, worker);
+ success = true;
+ }
+ }
+ } while (success);
+ }
+
+ _sendWith(request, worker) {
+ request._worker = worker;
+ worker.busy = true;
+ worker.worker.postMessage(request._message);
+ }
+
+ _enqueueRequest(message) {
+ this._counter += 1;
+ message.id = this._counter;
+ const request = new Request(message, this);
+ this._requests.set(message.id, request);
+ return request;
+ }
+
+ send(message) {
+ const request = this._enqueueRequest(message);
+ const worker = this._getFreeWorker();
+ if (worker) {
+ this._sendWith(request, worker);
+ }
+ return request;
+ }
+
+ // assumes all workers are free atm
+ sendAll(message) {
+ const promises = this._workers.map(worker => {
+ const request = this._enqueueRequest(Object.assign({}, message));
+ this._sendWith(request, worker);
+ return request.response();
+ });
+ return Promise.all(promises);
+ }
+
+ dispose() {
+ for (const w of this._workers) {
+ w.detach(this);
+ w.worker.terminate();
+ }
+ }
+
+ _trySendPendingInNextTick() {
+ if (!this._pendingFlag) {
+ this._pendingFlag = true;
+ Promise.resolve().then(() => {
+ this._sendPending();
+ });
+ }
+ }
+
+ _abortRequest(request) {
+ request._reject(new AbortError());
+ if (request._worker) {
+ request._worker.busy = false;
+ }
+ this._requests.delete(request._message.id);
+ // allow more requests to be aborted before trying to send other pending
+ this._trySendPendingInNextTick();
+ }
+}
diff --git a/src/utils/mergeMap.js b/src/utils/mergeMap.js
new file mode 100644
index 00000000..a0aed207
--- /dev/null
+++ b/src/utils/mergeMap.js
@@ -0,0 +1,41 @@
+/*
+Copyright 2020 Bruno Windels
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+export function mergeMap(src, dst) {
+ if (src) {
+ for (const [key, value] of src.entries()) {
+ dst.set(key, value);
+ }
+ }
+}
+
+export function tests() {
+ return {
+ "mergeMap with src": assert => {
+ const src = new Map();
+ src.set(1, "a");
+ const dst = new Map();
+ dst.set(2, "b");
+ mergeMap(src, dst);
+ assert.equal(dst.get(1), "a");
+ assert.equal(dst.get(2), "b");
+ assert.equal(src.get(2), null);
+ },
+ "mergeMap without src doesn't fail": () => {
+ mergeMap(undefined, new Map());
+ }
+ }
+}
diff --git a/src/worker-polyfill.js b/src/worker-polyfill.js
new file mode 100644
index 00000000..15b955d5
--- /dev/null
+++ b/src/worker-polyfill.js
@@ -0,0 +1,23 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+
+// polyfills needed for IE11
+// just enough to run olm, have promises and async/await
+import "regenerator-runtime/runtime";
+import "core-js/modules/es.promise";
+import "core-js/modules/es.math.imul";
+import "core-js/modules/es.math.clz32";
diff --git a/src/worker.js b/src/worker.js
new file mode 100644
index 00000000..7c6642fb
--- /dev/null
+++ b/src/worker.js
@@ -0,0 +1,108 @@
+/*
+Copyright 2020 The Matrix.org Foundation C.I.C.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+function asErrorMessage(err) {
+ return {
+ type: "error",
+ message: err.message,
+ stack: err.stack
+ };
+}
+
+function asSuccessMessage(payload) {
+ return {
+ type: "success",
+ payload
+ };
+}
+
+class MessageHandler {
+ constructor() {
+ this._olm = null;
+ }
+
+ handleEvent(e) {
+ if (e.type === "message") {
+ this._handleMessage(e.data);
+ }
+ }
+
+ _sendReply(refMessage, reply) {
+ reply.replyToId = refMessage.id;
+ self.postMessage(reply);
+ }
+
+ _toMessage(fn) {
+ try {
+ let payload = fn();
+ if (payload instanceof Promise) {
+ return payload.then(
+ payload => asSuccessMessage(payload),
+ err => asErrorMessage(err)
+ );
+ } else {
+ return asSuccessMessage(payload);
+ }
+ } catch (err) {
+ return asErrorMessage(err);
+ }
+ }
+
+ _loadOlm(path) {
+ return this._toMessage(async () => {
+ // might have some problems here with window vs self as global object?
+ if (self.msCrypto && !self.crypto) {
+ self.crypto = self.msCrypto;
+ }
+ self.importScripts(path);
+ const olm = self.olm_exports;
+ // mangle the globals enough to make olm load believe it is running in a browser
+ self.window = self;
+ self.document = {};
+ await olm.init();
+ delete self.document;
+ delete self.window;
+ this._olm = olm;
+ });
+ }
+
+ _megolmDecrypt(sessionKey, ciphertext) {
+ return this._toMessage(() => {
+ let session;
+ try {
+ session = new this._olm.InboundGroupSession();
+ session.import_session(sessionKey);
+ // returns object with plaintext and message_index
+ return session.decrypt(ciphertext);
+ } finally {
+ session?.free();
+ }
+ });
+ }
+
+ async _handleMessage(message) {
+ const {type} = message;
+ if (type === "ping") {
+ this._sendReply(message, {type: "success"});
+ } else if (type === "load_olm") {
+ this._sendReply(message, await this._loadOlm(message.path));
+ } else if (type === "megolm_decrypt") {
+ this._sendReply(message, this._megolmDecrypt(message.sessionKey, message.ciphertext));
+ }
+ }
+}
+
+self.addEventListener("message", new MessageHandler());