Merge pull request #250 from vector-im/bwindels/olm-in-prepare-stage

Provide new room keys in prepare decryption phase of sync
This commit is contained in:
Bruno Windels 2021-03-03 13:03:25 +00:00 committed by GitHub
commit e191880379
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 477 additions and 279 deletions

View file

@ -14,11 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {OLM_ALGORITHM, MEGOLM_ALGORITHM} from "./e2ee/common.js";
import {countBy} from "../utils/groupBy.js";
// key to store in session store
const PENDING_ENCRYPTED_EVENTS = "pendingEncryptedDeviceEvents";
import {OLM_ALGORITHM} from "./e2ee/common.js";
import {countBy, groupBy} from "../utils/groupBy.js";
export class DeviceMessageHandler {
constructor({storage}) {
@ -32,90 +29,50 @@ export class DeviceMessageHandler {
this._megolmDecryption = megolmDecryption;
}
/**
* @return {bool} whether messages are waiting to be decrypted and `decryptPending` should be called.
*/
async writeSync(toDeviceEvents, txn, log) {
obtainSyncLock(toDeviceEvents) {
return this._olmDecryption?.obtainDecryptionLock(toDeviceEvents);
}
async prepareSync(toDeviceEvents, lock, txn, log) {
log.set("messageTypes", countBy(toDeviceEvents, e => e.type));
const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted");
log.set("eventsCount", countBy(toDeviceEvents, e => e.type));
if (!encryptedEvents.length) {
return false;
}
// store encryptedEvents
let pendingEvents = await this._getPendingEvents(txn);
pendingEvents = pendingEvents.concat(encryptedEvents);
txn.session.set(PENDING_ENCRYPTED_EVENTS, pendingEvents);
// we don't handle anything other for now
return true;
}
/**
* [_writeDecryptedEvents description]
* @param {Array<DecryptionResult>} olmResults
* @param {[type]} txn [description]
* @return {[type]} [description]
*/
async _writeDecryptedEvents(olmResults, txn, log) {
const megOlmRoomKeysResults = olmResults.filter(r => {
return r.event?.type === "m.room_key" && r.event.content?.algorithm === MEGOLM_ALGORITHM;
});
let roomKeys;
log.set("eventsCount", countBy(olmResults, r => r.event.type));
log.set("roomKeys", megOlmRoomKeysResults.length);
if (megOlmRoomKeysResults.length) {
roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysResults, txn, log);
}
log.set("newRoomKeys", roomKeys.length);
return {roomKeys};
}
async _applyDecryptChanges(rooms, {roomKeys}) {
if (Array.isArray(roomKeys)) {
for (const roomKey of roomKeys) {
const room = rooms.get(roomKey.roomId);
// TODO: this is less parallized than it could be (like sync)
await room?.notifyRoomKey(roomKey);
}
}
}
// not safe to call multiple times without awaiting first call
async decryptPending(rooms, log) {
if (!this._olmDecryption) {
return;
}
const readTxn = this._storage.readTxn([this._storage.storeNames.session]);
const pendingEvents = await this._getPendingEvents(readTxn);
log.set("eventCount", pendingEvents.length);
if (pendingEvents.length === 0) {
log.log("can't decrypt, encryption not enabled", log.level.Warn);
return;
}
// only know olm for now
const olmEvents = pendingEvents.filter(e => e.content?.algorithm === OLM_ALGORITHM);
const decryptChanges = await this._olmDecryption.decryptAll(olmEvents);
for (const err of decryptChanges.errors) {
const olmEvents = encryptedEvents.filter(e => e.content?.algorithm === OLM_ALGORITHM);
if (olmEvents.length) {
const olmDecryptChanges = await this._olmDecryption.decryptAll(olmEvents, lock, txn);
log.set("decryptedTypes", countBy(olmDecryptChanges.results, r => r.event?.type));
for (const err of olmDecryptChanges.errors) {
log.child("decrypt_error").catch(err);
}
const txn = this._storage.readWriteTxn([
// both to remove the pending events and to modify the olm account
this._storage.storeNames.session,
this._storage.storeNames.olmSessions,
this._storage.storeNames.inboundGroupSessions,
]);
let changes;
try {
changes = await this._writeDecryptedEvents(decryptChanges.results, txn, log);
decryptChanges.write(txn);
txn.session.remove(PENDING_ENCRYPTED_EVENTS);
} catch (err) {
txn.abort();
throw err;
const newRoomKeys = this._megolmDecryption.roomKeysFromDeviceMessages(olmDecryptChanges.results, log);
return new SyncPreparation(olmDecryptChanges, newRoomKeys);
}
await txn.complete();
await this._applyDecryptChanges(rooms, changes);
}
async _getPendingEvents(txn) {
return (await txn.session.get(PENDING_ENCRYPTED_EVENTS)) || [];
/** check that prep is not undefined before calling this */
async writeSync(prep, txn) {
// write olm changes
prep.olmDecryptChanges.write(txn);
await Promise.all(prep.newRoomKeys.map(key => this._megolmDecryption.writeRoomKey(key, txn)));
}
}
class SyncPreparation {
constructor(olmDecryptChanges, newRoomKeys) {
this.olmDecryptChanges = olmDecryptChanges;
this.newRoomKeys = newRoomKeys;
this.newKeysByRoom = groupBy(newRoomKeys, r => r.roomId);
}
dispose() {
if (this.newRoomKeys) {
for (const k of this.newRoomKeys) {
k.dispose();
}
}
}
}

View file

@ -374,12 +374,25 @@ export class Session {
return room;
}
async obtainSyncLock(syncResponse) {
const toDeviceEvents = syncResponse.to_device?.events;
if (Array.isArray(toDeviceEvents) && toDeviceEvents.length) {
return await this._deviceMessageHandler.obtainSyncLock(toDeviceEvents);
}
}
async prepareSync(syncResponse, lock, txn, log) {
const toDeviceEvents = syncResponse.to_device?.events;
if (Array.isArray(toDeviceEvents) && toDeviceEvents.length) {
return await log.wrap("deviceMsgs", log => this._deviceMessageHandler.prepareSync(toDeviceEvents, lock, txn, log));
}
}
/** @internal */
async writeSync(syncResponse, syncFilterId, txn, log) {
async writeSync(syncResponse, syncFilterId, preparation, txn, log) {
const changes = {
syncInfo: null,
e2eeAccountChanges: null,
deviceMessageDecryptionPending: false
};
const syncToken = syncResponse.next_batch;
if (syncToken !== this.syncToken) {
@ -399,10 +412,8 @@ export class Session {
await log.wrap("deviceLists", log => this._deviceTracker.writeDeviceChanges(deviceLists.changed, txn, log));
}
const toDeviceEvents = syncResponse.to_device?.events;
if (Array.isArray(toDeviceEvents) && toDeviceEvents.length) {
changes.deviceMessageDecryptionPending =
await log.wrap("deviceMsgs", log => this._deviceMessageHandler.writeSync(toDeviceEvents, txn, log));
if (preparation) {
await log.wrap("deviceMsgs", log => this._deviceMessageHandler.writeSync(preparation, txn, log));
}
// store account data
@ -431,9 +442,6 @@ export class Session {
/** @internal */
async afterSyncCompleted(changes, isCatchupSync, log) {
const promises = [];
if (changes.deviceMessageDecryptionPending) {
promises.push(log.wrap("decryptPending", log => this._deviceMessageHandler.decryptPending(this.rooms, log)));
}
// we don't start uploading one-time keys until we've caught up with
// to-device messages, to help us avoid throwing away one-time-keys that we
// are about to receive messages for

View file

@ -40,7 +40,7 @@ function timelineIsEmpty(roomResponse) {
* Sync steps in js-pseudocode:
* ```js
* // can only read some stores
* const preparation = await room.prepareSync(roomResponse, membership, prepareTxn);
* const preparation = await room.prepareSync(roomResponse, membership, newRoomKeys, prepareTxn);
* // can do async work that is not related to storage (such as decryption)
* await room.afterPrepareSync(preparation);
* // writes and calculates changes
@ -190,15 +190,21 @@ export class Sync {
const response = await this._currentRequest.response();
const isInitialSync = !syncToken;
const sessionState = new SessionSyncProcessState();
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
await log.wrap("prepare", log => this._prepareRooms(roomStates, log));
let sessionChanges;
try {
// take a lock on olm sessions used in this sync so sending a message doesn't change them while syncing
sessionState.lock = await log.wrap("obtainSyncLock", () => this._session.obtainSyncLock(response));
await log.wrap("prepare", log => this._prepareSessionAndRooms(sessionState, roomStates, response, log));
await log.wrap("afterPrepareSync", log => Promise.all(roomStates.map(rs => {
return rs.room.afterPrepareSync(rs.preparation, log);
})));
await log.wrap("write", async log => {
const syncTxn = this._openSyncTxn();
try {
sessionChanges = await log.wrap("session", log => this._session.writeSync(response, syncFilterId, syncTxn, log));
sessionState.changes = await log.wrap("session", log => this._session.writeSync(
response, syncFilterId, sessionState.preparation, syncTxn, log));
await Promise.all(roomStates.map(async rs => {
rs.changes = await log.wrap("room", log => rs.room.writeSync(
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
@ -216,9 +222,12 @@ export class Sync {
}
await syncTxn.complete();
});
} finally {
sessionState.dispose();
}
log.wrap("after", log => {
log.wrap("session", log => this._session.afterSync(sessionChanges, log), log.level.Detail);
log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail);
// emit room related events after txn has been closed
for(let rs of roomStates) {
log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail);
@ -229,7 +238,7 @@ export class Sync {
return {
syncToken: response.next_batch,
roomStates,
sessionChanges,
sessionChanges: sessionState.changes,
hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0,
};
}
@ -237,18 +246,40 @@ export class Sync {
_openPrepareSyncTxn() {
const storeNames = this._storage.storeNames;
return this._storage.readTxn([
storeNames.olmSessions,
storeNames.inboundGroupSessions,
]);
}
async _prepareRooms(roomStates, log) {
async _prepareSessionAndRooms(sessionState, roomStates, response, log) {
const prepareTxn = this._openPrepareSyncTxn();
sessionState.preparation = await log.wrap("session", log => this._session.prepareSync(
response, sessionState.lock, prepareTxn, log));
const newKeysByRoom = sessionState.preparation?.newKeysByRoom;
// add any rooms with new keys but no sync response to the list of rooms to be synced
if (newKeysByRoom) {
const {hasOwnProperty} = Object.prototype;
for (const roomId of newKeysByRoom.keys()) {
const isRoomInResponse = response.rooms?.join && hasOwnProperty.call(response.rooms.join, roomId);
if (!isRoomInResponse) {
let room = this._session.rooms.get(roomId);
if (room) {
roomStates.push(new RoomSyncProcessState(room, {}, room.membership));
}
}
}
}
await Promise.all(roomStates.map(async rs => {
rs.preparation = await log.wrap("room", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log), log.level.Detail);
const newKeys = newKeysByRoom?.get(rs.room.id);
rs.preparation = await log.wrap("room", log => rs.room.prepareSync(
rs.roomResponse, rs.membership, newKeys, prepareTxn, log), log.level.Detail);
}));
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
await prepareTxn.complete();
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation, log)));
}
_openSyncTxn() {
@ -269,6 +300,9 @@ export class Sync {
storeNames.outboundGroupSessions,
storeNames.operations,
storeNames.accountData,
// to decrypt and store new room keys
storeNames.olmSessions,
storeNames.inboundGroupSessions,
]);
}
@ -311,6 +345,19 @@ export class Sync {
}
}
class SessionSyncProcessState {
constructor() {
this.lock = null;
this.preparation = null;
this.changes = null;
}
dispose() {
this.lock?.release();
this.preparation?.dispose();
}
}
class RoomSyncProcessState {
constructor(room, roomResponse, membership) {
this.room = room;

View file

@ -93,7 +93,7 @@ export class RoomEncryption {
// this happens before entries exists, as they are created by the syncwriter
// but we want to be able to map it back to something in the timeline easily
// when retrying decryption.
async prepareDecryptAll(events, source, isTimelineOpen, txn) {
async prepareDecryptAll(events, newKeys, source, isTimelineOpen, txn) {
const errors = new Map();
const validEvents = [];
for (const event of events) {
@ -107,6 +107,8 @@ export class RoomEncryption {
}
let customCache;
let sessionCache;
// we have different caches so we can keep them small but still
// have backfill and sync not invalidate each other
if (source === DecryptionSource.Sync) {
sessionCache = this._megolmSyncCache;
} else if (source === DecryptionSource.Timeline) {
@ -120,7 +122,7 @@ export class RoomEncryption {
throw new Error("Unknown source: " + source);
}
const preparation = await this._megolmDecryption.prepareDecryptAll(
this._room.id, validEvents, sessionCache, txn);
this._room.id, validEvents, newKeys, sessionCache, txn);
if (customCache) {
customCache.dispose();
}
@ -188,21 +190,28 @@ export class RoomEncryption {
console.warn("Got session key back from backup with different sender key, ignoring", {session, senderKey});
return;
}
const txn = this._storage.readWriteTxn([this._storage.storeNames.inboundGroupSessions]);
let roomKey;
let roomKey = this._megolmDecryption.roomKeyFromBackup(this._room.id, sessionId, session);
if (roomKey) {
let keyIsBestOne = false;
try {
roomKey = await this._megolmDecryption.addRoomKeyFromBackup(
this._room.id, sessionId, session, txn);
const txn = this._storage.readWriteTxn([this._storage.storeNames.inboundGroupSessions]);
try {
keyIsBestOne = await this._megolmDecryption.writeRoomKey(roomKey, txn);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
if (roomKey) {
// this will reattempt decryption
} finally {
// can still access properties on it afterwards
// this is just clearing the internal sessionInfo
roomKey.dispose();
}
if (keyIsBestOne) {
// wrote the key, meaning we didn't have a better one, go ahead and reattempt decryption
await this._room.notifyRoomKey(roomKey);
}
}
} else if (session?.algorithm) {
console.info(`Backed-up session of unknown algorithm: ${session.algorithm}`);
}
@ -212,12 +221,7 @@ export class RoomEncryption {
}
/**
* @type {RoomKeyDescription}
* @property {RoomKeyDescription} senderKey the curve25519 key of the sender
* @property {RoomKeyDescription} sessionId
*
*
* @param {Array<RoomKeyDescription>} roomKeys
* @param {RoomKey} roomKeys
* @return {Array<string>} the event ids that should be retried to decrypt
*/
getEventIdsForRoomKey(roomKey) {
@ -396,6 +400,18 @@ export class RoomEncryption {
await hsApi.sendToDevice(type, payload, txnId, {log}).response();
}
filterEventEntriesForKeys(entries, keys) {
return entries.filter(entry => {
const {event} = entry;
if (event) {
const senderKey = event.content?.["sender_key"];
const sessionId = event.content?.["session_id"];
return keys.some(key => senderKey === key.senderKey && sessionId === key.sessionId);
}
return false;
});
}
dispose() {
this._disposed = true;
this._megolmBackfillCache.dispose();

View file

@ -16,11 +16,12 @@ limitations under the License.
import {DecryptionError} from "../common.js";
import {groupBy} from "../../../utils/groupBy.js";
import * as RoomKey from "./decryption/RoomKey.js";
import {SessionInfo} from "./decryption/SessionInfo.js";
import {DecryptionPreparation} from "./decryption/DecryptionPreparation.js";
import {SessionDecryption} from "./decryption/SessionDecryption.js";
import {SessionCache} from "./decryption/SessionCache.js";
import {MEGOLM_ALGORITHM} from "../common.js";
function getSenderKey(event) {
return event.content?.["sender_key"];
@ -50,11 +51,12 @@ export class Decryption {
* Decryption can then happen outside of a storage transaction.
* @param {[type]} roomId [description]
* @param {[type]} events [description]
* @param {RoomKey[]?} newKeys keys as returned from extractRoomKeys, but not yet committed to storage. May be undefined.
* @param {[type]} sessionCache [description]
* @param {[type]} txn [description]
* @return {DecryptionPreparation}
*/
async prepareDecryptAll(roomId, events, sessionCache, txn) {
async prepareDecryptAll(roomId, events, newKeys, sessionCache, txn) {
const errors = new Map();
const validEvents = [];
@ -74,27 +76,38 @@ export class Decryption {
});
const sessionDecryptions = [];
await Promise.all(Array.from(eventsBySession.values()).map(async eventsForSession => {
const first = eventsForSession[0];
const senderKey = getSenderKey(first);
const sessionId = getSessionId(first);
const sessionInfo = await this._getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn);
if (!sessionInfo) {
const firstEvent = eventsForSession[0];
const sessionInfo = await this._getSessionInfoForEvent(roomId, firstEvent, newKeys, sessionCache, txn);
if (sessionInfo) {
sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._olmWorker));
} else {
for (const event of eventsForSession) {
errors.set(event.event_id, new DecryptionError("MEGOLM_NO_SESSION", event));
}
} else {
sessionDecryptions.push(new SessionDecryption(sessionInfo, eventsForSession, this._olmWorker));
}
}));
return new DecryptionPreparation(roomId, sessionDecryptions, errors);
}
async _getSessionInfo(roomId, senderKey, sessionId, sessionCache, txn) {
async _getSessionInfoForEvent(roomId, event, newKeys, sessionCache, txn) {
const senderKey = getSenderKey(event);
const sessionId = getSessionId(event);
let sessionInfo;
if (newKeys) {
const key = newKeys.find(k => k.roomId === roomId && k.senderKey === senderKey && k.sessionId === sessionId);
if (key) {
sessionInfo = await key.createSessionInfo(this._olm, this._pickleKey, txn);
if (sessionInfo) {
sessionCache.add(sessionInfo);
}
}
}
// look only in the cache after looking into newKeys as it may contains that are better
if (!sessionInfo) {
sessionInfo = sessionCache.get(roomId, senderKey, sessionId);
}
if (!sessionInfo) {
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
if (sessionEntry) {
@ -113,111 +126,45 @@ export class Decryption {
}
/**
* @type {MegolmInboundSessionDescription}
* @property {string} senderKey the sender key of the session
* @property {string} sessionId the session identifier
*
* Adds room keys as inbound group sessions
* @param {Array<OlmDecryptionResult>} decryptionResults an array of m.room_key decryption results.
* @param {[type]} txn a storage transaction with read/write on inboundGroupSessions
* @return {Promise<Array<MegolmInboundSessionDescription>>} an array with the newly added sessions
* Writes the key as an inbound group session if there is not already a better key in the store
* @param {RoomKey} key
* @param {Transaction} txn a storage transaction with read/write on inboundGroupSessions
* @return {Promise<boolean>} whether the key was the best for the sessio id and was written
*/
async addRoomKeys(decryptionResults, txn, log) {
const newSessions = [];
for (const {senderCurve25519Key: senderKey, event, claimedEd25519Key} of decryptionResults) {
await log.wrap("room_key", async log => {
const roomId = event.content?.["room_id"];
const sessionId = event.content?.["session_id"];
const sessionKey = event.content?.["session_key"];
writeRoomKey(key, txn) {
return key.write(this._olm, this._pickleKey, txn);
}
log.set("roomId", roomId);
log.set("sessionId", sessionId);
if (
typeof roomId !== "string" ||
typeof sessionId !== "string" ||
typeof senderKey !== "string" ||
typeof sessionKey !== "string"
) {
/**
* Extracts room keys from decrypted device messages.
* The key won't be persisted yet, you need to call RoomKey.write for that.
*
* @param {Array<OlmDecryptionResult>} decryptionResults, any non megolm m.room_key messages will be ignored.
* @return {Array<RoomKey>} an array with validated RoomKey's. Note that it is possible we already have a better version of this key in storage though; writing the key will tell you so.
*/
roomKeysFromDeviceMessages(decryptionResults, log) {
let keys = [];
for (const dr of decryptionResults) {
if (dr.event?.type !== "m.room_key" || dr.event.content?.algorithm !== MEGOLM_ALGORITHM) {
continue;
}
log.wrap("room_key", log => {
const key = RoomKey.fromDeviceMessage(dr);
if (key) {
log.set("roomId", key.roomId);
log.set("id", key.sessionId);
keys.push(key);
} else {
log.logLevel = log.level.Warn;
log.set("invalid", true);
return;
}
const session = new this._olm.InboundGroupSession();
try {
session.create(sessionKey);
const sessionEntry = await this._writeInboundSession(
session, roomId, senderKey, claimedEd25519Key, sessionId, txn);
if (sessionEntry) {
newSessions.push(sessionEntry);
}
} finally {
session.free();
}
}, log.level.Detail);
}
// this will be passed to the Room in notifyRoomKeys
return newSessions;
return keys;
}
/*
sessionInfo is a response from key backup and has the following keys:
algorithm
forwarding_curve25519_key_chain
sender_claimed_keys
sender_key
session_key
*/
async addRoomKeyFromBackup(roomId, sessionId, sessionInfo, txn) {
const sessionKey = sessionInfo["session_key"];
const senderKey = sessionInfo["sender_key"];
// TODO: can we just trust this?
const claimedEd25519Key = sessionInfo["sender_claimed_keys"]?.["ed25519"];
if (
typeof roomId !== "string" ||
typeof sessionId !== "string" ||
typeof senderKey !== "string" ||
typeof sessionKey !== "string" ||
typeof claimedEd25519Key !== "string"
) {
return;
}
const session = new this._olm.InboundGroupSession();
try {
session.import_session(sessionKey);
return await this._writeInboundSession(
session, roomId, senderKey, claimedEd25519Key, sessionId, txn);
} finally {
session.free();
}
}
async _writeInboundSession(session, roomId, senderKey, claimedEd25519Key, sessionId, txn) {
let incomingSessionIsBetter = true;
const existingSessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
if (existingSessionEntry) {
const existingSession = new this._olm.InboundGroupSession();
try {
existingSession.unpickle(this._pickleKey, existingSessionEntry.session);
incomingSessionIsBetter = session.first_known_index() < existingSession.first_known_index();
} finally {
existingSession.free();
}
}
if (incomingSessionIsBetter) {
const sessionEntry = {
roomId,
senderKey,
sessionId,
session: session.pickle(this._pickleKey),
claimedKeys: {ed25519: claimedEd25519Key},
};
txn.inboundGroupSessions.set(sessionEntry);
return sessionEntry;
}
roomKeyFromBackup(roomId, sessionId, sessionInfo) {
return RoomKey.fromBackup(roomId, sessionId, sessionInfo);
}
}

View file

@ -47,6 +47,10 @@ export class DecryptionChanges {
};
}
// need to handle replay attack because
// if we redecrypted the same message twice and showed it again
// then it could be a malicious server admin replaying the word “yes”
// to make you respond to a msg you didnt say “yes” to, or something
async _handleReplayAttack(roomId, replayEntry, txn) {
const {messageIndex, sessionId, eventId, timestamp} = replayEntry;
const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex);

View file

@ -0,0 +1,154 @@
import {SessionInfo} from "./SessionInfo.js";
export class BaseRoomKey {
constructor() {
this._sessionInfo = null;
this._isBetter = null;
}
async createSessionInfo(olm, pickleKey, txn) {
const session = new olm.InboundGroupSession();
try {
this._loadSessionKey(session);
this._isBetter = await this._isBetterThanKnown(session, olm, pickleKey, txn);
if (this._isBetter) {
const claimedKeys = {ed25519: this.claimedEd25519Key};
this._sessionInfo = new SessionInfo(this.roomId, this.senderKey, session, claimedKeys);
// retain the session so we don't have to create a new session during write.
this._sessionInfo.retain();
return this._sessionInfo;
} else {
session.free();
return;
}
} catch (err) {
this._sessionInfo = null;
session.free();
throw err;
}
}
async _isBetterThanKnown(session, olm, pickleKey, txn) {
let isBetter = true;
const existingSessionEntry = await txn.inboundGroupSessions.get(this.roomId, this.senderKey, this.sessionId);
if (existingSessionEntry) {
const existingSession = new olm.InboundGroupSession();
try {
existingSession.unpickle(pickleKey, existingSessionEntry.session);
isBetter = session.first_known_index() < existingSession.first_known_index();
} finally {
existingSession.free();
}
}
return isBetter;
}
async write(olm, pickleKey, txn) {
// we checked already and we had a better session in storage, so don't write
if (this._isBetter === false) {
return false;
}
if (!this._sessionInfo) {
await this.createSessionInfo(olm, pickleKey, txn);
}
if (this._sessionInfo) {
const session = this._sessionInfo.session;
const sessionEntry = {
roomId: this.roomId,
senderKey: this.senderKey,
sessionId: this.sessionId,
session: session.pickle(pickleKey),
claimedKeys: this._sessionInfo.claimedKeys,
};
txn.inboundGroupSessions.set(sessionEntry);
this.dispose();
return true;
}
return false;
}
dispose() {
if (this._sessionInfo) {
this._sessionInfo.release();
this._sessionInfo = null;
}
}
}
class DeviceMessageRoomKey extends BaseRoomKey {
constructor(decryptionResult) {
super();
this._decryptionResult = decryptionResult;
}
get roomId() { return this._decryptionResult.event.content?.["room_id"]; }
get senderKey() { return this._decryptionResult.senderCurve25519Key; }
get sessionId() { return this._decryptionResult.event.content?.["session_id"]; }
get claimedEd25519Key() { return this._decryptionResult.claimedEd25519Key; }
_loadSessionKey(session) {
const sessionKey = this._decryptionResult.event.content?.["session_key"];
session.create(sessionKey);
}
}
class BackupRoomKey extends BaseRoomKey {
constructor(roomId, sessionId, backupInfo) {
super();
this._roomId = roomId;
this._sessionId = sessionId;
this._backupInfo = backupInfo;
}
get roomId() { return this._roomId; }
get senderKey() { return this._backupInfo["sender_key"]; }
get sessionId() { return this._sessionId; }
get claimedEd25519Key() { return this._backupInfo["sender_claimed_keys"]?.["ed25519"]; }
_loadSessionKey(session) {
const sessionKey = this._backupInfo["session_key"];
session.import_session(sessionKey);
}
}
export function fromDeviceMessage(dr) {
const roomId = dr.event.content?.["room_id"];
const sessionId = dr.event.content?.["session_id"];
const sessionKey = dr.event.content?.["session_key"];
if (
typeof roomId === "string" ||
typeof sessionId === "string" ||
typeof senderKey === "string" ||
typeof sessionKey === "string"
) {
return new DeviceMessageRoomKey(dr);
}
}
/*
sessionInfo is a response from key backup and has the following keys:
algorithm
forwarding_curve25519_key_chain
sender_claimed_keys
sender_key
session_key
*/
export function fromBackup(roomId, sessionId, sessionInfo) {
const sessionKey = sessionInfo["session_key"];
const senderKey = sessionInfo["sender_key"];
// TODO: can we just trust this?
const claimedEd25519Key = sessionInfo["sender_claimed_keys"]?.["ed25519"];
if (
typeof roomId === "string" &&
typeof sessionId === "string" &&
typeof senderKey === "string" &&
typeof sessionKey === "string" &&
typeof claimedEd25519Key === "string"
) {
return new BackupRoomKey(roomId, sessionId, sessionInfo);
}
}

View file

@ -40,5 +40,6 @@ export class SessionInfo {
dispose() {
this.session.free();
this.session = null;
}
}

View file

@ -16,6 +16,7 @@ limitations under the License.
import {DecryptionError} from "../common.js";
import {groupBy} from "../../../utils/groupBy.js";
import {MultiLock} from "../../../utils/Lock.js";
import {Session} from "./Session.js";
import {DecryptionResult} from "../DecryptionResult.js";
@ -42,6 +43,29 @@ export class Decryption {
this._senderKeyLock = senderKeyLock;
}
// we need to lock because both encryption and decryption can't be done in one txn,
// so for them not to step on each other toes, we need to lock.
//
// the lock is release from 1 of 3 places, whichever comes first:
// - decryptAll below fails (to release the lock as early as we can)
// - DecryptionChanges.write succeeds
// - Sync finishes the writeSync phase (or an error was thrown, in case we never get to DecryptionChanges.write)
async obtainDecryptionLock(events) {
const senderKeys = new Set();
for (const event of events) {
const senderKey = event.content?.["sender_key"];
if (senderKey) {
senderKeys.add(senderKey);
}
}
// take a lock on all senderKeys so encryption or other calls to decryptAll (should not happen)
// don't modify the sessions at the same time
const locks = await Promise.all(Array.from(senderKeys).map(senderKey => {
return this._senderKeyLock.takeLock(senderKey);
}));
return new MultiLock(locks);
}
// we need decryptAll because there is some parallelization we can do for decrypting different sender keys at once
// but for the same sender key we need to do one by one
//
@ -54,34 +78,28 @@ export class Decryption {
//
/**
* It is importants the lock obtained from obtainDecryptionLock is for the same set of events as passed in here.
* [decryptAll description]
* @param {[type]} events
* @return {Promise<DecryptionChanges>} [description]
*/
async decryptAll(events) {
async decryptAll(events, lock, txn) {
try {
const eventsPerSenderKey = groupBy(events, event => event.content?.["sender_key"]);
const timestamp = this._now();
// take a lock on all senderKeys so encryption or other calls to decryptAll (should not happen)
// don't modify the sessions at the same time
const locks = await Promise.all(Array.from(eventsPerSenderKey.keys()).map(senderKey => {
return this._senderKeyLock.takeLock(senderKey);
}));
try {
const readSessionsTxn = this._storage.readTxn([this._storage.storeNames.olmSessions]);
// decrypt events for different sender keys in parallel
const senderKeyOperations = await Promise.all(Array.from(eventsPerSenderKey.entries()).map(([senderKey, events]) => {
return this._decryptAllForSenderKey(senderKey, events, timestamp, readSessionsTxn);
return this._decryptAllForSenderKey(senderKey, events, timestamp, txn);
}));
const results = senderKeyOperations.reduce((all, r) => all.concat(r.results), []);
const errors = senderKeyOperations.reduce((all, r) => all.concat(r.errors), []);
const senderKeyDecryptions = senderKeyOperations.map(r => r.senderKeyDecryption);
return new DecryptionChanges(senderKeyDecryptions, results, errors, this._account, locks);
return new DecryptionChanges(senderKeyDecryptions, results, errors, this._account, lock);
} catch (err) {
// make sure the locks are release if something throws
// otherwise they will be released in DecryptionChanges after having written
for (const lock of locks) {
// or after the writeSync phase in Sync
lock.release();
}
throw err;
}
}
@ -268,12 +286,12 @@ class SenderKeyDecryption {
* @property {Array<DecryptionError>} errors see DecryptionError.event to retrieve the event that failed to decrypt.
*/
class DecryptionChanges {
constructor(senderKeyDecryptions, results, errors, account, locks) {
constructor(senderKeyDecryptions, results, errors, account, lock) {
this._senderKeyDecryptions = senderKeyDecryptions;
this._account = account;
this.results = results;
this.errors = errors;
this._locks = locks;
this._lock = lock;
}
get hasNewSessions() {
@ -304,9 +322,7 @@ class DecryptionChanges {
}
}
} finally {
for (const lock of this._locks) {
lock.release();
}
this._lock.release();
}
}
}

View file

@ -148,7 +148,7 @@ export class Room extends EventEmitter {
return entry.eventType === EVENT_ENCRYPTED_TYPE;
}).map(entry => entry.event);
const isTimelineOpen = this._isTimelineOpen;
r.preparation = await this._roomEncryption.prepareDecryptAll(events, source, isTimelineOpen, inboundSessionTxn);
r.preparation = await this._roomEncryption.prepareDecryptAll(events, null, source, isTimelineOpen, inboundSessionTxn);
if (r.cancelled) return;
const changes = await r.preparation.decrypt();
r.preparation = null;
@ -176,8 +176,11 @@ export class Room extends EventEmitter {
return request;
}
async prepareSync(roomResponse, membership, txn, log) {
async prepareSync(roomResponse, membership, newKeys, txn, log) {
log.set("id", this.id);
if (newKeys) {
log.set("newKeys", newKeys.length);
}
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
let roomEncryption = this._roomEncryption;
// encryption is enabled in this sync
@ -186,15 +189,28 @@ export class Room extends EventEmitter {
roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption);
}
let retryEntries;
let decryptPreparation;
if (roomEncryption) {
const events = roomResponse?.timeline?.events;
if (Array.isArray(events)) {
// also look for events in timeline here
let events = roomResponse?.timeline?.events || [];
// when new keys arrive, also see if any events currently loaded in the timeline
// can now be retried to decrypt
if (this._timeline && newKeys) {
retryEntries = roomEncryption.filterEventEntriesForKeys(
this._timeline.remoteEntries, newKeys);
if (retryEntries.length) {
log.set("retry", retryEntries.length);
events = events.concat(retryEntries.map(entry => entry.event));
}
}
if (events.length) {
const eventsToDecrypt = events.filter(event => {
return event?.type === EVENT_ENCRYPTED_TYPE;
});
decryptPreparation = await roomEncryption.prepareDecryptAll(
eventsToDecrypt, DecryptionSource.Sync, this._isTimelineOpen, txn);
eventsToDecrypt, newKeys, DecryptionSource.Sync, this._isTimelineOpen, txn);
}
}
@ -203,6 +219,7 @@ export class Room extends EventEmitter {
summaryChanges,
decryptPreparation,
decryptChanges: null,
retryEntries
};
}
@ -217,12 +234,19 @@ export class Room extends EventEmitter {
}
/** @package */
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn, log) {
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption, retryEntries}, txn, log) {
log.set("id", this.id);
const {entries, newLiveKey, memberChanges} =
await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, txn, log), log.level.Detail);
if (decryptChanges) {
const decryption = await decryptChanges.write(txn);
if (retryEntries?.length) {
// TODO: this will modify existing timeline entries (which we should not do in writeSync),
// but it is a temporary way of reattempting decryption while timeline is open
// won't need copies when tracking missing sessions properly
// prepend the retried entries, as we know they are older (not that it should matter much for the summary)
entries.unshift(...retryEntries);
}
decryption.applyToEntries(entries);
}
// pass member changes to device tracker
@ -251,7 +275,7 @@ export class Room extends EventEmitter {
return {
summaryChanges,
roomEncryption,
newTimelineEntries: entries,
newAndUpdatedEntries: entries,
newLiveKey,
removedPendingEvents,
memberChanges,
@ -264,7 +288,7 @@ export class Room extends EventEmitter {
* Called with the changes returned from `writeSync` to apply them and emit changes.
* No storage or network operations should be done here.
*/
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}, log) {
afterSync({summaryChanges, newAndUpdatedEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}, log) {
log.set("id", this.id);
this._syncWriter.afterSync(newLiveKey);
this._setEncryption(roomEncryption);
@ -297,10 +321,10 @@ export class Room extends EventEmitter {
this._emitUpdate();
}
if (this._timeline) {
this._timeline.appendLiveEntries(newTimelineEntries);
this._timeline.appendLiveEntries(newAndUpdatedEntries);
}
if (this._observedEvents) {
this._observedEvents.updateEvents(newTimelineEntries);
this._observedEvents.updateEvents(newAndUpdatedEntries);
}
if (removedPendingEvents) {
this._sendQueue.emitRemovals(removedPendingEvents);
@ -518,6 +542,10 @@ export class Room extends EventEmitter {
return !!this._summary.data.encryption;
}
get membership() {
return this._summary.data.membership;
}
enableSessionBackup(sessionBackup) {
this._roomEncryption?.enableSessionBackup(sessionBackup);
}

View file

@ -116,6 +116,14 @@ export class Timeline {
return this._allEntries;
}
/**
* @internal
* @return {Array<EventEntry>} remote event entries, should not be modified
*/
get remoteEntries() {
return this._remoteEntries.array;
}
/** @public */
dispose() {
if (this._closeCallback) {

View file

@ -167,7 +167,7 @@ export class SyncWriter {
}
async _writeTimeline(entries, timeline, currentKey, memberChanges, txn, log) {
if (Array.isArray(timeline.events) && timeline.events.length) {
if (Array.isArray(timeline?.events) && timeline.events.length) {
// only create a fragment when we will really write an event
currentKey = await this._ensureLiveFragment(currentKey, entries, timeline, txn, log);
const events = deduplicateEvents(timeline.events);

View file

@ -54,6 +54,18 @@ export class Lock {
}
}
export class MultiLock {
constructor(locks) {
this.locks = locks;
}
release() {
for (const lock of this.locks) {
lock.release();
}
}
}
export function tests() {
return {
"taking a lock twice returns false": assert => {