Merge pull request #90 from vector-im/bwindels/room_key_share_operation
Store room key shares in operations store
This commit is contained in:
commit
f3d3e3c014
11 changed files with 189 additions and 130 deletions
|
@ -28,6 +28,7 @@ import {MEGOLM_ALGORITHM} from "./e2ee/common.js";
|
|||
import {RoomEncryption} from "./e2ee/RoomEncryption.js";
|
||||
import {DeviceTracker} from "./e2ee/DeviceTracker.js";
|
||||
import {LockMap} from "../utils/LockMap.js";
|
||||
import {groupBy} from "../utils/groupBy.js";
|
||||
|
||||
const PICKLE_KEY = "DEFAULT_KEY";
|
||||
|
||||
|
@ -212,9 +213,20 @@ export class Session {
|
|||
await txn.complete();
|
||||
}
|
||||
|
||||
const opsTxn = await this._storage.readWriteTxn([
|
||||
this._storage.storeNames.operations
|
||||
]);
|
||||
const operations = await opsTxn.operations.getAll();
|
||||
const operationsByScope = groupBy(operations, o => o.scope);
|
||||
|
||||
this._sendScheduler.start();
|
||||
for (const [, room] of this._rooms) {
|
||||
room.start();
|
||||
let roomOperationsByType;
|
||||
const roomOperations = operationsByScope.get(room.id);
|
||||
if (roomOperations) {
|
||||
roomOperationsByType = groupBy(roomOperations, r => r.type);
|
||||
}
|
||||
room.start(roomOperationsByType);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -226,7 +226,9 @@ export class Sync {
|
|||
storeNames.groupSessionDecryptions,
|
||||
storeNames.deviceIdentities,
|
||||
// to discard outbound session when somebody leaves a room
|
||||
storeNames.outboundGroupSessions
|
||||
// and to create room key messages when somebody leaves
|
||||
storeNames.outboundGroupSessions,
|
||||
storeNames.operations
|
||||
]);
|
||||
}
|
||||
|
||||
|
|
|
@ -47,13 +47,14 @@ export class RoomEncryption {
|
|||
}
|
||||
|
||||
async writeMemberChanges(memberChanges, txn) {
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.hasLeft) {
|
||||
this._megolmEncryption.discardOutboundSession(this._room.id, txn);
|
||||
break;
|
||||
}
|
||||
const memberChangesArray = Array.from(memberChanges.values());
|
||||
if (memberChangesArray.some(m => m.hasLeft)) {
|
||||
this._megolmEncryption.discardOutboundSession(this._room.id, txn);
|
||||
}
|
||||
return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
|
||||
if (memberChangesArray.some(m => m.hasJoined)) {
|
||||
await this._addShareRoomKeyOperationForNewMembers(memberChangesArray, txn);
|
||||
}
|
||||
await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
|
||||
}
|
||||
|
||||
// this happens before entries exists, as they are created by the syncwriter
|
||||
|
@ -146,16 +147,10 @@ export class RoomEncryption {
|
|||
}
|
||||
|
||||
async encrypt(type, content, hsApi) {
|
||||
await this._deviceTracker.trackRoom(this._room);
|
||||
const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams);
|
||||
// share the new megolm session if needed
|
||||
if (megolmResult.roomKeyMessage) {
|
||||
await this._deviceTracker.trackRoom(this._room);
|
||||
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi);
|
||||
await this._sendRoomKey(megolmResult.roomKeyMessage, devices, hsApi);
|
||||
// if we happen to rotate the session before we have sent newly joined members the room key
|
||||
// then mark those members as not needing the key anymore
|
||||
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
await this._clearNeedsRoomKeyFlag(userIds);
|
||||
this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi);
|
||||
}
|
||||
return {
|
||||
type: ENCRYPTED_TYPE,
|
||||
|
@ -165,64 +160,87 @@ export class RoomEncryption {
|
|||
|
||||
needsToShareKeys(memberChanges) {
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.member.needsRoomKey) {
|
||||
if (m.hasJoined) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
async shareRoomKeyToPendingMembers(hsApi) {
|
||||
// sucks to call this for all encrypted rooms on startup?
|
||||
const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]);
|
||||
const pendingUserIds = await txn.roomMembers.getUserIdsNeedingRoomKey(this._room.id);
|
||||
return await this._shareRoomKey(pendingUserIds, hsApi);
|
||||
}
|
||||
async _shareNewRoomKey(roomKeyMessage, hsApi) {
|
||||
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi);
|
||||
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
|
||||
async shareRoomKeyForMemberChanges(memberChanges, hsApi) {
|
||||
const pendingUserIds = [];
|
||||
for (const m of memberChanges.values()) {
|
||||
if (m.member.needsRoomKey) {
|
||||
pendingUserIds.push(m.userId);
|
||||
}
|
||||
}
|
||||
return await this._shareRoomKey(pendingUserIds, hsApi);
|
||||
}
|
||||
|
||||
async _shareRoomKey(userIds, hsApi) {
|
||||
if (userIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
const readRoomKeyTxn = await this._storage.readTxn([this._storage.storeNames.outboundGroupSessions]);
|
||||
const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage(this._room.id, readRoomKeyTxn);
|
||||
// no room key if we haven't created a session yet
|
||||
// (or we removed it and will create a new one on the next send)
|
||||
if (roomKeyMessage) {
|
||||
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, userIds, hsApi);
|
||||
await this._sendRoomKey(roomKeyMessage, devices, hsApi);
|
||||
const actuallySentUserIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
await this._clearNeedsRoomKeyFlag(actuallySentUserIds);
|
||||
} else {
|
||||
// we don't have a session yet, clear them all
|
||||
await this._clearNeedsRoomKeyFlag(userIds);
|
||||
}
|
||||
}
|
||||
|
||||
async _clearNeedsRoomKeyFlag(userIds) {
|
||||
const txn = await this._storage.readWriteTxn([this._storage.storeNames.roomMembers]);
|
||||
// store operation for room key share, in case we don't finish here
|
||||
const writeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
let operationId;
|
||||
try {
|
||||
await Promise.all(userIds.map(async userId => {
|
||||
const memberData = await txn.roomMembers.get(this._room.id, userId);
|
||||
if (memberData.needsRoomKey) {
|
||||
memberData.needsRoomKey = false;
|
||||
txn.roomMembers.set(memberData);
|
||||
}
|
||||
}));
|
||||
operationId = this._writeRoomKeyShareOperation(roomKeyMessage, userIds, writeOpTxn);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
writeOpTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
await writeOpTxn.complete();
|
||||
// TODO: at this point we have the room key stored, and the rest is sort of optional
|
||||
// it would be nice if we could signal SendQueue that any error from here on is non-fatal and
|
||||
// return the encrypted payload.
|
||||
|
||||
// send the room key
|
||||
await this._sendRoomKey(roomKeyMessage, devices, hsApi);
|
||||
|
||||
// remove the operation
|
||||
const removeOpTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
removeOpTxn.operations.remove(operationId);
|
||||
} catch (err) {
|
||||
removeOpTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await removeOpTxn.complete();
|
||||
}
|
||||
|
||||
async _addShareRoomKeyOperationForNewMembers(memberChangesArray, txn) {
|
||||
const userIds = memberChangesArray.filter(m => m.hasJoined).map(m => m.userId);
|
||||
const roomKeyMessage = await this._megolmEncryption.createRoomKeyMessage(
|
||||
this._room.id, txn);
|
||||
if (roomKeyMessage) {
|
||||
this._writeRoomKeyShareOperation(roomKeyMessage, userIds, txn);
|
||||
}
|
||||
}
|
||||
|
||||
_writeRoomKeyShareOperation(roomKeyMessage, userIds, txn) {
|
||||
const id = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER).toString();
|
||||
txn.operations.add({
|
||||
id,
|
||||
type: "share_room_key",
|
||||
scope: this._room.id,
|
||||
userIds,
|
||||
roomKeyMessage,
|
||||
});
|
||||
return id;
|
||||
}
|
||||
|
||||
async flushPendingRoomKeyShares(hsApi, operations = null) {
|
||||
if (!operations) {
|
||||
const txn = await this._storage.readTxn([this._storage.storeNames.operations]);
|
||||
operations = await txn.operations.getAllByTypeAndScope("share_room_key", this._room.id);
|
||||
}
|
||||
for (const operation of operations) {
|
||||
// just to be sure
|
||||
if (operation.type !== "share_room_key") {
|
||||
continue;
|
||||
}
|
||||
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi);
|
||||
await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi);
|
||||
const removeTxn = await this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
removeTxn.operations.remove(operation.id);
|
||||
} catch (err) {
|
||||
removeTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await removeTxn.complete();
|
||||
}
|
||||
}
|
||||
|
||||
async _sendRoomKey(roomKeyMessage, devices, hsApi) {
|
||||
|
|
|
@ -158,7 +158,7 @@ export class Room extends EventEmitter {
|
|||
decryption = await decryptChanges.write(txn);
|
||||
}
|
||||
const {entries, newLiveKey, memberChanges} =
|
||||
await this._syncWriter.writeSync(roomResponse, this.isTrackingMembers, txn);
|
||||
await this._syncWriter.writeSync(roomResponse, txn);
|
||||
if (decryption) {
|
||||
decryption.applyToEntries(entries);
|
||||
}
|
||||
|
@ -251,21 +251,24 @@ export class Room extends EventEmitter {
|
|||
* Can be used to do longer running operations that resulted from the last sync,
|
||||
* like network operations.
|
||||
*/
|
||||
async afterSyncCompleted({memberChanges}) {
|
||||
async afterSyncCompleted() {
|
||||
if (this._roomEncryption) {
|
||||
await this._roomEncryption.shareRoomKeyForMemberChanges(memberChanges, this._hsApi);
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi);
|
||||
}
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async start() {
|
||||
async start(pendingOperations) {
|
||||
if (this._roomEncryption) {
|
||||
try {
|
||||
// if we got interrupted last time sending keys to newly joined members
|
||||
await this._roomEncryption.shareRoomKeyToPendingMembers(this._hsApi);
|
||||
const roomKeyShares = pendingOperations?.get("share_room_key");
|
||||
if (roomKeyShares) {
|
||||
// if we got interrupted last time sending keys to newly joined members
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, roomKeyShares);
|
||||
}
|
||||
} catch (err) {
|
||||
// we should not throw here
|
||||
console.error(`could not send out pending room keys for room ${this.id}`, err.stack);
|
||||
console.error(`could not send out (all) pending room keys for room ${this.id}`, err.stack);
|
||||
}
|
||||
}
|
||||
this._sendQueue.resumeSending();
|
||||
|
|
|
@ -67,14 +67,6 @@ export class RoomMember {
|
|||
});
|
||||
}
|
||||
|
||||
get needsRoomKey() {
|
||||
return this._data.needsRoomKey;
|
||||
}
|
||||
|
||||
set needsRoomKey(value) {
|
||||
this._data.needsRoomKey = !!value;
|
||||
}
|
||||
|
||||
get membership() {
|
||||
return this._data.membership;
|
||||
}
|
||||
|
|
|
@ -98,48 +98,40 @@ export class SyncWriter {
|
|||
return {oldFragment, newFragment};
|
||||
}
|
||||
|
||||
async _writeMember(event, trackNewlyJoined, txn) {
|
||||
_writeMember(event, txn) {
|
||||
const userId = event.state_key;
|
||||
if (userId) {
|
||||
const memberChange = new MemberChange(this._roomId, event);
|
||||
const {member} = memberChange;
|
||||
if (member) {
|
||||
if (trackNewlyJoined) {
|
||||
const existingMemberData = await txn.roomMembers.get(this._roomId, userId);
|
||||
// mark new members so we know who needs our the room key for our outbound megolm session
|
||||
member.needsRoomKey = existingMemberData?.needsRoomKey || memberChange.hasJoined;
|
||||
}
|
||||
txn.roomMembers.set(member.serialize());
|
||||
return memberChange;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _writeStateEvent(event, trackNewlyJoined, txn) {
|
||||
_writeStateEvent(event, txn) {
|
||||
if (event.type === MEMBER_EVENT_TYPE) {
|
||||
return await this._writeMember(event, trackNewlyJoined, txn);
|
||||
return this._writeMember(event, txn);
|
||||
} else {
|
||||
txn.roomState.set(this._roomId, event);
|
||||
}
|
||||
}
|
||||
|
||||
async _writeStateEvents(roomResponse, trackNewlyJoined, txn) {
|
||||
const memberChanges = new Map();
|
||||
_writeStateEvents(roomResponse, memberChanges, txn) {
|
||||
// persist state
|
||||
const {state} = roomResponse;
|
||||
if (Array.isArray(state?.events)) {
|
||||
await Promise.all(state.events.map(async event => {
|
||||
const memberChange = await this._writeStateEvent(event, trackNewlyJoined, txn);
|
||||
for (const event of state.events) {
|
||||
const memberChange = this._writeStateEvent(event, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
return memberChanges;
|
||||
}
|
||||
|
||||
async _writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn) {
|
||||
const memberChanges = new Map();
|
||||
async _writeTimeline(entries, timeline, currentKey, memberChanges, txn) {
|
||||
if (Array.isArray(timeline.events)) {
|
||||
const events = deduplicateEvents(timeline.events);
|
||||
for(const event of events) {
|
||||
|
@ -153,19 +145,17 @@ export class SyncWriter {
|
|||
}
|
||||
txn.timelineEvents.insert(entry);
|
||||
entries.push(new EventEntry(entry, this._fragmentIdComparer));
|
||||
}
|
||||
// process live state events first, so new member info is available
|
||||
// also run async state event writing in parallel
|
||||
await Promise.all(events.filter(event => {
|
||||
return typeof event.state_key === "string";
|
||||
}).map(async stateEvent => {
|
||||
const memberChange = await this._writeStateEvent(stateEvent, trackNewlyJoined, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
|
||||
// process live state events first, so new member info is available
|
||||
if (typeof event.state_key === "string") {
|
||||
const memberChange = this._writeStateEvent(event, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
return {currentKey, memberChanges};
|
||||
return currentKey;
|
||||
}
|
||||
|
||||
async _findMemberData(userId, events, txn) {
|
||||
|
@ -193,11 +183,10 @@ export class SyncWriter {
|
|||
* @property {Map<string, MemberChange>} memberChanges member changes in the processed sync ny user id
|
||||
*
|
||||
* @param {Object} roomResponse [description]
|
||||
* @param {Boolean} trackNewlyJoined needed to know if we need to keep track whether a user needs keys when they join an encrypted room
|
||||
* @param {Transaction} txn
|
||||
* @return {SyncWriterResult}
|
||||
*/
|
||||
async writeSync(roomResponse, trackNewlyJoined, txn) {
|
||||
async writeSync(roomResponse, txn) {
|
||||
const entries = [];
|
||||
const {timeline} = roomResponse;
|
||||
let currentKey = this._lastLiveKey;
|
||||
|
@ -217,16 +206,11 @@ export class SyncWriter {
|
|||
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
|
||||
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
|
||||
}
|
||||
const memberChanges = new Map();
|
||||
// important this happens before _writeTimeline so
|
||||
// members are available in the transaction
|
||||
const memberChanges = await this._writeStateEvents(roomResponse, trackNewlyJoined, txn);
|
||||
// TODO: remove trackNewlyJoined and pass in memberChanges
|
||||
const timelineResult = await this._writeTimeline(entries, timeline, currentKey, trackNewlyJoined, txn);
|
||||
currentKey = timelineResult.currentKey;
|
||||
// merge member changes from state and timeline, giving precedence to the latter
|
||||
for (const [userId, memberChange] of timelineResult.memberChanges.entries()) {
|
||||
memberChanges.set(userId, memberChange);
|
||||
}
|
||||
this._writeStateEvents(roomResponse, memberChanges, txn);
|
||||
currentKey = await this._writeTimeline(entries, timeline, currentKey, memberChanges, txn);
|
||||
return {entries, newLiveKey: currentKey, memberChanges};
|
||||
}
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ export const STORE_NAMES = Object.freeze([
|
|||
"inboundGroupSessions",
|
||||
"outboundGroupSessions",
|
||||
"groupSessionDecryptions",
|
||||
"operations"
|
||||
]);
|
||||
|
||||
export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => {
|
||||
|
|
|
@ -30,6 +30,7 @@ import {OlmSessionStore} from "./stores/OlmSessionStore.js";
|
|||
import {InboundGroupSessionStore} from "./stores/InboundGroupSessionStore.js";
|
||||
import {OutboundGroupSessionStore} from "./stores/OutboundGroupSessionStore.js";
|
||||
import {GroupSessionDecryptionStore} from "./stores/GroupSessionDecryptionStore.js";
|
||||
import {OperationStore} from "./stores/OperationStore.js";
|
||||
|
||||
export class Transaction {
|
||||
constructor(txn, allowedStoreNames) {
|
||||
|
@ -111,6 +112,10 @@ export class Transaction {
|
|||
return this._store("groupSessionDecryptions", idbStore => new GroupSessionDecryptionStore(idbStore));
|
||||
}
|
||||
|
||||
get operations() {
|
||||
return this._store("operations", idbStore => new OperationStore(idbStore));
|
||||
}
|
||||
|
||||
complete() {
|
||||
return txnAsPromise(this._txn);
|
||||
}
|
||||
|
|
|
@ -74,4 +74,6 @@ function createE2EEStores(db) {
|
|||
db.createObjectStore("inboundGroupSessions", {keyPath: "key"});
|
||||
db.createObjectStore("outboundGroupSessions", {keyPath: "roomId"});
|
||||
db.createObjectStore("groupSessionDecryptions", {keyPath: "key"});
|
||||
const operations = db.createObjectStore("operations", {keyPath: "id"});
|
||||
operations.createIndex("byTypeAndScope", "typeScopeKey", {unique: false});
|
||||
}
|
||||
|
|
55
src/matrix/storage/idb/stores/OperationStore.js
Normal file
55
src/matrix/storage/idb/stores/OperationStore.js
Normal file
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
function encodeTypeScopeKey(type, scope) {
|
||||
return `${type}|${scope}`;
|
||||
}
|
||||
|
||||
export class OperationStore {
|
||||
constructor(store) {
|
||||
this._store = store;
|
||||
}
|
||||
|
||||
getAll() {
|
||||
return this._store.selectAll();
|
||||
}
|
||||
|
||||
async getAllByTypeAndScope(type, scope) {
|
||||
const key = encodeTypeScopeKey(type, scope);
|
||||
const results = [];
|
||||
await this._store.index("byTypeAndScope").iterateWhile(key, value => {
|
||||
if (value.typeScopeKey !== key) {
|
||||
return false;
|
||||
}
|
||||
results.push(value);
|
||||
return true;
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
add(operation) {
|
||||
operation.typeScopeKey = encodeTypeScopeKey(operation.type, operation.scope);
|
||||
this._store.add(operation);
|
||||
}
|
||||
|
||||
update(operation) {
|
||||
this._store.set(operation);
|
||||
}
|
||||
|
||||
remove(id) {
|
||||
this._store.delete(id);
|
||||
}
|
||||
}
|
|
@ -60,19 +60,4 @@ export class RoomMemberStore {
|
|||
});
|
||||
return userIds;
|
||||
}
|
||||
|
||||
async getUserIdsNeedingRoomKey(roomId) {
|
||||
const userIds = [];
|
||||
const range = IDBKeyRange.lowerBound(encodeKey(roomId, ""));
|
||||
await this._roomMembersStore.iterateWhile(range, member => {
|
||||
if (member.roomId !== roomId) {
|
||||
return false;
|
||||
}
|
||||
if (member.needsRoomKey) {
|
||||
userIds.push(member.userId);
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return userIds;
|
||||
}
|
||||
}
|
||||
|
|
Reference in a new issue