forked from mystiq/hydrogen-web
Merge pull request #84 from vector-im/bwindels/megolm-decrypt
Implement megolm decryption and hooking up decryption in the room
This commit is contained in:
commit
b8ba4c5771
17 changed files with 464 additions and 58 deletions
|
@ -15,6 +15,7 @@ limitations under the License.
|
|||
*/
|
||||
|
||||
import {OLM_ALGORITHM, MEGOLM_ALGORITHM} from "./e2ee/common.js";
|
||||
import {groupBy} from "../utils/groupBy.js";
|
||||
|
||||
// key to store in session store
|
||||
const PENDING_ENCRYPTED_EVENTS = "pendingEncryptedDeviceEvents";
|
||||
|
@ -44,21 +45,23 @@ export class DeviceMessageHandler {
|
|||
const megOlmRoomKeysPayloads = payloads.filter(p => {
|
||||
return p.event?.type === "m.room_key" && p.event.content?.algorithm === MEGOLM_ALGORITHM;
|
||||
});
|
||||
let megolmChanges;
|
||||
let roomKeys;
|
||||
if (megOlmRoomKeysPayloads.length) {
|
||||
megolmChanges = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysPayloads, txn);
|
||||
roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysPayloads, txn);
|
||||
}
|
||||
return {megolmChanges};
|
||||
return {roomKeys};
|
||||
}
|
||||
|
||||
_applyDecryptChanges({megolmChanges}) {
|
||||
if (megolmChanges) {
|
||||
this._megolmDecryption.applyRoomKeyChanges(megolmChanges);
|
||||
_applyDecryptChanges(rooms, {roomKeys}) {
|
||||
const roomKeysByRoom = groupBy(roomKeys, s => s.roomId);
|
||||
for (const [roomId, roomKeys] of roomKeysByRoom) {
|
||||
const room = rooms.get(roomId);
|
||||
room?.notifyRoomKeys(roomKeys);
|
||||
}
|
||||
}
|
||||
|
||||
// not safe to call multiple times without awaiting first call
|
||||
async decryptPending() {
|
||||
async decryptPending(rooms) {
|
||||
if (!this._olmDecryption) {
|
||||
return;
|
||||
}
|
||||
|
@ -89,7 +92,7 @@ export class DeviceMessageHandler {
|
|||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
this._applyDecryptChanges(changes);
|
||||
this._applyDecryptChanges(rooms, changes);
|
||||
}
|
||||
|
||||
async _getPendingEvents(txn) {
|
||||
|
|
|
@ -49,6 +49,9 @@ export class Session {
|
|||
this._e2eeAccount = null;
|
||||
this._deviceTracker = null;
|
||||
this._olmEncryption = null;
|
||||
this._megolmEncryption = null;
|
||||
this._megolmDecryption = null;
|
||||
|
||||
if (olm) {
|
||||
this._olmUtil = new olm.Utility();
|
||||
this._deviceTracker = new DeviceTracker({
|
||||
|
@ -92,9 +95,12 @@ export class Session {
|
|||
storage: this._storage,
|
||||
now: this._clock.now,
|
||||
ownDeviceId: this._sessionInfo.deviceId,
|
||||
})
|
||||
const megolmDecryption = new MegOlmDecryption({pickleKey: PICKLE_KEY, olm: this._olm});
|
||||
this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption});
|
||||
});
|
||||
this._megolmDecryption = new MegOlmDecryption({
|
||||
pickleKey: PICKLE_KEY,
|
||||
olm: this._olm,
|
||||
});
|
||||
this._deviceMessageHandler.enableEncryption({olmDecryption, megolmDecryption: this._megolmDecryption});
|
||||
}
|
||||
|
||||
_createRoomEncryption(room, encryptionParams) {
|
||||
|
@ -118,6 +124,7 @@ export class Session {
|
|||
deviceTracker: this._deviceTracker,
|
||||
olmEncryption: this._olmEncryption,
|
||||
megolmEncryption: this._megolmEncryption,
|
||||
megolmDecryption: this._megolmDecryption,
|
||||
encryptionParams
|
||||
});
|
||||
}
|
||||
|
@ -150,7 +157,7 @@ export class Session {
|
|||
}
|
||||
await this._e2eeAccount.generateOTKsIfNeeded(this._storage);
|
||||
await this._e2eeAccount.uploadKeys(this._storage);
|
||||
await this._deviceMessageHandler.decryptPending();
|
||||
await this._deviceMessageHandler.decryptPending(this.rooms);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -285,7 +292,7 @@ export class Session {
|
|||
|
||||
async afterSyncCompleted() {
|
||||
const needsToUploadOTKs = await this._e2eeAccount.generateOTKsIfNeeded(this._storage);
|
||||
const promises = [this._deviceMessageHandler.decryptPending()];
|
||||
const promises = [this._deviceMessageHandler.decryptPending(this.rooms)];
|
||||
if (needsToUploadOTKs) {
|
||||
// TODO: we could do this in parallel with sync if it proves to be too slow
|
||||
// but I'm not sure how to not swallow errors in that case
|
||||
|
|
|
@ -133,6 +133,8 @@ export class Sync {
|
|||
storeNames.timelineFragments,
|
||||
storeNames.pendingEvents,
|
||||
storeNames.userIdentities,
|
||||
storeNames.inboundGroupSessions,
|
||||
storeNames.groupSessionDecryptions,
|
||||
]);
|
||||
const roomChanges = [];
|
||||
let sessionChanges;
|
||||
|
|
|
@ -14,25 +14,77 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {MEGOLM_ALGORITHM} from "./common.js";
|
||||
import {groupBy} from "../../utils/groupBy.js";
|
||||
import {makeTxnId} from "../common.js";
|
||||
|
||||
const ENCRYPTED_TYPE = "m.room.encrypted";
|
||||
|
||||
export class RoomEncryption {
|
||||
constructor({room, deviceTracker, olmEncryption, megolmEncryption, encryptionParams}) {
|
||||
constructor({room, deviceTracker, olmEncryption, megolmEncryption, megolmDecryption, encryptionParams}) {
|
||||
this._room = room;
|
||||
this._deviceTracker = deviceTracker;
|
||||
this._olmEncryption = olmEncryption;
|
||||
this._megolmEncryption = megolmEncryption;
|
||||
this._megolmDecryption = megolmDecryption;
|
||||
// content of the m.room.encryption event
|
||||
this._encryptionParams = encryptionParams;
|
||||
|
||||
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
|
||||
this._megolmSyncCache = this._megolmDecryption.createSessionCache();
|
||||
// not `event_id`, but an internal event id passed in to the decrypt methods
|
||||
this._eventIdsByMissingSession = new Map();
|
||||
}
|
||||
|
||||
notifyTimelineClosed() {
|
||||
// empty the backfill cache when closing the timeline
|
||||
this._megolmBackfillCache.dispose();
|
||||
this._megolmBackfillCache = this._megolmDecryption.createSessionCache();
|
||||
}
|
||||
|
||||
async writeMemberChanges(memberChanges, txn) {
|
||||
return await this._deviceTracker.writeMemberChanges(this._room, memberChanges, txn);
|
||||
}
|
||||
|
||||
async decrypt(event, isSync, retryData, txn) {
|
||||
if (event.content?.algorithm !== MEGOLM_ALGORITHM) {
|
||||
throw new Error("Unsupported algorithm: " + event.content?.algorithm);
|
||||
}
|
||||
let sessionCache = isSync ? this._megolmSyncCache : this._megolmBackfillCache;
|
||||
const payload = await this._megolmDecryption.decrypt(
|
||||
this._room.id, event, sessionCache, txn);
|
||||
if (!payload) {
|
||||
this._addMissingSessionEvent(event, isSync, retryData);
|
||||
}
|
||||
return payload;
|
||||
}
|
||||
|
||||
_addMissingSessionEvent(event, isSync, data) {
|
||||
const senderKey = event.content?.["sender_key"];
|
||||
const sessionId = event.content?.["session_id"];
|
||||
const key = `${senderKey}|${sessionId}`;
|
||||
let eventIds = this._eventIdsByMissingSession.get(key);
|
||||
if (!eventIds) {
|
||||
eventIds = new Map();
|
||||
this._eventIdsByMissingSession.set(key, eventIds);
|
||||
}
|
||||
eventIds.set(event.event_id, {data, isSync});
|
||||
}
|
||||
|
||||
applyRoomKeys(roomKeys) {
|
||||
// retry decryption with the new sessions
|
||||
const retryEntries = [];
|
||||
for (const roomKey of roomKeys) {
|
||||
const key = `${roomKey.senderKey}|${roomKey.sessionId}`;
|
||||
const entriesForSession = this._eventIdsByMissingSession.get(key);
|
||||
if (entriesForSession) {
|
||||
this._eventIdsByMissingSession.delete(key);
|
||||
retryEntries.push(...entriesForSession.values());
|
||||
}
|
||||
}
|
||||
return retryEntries;
|
||||
}
|
||||
|
||||
async encrypt(type, content, hsApi) {
|
||||
const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams);
|
||||
// share the new megolm session if needed
|
||||
|
|
|
@ -14,12 +14,87 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {DecryptionError} from "../common.js";
|
||||
|
||||
const CACHE_MAX_SIZE = 10;
|
||||
|
||||
export class Decryption {
|
||||
constructor({pickleKey, olm}) {
|
||||
this._pickleKey = pickleKey;
|
||||
this._olm = olm;
|
||||
}
|
||||
|
||||
createSessionCache() {
|
||||
return new SessionCache();
|
||||
}
|
||||
|
||||
async decrypt(roomId, event, sessionCache, txn) {
|
||||
const senderKey = event.content?.["sender_key"];
|
||||
const sessionId = event.content?.["session_id"];
|
||||
const ciphertext = event.content?.ciphertext;
|
||||
|
||||
if (
|
||||
typeof senderKey !== "string" ||
|
||||
typeof sessionId !== "string" ||
|
||||
typeof ciphertext !== "string"
|
||||
) {
|
||||
throw new DecryptionError("MEGOLM_INVALID_EVENT", event);
|
||||
}
|
||||
|
||||
let session = sessionCache.get(roomId, senderKey, sessionId);
|
||||
if (!session) {
|
||||
const sessionEntry = await txn.inboundGroupSessions.get(roomId, senderKey, sessionId);
|
||||
if (sessionEntry) {
|
||||
session = new this._olm.InboundGroupSession();
|
||||
try {
|
||||
session.unpickle(this._pickleKey, sessionEntry.session);
|
||||
} catch (err) {
|
||||
session.free();
|
||||
throw err;
|
||||
}
|
||||
sessionCache.add(roomId, senderKey, session);
|
||||
}
|
||||
}
|
||||
if (!session) {
|
||||
return;
|
||||
}
|
||||
const {plaintext, message_index: messageIndex} = session.decrypt(ciphertext);
|
||||
let payload;
|
||||
try {
|
||||
payload = JSON.parse(plaintext);
|
||||
} catch (err) {
|
||||
throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err});
|
||||
}
|
||||
if (payload.room_id !== roomId) {
|
||||
throw new DecryptionError("MEGOLM_WRONG_ROOM", event,
|
||||
{encryptedRoomId: payload.room_id, eventRoomId: roomId});
|
||||
}
|
||||
await this._handleReplayAttack(roomId, sessionId, messageIndex, event, txn);
|
||||
// TODO: verify event came from said senderKey
|
||||
return payload;
|
||||
}
|
||||
|
||||
async _handleReplayAttack(roomId, sessionId, messageIndex, event, txn) {
|
||||
const eventId = event.event_id;
|
||||
const timestamp = event.origin_server_ts;
|
||||
const decryption = await txn.groupSessionDecryptions.get(roomId, sessionId, messageIndex);
|
||||
if (decryption && decryption.eventId !== eventId) {
|
||||
// the one with the newest timestamp should be the attack
|
||||
const decryptedEventIsBad = decryption.timestamp < timestamp;
|
||||
const badEventId = decryptedEventIsBad ? eventId : decryption.eventId;
|
||||
throw new DecryptionError("MEGOLM_REPLAYED_INDEX", event, {badEventId, otherEventId: decryption.eventId});
|
||||
}
|
||||
if (!decryption) {
|
||||
txn.groupSessionDecryptions.set({
|
||||
roomId,
|
||||
sessionId,
|
||||
messageIndex,
|
||||
eventId,
|
||||
timestamp
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async addRoomKeys(payloads, txn) {
|
||||
const newSessions = [];
|
||||
for (const {senderKey, event} of payloads) {
|
||||
|
@ -36,6 +111,7 @@ export class Decryption {
|
|||
return;
|
||||
}
|
||||
|
||||
// TODO: compare first_known_index to see which session to keep
|
||||
const hasSession = await txn.inboundGroupSessions.has(roomId, senderKey, sessionId);
|
||||
if (!hasSession) {
|
||||
const session = new this._olm.InboundGroupSession();
|
||||
|
@ -56,13 +132,49 @@ export class Decryption {
|
|||
}
|
||||
|
||||
}
|
||||
// this will be passed to the Room in notifyRoomKeys
|
||||
return newSessions;
|
||||
}
|
||||
}
|
||||
|
||||
applyRoomKeyChanges(newSessions) {
|
||||
// retry decryption with the new sessions
|
||||
if (newSessions.length) {
|
||||
console.log(`I have ${newSessions.length} new inbound group sessions`, newSessions)
|
||||
class SessionCache {
|
||||
constructor() {
|
||||
this._sessions = [];
|
||||
}
|
||||
|
||||
get(roomId, senderKey, sessionId) {
|
||||
const idx = this._sessions.findIndex(s => {
|
||||
return s.roomId === roomId &&
|
||||
s.senderKey === senderKey &&
|
||||
sessionId === s.session.session_id();
|
||||
});
|
||||
if (idx !== -1) {
|
||||
const entry = this._sessions[idx];
|
||||
// move to top
|
||||
if (idx > 0) {
|
||||
this._sessions.splice(idx, 1);
|
||||
this._sessions.unshift(entry);
|
||||
}
|
||||
return entry.session;
|
||||
}
|
||||
}
|
||||
|
||||
add(roomId, senderKey, session) {
|
||||
// add new at top
|
||||
this._sessions.unshift({roomId, senderKey, session});
|
||||
if (this._sessions.length > CACHE_MAX_SIZE) {
|
||||
// free sessions we're about to remove
|
||||
for (let i = CACHE_MAX_SIZE; i < this._sessions.length; i += 1) {
|
||||
this._sessions[i].session.free();
|
||||
}
|
||||
this._sessions = this._sessions.slice(0, CACHE_MAX_SIZE);
|
||||
}
|
||||
}
|
||||
|
||||
dispose() {
|
||||
for (const entry of this._sessions) {
|
||||
entry.session.free();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,13 +36,14 @@ export class Encryption {
|
|||
let roomKeyMessage;
|
||||
let encryptedContent;
|
||||
try {
|
||||
// TODO: we could consider keeping the session in memory for the current room
|
||||
let sessionEntry = await txn.outboundGroupSessions.get(roomId);
|
||||
if (sessionEntry) {
|
||||
session.unpickle(this._pickleKey, sessionEntry.session);
|
||||
}
|
||||
if (!sessionEntry || this._needsToRotate(session, sessionEntry.createdAt, encryptionParams)) {
|
||||
// in the case of rotating, recreate a session as we already unpickled into it
|
||||
if (session) {
|
||||
if (sessionEntry) {
|
||||
session.free();
|
||||
session = new this._olm.OutboundGroupSession();
|
||||
}
|
||||
|
@ -114,6 +115,11 @@ export class Encryption {
|
|||
session_id: session.session_id(),
|
||||
session_key: session.session_key(),
|
||||
algorithm: MEGOLM_ALGORITHM,
|
||||
// if we need to do this, do we need to create
|
||||
// the room key message after or before having encrypted
|
||||
// with the new session? I guess before as we do now
|
||||
// because the chain_index is where you should start decrypting?
|
||||
//
|
||||
// chain_index: session.message_index()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,13 +115,13 @@ export class Decryption {
|
|||
try {
|
||||
payload = JSON.parse(plaintext);
|
||||
} catch (err) {
|
||||
throw new DecryptionError("Could not JSON decode plaintext", event, {plaintext, err});
|
||||
throw new DecryptionError("PLAINTEXT_NOT_JSON", event, {plaintext, err});
|
||||
}
|
||||
this._validatePayload(payload, event);
|
||||
return {event: payload, senderKey};
|
||||
} else {
|
||||
throw new DecryptionError("Didn't find any session to decrypt with", event,
|
||||
{sessionIds: senderKeyDecryption.sessions.map(s => s.id)});
|
||||
throw new DecryptionError("OLM_NO_MATCHING_SESSION", event,
|
||||
{knownSessionIds: senderKeyDecryption.sessions.map(s => s.id)});
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,8 @@ import {WrappedError} from "../error.js"
|
|||
import {fetchOrLoadMembers} from "./members/load.js";
|
||||
import {MemberList} from "./members/MemberList.js";
|
||||
import {Heroes} from "./members/Heroes.js";
|
||||
import {EventEntry} from "./timeline/entries/EventEntry.js";
|
||||
import {EventKey} from "./timeline/EventKey.js";
|
||||
|
||||
export class Room extends EventEmitter {
|
||||
constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents, user, createRoomEncryption}) {
|
||||
|
@ -45,6 +47,79 @@ export class Room extends EventEmitter {
|
|||
this._roomEncryption = null;
|
||||
}
|
||||
|
||||
async notifyRoomKeys(roomKeys) {
|
||||
if (this._roomEncryption) {
|
||||
// array of {data, source}
|
||||
let retryEntries = this._roomEncryption.applyRoomKeys(roomKeys);
|
||||
let decryptedEntries = [];
|
||||
if (retryEntries.length) {
|
||||
// groupSessionDecryptions can be written, the other stores not
|
||||
const txn = await this._storage.readWriteTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.inboundGroupSessions,
|
||||
this._storage.storeNames.groupSessionDecryptions,
|
||||
]);
|
||||
try {
|
||||
for (const retryEntry of retryEntries) {
|
||||
const {data: eventKey} = retryEntry;
|
||||
let entry = this._timeline?.findEntry(eventKey);
|
||||
if (!entry) {
|
||||
const storageEntry = await txn.timelineEvents.get(this._roomId, eventKey.fragmentId, eventKey.entryIndex);
|
||||
if (storageEntry) {
|
||||
entry = new EventEntry(storageEntry, this._fragmentIdComparer);
|
||||
}
|
||||
}
|
||||
if (entry) {
|
||||
entry = await this._decryptEntry(entry, txn, retryEntry.isSync);
|
||||
decryptedEntries.push(entry);
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
}
|
||||
if (this._timeline) {
|
||||
// only adds if already present
|
||||
this._timeline.replaceEntries(decryptedEntries);
|
||||
}
|
||||
// pass decryptedEntries to roomSummary
|
||||
}
|
||||
}
|
||||
|
||||
_enableEncryption(encryptionParams) {
|
||||
this._roomEncryption = this._createRoomEncryption(this, encryptionParams);
|
||||
if (this._roomEncryption) {
|
||||
this._sendQueue.enableEncryption(this._roomEncryption);
|
||||
if (this._timeline) {
|
||||
this._timeline.enableEncryption(this._decryptEntries.bind(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async _decryptEntry(entry, txn, isSync) {
|
||||
if (entry.eventType === "m.room.encrypted") {
|
||||
try {
|
||||
const {fragmentId, entryIndex} = entry;
|
||||
const key = new EventKey(fragmentId, entryIndex);
|
||||
const decryptedEvent = await this._roomEncryption.decrypt(
|
||||
entry.event, isSync, key, txn);
|
||||
if (decryptedEvent) {
|
||||
entry.replaceWithDecrypted(decryptedEvent);
|
||||
}
|
||||
} catch (err) {
|
||||
console.warn("event decryption error", err, entry.event);
|
||||
entry.setDecryptionError(err);
|
||||
}
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
|
||||
async _decryptEntries(entries, txn, isSync = false) {
|
||||
return await Promise.all(entries.map(async e => this._decryptEntry(e, txn, isSync)));
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async writeSync(roomResponse, membership, isInitialSync, txn) {
|
||||
const isTimelineOpen = !!this._timeline;
|
||||
|
@ -53,7 +128,13 @@ export class Room extends EventEmitter {
|
|||
membership,
|
||||
isInitialSync, isTimelineOpen,
|
||||
txn);
|
||||
const {entries, newLiveKey, memberChanges} = await this._syncWriter.writeSync(roomResponse, txn);
|
||||
const {entries: encryptedEntries, newLiveKey, memberChanges} =
|
||||
await this._syncWriter.writeSync(roomResponse, txn);
|
||||
// decrypt if applicable
|
||||
let entries = encryptedEntries;
|
||||
if (this._roomEncryption) {
|
||||
entries = await this._decryptEntries(encryptedEntries, txn, true);
|
||||
}
|
||||
// fetch new members while we have txn open,
|
||||
// but don't make any in-memory changes yet
|
||||
let heroChanges;
|
||||
|
@ -85,12 +166,8 @@ export class Room extends EventEmitter {
|
|||
/** @package */
|
||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges}) {
|
||||
this._syncWriter.afterSync(newLiveKey);
|
||||
// encryption got enabled
|
||||
if (!this._summary.encryption && summaryChanges.encryption && !this._roomEncryption) {
|
||||
this._roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption);
|
||||
if (this._roomEncryption) {
|
||||
this._sendQueue.enableEncryption(this._roomEncryption);
|
||||
}
|
||||
this._enableEncryption(summaryChanges.encryption);
|
||||
}
|
||||
if (memberChanges.size) {
|
||||
if (this._changedMembersDuringSync) {
|
||||
|
@ -139,10 +216,7 @@ export class Room extends EventEmitter {
|
|||
try {
|
||||
this._summary.load(summary);
|
||||
if (this._summary.encryption) {
|
||||
this._roomEncryption = this._createRoomEncryption(this, this._summary.encryption);
|
||||
if (this._roomEncryption) {
|
||||
this._sendQueue.enableEncryption(this._roomEncryption);
|
||||
}
|
||||
this._enableEncryption(this._summary.encryption);
|
||||
}
|
||||
// need to load members for name?
|
||||
if (this._summary.needsHeroes) {
|
||||
|
@ -200,11 +274,18 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
}).response();
|
||||
|
||||
const txn = await this._storage.readWriteTxn([
|
||||
let stores = [
|
||||
this._storage.storeNames.pendingEvents,
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
]);
|
||||
];
|
||||
if (this._roomEncryption) {
|
||||
stores = stores.concat([
|
||||
this._storage.storeNames.inboundGroupSessions,
|
||||
this._storage.storeNames.groupSessionDecryptions,
|
||||
]);
|
||||
}
|
||||
const txn = await this._storage.readWriteTxn(stores);
|
||||
let removedPendingEvents;
|
||||
let gapResult;
|
||||
try {
|
||||
|
@ -214,9 +295,12 @@ export class Room extends EventEmitter {
|
|||
const gapWriter = new GapWriter({
|
||||
roomId: this._roomId,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
});
|
||||
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
||||
if (this._roomEncryption) {
|
||||
gapResult.entries = await this._decryptEntries(gapResult.entries, txn, false);
|
||||
}
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
|
@ -341,9 +425,15 @@ export class Room extends EventEmitter {
|
|||
closeCallback: () => {
|
||||
console.log(`closing the timeline for ${this._roomId}`);
|
||||
this._timeline = null;
|
||||
if (this._roomEncryption) {
|
||||
this._roomEncryption.notifyTimelineClosed();
|
||||
}
|
||||
},
|
||||
user: this._user,
|
||||
});
|
||||
if (this._roomEncryption) {
|
||||
this._timeline.enableEncryption(this._decryptEntries.bind(this));
|
||||
}
|
||||
await this._timeline.load();
|
||||
return this._timeline;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import {SortedArray, MappedList, ConcatList} from "../../../observable/index.js"
|
|||
import {Direction} from "./Direction.js";
|
||||
import {TimelineReader} from "./persistence/TimelineReader.js";
|
||||
import {PendingEventEntry} from "./entries/PendingEventEntry.js";
|
||||
import {EventEntry} from "./entries/EventEntry.js";
|
||||
|
||||
export class Timeline {
|
||||
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user}) {
|
||||
|
@ -45,6 +46,27 @@ export class Timeline {
|
|||
this._remoteEntries.setManySorted(entries);
|
||||
}
|
||||
|
||||
findEntry(eventKey) {
|
||||
// a storage event entry has a fragmentId and eventIndex property, used for sorting,
|
||||
// just like an EventKey, so this will work, but perhaps a bit brittle.
|
||||
const entry = new EventEntry(eventKey, this._fragmentIdComparer);
|
||||
try {
|
||||
const idx = this._remoteEntries.indexOf(entry);
|
||||
if (idx !== -1) {
|
||||
return this._remoteEntries.get(idx);
|
||||
}
|
||||
} catch (err) {
|
||||
// fragmentIdComparer threw, ignore
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
replaceEntries(entries) {
|
||||
for (const entry of entries) {
|
||||
this._remoteEntries.replace(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should we rather have generic methods for
|
||||
// - adding new entries
|
||||
// - updating existing entries (redaction, relations)
|
||||
|
@ -84,4 +106,8 @@ export class Timeline {
|
|||
this._closeCallback = null;
|
||||
}
|
||||
}
|
||||
|
||||
enableEncryption(decryptEntries) {
|
||||
this._timelineReader.enableEncryption(decryptEntries);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,12 @@ export class EventEntry extends BaseEntry {
|
|||
constructor(eventEntry, fragmentIdComparer) {
|
||||
super(fragmentIdComparer);
|
||||
this._eventEntry = eventEntry;
|
||||
this._decryptionError = null;
|
||||
this._decryptedEvent = null;
|
||||
}
|
||||
|
||||
get event() {
|
||||
return this._eventEntry.event;
|
||||
}
|
||||
|
||||
get fragmentId() {
|
||||
|
@ -32,7 +38,7 @@ export class EventEntry extends BaseEntry {
|
|||
}
|
||||
|
||||
get content() {
|
||||
return this._eventEntry.event.content;
|
||||
return this._decryptedEvent?.content || this._eventEntry.event.content;
|
||||
}
|
||||
|
||||
get prevContent() {
|
||||
|
@ -40,7 +46,7 @@ export class EventEntry extends BaseEntry {
|
|||
}
|
||||
|
||||
get eventType() {
|
||||
return this._eventEntry.event.type;
|
||||
return this._decryptedEvent?.type || this._eventEntry.event.type;
|
||||
}
|
||||
|
||||
get stateKey() {
|
||||
|
@ -66,4 +72,12 @@ export class EventEntry extends BaseEntry {
|
|||
get id() {
|
||||
return this._eventEntry.event.event_id;
|
||||
}
|
||||
|
||||
replaceWithDecrypted(event) {
|
||||
this._decryptedEvent = event;
|
||||
}
|
||||
|
||||
setDecryptionError(err) {
|
||||
this._decryptionError = err;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,18 +24,41 @@ export class TimelineReader {
|
|||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
this._fragmentIdComparer = fragmentIdComparer;
|
||||
this._decryptEntries = null;
|
||||
}
|
||||
|
||||
enableEncryption(decryptEntries) {
|
||||
this._decryptEntries = decryptEntries;
|
||||
}
|
||||
|
||||
_openTxn() {
|
||||
return this._storage.readTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
]);
|
||||
if (this._decryptEntries) {
|
||||
return this._storage.readWriteTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
this._storage.storeNames.inboundGroupSessions,
|
||||
this._storage.storeNames.groupSessionDecryptions,
|
||||
]);
|
||||
|
||||
} else {
|
||||
return this._storage.readTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
async readFrom(eventKey, direction, amount) {
|
||||
const txn = await this._openTxn();
|
||||
return this._readFrom(eventKey, direction, amount, txn);
|
||||
let entries;
|
||||
try {
|
||||
entries = await this._readFrom(eventKey, direction, amount, txn);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
return entries;
|
||||
}
|
||||
|
||||
async _readFrom(eventKey, direction, amount, txn) {
|
||||
|
@ -50,7 +73,10 @@ export class TimelineReader {
|
|||
} else {
|
||||
eventsWithinFragment = await timelineStore.eventsBefore(this._roomId, eventKey, amount);
|
||||
}
|
||||
const eventEntries = eventsWithinFragment.map(e => new EventEntry(e, this._fragmentIdComparer));
|
||||
let eventEntries = eventsWithinFragment.map(e => new EventEntry(e, this._fragmentIdComparer));
|
||||
if (this._decryptEntries) {
|
||||
eventEntries = await this._decryptEntries(eventEntries, txn);
|
||||
}
|
||||
entries = directionalConcat(entries, eventEntries, direction);
|
||||
// prepend or append eventsWithinFragment to entries, and wrap them in EventEntry
|
||||
|
||||
|
@ -78,22 +104,24 @@ export class TimelineReader {
|
|||
|
||||
async readFromEnd(amount) {
|
||||
const txn = await this._openTxn();
|
||||
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
|
||||
// room hasn't been synced yet
|
||||
if (!liveFragment) {
|
||||
return [];
|
||||
let entries;
|
||||
try {
|
||||
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
|
||||
// room hasn't been synced yet
|
||||
if (!liveFragment) {
|
||||
entries = [];
|
||||
} else {
|
||||
this._fragmentIdComparer.add(liveFragment);
|
||||
const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
|
||||
const eventKey = liveFragmentEntry.asEventKey();
|
||||
entries = await this._readFrom(eventKey, Direction.Backward, amount, txn);
|
||||
entries.unshift(liveFragmentEntry);
|
||||
}
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
this._fragmentIdComparer.add(liveFragment);
|
||||
const liveFragmentEntry = FragmentBoundaryEntry.end(liveFragment, this._fragmentIdComparer);
|
||||
const eventKey = liveFragmentEntry.asEventKey();
|
||||
const entries = await this._readFrom(eventKey, Direction.Backward, amount, txn);
|
||||
entries.unshift(liveFragmentEntry);
|
||||
await txn.complete();
|
||||
return entries;
|
||||
}
|
||||
|
||||
// reads distance up and down from eventId
|
||||
// or just expose eventIdToKey?
|
||||
readAtEventId(eventId, distance) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ export const STORE_NAMES = Object.freeze([
|
|||
"olmSessions",
|
||||
"inboundGroupSessions",
|
||||
"outboundGroupSessions",
|
||||
"groupSessionDecryptions",
|
||||
]);
|
||||
|
||||
export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => {
|
||||
|
|
|
@ -29,6 +29,7 @@ import {DeviceIdentityStore} from "./stores/DeviceIdentityStore.js";
|
|||
import {OlmSessionStore} from "./stores/OlmSessionStore.js";
|
||||
import {InboundGroupSessionStore} from "./stores/InboundGroupSessionStore.js";
|
||||
import {OutboundGroupSessionStore} from "./stores/OutboundGroupSessionStore.js";
|
||||
import {GroupSessionDecryptionStore} from "./stores/GroupSessionDecryptionStore.js";
|
||||
|
||||
export class Transaction {
|
||||
constructor(txn, allowedStoreNames) {
|
||||
|
@ -105,6 +106,11 @@ export class Transaction {
|
|||
get outboundGroupSessions() {
|
||||
return this._store("outboundGroupSessions", idbStore => new OutboundGroupSessionStore(idbStore));
|
||||
}
|
||||
|
||||
get groupSessionDecryptions() {
|
||||
return this._store("groupSessionDecryptions", idbStore => new GroupSessionDecryptionStore(idbStore));
|
||||
}
|
||||
|
||||
complete() {
|
||||
return txnAsPromise(this._txn);
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ export const schema = [
|
|||
createOlmSessionStore,
|
||||
createInboundGroupSessionsStore,
|
||||
createOutboundGroupSessionsStore,
|
||||
createGroupSessionDecryptions,
|
||||
];
|
||||
// TODO: how to deal with git merge conflicts of this array?
|
||||
|
||||
|
@ -89,3 +90,7 @@ function createOutboundGroupSessionsStore(db) {
|
|||
db.createObjectStore("outboundGroupSessions", {keyPath: "roomId"});
|
||||
}
|
||||
|
||||
//v8
|
||||
function createGroupSessionDecryptions(db) {
|
||||
db.createObjectStore("groupSessionDecryptions", {keyPath: "key"});
|
||||
}
|
||||
|
|
34
src/matrix/storage/idb/stores/GroupSessionDecryptionStore.js
Normal file
34
src/matrix/storage/idb/stores/GroupSessionDecryptionStore.js
Normal file
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
function encodeKey(roomId, sessionId, messageIndex) {
|
||||
return `${roomId}|${sessionId}|${messageIndex}`;
|
||||
}
|
||||
|
||||
export class GroupSessionDecryptionStore {
|
||||
constructor(store) {
|
||||
this._store = store;
|
||||
}
|
||||
|
||||
get(roomId, sessionId, messageIndex) {
|
||||
return this._store.get(encodeKey(roomId, sessionId, messageIndex));
|
||||
}
|
||||
|
||||
set(decryption) {
|
||||
decryption.key = encodeKey(decryption.roomId, decryption.sessionId, decryption.messageIndex);
|
||||
this._store.put(decryption);
|
||||
}
|
||||
}
|
|
@ -29,6 +29,10 @@ export class InboundGroupSessionStore {
|
|||
return key === fetchedKey;
|
||||
}
|
||||
|
||||
get(roomId, senderKey, sessionId) {
|
||||
return this._store.get(encodeKey(roomId, senderKey, sessionId));
|
||||
}
|
||||
|
||||
set(session) {
|
||||
session.key = encodeKey(session.roomId, session.senderKey, session.sessionId);
|
||||
this._store.put(session);
|
||||
|
|
|
@ -41,6 +41,22 @@ export class SortedArray extends BaseObservableList {
|
|||
}
|
||||
}
|
||||
|
||||
replace(item) {
|
||||
const idx = this.indexOf(item);
|
||||
if (idx !== -1) {
|
||||
this._items[idx] = item;
|
||||
}
|
||||
}
|
||||
|
||||
indexOf(item) {
|
||||
const idx = sortedIndex(this._items, item, this._comparator);
|
||||
if (idx < this._items.length && this._comparator(this._items[idx], item) === 0) {
|
||||
return idx;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
set(item, updateParams = null) {
|
||||
const idx = sortedIndex(this._items, item, this._comparator);
|
||||
if (idx >= this._items.length || this._comparator(this._items[idx], item) !== 0) {
|
||||
|
|
Loading…
Reference in a new issue