forked from mystiq/hydrogen-web
add more sync logging
This commit is contained in:
parent
e14929bd4f
commit
f321968ac3
8 changed files with 164 additions and 147 deletions
|
@ -89,6 +89,7 @@ export class LogItem {
|
|||
if (!filter.filter(this, children)) {
|
||||
return null;
|
||||
}
|
||||
// in (v)alues, (l)abel and (t)ype are also reserved.
|
||||
const item = {
|
||||
// (s)tart
|
||||
s: this._start,
|
||||
|
|
|
@ -34,7 +34,7 @@ export class DeviceMessageHandler {
|
|||
/**
|
||||
* @return {bool} whether messages are waiting to be decrypted and `decryptPending` should be called.
|
||||
*/
|
||||
async writeSync(toDeviceEvents, txn) {
|
||||
async writeSync(toDeviceEvents, txn, log) {
|
||||
const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted");
|
||||
if (!encryptedEvents.length) {
|
||||
return false;
|
||||
|
@ -53,14 +53,14 @@ export class DeviceMessageHandler {
|
|||
* @param {[type]} txn [description]
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
async _writeDecryptedEvents(olmResults, txn) {
|
||||
async _writeDecryptedEvents(olmResults, txn, log) {
|
||||
const megOlmRoomKeysResults = olmResults.filter(r => {
|
||||
return r.event?.type === "m.room_key" && r.event.content?.algorithm === MEGOLM_ALGORITHM;
|
||||
});
|
||||
let roomKeys;
|
||||
log.set("roomKeyCount", megOlmRoomKeysResults.length);
|
||||
if (megOlmRoomKeysResults.length) {
|
||||
console.log("new room keys", megOlmRoomKeysResults);
|
||||
roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysResults, txn);
|
||||
roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysResults, txn, log);
|
||||
}
|
||||
return {roomKeys};
|
||||
}
|
||||
|
@ -76,12 +76,13 @@ export class DeviceMessageHandler {
|
|||
}
|
||||
|
||||
// not safe to call multiple times without awaiting first call
|
||||
async decryptPending(rooms) {
|
||||
async decryptPending(rooms, log) {
|
||||
if (!this._olmDecryption) {
|
||||
return;
|
||||
}
|
||||
const readTxn = this._storage.readTxn([this._storage.storeNames.session]);
|
||||
const pendingEvents = await this._getPendingEvents(readTxn);
|
||||
log.set("eventCount", pendingEvents.length);
|
||||
if (pendingEvents.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
@ -89,7 +90,7 @@ export class DeviceMessageHandler {
|
|||
const olmEvents = pendingEvents.filter(e => e.content?.algorithm === OLM_ALGORITHM);
|
||||
const decryptChanges = await this._olmDecryption.decryptAll(olmEvents);
|
||||
for (const err of decryptChanges.errors) {
|
||||
console.warn("decryption failed for event", err, err.event);
|
||||
log.child("decrypt_error").catch(err);
|
||||
}
|
||||
const txn = this._storage.readWriteTxn([
|
||||
// both to remove the pending events and to modify the olm account
|
||||
|
@ -99,7 +100,7 @@ export class DeviceMessageHandler {
|
|||
]);
|
||||
let changes;
|
||||
try {
|
||||
changes = await this._writeDecryptedEvents(decryptChanges.results, txn);
|
||||
changes = await this._writeDecryptedEvents(decryptChanges.results, txn, log);
|
||||
decryptChanges.write(txn);
|
||||
txn.session.remove(PENDING_ENCRYPTED_EVENTS);
|
||||
} catch (err) {
|
||||
|
|
|
@ -374,7 +374,7 @@ export class Session {
|
|||
}
|
||||
|
||||
/** @internal */
|
||||
async writeSync(syncResponse, syncFilterId, txn) {
|
||||
async writeSync(syncResponse, syncFilterId, txn, log) {
|
||||
const changes = {
|
||||
syncInfo: null,
|
||||
e2eeAccountChanges: null,
|
||||
|
@ -390,20 +390,20 @@ export class Session {
|
|||
|
||||
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
|
||||
if (this._e2eeAccount && deviceOneTimeKeysCount) {
|
||||
changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn);
|
||||
changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn, log);
|
||||
}
|
||||
|
||||
if (this._deviceTracker) {
|
||||
const deviceLists = syncResponse.device_lists;
|
||||
if (deviceLists) {
|
||||
await this._deviceTracker.writeDeviceChanges(deviceLists, txn);
|
||||
await log.wrap("deviceTracker", log => this._deviceTracker.writeDeviceChanges(deviceLists, txn, log));
|
||||
}
|
||||
}
|
||||
|
||||
const toDeviceEvents = syncResponse.to_device?.events;
|
||||
if (Array.isArray(toDeviceEvents)) {
|
||||
changes.deviceMessageDecryptionPending =
|
||||
await this._deviceMessageHandler.writeSync(toDeviceEvents, txn);
|
||||
await log.wrap("deviceMsgs", log => this._deviceMessageHandler.writeSync(toDeviceEvents, txn, log));
|
||||
}
|
||||
|
||||
// store account data
|
||||
|
@ -430,10 +430,10 @@ export class Session {
|
|||
}
|
||||
|
||||
/** @internal */
|
||||
async afterSyncCompleted(changes, isCatchupSync) {
|
||||
async afterSyncCompleted(changes, isCatchupSync, log) {
|
||||
const promises = [];
|
||||
if (changes.deviceMessageDecryptionPending) {
|
||||
promises.push(this._deviceMessageHandler.decryptPending(this.rooms));
|
||||
promises.push(log.wrap("decryptPending", log => this._deviceMessageHandler.decryptPending(this.rooms, log)));
|
||||
}
|
||||
// we don't start uploading one-time keys until we've caught up with
|
||||
// to-device messages, to help us avoid throwing away one-time-keys that we
|
||||
|
@ -442,7 +442,7 @@ export class Session {
|
|||
if (!isCatchupSync) {
|
||||
const needsToUploadOTKs = await this._e2eeAccount.generateOTKsIfNeeded(this._storage);
|
||||
if (needsToUploadOTKs) {
|
||||
promises.push(this._e2eeAccount.uploadKeys(this._storage));
|
||||
promises.push(log.wrap("uploadKeys", log => this._e2eeAccount.uploadKeys(this._storage, log)));
|
||||
}
|
||||
}
|
||||
if (promises.length) {
|
||||
|
|
|
@ -90,21 +90,17 @@ export class Sync {
|
|||
this._syncLoop(syncToken);
|
||||
}
|
||||
|
||||
_createLogFilter(filter, log) {
|
||||
if (log.duration >= 2000 || log.error || this._status.get() === SyncStatus.CatchupSync) {
|
||||
return filter.minLevel(log.level.Detail);
|
||||
} else {
|
||||
return filter.minLevel(log.level.Info);
|
||||
}
|
||||
}
|
||||
|
||||
async _syncLoop(syncToken) {
|
||||
// if syncToken is falsy, it will first do an initial sync ...
|
||||
while(this._status.get() !== SyncStatus.Stopped) {
|
||||
let roomStates;
|
||||
let sessionChanges;
|
||||
let wasCatchup = this._status.get() === SyncStatus.CatchupSync;
|
||||
await this._logger.run("sync", async log => {
|
||||
log.set("token", syncToken);
|
||||
log.set("status", this._status.get());
|
||||
try {
|
||||
console.log(`starting sync request with since ${syncToken} ...`);
|
||||
// unless we are happily syncing already, we want the server to return
|
||||
// as quickly as possible, even if there are no events queued. This
|
||||
// serves two purposes:
|
||||
|
@ -117,11 +113,7 @@ export class Sync {
|
|||
// for us. We do that by calling it with a zero timeout until it
|
||||
// doesn't give us any more to_device messages.
|
||||
const timeout = this._status.get() === SyncStatus.Syncing ? INCREMENTAL_TIMEOUT : 0;
|
||||
const syncResult = await this._logger.run("sync",
|
||||
log => this._syncRequest(syncToken, timeout, log),
|
||||
this._logger.level.Info,
|
||||
this._createLogFilter.bind(this)
|
||||
);
|
||||
const syncResult = await this._syncRequest(syncToken, timeout, log);
|
||||
syncToken = syncResult.syncToken;
|
||||
roomStates = syncResult.roomStates;
|
||||
sessionChanges = syncResult.sessionChanges;
|
||||
|
@ -135,13 +127,16 @@ export class Sync {
|
|||
// retry same request on timeout
|
||||
if (err.name === "ConnectionError" && err.isTimeout) {
|
||||
// don't run afterSyncCompleted
|
||||
continue;
|
||||
return;
|
||||
}
|
||||
this._error = err;
|
||||
if (err.name !== "AbortError") {
|
||||
console.warn("stopping sync because of error");
|
||||
console.error(err);
|
||||
// sync wasn't asked to stop, but is stopping
|
||||
// because of the error.
|
||||
log.error = err;
|
||||
log.logLevel = log.level.Fatal;
|
||||
}
|
||||
log.set("stopping", true);
|
||||
this._status.set(SyncStatus.Stopped);
|
||||
}
|
||||
if (this._status.get() !== SyncStatus.Stopped) {
|
||||
|
@ -150,19 +145,26 @@ export class Sync {
|
|||
// should we move this inside _syncRequest?
|
||||
// Alternatively, we can try to fix the OTK upload issue while still
|
||||
// running in parallel.
|
||||
await this._runAfterSyncCompleted(sessionChanges, roomStates);
|
||||
await log.wrap("afterSyncCompleted", log => this._runAfterSyncCompleted(sessionChanges, roomStates, log));
|
||||
}
|
||||
},
|
||||
this._logger.level.Info,
|
||||
(filter, log) => {
|
||||
if (log.duration >= 2000 || log.error || wasCatchup) {
|
||||
return filter.minLevel(log.level.Detail);
|
||||
} else {
|
||||
return filter.minLevel(log.level.Info);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async _runAfterSyncCompleted(sessionChanges, roomStates) {
|
||||
async _runAfterSyncCompleted(sessionChanges, roomStates, log) {
|
||||
const isCatchupSync = this._status.get() === SyncStatus.CatchupSync;
|
||||
const sessionPromise = (async () => {
|
||||
try {
|
||||
await this._session.afterSyncCompleted(sessionChanges, isCatchupSync);
|
||||
} catch (err) {
|
||||
console.error("error during session afterSyncCompleted, continuing", err.stack);
|
||||
}
|
||||
await log.wrap("session", log => this._session.afterSyncCompleted(sessionChanges, isCatchupSync, log));
|
||||
} catch (err) {} // error is logged, but don't fail sessionPromise
|
||||
})();
|
||||
|
||||
const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => {
|
||||
|
@ -170,10 +172,8 @@ export class Sync {
|
|||
});
|
||||
const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => {
|
||||
try {
|
||||
await rs.room.afterSyncCompleted(rs.changes);
|
||||
} catch (err) {
|
||||
console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack);
|
||||
}
|
||||
await log.wrap("room", log => rs.room.afterSyncCompleted(rs.changes, log));
|
||||
} catch (err) {} // error is logged, but don't fail roomsPromises
|
||||
});
|
||||
// run everything in parallel,
|
||||
// we don't want to delay the next sync too much
|
||||
|
@ -185,7 +185,7 @@ export class Sync {
|
|||
async _syncRequest(syncToken, timeout, log) {
|
||||
let {syncFilterId} = this._session;
|
||||
if (typeof syncFilterId !== "string") {
|
||||
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}});
|
||||
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}}, {log});
|
||||
syncFilterId = (await this._currentRequest.response()).filter_id;
|
||||
}
|
||||
const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests
|
||||
|
@ -193,19 +193,20 @@ export class Sync {
|
|||
const response = await this._currentRequest.response();
|
||||
|
||||
const isInitialSync = !syncToken;
|
||||
syncToken = response.next_batch;
|
||||
log.set("syncToken", syncToken);
|
||||
log.set("status", this._status.get());
|
||||
|
||||
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
|
||||
await log.wrap("prepare rooms", log => this._prepareRooms(roomStates, log));
|
||||
log.set("roomCount", roomStates.length);
|
||||
|
||||
await log.wrap("prepare", log => this._prepareRooms(roomStates, log));
|
||||
|
||||
let sessionChanges;
|
||||
|
||||
await log.wrap("write", async log => {
|
||||
const syncTxn = this._openSyncTxn();
|
||||
try {
|
||||
sessionChanges = await log.wrap("session.writeSync", log => this._session.writeSync(response, syncFilterId, syncTxn, log));
|
||||
sessionChanges = await log.wrap("session", log => this._session.writeSync(response, syncFilterId, syncTxn, log), log.level.Detail);
|
||||
await Promise.all(roomStates.map(async rs => {
|
||||
rs.changes = await log.wrap("room.writeSync", log => rs.room.writeSync(
|
||||
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
|
||||
rs.changes = await log.wrap("room", log => rs.room.writeSync(
|
||||
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log), log.level.Detail);
|
||||
}));
|
||||
} catch(err) {
|
||||
// avoid corrupting state by only
|
||||
|
@ -214,26 +215,24 @@ export class Sync {
|
|||
try {
|
||||
syncTxn.abort();
|
||||
} catch (abortErr) {
|
||||
console.error("Could not abort sync transaction, the sync response was probably only partially written and may have put storage in a inconsistent state.", abortErr);
|
||||
log.set("couldNotAbortTxn", true);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
try {
|
||||
await syncTxn.complete();
|
||||
console.info("syncTxn committed!!");
|
||||
} catch (err) {
|
||||
console.error("unable to commit sync tranaction");
|
||||
throw err;
|
||||
}
|
||||
this._session.afterSync(sessionChanges);
|
||||
});
|
||||
|
||||
log.wrap("after", log => {
|
||||
log.wrap("session", log => this._session.afterSync(sessionChanges, log), log.level.Detail);
|
||||
// emit room related events after txn has been closed
|
||||
for(let rs of roomStates) {
|
||||
rs.room.afterSync(rs.changes);
|
||||
log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail);
|
||||
}
|
||||
});
|
||||
|
||||
const toDeviceEvents = response.to_device?.events;
|
||||
return {
|
||||
syncToken,
|
||||
syncToken: response.next_batch,
|
||||
roomStates,
|
||||
sessionChanges,
|
||||
hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0,
|
||||
|
@ -250,11 +249,11 @@ export class Sync {
|
|||
async _prepareRooms(roomStates, log) {
|
||||
const prepareTxn = this._openPrepareSyncTxn();
|
||||
await Promise.all(roomStates.map(async rs => {
|
||||
rs.preparation = await log.wrap("room.prepareSync", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log));
|
||||
rs.preparation = await log.wrap("room", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log), log.level.Detail);
|
||||
}));
|
||||
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
|
||||
await prepareTxn.complete();
|
||||
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation)));
|
||||
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation, log)));
|
||||
}
|
||||
|
||||
_openSyncTxn() {
|
||||
|
|
|
@ -80,7 +80,7 @@ export class Account {
|
|||
return this._identityKeys;
|
||||
}
|
||||
|
||||
async uploadKeys(storage) {
|
||||
async uploadKeys(storage, log) {
|
||||
const oneTimeKeys = JSON.parse(this._account.one_time_keys());
|
||||
// only one algorithm supported by olm atm, so hardcode its name
|
||||
const oneTimeKeysEntries = Object.entries(oneTimeKeys.curve25519);
|
||||
|
@ -93,8 +93,9 @@ export class Account {
|
|||
if (oneTimeKeysEntries.length) {
|
||||
payload.one_time_keys = this._oneTimeKeysPayload(oneTimeKeysEntries);
|
||||
}
|
||||
const response = await this._hsApi.uploadKeys(payload).response();
|
||||
const response = await this._hsApi.uploadKeys(payload, {log}).response();
|
||||
this._serverOTKCount = response?.one_time_key_counts?.signed_curve25519;
|
||||
log.set("serverOTKCount", this._serverOTKCount);
|
||||
// TODO: should we not modify this in the txn like we do elsewhere?
|
||||
// we'd have to pickle and unpickle the account to clone it though ...
|
||||
// and the upload has succeed at this point, so in-memory would be correct
|
||||
|
@ -173,11 +174,12 @@ export class Account {
|
|||
txn.session.set(ACCOUNT_SESSION_KEY, this._account.pickle(this._pickleKey));
|
||||
}
|
||||
|
||||
writeSync(deviceOneTimeKeysCount, txn) {
|
||||
writeSync(deviceOneTimeKeysCount, txn, log) {
|
||||
// we only upload signed_curve25519 otks
|
||||
const otkCount = deviceOneTimeKeysCount.signed_curve25519 || 0;
|
||||
if (Number.isSafeInteger(otkCount) && otkCount !== this._serverOTKCount) {
|
||||
txn.session.set(SERVER_OTK_COUNT_SESSION_KEY, otkCount);
|
||||
log.set("otkCount", otkCount);
|
||||
return otkCount;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -337,7 +337,7 @@ export class RoomEncryption {
|
|||
return id;
|
||||
}
|
||||
|
||||
async flushPendingRoomKeyShares(hsApi, operations = null) {
|
||||
async flushPendingRoomKeyShares(hsApi, operations, log) {
|
||||
// this has to be reentrant as it can be called from Room.start while still running
|
||||
if (this._isFlushingRoomKeyShares) {
|
||||
return;
|
||||
|
|
|
@ -122,19 +122,25 @@ export class Decryption {
|
|||
* @param {[type]} txn a storage transaction with read/write on inboundGroupSessions
|
||||
* @return {Promise<Array<MegolmInboundSessionDescription>>} an array with the newly added sessions
|
||||
*/
|
||||
async addRoomKeys(decryptionResults, txn) {
|
||||
async addRoomKeys(decryptionResults, txn, log) {
|
||||
const newSessions = [];
|
||||
for (const {senderCurve25519Key: senderKey, event, claimedEd25519Key} of decryptionResults) {
|
||||
await log.wrap("room_key", async log => {
|
||||
const roomId = event.content?.["room_id"];
|
||||
const sessionId = event.content?.["session_id"];
|
||||
const sessionKey = event.content?.["session_key"];
|
||||
|
||||
log.set("roomId", roomId);
|
||||
log.set("sessionId", sessionId);
|
||||
|
||||
if (
|
||||
typeof roomId !== "string" ||
|
||||
typeof sessionId !== "string" ||
|
||||
typeof senderKey !== "string" ||
|
||||
typeof sessionKey !== "string"
|
||||
) {
|
||||
log.logLevel = log.level.Warn;
|
||||
log.set("invalid", true);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -149,6 +155,7 @@ export class Decryption {
|
|||
} finally {
|
||||
session.free();
|
||||
}
|
||||
}, log.level.Detail);
|
||||
}
|
||||
// this will be passed to the Room in notifyRoomKeys
|
||||
return newSessions;
|
||||
|
|
|
@ -176,11 +176,12 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
|
||||
async prepareSync(roomResponse, membership, txn, log) {
|
||||
log.set("roomId", this.id);
|
||||
log.set("id", this.id);
|
||||
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
||||
let roomEncryption = this._roomEncryption;
|
||||
// encryption is enabled in this sync
|
||||
if (!roomEncryption && summaryChanges.encryption) {
|
||||
log.set("enableEncryption", true);
|
||||
roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption);
|
||||
}
|
||||
|
||||
|
@ -204,16 +205,19 @@ export class Room extends EventEmitter {
|
|||
};
|
||||
}
|
||||
|
||||
async afterPrepareSync(preparation) {
|
||||
async afterPrepareSync(preparation, parentLog) {
|
||||
if (preparation.decryptPreparation) {
|
||||
await parentLog.wrap("afterPrepareSync decrypt", async log => {
|
||||
log.set("id", this.id);
|
||||
preparation.decryptChanges = await preparation.decryptPreparation.decrypt();
|
||||
preparation.decryptPreparation = null;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn, log) {
|
||||
log.set("roomId", this.id);
|
||||
log.set("id", this.id);
|
||||
const {entries, newLiveKey, memberChanges} =
|
||||
await this._syncWriter.writeSync(roomResponse, txn);
|
||||
if (decryptChanges) {
|
||||
|
@ -259,7 +263,8 @@ export class Room extends EventEmitter {
|
|||
* Called with the changes returned from `writeSync` to apply them and emit changes.
|
||||
* No storage or network operations should be done here.
|
||||
*/
|
||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}) {
|
||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}, log) {
|
||||
log.set("id", this.id);
|
||||
this._syncWriter.afterSync(newLiveKey);
|
||||
this._setEncryption(roomEncryption);
|
||||
if (memberChanges.size) {
|
||||
|
@ -310,9 +315,11 @@ export class Room extends EventEmitter {
|
|||
* Can be used to do longer running operations that resulted from the last sync,
|
||||
* like network operations.
|
||||
*/
|
||||
async afterSyncCompleted() {
|
||||
async afterSyncCompleted(changes, log) {
|
||||
log.set("id", this.id);
|
||||
if (this._roomEncryption) {
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi);
|
||||
// TODO: pass log to flushPendingRoomKeyShares once we also have a logger in `start`
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue