Merge pull request #265 from vector-im/bwindels/fix-201
Send withheld message when we can't create an olm session with a given device
This commit is contained in:
commit
2a106c5053
3 changed files with 52 additions and 13 deletions
|
@ -26,6 +26,7 @@ const ENCRYPTED_TYPE = "m.room.encrypted";
|
|||
// note that encrypt could still create a new session
|
||||
const MIN_PRESHARE_INTERVAL = 60 * 1000; // 1min
|
||||
|
||||
// TODO: this class is a good candidate for splitting up into encryption and decryption, there doesn't seem to be much overlap
|
||||
export class RoomEncryption {
|
||||
constructor({room, deviceTracker, olmEncryption, megolmEncryption, megolmDecryption, encryptionParams, storage, sessionBackup, notifyMissingMegolmSession, clock}) {
|
||||
this._room = room;
|
||||
|
@ -349,36 +350,63 @@ export class RoomEncryption {
|
|||
|
||||
async _processShareRoomKeyOperation(operation, hsApi, log) {
|
||||
log.set("id", operation.id);
|
||||
|
||||
await this._deviceTracker.trackRoom(this._room, log);
|
||||
let devices;
|
||||
if (operation.userIds === null) {
|
||||
devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi, log);
|
||||
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
operation.userIds = userIds;
|
||||
const userIdsTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
userIdsTxn.operations.update(operation);
|
||||
} catch (err) {
|
||||
userIdsTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await userIdsTxn.complete();
|
||||
await this._updateOperationsStore(operations => operations.update(operation));
|
||||
} else {
|
||||
devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi, log);
|
||||
}
|
||||
|
||||
const messages = await log.wrap("olm encrypt", log => this._olmEncryption.encrypt(
|
||||
"m.room_key", operation.roomKeyMessage, devices, hsApi, log));
|
||||
const missingDevices = devices.filter(d => !messages.some(m => m.device === d));
|
||||
await log.wrap("send", log => this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi, log));
|
||||
if (missingDevices.length) {
|
||||
await log.wrap("missingDevices", async log => {
|
||||
log.set("devices", missingDevices.map(d => d.deviceId));
|
||||
const unsentUserIds = operation.userIds.filter(userId => missingDevices.some(d => d.userId === userId));
|
||||
log.set("unsentUserIds", unsentUserIds);
|
||||
operation.userIds = unsentUserIds;
|
||||
// first remove the users that we've sent the keys already from the operation,
|
||||
// so if anything fails, we don't send them again
|
||||
await this._updateOperationsStore(operations => operations.update(operation));
|
||||
// now, let the devices we could not claim their key
|
||||
const withheldMessage = this._megolmEncryption.createWithheldMessage(operation.roomKeyMessage, "m.no_olm", "OTKs exhausted");
|
||||
await this._sendSharedMessageToDevices("org.matrix.room_key.withheld", withheldMessage, missingDevices, hsApi, log);
|
||||
});
|
||||
}
|
||||
await this._updateOperationsStore(operations => operations.remove(operation.id));
|
||||
}
|
||||
|
||||
const removeTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
async _updateOperationsStore(callback) {
|
||||
const writeTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
removeTxn.operations.remove(operation.id);
|
||||
callback(writeTxn.operations);
|
||||
} catch (err) {
|
||||
removeTxn.abort();
|
||||
writeTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await removeTxn.complete();
|
||||
await writeTxn.complete();
|
||||
}
|
||||
|
||||
async _sendSharedMessageToDevices(type, message, devices, hsApi, log) {
|
||||
const devicesByUser = groupBy(devices, device => device.userId);
|
||||
const payload = {
|
||||
messages: Array.from(devicesByUser.entries()).reduce((userMap, [userId, devices]) => {
|
||||
userMap[userId] = devices.reduce((deviceMap, device) => {
|
||||
deviceMap[device.deviceId] = message;
|
||||
return deviceMap;
|
||||
}, {});
|
||||
return userMap;
|
||||
}, {})
|
||||
};
|
||||
const txnId = makeTxnId();
|
||||
await hsApi.sendToDevice(type, payload, txnId, {log}).response();
|
||||
}
|
||||
|
||||
async _sendMessagesToDevices(type, messages, hsApi, log) {
|
||||
|
|
|
@ -43,6 +43,17 @@ export class Encryption {
|
|||
}
|
||||
}
|
||||
|
||||
createWithheldMessage(roomMessage, code, reason) {
|
||||
return {
|
||||
algorithm: roomMessage.algorithm,
|
||||
code,
|
||||
reason,
|
||||
room_id: roomMessage.room_id,
|
||||
sender_key: this._account.identityKeys.curve25519,
|
||||
session_id: roomMessage.session_id
|
||||
};
|
||||
}
|
||||
|
||||
async ensureOutboundSession(roomId, encryptionParams) {
|
||||
let session = new this._olm.OutboundGroupSession();
|
||||
try {
|
||||
|
|
|
@ -157,7 +157,7 @@ export class Encryption {
|
|||
const {device, oneTimeKey} = target;
|
||||
target.session = await this._account.createOutboundOlmSession(device.curve25519Key, oneTimeKey);
|
||||
}
|
||||
this._storeSessions(newEncryptionTargets, timestamp);
|
||||
await this._storeSessions(newEncryptionTargets, timestamp);
|
||||
} catch (err) {
|
||||
for (const target of newEncryptionTargets) {
|
||||
target.dispose();
|
||||
|
|
Reference in a new issue