store key share operations before tracking the room

This commit is contained in:
Bruno Windels 2021-03-03 18:56:16 +01:00
parent aa23672694
commit 4445b1ee01
2 changed files with 49 additions and 52 deletions

View file

@ -281,38 +281,18 @@ export class RoomEncryption {
} }
async _shareNewRoomKey(roomKeyMessage, hsApi, log) { async _shareNewRoomKey(roomKeyMessage, hsApi, log) {
await this._deviceTracker.trackRoom(this._room, log); let writeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi, log); let operation;
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
// store operation for room key share, in case we don't finish here
const writeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
let operationId;
try { try {
operationId = this._writeRoomKeyShareOperation(roomKeyMessage, userIds, writeOpTxn); operation = this._writeRoomKeyShareOperation(roomKeyMessage, null, writeOpTxn);
} catch (err) { } catch (err) {
writeOpTxn.abort(); writeOpTxn.abort();
throw err; throw err;
} }
log.set("id", operationId);
log.set("sessionId", roomKeyMessage.session_id);
await writeOpTxn.complete();
// TODO: at this point we have the room key stored, and the rest is sort of optional // TODO: at this point we have the room key stored, and the rest is sort of optional
// it would be nice if we could signal SendQueue that any error from here on is non-fatal and // it would be nice if we could signal SendQueue that any error from here on is non-fatal and
// return the encrypted payload. // return the encrypted payload.
await this._processShareRoomKeyOperation(operation, hsApi, log);
// send the room key
await this._sendRoomKey(roomKeyMessage, devices, hsApi, log);
// remove the operation
const removeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
try {
removeOpTxn.operations.remove(operationId);
} catch (err) {
removeOpTxn.abort();
throw err;
}
await removeOpTxn.complete();
} }
async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn, log) { async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn, log) {
@ -331,18 +311,6 @@ export class RoomEncryption {
return false; return false;
} }
_writeRoomKeyShareOperation(roomKeyMessage, userIds, txn) {
const id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString();
txn.operations.add({
id,
type: "share_room_key",
scope: this._room.id,
userIds,
roomKeyMessage,
});
return id;
}
async flushPendingRoomKeyShares(hsApi, operations, log) { async flushPendingRoomKeyShares(hsApi, operations, log) {
// this has to be reentrant as it can be called from Room.start while still running // this has to be reentrant as it can be called from Room.start while still running
if (this._isFlushingRoomKeyShares) { if (this._isFlushingRoomKeyShares) {
@ -359,29 +327,58 @@ export class RoomEncryption {
if (operation.type !== "share_room_key") { if (operation.type !== "share_room_key") {
continue; continue;
} }
await log.wrap("operation", async log => { await log.wrap("operation", log => this._processShareRoomKeyOperation(operation, hsApi, log));
log.set("id", operation.id);
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi, log);
await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi, log);
const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
try {
removeTxn.operations.remove(operation.id);
} catch (err) {
removeTxn.abort();
throw err;
}
await removeTxn.complete();
});
} }
} finally { } finally {
this._isFlushingRoomKeyShares = false; this._isFlushingRoomKeyShares = false;
} }
} }
async _sendRoomKey(roomKeyMessage, devices, hsApi, log) { _writeRoomKeyShareOperation(roomKeyMessage, userIds, txn) {
const id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString();
const operation = {
id,
type: "share_room_key",
scope: this._room.id,
userIds,
roomKeyMessage,
};
txn.operations.add(operation);
return operation;
}
async _processShareRoomKeyOperation(operation, hsApi, log) {
log.set("id", operation.id);
await this._deviceTracker.trackRoom(this._room, log);
let devices;
if (operation.userIds === null) {
devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi, log);
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
operation.userIds = userIds;
const userIdsTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
try {
userIdsTxn.operations.update(operation);
} catch (err) {
userIdsTxn.abort();
throw err;
}
await userIdsTxn.complete();
} else {
devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi, log);
}
const messages = await log.wrap("olm encrypt", log => this._olmEncryption.encrypt( const messages = await log.wrap("olm encrypt", log => this._olmEncryption.encrypt(
"m.room_key", roomKeyMessage, devices, hsApi, log)); "m.room_key", operation.roomKeyMessage, devices, hsApi, log));
await log.wrap("send", log => this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi, log)); await log.wrap("send", log => this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi, log));
const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
try {
removeTxn.operations.remove(operation.id);
} catch (err) {
removeTxn.abort();
throw err;
}
await removeTxn.complete();
} }
async _sendMessagesToDevices(type, messages, hsApi, log) { async _sendMessagesToDevices(type, messages, hsApi, log) {

View file

@ -46,7 +46,7 @@ export class OperationStore {
} }
update(operation) { update(operation) {
this._store.set(operation); this._store.put(operation);
} }
remove(id) { remove(id) {