forked from mystiq/hydrogen-web
implement room key sharing with operations store
This commit is contained in:
parent
b00865510f
commit
ab1fe711ad
3 changed files with 86 additions and 65 deletions
|
@ -226,7 +226,9 @@ export class Sync {
|
|||
storeNames.groupSessionDecryptions,
|
||||
storeNames.deviceIdentities,
|
||||
// to discard outbound session when somebody leaves a room
|
||||
storeNames.outboundGroupSessions
|
||||
// and to create room key messages when somebody leaves
|
||||
storeNames.outboundGroupSessions,
|
||||
storeNames.operations
|
||||
]);
|
||||
}
|
||||
|
||||
|
|
|
@ -47,13 +47,14 @@ export class RoomEncryption {
|
|||
}
|
||||
|
||||
async writeMemberChanges(memberChanges, txn) {
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.hasLeft) {
|
||||
this._megolmEncryption.discardOutboundSession(this._room.id, txn);
|
||||
break;
|
||||
}
|
||||
const memberChangesArray = Array.from(memberChanges.values());
|
||||
if (memberChangesArray.some(m => m.hasLeft)) {
|
||||
this._megolmEncryption.discardOutboundSession(this._room.id, txn);
|
||||
}
|
||||
return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
|
||||
if (memberChangesArray.some(m => m.hasJoined)) {
|
||||
await this._addShareRoomKeyOperationForNewMembers(memberChangesArray, txn);
|
||||
}
|
||||
await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
|
||||
}
|
||||
|
||||
// this happens before entries exists, as they are created by the syncwriter
|
||||
|
@ -146,16 +147,10 @@ export class RoomEncryption {
|
|||
}
|
||||
|
||||
async encrypt(type, content, hsApi) {
|
||||
await this._deviceTracker.trackRoom(this._room);
|
||||
const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams);
|
||||
// share the new megolm session if needed
|
||||
if (megolmResult.roomKeyMessage) {
|
||||
await this._deviceTracker.trackRoom(this._room);
|
||||
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi);
|
||||
await this._sendRoomKey(megolmResult.roomKeyMessage, devices, hsApi);
|
||||
// if we happen to rotate the session before we have sent newly joined members the room key
|
||||
// then mark those members as not needing the key anymore
|
||||
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
await this._clearNeedsRoomKeyFlag(userIds);
|
||||
this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi);
|
||||
}
|
||||
return {
|
||||
type: ENCRYPTED_TYPE,
|
||||
|
@ -165,64 +160,87 @@ export class RoomEncryption {
|
|||
|
||||
needsToShareKeys(memberChanges) {
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.member.needsRoomKey) {
|
||||
if (m.hasJoined) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async shareRoomKeyToPendingMembers(hsApi) {
|
||||
// sucks to call this for all encrypted rooms on startup?
|
||||
const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]);
|
||||
const pendingUserIds = await txn.roomMembers.getUserIdsNeedingRoomKey(this._room.id);
|
||||
return await this._shareRoomKey(pendingUserIds, hsApi);
|
||||
}
|
||||
async _shareNewRoomKey(roomKeyMessage, hsApi) {
|
||||
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi);
|
||||
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
|
||||
async shareRoomKeyForMemberChanges(memberChanges, hsApi) {
|
||||
const pendingUserIds = [];
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.member.needsRoomKey) {
|
||||
pendingUserIds.push(m.userId);
|
||||
}
|
||||
}
|
||||
return await this._shareRoomKey(pendingUserIds, hsApi);
|
||||
}
|
||||
|
||||
async _shareRoomKey(userIds, hsApi) {
|
||||
if (userIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
const readRoomKeyTxn = await this._storage.readTxn([this._storage.storeNames.outboundGroupSessions]);
|
||||
const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage(this._room.id, readRoomKeyTxn);
|
||||
// no room key if we haven't created a session yet
|
||||
// (or we removed it and will create a new one on the next send)
|
||||
if (roomKeyMessage) {
|
||||
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, userIds, hsApi);
|
||||
await this._sendRoomKey(roomKeyMessage, devices, hsApi);
|
||||
const actuallySentUserIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
await this._clearNeedsRoomKeyFlag(actuallySentUserIds);
|
||||
} else {
|
||||
// we don't have a session yet, clear them all
|
||||
await this._clearNeedsRoomKeyFlag(userIds);
|
||||
}
|
||||
}
|
||||
|
||||
async _clearNeedsRoomKeyFlag(userIds) {
|
||||
const txn = await this._storage.readWriteTxn([this._storage.storeNames.roomMembers]);
|
||||
// store operation for room key share, in case we don't finish here
|
||||
const writeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
let operationId;
|
||||
try {
|
||||
await Promise.all(userIds.map(async userId => {
|
||||
const memberData = await txn.roomMembers.get(this._room.id, userId);
|
||||
if (memberData.needsRoomKey) {
|
||||
memberData.needsRoomKey = false;
|
||||
txn.roomMembers.set(memberData);
|
||||
}
|
||||
}));
|
||||
operationId = this._writeRoomKeyShareOperation(roomKeyMessage, userIds, writeOpTxn);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
writeOpTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
await writeOpTxn.complete();
|
||||
// TODO: at this point we have the room key stored, and the rest is sort of optional
|
||||
// it would be nice if we could signal SendQueue that any error from here on is non-fatal and
|
||||
// return the encrypted payload.
|
||||
|
||||
// send the room key
|
||||
await this._sendRoomKey(roomKeyMessage, devices, hsApi);
|
||||
|
||||
// remove the operation
|
||||
const removeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
removeOpTxn.operations.remove(operationId);
|
||||
} catch (err) {
|
||||
removeOpTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await removeOpTxn.complete();
|
||||
}
|
||||
|
||||
async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn) {
|
||||
const userIds = memberChangesArray.filter(m => m.hasJoined).map(m => m.userId);
|
||||
const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage(
|
||||
this._room.id, txn);
|
||||
if (roomKeyMessage) {
|
||||
this._writeRoomKeyShareOperation(roomKeyMessage, userIds, txn);
|
||||
}
|
||||
}
|
||||
|
||||
_writeRoomKeyShareOperation(roomKeyMessage, userIds, txn) {
|
||||
const id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString();
|
||||
txn.operations.add({
|
||||
id,
|
||||
type: "share_room_key",
|
||||
scope: this._room.id,
|
||||
userIds,
|
||||
roomKeyMessage,
|
||||
});
|
||||
return id;
|
||||
}
|
||||
|
||||
async flushPendingRoomKeyShares(hsApi, operations = null) {
|
||||
if (!operations) {
|
||||
const txn = await this._storage.readTxn([this._storage.storeNames.operations]);
|
||||
operations = await txn.operations.getAllByTypeAndScope("share_room_key", this._room.id);
|
||||
}
|
||||
for (const operation of operations) {
|
||||
// just to be sure
|
||||
if (operation.type !== "share_room_key") {
|
||||
continue;
|
||||
}
|
||||
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi);
|
||||
await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi);
|
||||
const removeTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
removeTxn.operations.remove(operation.id);
|
||||
} catch (err) {
|
||||
removeTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await removeTxn.complete();
|
||||
}
|
||||
}
|
||||
|
||||
async _sendRoomKey(roomKeyMessage, devices, hsApi) {
|
||||
|
|
|
@ -243,7 +243,8 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
|
||||
needsAfterSyncCompleted({memberChanges}) {
|
||||
return this._roomEncryption?.needsToShareKeys(memberChanges);
|
||||
const result = this._roomEncryption?.needsToShareKeys(memberChanges);
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -251,9 +252,9 @@ export class Room extends EventEmitter {
|
|||
* Can be used to do longer running operations that resulted from the last sync,
|
||||
* like network operations.
|
||||
*/
|
||||
async afterSyncCompleted({memberChanges}) {
|
||||
async afterSyncCompleted() {
|
||||
if (this._roomEncryption) {
|
||||
await this._roomEncryption.shareRoomKeyForMemberChanges(memberChanges, this._hsApi);
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue