Merge pull request #234 from vector-im/bwindels/sync-logging
Better sync logging
This commit is contained in:
commit
b2621b3001
17 changed files with 276 additions and 210 deletions
|
@ -135,18 +135,35 @@
|
||||||
flex: 1;
|
flex: 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
.timeline div.item.level-3 {
|
.timeline .item.level-3 {
|
||||||
--brightness: 90%;
|
--brightness: 90%;
|
||||||
}
|
}
|
||||||
|
|
||||||
.timeline div.item.level-6 {
|
.timeline .item.level-2 {
|
||||||
|
--brightness: 95%;
|
||||||
|
}
|
||||||
|
|
||||||
|
.timeline .item.level-5 {
|
||||||
|
--brightness: 80%;
|
||||||
|
}
|
||||||
|
|
||||||
|
.timeline .item.level-6, .timeline .item.level-7 {
|
||||||
--hue: 0deg !important;
|
--hue: 0deg !important;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.timeline .item.level-7 {
|
||||||
|
--brightness: 50%;
|
||||||
|
color: white;
|
||||||
|
}
|
||||||
|
|
||||||
.timeline div.item.type-network {
|
.timeline div.item.type-network {
|
||||||
--hue: 30deg;
|
--hue: 30deg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.timeline div.item.type-navigation {
|
||||||
|
--hue: 200deg;
|
||||||
|
}
|
||||||
|
|
||||||
.timeline div.item.selected {
|
.timeline div.item.selected {
|
||||||
background-color: Highlight;
|
background-color: Highlight;
|
||||||
border-color: Highlight;
|
border-color: Highlight;
|
||||||
|
|
|
@ -68,7 +68,7 @@ function showItemDetails(item, parent, itemNode) {
|
||||||
t.ul({class: "values"}, Object.entries(itemValues(item)).map(([key, value]) => {
|
t.ul({class: "values"}, Object.entries(itemValues(item)).map(([key, value]) => {
|
||||||
return t.li([
|
return t.li([
|
||||||
t.span({className: "key"}, normalizeValueKey(key)),
|
t.span({className: "key"}, normalizeValueKey(key)),
|
||||||
t.span({className: "value"}, value)
|
t.span({className: "value"}, value+"")
|
||||||
]);
|
]);
|
||||||
})),
|
})),
|
||||||
t.p(expandButton)
|
t.p(expandButton)
|
||||||
|
@ -128,11 +128,22 @@ function itemLevel(item) { return item.l; }
|
||||||
function itemLabel(item) { return item.v?.l; }
|
function itemLabel(item) { return item.v?.l; }
|
||||||
function itemType(item) { return item.v?.t; }
|
function itemType(item) { return item.v?.t; }
|
||||||
function itemError(item) { return item.e; }
|
function itemError(item) { return item.e; }
|
||||||
|
function itemShortErrorMessage(item) {
|
||||||
|
if (itemError(item)) {
|
||||||
|
const e = itemError(item);
|
||||||
|
return e.name || e.stack.substr(0, e.stack.indexOf("\n"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
function itemCaption(item) {
|
function itemCaption(item) {
|
||||||
if (itemType(item) === "network") {
|
if (itemType(item) === "network") {
|
||||||
return `${itemValues(item)?.method} ${itemValues(item)?.url}`;
|
return `${itemValues(item)?.method} ${itemValues(item)?.url}`;
|
||||||
} else if (itemLabel(item) && itemValues(item)?.id) {
|
} else if (itemLabel(item) && itemValues(item)?.id) {
|
||||||
return `${itemLabel(item)} ${itemValues(item).id}`;
|
return `${itemLabel(item)} ${itemValues(item).id}`;
|
||||||
|
} else if (itemLabel(item) && itemValues(item)?.status) {
|
||||||
|
return `${itemLabel(item)} (${itemValues(item).status})`;
|
||||||
|
} else if (itemLabel(item) && itemError(item)) {
|
||||||
|
return `${itemLabel(item)} (${itemShortErrorMessage(item)})`;
|
||||||
} else {
|
} else {
|
||||||
return itemLabel(item) || itemType(item);
|
return itemLabel(item) || itemType(item);
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,12 @@ export class BaseLogger {
|
||||||
this._platform = platform;
|
this._platform = platform;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log(labelOrValues, logLevel = LogLevel.Info) {
|
||||||
|
const item = new LogItem(labelOrValues, logLevel, null, this._platform.clock);
|
||||||
|
item._end = item._start;
|
||||||
|
this._persistItem(item.serialize(null));
|
||||||
|
}
|
||||||
|
|
||||||
run(labelOrValues, callback, logLevel = LogLevel.Info, filterCreator = null) {
|
run(labelOrValues, callback, logLevel = LogLevel.Info, filterCreator = null) {
|
||||||
const item = new LogItem(labelOrValues, logLevel, null, this._platform.clock);
|
const item = new LogItem(labelOrValues, logLevel, null, this._platform.clock);
|
||||||
this._openItems.add(item);
|
this._openItems.add(item);
|
||||||
|
@ -31,7 +37,7 @@ export class BaseLogger {
|
||||||
let filter = new LogFilter();
|
let filter = new LogFilter();
|
||||||
if (filterCreator) {
|
if (filterCreator) {
|
||||||
try {
|
try {
|
||||||
filter = filterCreator(filter, this);
|
filter = filterCreator(filter, item);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("Error while creating log filter", err);
|
console.error("Error while creating log filter", err);
|
||||||
}
|
}
|
||||||
|
@ -40,7 +46,7 @@ export class BaseLogger {
|
||||||
filter = filter.minLevel(logLevel);
|
filter = filter.minLevel(logLevel);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
const serialized = item.serialize(filter, 0);
|
const serialized = item.serialize(filter);
|
||||||
if (serialized) {
|
if (serialized) {
|
||||||
this._persistItem(serialized);
|
this._persistItem(serialized);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ import {
|
||||||
reqAsPromise,
|
reqAsPromise,
|
||||||
iterateCursor,
|
iterateCursor,
|
||||||
fetchResults,
|
fetchResults,
|
||||||
encodeUint64
|
|
||||||
} from "../matrix/storage/idb/utils.js";
|
} from "../matrix/storage/idb/utils.js";
|
||||||
import {BaseLogger} from "./BaseLogger.js";
|
import {BaseLogger} from "./BaseLogger.js";
|
||||||
|
|
||||||
|
@ -30,9 +29,6 @@ export class IDBLogger extends BaseLogger {
|
||||||
const {name, flushInterval = 60 * 1000, limit = 3000} = options;
|
const {name, flushInterval = 60 * 1000, limit = 3000} = options;
|
||||||
this._name = name;
|
this._name = name;
|
||||||
this._limit = limit;
|
this._limit = limit;
|
||||||
// does not get loaded from idb on startup as we only use it to
|
|
||||||
// differentiate between two items with the same start time
|
|
||||||
this._itemCounter = 0;
|
|
||||||
this._queuedItems = this._loadQueuedItems();
|
this._queuedItems = this._loadQueuedItems();
|
||||||
// TODO: also listen for unload just in case sync keeps on running after pagehide is fired?
|
// TODO: also listen for unload just in case sync keeps on running after pagehide is fired?
|
||||||
window.addEventListener("pagehide", this, false);
|
window.addEventListener("pagehide", this, false);
|
||||||
|
@ -82,6 +78,7 @@ export class IDBLogger extends BaseLogger {
|
||||||
|
|
||||||
_finishAllAndFlush() {
|
_finishAllAndFlush() {
|
||||||
this._finishOpenItems();
|
this._finishOpenItems();
|
||||||
|
this.log({l: "pagehide, closing logs", t: "navigation"});
|
||||||
this._persistQueuedItems(this._queuedItems);
|
this._persistQueuedItems(this._queuedItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,13 +97,11 @@ export class IDBLogger extends BaseLogger {
|
||||||
}
|
}
|
||||||
|
|
||||||
_openDB() {
|
_openDB() {
|
||||||
return openDatabase(this._name, db => db.createObjectStore("logs", {keyPath: "id"}), 1);
|
return openDatabase(this._name, db => db.createObjectStore("logs", {keyPath: "id", autoIncrement: true}), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
_persistItem(serializedItem) {
|
_persistItem(serializedItem) {
|
||||||
this._itemCounter += 1;
|
|
||||||
this._queuedItems.push({
|
this._queuedItems.push({
|
||||||
id: `${encodeUint64(serializedItem.s)}:${this._itemCounter}`,
|
|
||||||
json: JSON.stringify(serializedItem)
|
json: JSON.stringify(serializedItem)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -126,10 +121,7 @@ export class IDBLogger extends BaseLogger {
|
||||||
const logs = txn.objectStore("logs");
|
const logs = txn.objectStore("logs");
|
||||||
const storedItems = await fetchResults(logs.openCursor(), () => false);
|
const storedItems = await fetchResults(logs.openCursor(), () => false);
|
||||||
const allItems = storedItems.concat(this._queuedItems);
|
const allItems = storedItems.concat(this._queuedItems);
|
||||||
const sortedItems = allItems.sort((a, b) => {
|
return new IDBLogExport(allItems, this, this._platform);
|
||||||
return a.id > b.id;
|
|
||||||
});
|
|
||||||
return new IDBLogExport(sortedItems, this, this._platform);
|
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
db.close();
|
db.close();
|
||||||
|
@ -179,7 +171,8 @@ class IDBLogExport {
|
||||||
|
|
||||||
asBlob() {
|
asBlob() {
|
||||||
const log = {
|
const log = {
|
||||||
version: 1,
|
formatVersion: 1,
|
||||||
|
appVersion: this._platform.updateService?.version,
|
||||||
items: this._items.map(i => JSON.parse(i.json))
|
items: this._items.map(i => JSON.parse(i.json))
|
||||||
};
|
};
|
||||||
const json = JSON.stringify(log);
|
const json = JSON.stringify(log);
|
||||||
|
|
|
@ -17,30 +17,28 @@ limitations under the License.
|
||||||
export const LogLevel = {
|
export const LogLevel = {
|
||||||
All: 1,
|
All: 1,
|
||||||
Debug: 2,
|
Debug: 2,
|
||||||
Info: 3,
|
Detail: 3,
|
||||||
Warn: 4,
|
Info: 4,
|
||||||
Error: 5,
|
Warn: 5,
|
||||||
Fatal: 6,
|
Error: 6,
|
||||||
Off: 7,
|
Fatal: 7,
|
||||||
|
Off: 8,
|
||||||
}
|
}
|
||||||
|
|
||||||
export class LogFilter {
|
export class LogFilter {
|
||||||
constructor(parentFilter) {
|
constructor(parentFilter) {
|
||||||
this._parentFilter = parentFilter;
|
this._parentFilter = parentFilter;
|
||||||
this._min = null;
|
this._min = null;
|
||||||
this._maxDepth = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
filter(item, children, depth) {
|
filter(item, children) {
|
||||||
if (this._parentFilter) {
|
if (this._parentFilter) {
|
||||||
if (!this._parentFilter.filter(item, children, depth)) {
|
if (!this._parentFilter.filter(item, children)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// neither our children or us have a loglevel high enough, filter out.
|
// neither our children or us have a loglevel high enough, filter out.
|
||||||
if (this._min !== null && children === null && item.logLevel < this._min) {
|
if (this._min !== null && !Array.isArray(children) && item.logLevel < this._min) {
|
||||||
return false;
|
|
||||||
} if (this._maxDepth !== null && depth > this._maxDepth) {
|
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
return true;
|
return true;
|
||||||
|
@ -52,9 +50,4 @@ export class LogFilter {
|
||||||
this._min = logLevel;
|
this._min = logLevel;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
maxDepth(depth) {
|
|
||||||
this._maxDepth = depth;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ export class LogItem {
|
||||||
/**
|
/**
|
||||||
* Creates a new child item and runs it in `callback`.
|
* Creates a new child item and runs it in `callback`.
|
||||||
*/
|
*/
|
||||||
wrap(labelOrValues, callback, logLevel = LogLevel.Info, filterCreator = null) {
|
wrap(labelOrValues, callback, logLevel = null, filterCreator = null) {
|
||||||
const item = this.child(labelOrValues, logLevel, filterCreator);
|
const item = this.child(labelOrValues, logLevel, filterCreator);
|
||||||
return item.run(callback);
|
return item.run(callback);
|
||||||
}
|
}
|
||||||
|
@ -45,13 +45,29 @@ export class LogItem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
durationWithoutType(type) {
|
||||||
|
return this.duration - this.durationOfType(type);
|
||||||
|
}
|
||||||
|
|
||||||
|
durationOfType(type) {
|
||||||
|
if (this._values.t === type) {
|
||||||
|
return this.duration;
|
||||||
|
} else if (this._children) {
|
||||||
|
return this._children.reduce((sum, c) => {
|
||||||
|
return sum + c.durationOfType(type);
|
||||||
|
}, 0);
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new child item that finishes immediately
|
* Creates a new child item that finishes immediately
|
||||||
* and can hence not be modified anymore.
|
* and can hence not be modified anymore.
|
||||||
*
|
*
|
||||||
* Hence, the child item is not returned.
|
* Hence, the child item is not returned.
|
||||||
*/
|
*/
|
||||||
log(labelOrValues, logLevel = LogLevel.Info) {
|
log(labelOrValues, logLevel = null) {
|
||||||
const item = this.child(labelOrValues, logLevel, null);
|
const item = this.child(labelOrValues, logLevel, null);
|
||||||
item.end = item.start;
|
item.end = item.start;
|
||||||
}
|
}
|
||||||
|
@ -65,18 +81,18 @@ export class LogItem {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serialize(filter, depth) {
|
serialize(filter) {
|
||||||
if (this._filterCreator) {
|
if (this._filterCreator) {
|
||||||
try {
|
try {
|
||||||
filter = this._filterCreator(new LogFilter(filter), this);
|
filter = this._filterCreator(new LogFilter(filter), this);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error("Error creating log item", err);
|
console.error("Error creating log filter", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let children;
|
let children;
|
||||||
if (this._children !== null) {
|
if (this._children !== null) {
|
||||||
children = this._children.reduce((array, c) => {
|
children = this._children.reduce((array, c) => {
|
||||||
const s = c.serialize(filter, depth + 1);
|
const s = c.serialize(filter);
|
||||||
if (s) {
|
if (s) {
|
||||||
if (array === null) {
|
if (array === null) {
|
||||||
array = [];
|
array = [];
|
||||||
|
@ -86,9 +102,10 @@ export class LogItem {
|
||||||
return array;
|
return array;
|
||||||
}, null);
|
}, null);
|
||||||
}
|
}
|
||||||
if (!filter.filter(this, children, depth)) {
|
if (filter && !filter.filter(this, children)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
// in (v)alues, (l)abel and (t)ype are also reserved.
|
||||||
const item = {
|
const item = {
|
||||||
// (s)tart
|
// (s)tart
|
||||||
s: this._start,
|
s: this._start,
|
||||||
|
@ -180,6 +197,9 @@ export class LogItem {
|
||||||
if (this._end !== null) {
|
if (this._end !== null) {
|
||||||
console.trace("log item is finished, additional logs will likely not be recorded");
|
console.trace("log item is finished, additional logs will likely not be recorded");
|
||||||
}
|
}
|
||||||
|
if (!logLevel) {
|
||||||
|
logLevel = this.logLevel || LogLevel.Info;
|
||||||
|
}
|
||||||
const item = new LogItem(labelOrValues, logLevel, filterCreator, this._clock);
|
const item = new LogItem(labelOrValues, logLevel, filterCreator, this._clock);
|
||||||
if (this._children === null) {
|
if (this._children === null) {
|
||||||
this._children = [];
|
this._children = [];
|
||||||
|
|
|
@ -20,6 +20,8 @@ export class NullLogger {
|
||||||
this._item = new NullLogItem();
|
this._item = new NullLogItem();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log() {}
|
||||||
|
|
||||||
run(_, callback) {
|
run(_, callback) {
|
||||||
return callback(this._item);
|
return callback(this._item);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,8 +34,12 @@ export class DeviceMessageHandler {
|
||||||
/**
|
/**
|
||||||
* @return {bool} whether messages are waiting to be decrypted and `decryptPending` should be called.
|
* @return {bool} whether messages are waiting to be decrypted and `decryptPending` should be called.
|
||||||
*/
|
*/
|
||||||
async writeSync(toDeviceEvents, txn) {
|
async writeSync(toDeviceEvents, txn, log) {
|
||||||
const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted");
|
const encryptedEvents = toDeviceEvents.filter(e => e.type === "m.room.encrypted");
|
||||||
|
log.set("encryptedCount", encryptedEvents.length);
|
||||||
|
const keyRequestCount = toDeviceEvents.reduce((sum, e) => sum + e.type === "m.room_key_request" ? 1 : 0, 0);
|
||||||
|
log.set("keyRequestCount", keyRequestCount);
|
||||||
|
log.set("otherCount", toDeviceEvents.length - encryptedEvents.length - keyRequestCount);
|
||||||
if (!encryptedEvents.length) {
|
if (!encryptedEvents.length) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -53,14 +57,14 @@ export class DeviceMessageHandler {
|
||||||
* @param {[type]} txn [description]
|
* @param {[type]} txn [description]
|
||||||
* @return {[type]} [description]
|
* @return {[type]} [description]
|
||||||
*/
|
*/
|
||||||
async _writeDecryptedEvents(olmResults, txn) {
|
async _writeDecryptedEvents(olmResults, txn, log) {
|
||||||
const megOlmRoomKeysResults = olmResults.filter(r => {
|
const megOlmRoomKeysResults = olmResults.filter(r => {
|
||||||
return r.event?.type === "m.room_key" && r.event.content?.algorithm === MEGOLM_ALGORITHM;
|
return r.event?.type === "m.room_key" && r.event.content?.algorithm === MEGOLM_ALGORITHM;
|
||||||
});
|
});
|
||||||
let roomKeys;
|
let roomKeys;
|
||||||
|
log.set("roomKeyCount", megOlmRoomKeysResults.length);
|
||||||
if (megOlmRoomKeysResults.length) {
|
if (megOlmRoomKeysResults.length) {
|
||||||
console.log("new room keys", megOlmRoomKeysResults);
|
roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysResults, txn, log);
|
||||||
roomKeys = await this._megolmDecryption.addRoomKeys(megOlmRoomKeysResults, txn);
|
|
||||||
}
|
}
|
||||||
return {roomKeys};
|
return {roomKeys};
|
||||||
}
|
}
|
||||||
|
@ -76,12 +80,13 @@ export class DeviceMessageHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
// not safe to call multiple times without awaiting first call
|
// not safe to call multiple times without awaiting first call
|
||||||
async decryptPending(rooms) {
|
async decryptPending(rooms, log) {
|
||||||
if (!this._olmDecryption) {
|
if (!this._olmDecryption) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const readTxn = this._storage.readTxn([this._storage.storeNames.session]);
|
const readTxn = this._storage.readTxn([this._storage.storeNames.session]);
|
||||||
const pendingEvents = await this._getPendingEvents(readTxn);
|
const pendingEvents = await this._getPendingEvents(readTxn);
|
||||||
|
log.set("eventCount", pendingEvents.length);
|
||||||
if (pendingEvents.length === 0) {
|
if (pendingEvents.length === 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -89,7 +94,7 @@ export class DeviceMessageHandler {
|
||||||
const olmEvents = pendingEvents.filter(e => e.content?.algorithm === OLM_ALGORITHM);
|
const olmEvents = pendingEvents.filter(e => e.content?.algorithm === OLM_ALGORITHM);
|
||||||
const decryptChanges = await this._olmDecryption.decryptAll(olmEvents);
|
const decryptChanges = await this._olmDecryption.decryptAll(olmEvents);
|
||||||
for (const err of decryptChanges.errors) {
|
for (const err of decryptChanges.errors) {
|
||||||
console.warn("decryption failed for event", err, err.event);
|
log.child("decrypt_error").catch(err);
|
||||||
}
|
}
|
||||||
const txn = this._storage.readWriteTxn([
|
const txn = this._storage.readWriteTxn([
|
||||||
// both to remove the pending events and to modify the olm account
|
// both to remove the pending events and to modify the olm account
|
||||||
|
@ -99,7 +104,7 @@ export class DeviceMessageHandler {
|
||||||
]);
|
]);
|
||||||
let changes;
|
let changes;
|
||||||
try {
|
try {
|
||||||
changes = await this._writeDecryptedEvents(decryptChanges.results, txn);
|
changes = await this._writeDecryptedEvents(decryptChanges.results, txn, log);
|
||||||
decryptChanges.write(txn);
|
decryptChanges.write(txn);
|
||||||
txn.session.remove(PENDING_ENCRYPTED_EVENTS);
|
txn.session.remove(PENDING_ENCRYPTED_EVENTS);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|
|
@ -374,7 +374,7 @@ export class Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @internal */
|
/** @internal */
|
||||||
async writeSync(syncResponse, syncFilterId, txn) {
|
async writeSync(syncResponse, syncFilterId, txn, log) {
|
||||||
const changes = {
|
const changes = {
|
||||||
syncInfo: null,
|
syncInfo: null,
|
||||||
e2eeAccountChanges: null,
|
e2eeAccountChanges: null,
|
||||||
|
@ -390,20 +390,18 @@ export class Session {
|
||||||
|
|
||||||
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
|
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
|
||||||
if (this._e2eeAccount && deviceOneTimeKeysCount) {
|
if (this._e2eeAccount && deviceOneTimeKeysCount) {
|
||||||
changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn);
|
changes.e2eeAccountChanges = this._e2eeAccount.writeSync(deviceOneTimeKeysCount, txn, log);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this._deviceTracker) {
|
const deviceLists = syncResponse.device_lists;
|
||||||
const deviceLists = syncResponse.device_lists;
|
if (this._deviceTracker && Array.isArray(deviceLists?.changed) && deviceLists.changed.length) {
|
||||||
if (deviceLists) {
|
await log.wrap("deviceLists", log => this._deviceTracker.writeDeviceChanges(deviceLists.changed, txn, log));
|
||||||
await this._deviceTracker.writeDeviceChanges(deviceLists, txn);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const toDeviceEvents = syncResponse.to_device?.events;
|
const toDeviceEvents = syncResponse.to_device?.events;
|
||||||
if (Array.isArray(toDeviceEvents)) {
|
if (Array.isArray(toDeviceEvents) && toDeviceEvents.length) {
|
||||||
changes.deviceMessageDecryptionPending =
|
changes.deviceMessageDecryptionPending =
|
||||||
await this._deviceMessageHandler.writeSync(toDeviceEvents, txn);
|
await log.wrap("deviceMsgs", log => this._deviceMessageHandler.writeSync(toDeviceEvents, txn, log));
|
||||||
}
|
}
|
||||||
|
|
||||||
// store account data
|
// store account data
|
||||||
|
@ -430,10 +428,10 @@ export class Session {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @internal */
|
/** @internal */
|
||||||
async afterSyncCompleted(changes, isCatchupSync) {
|
async afterSyncCompleted(changes, isCatchupSync, log) {
|
||||||
const promises = [];
|
const promises = [];
|
||||||
if (changes.deviceMessageDecryptionPending) {
|
if (changes.deviceMessageDecryptionPending) {
|
||||||
promises.push(this._deviceMessageHandler.decryptPending(this.rooms));
|
promises.push(log.wrap("decryptPending", log => this._deviceMessageHandler.decryptPending(this.rooms, log)));
|
||||||
}
|
}
|
||||||
// we don't start uploading one-time keys until we've caught up with
|
// we don't start uploading one-time keys until we've caught up with
|
||||||
// to-device messages, to help us avoid throwing away one-time-keys that we
|
// to-device messages, to help us avoid throwing away one-time-keys that we
|
||||||
|
@ -442,7 +440,7 @@ export class Session {
|
||||||
if (!isCatchupSync) {
|
if (!isCatchupSync) {
|
||||||
const needsToUploadOTKs = await this._e2eeAccount.generateOTKsIfNeeded(this._storage);
|
const needsToUploadOTKs = await this._e2eeAccount.generateOTKsIfNeeded(this._storage);
|
||||||
if (needsToUploadOTKs) {
|
if (needsToUploadOTKs) {
|
||||||
promises.push(this._e2eeAccount.uploadKeys(this._storage));
|
promises.push(log.wrap("uploadKeys", log => this._e2eeAccount.uploadKeys(this._storage, log)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (promises.length) {
|
if (promises.length) {
|
||||||
|
|
|
@ -95,73 +95,75 @@ export class Sync {
|
||||||
while(this._status.get() !== SyncStatus.Stopped) {
|
while(this._status.get() !== SyncStatus.Stopped) {
|
||||||
let roomStates;
|
let roomStates;
|
||||||
let sessionChanges;
|
let sessionChanges;
|
||||||
try {
|
let wasCatchupOrInitial = this._status.get() === SyncStatus.CatchupSync || this._status.get() === SyncStatus.InitialSync;
|
||||||
console.log(`starting sync request with since ${syncToken} ...`);
|
await this._logger.run("sync", async log => {
|
||||||
// unless we are happily syncing already, we want the server to return
|
log.set("token", syncToken);
|
||||||
// as quickly as possible, even if there are no events queued. This
|
log.set("status", this._status.get());
|
||||||
// serves two purposes:
|
try {
|
||||||
//
|
// unless we are happily syncing already, we want the server to return
|
||||||
// * When the connection dies, we want to know asap when it comes back,
|
// as quickly as possible, even if there are no events queued. This
|
||||||
// so that we can hide the error from the user. (We don't want to
|
// serves two purposes:
|
||||||
// have to wait for an event or a timeout).
|
//
|
||||||
//
|
// * When the connection dies, we want to know asap when it comes back,
|
||||||
// * We want to know if the server has any to_device messages queued up
|
// so that we can hide the error from the user. (We don't want to
|
||||||
// for us. We do that by calling it with a zero timeout until it
|
// have to wait for an event or a timeout).
|
||||||
// doesn't give us any more to_device messages.
|
//
|
||||||
const timeout = this._status.get() === SyncStatus.Syncing ? INCREMENTAL_TIMEOUT : 0;
|
// * We want to know if the server has any to_device messages queued up
|
||||||
const syncResult = await this._logger.run("sync",
|
// for us. We do that by calling it with a zero timeout until it
|
||||||
log => this._syncRequest(syncToken, timeout, log),
|
// doesn't give us any more to_device messages.
|
||||||
this._logger.level.Info,
|
const timeout = this._status.get() === SyncStatus.Syncing ? INCREMENTAL_TIMEOUT : 0;
|
||||||
(filter, log) => {
|
const syncResult = await this._syncRequest(syncToken, timeout, log);
|
||||||
if (log.duration >= 2000 || this._status.get() === SyncStatus.CatchupSync) {
|
syncToken = syncResult.syncToken;
|
||||||
return filter.minLevel(log.level.Info);
|
roomStates = syncResult.roomStates;
|
||||||
} else if (log.error) {
|
sessionChanges = syncResult.sessionChanges;
|
||||||
return filter.minLevel(log.level.Error);
|
// initial sync or catchup sync
|
||||||
} else {
|
if (this._status.get() !== SyncStatus.Syncing && syncResult.hadToDeviceMessages) {
|
||||||
return filter.maxDepth(0);
|
this._status.set(SyncStatus.CatchupSync);
|
||||||
}
|
} else {
|
||||||
});
|
this._status.set(SyncStatus.Syncing);
|
||||||
syncToken = syncResult.syncToken;
|
}
|
||||||
roomStates = syncResult.roomStates;
|
} catch (err) {
|
||||||
sessionChanges = syncResult.sessionChanges;
|
// retry same request on timeout
|
||||||
// initial sync or catchup sync
|
if (err.name === "ConnectionError" && err.isTimeout) {
|
||||||
if (this._status.get() !== SyncStatus.Syncing && syncResult.hadToDeviceMessages) {
|
// don't run afterSyncCompleted
|
||||||
this._status.set(SyncStatus.CatchupSync);
|
return;
|
||||||
|
}
|
||||||
|
this._error = err;
|
||||||
|
if (err.name !== "AbortError") {
|
||||||
|
// sync wasn't asked to stop, but is stopping
|
||||||
|
// because of the error.
|
||||||
|
log.error = err;
|
||||||
|
log.logLevel = log.level.Fatal;
|
||||||
|
}
|
||||||
|
log.set("stopping", true);
|
||||||
|
this._status.set(SyncStatus.Stopped);
|
||||||
|
}
|
||||||
|
if (this._status.get() !== SyncStatus.Stopped) {
|
||||||
|
// TODO: if we're not going to run this phase in parallel with the next
|
||||||
|
// sync request (because this causes OTKs to be uploaded twice)
|
||||||
|
// should we move this inside _syncRequest?
|
||||||
|
// Alternatively, we can try to fix the OTK upload issue while still
|
||||||
|
// running in parallel.
|
||||||
|
await log.wrap("afterSyncCompleted", log => this._runAfterSyncCompleted(sessionChanges, roomStates, log));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
this._logger.level.Info,
|
||||||
|
(filter, log) => {
|
||||||
|
if (log.durationWithoutType("network") >= 2000 || log.error || wasCatchupOrInitial) {
|
||||||
|
return filter.minLevel(log.level.Detail);
|
||||||
} else {
|
} else {
|
||||||
this._status.set(SyncStatus.Syncing);
|
return filter.minLevel(log.level.Info);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
});
|
||||||
// retry same request on timeout
|
|
||||||
if (err.name === "ConnectionError" && err.isTimeout) {
|
|
||||||
// don't run afterSyncCompleted
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
this._error = err;
|
|
||||||
if (err.name !== "AbortError") {
|
|
||||||
console.warn("stopping sync because of error");
|
|
||||||
console.error(err);
|
|
||||||
}
|
|
||||||
this._status.set(SyncStatus.Stopped);
|
|
||||||
}
|
|
||||||
if (this._status.get() !== SyncStatus.Stopped) {
|
|
||||||
// TODO: if we're not going to run this phase in parallel with the next
|
|
||||||
// sync request (because this causes OTKs to be uploaded twice)
|
|
||||||
// should we move this inside _syncRequest?
|
|
||||||
// Alternatively, we can try to fix the OTK upload issue while still
|
|
||||||
// running in parallel.
|
|
||||||
await this._runAfterSyncCompleted(sessionChanges, roomStates);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _runAfterSyncCompleted(sessionChanges, roomStates) {
|
async _runAfterSyncCompleted(sessionChanges, roomStates, log) {
|
||||||
const isCatchupSync = this._status.get() === SyncStatus.CatchupSync;
|
const isCatchupSync = this._status.get() === SyncStatus.CatchupSync;
|
||||||
const sessionPromise = (async () => {
|
const sessionPromise = (async () => {
|
||||||
try {
|
try {
|
||||||
await this._session.afterSyncCompleted(sessionChanges, isCatchupSync);
|
await log.wrap("session", log => this._session.afterSyncCompleted(sessionChanges, isCatchupSync, log), log.level.Detail);
|
||||||
} catch (err) {
|
} catch (err) {} // error is logged, but don't fail sessionPromise
|
||||||
console.error("error during session afterSyncCompleted, continuing", err.stack);
|
|
||||||
}
|
|
||||||
})();
|
})();
|
||||||
|
|
||||||
const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => {
|
const roomsNeedingAfterSyncCompleted = roomStates.filter(rs => {
|
||||||
|
@ -169,10 +171,8 @@ export class Sync {
|
||||||
});
|
});
|
||||||
const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => {
|
const roomsPromises = roomsNeedingAfterSyncCompleted.map(async rs => {
|
||||||
try {
|
try {
|
||||||
await rs.room.afterSyncCompleted(rs.changes);
|
await log.wrap("room", log => rs.room.afterSyncCompleted(rs.changes, log), log.level.Detail);
|
||||||
} catch (err) {
|
} catch (err) {} // error is logged, but don't fail roomsPromises
|
||||||
console.error(`error during room ${rs.room.id} afterSyncCompleted, continuing`, err.stack);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
// run everything in parallel,
|
// run everything in parallel,
|
||||||
// we don't want to delay the next sync too much
|
// we don't want to delay the next sync too much
|
||||||
|
@ -184,7 +184,7 @@ export class Sync {
|
||||||
async _syncRequest(syncToken, timeout, log) {
|
async _syncRequest(syncToken, timeout, log) {
|
||||||
let {syncFilterId} = this._session;
|
let {syncFilterId} = this._session;
|
||||||
if (typeof syncFilterId !== "string") {
|
if (typeof syncFilterId !== "string") {
|
||||||
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}});
|
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}}, {log});
|
||||||
syncFilterId = (await this._currentRequest.response()).filter_id;
|
syncFilterId = (await this._currentRequest.response()).filter_id;
|
||||||
}
|
}
|
||||||
const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests
|
const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests
|
||||||
|
@ -192,47 +192,44 @@ export class Sync {
|
||||||
const response = await this._currentRequest.response();
|
const response = await this._currentRequest.response();
|
||||||
|
|
||||||
const isInitialSync = !syncToken;
|
const isInitialSync = !syncToken;
|
||||||
syncToken = response.next_batch;
|
|
||||||
log.set("syncToken", syncToken);
|
|
||||||
log.set("status", this._status.get());
|
|
||||||
|
|
||||||
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
|
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
|
||||||
await log.wrap("prepare rooms", log => this._prepareRooms(roomStates, log));
|
|
||||||
|
await log.wrap("prepare", log => this._prepareRooms(roomStates, log));
|
||||||
|
|
||||||
let sessionChanges;
|
let sessionChanges;
|
||||||
const syncTxn = this._openSyncTxn();
|
await log.wrap("write", async log => {
|
||||||
try {
|
const syncTxn = this._openSyncTxn();
|
||||||
sessionChanges = await log.wrap("session.writeSync", log => this._session.writeSync(response, syncFilterId, syncTxn, log));
|
|
||||||
await Promise.all(roomStates.map(async rs => {
|
|
||||||
rs.changes = await log.wrap("room.writeSync", log => rs.room.writeSync(
|
|
||||||
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
|
|
||||||
}));
|
|
||||||
} catch(err) {
|
|
||||||
// avoid corrupting state by only
|
|
||||||
// storing the sync up till the point
|
|
||||||
// the exception occurred
|
|
||||||
try {
|
try {
|
||||||
syncTxn.abort();
|
sessionChanges = await log.wrap("session", log => this._session.writeSync(response, syncFilterId, syncTxn, log));
|
||||||
} catch (abortErr) {
|
await Promise.all(roomStates.map(async rs => {
|
||||||
console.error("Could not abort sync transaction, the sync response was probably only partially written and may have put storage in a inconsistent state.", abortErr);
|
rs.changes = await log.wrap("room", log => rs.room.writeSync(
|
||||||
|
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
|
||||||
|
}));
|
||||||
|
} catch(err) {
|
||||||
|
// avoid corrupting state by only
|
||||||
|
// storing the sync up till the point
|
||||||
|
// the exception occurred
|
||||||
|
try {
|
||||||
|
syncTxn.abort();
|
||||||
|
} catch (abortErr) {
|
||||||
|
log.set("couldNotAbortTxn", true);
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
}
|
}
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
await syncTxn.complete();
|
await syncTxn.complete();
|
||||||
console.info("syncTxn committed!!");
|
});
|
||||||
} catch (err) {
|
|
||||||
console.error("unable to commit sync tranaction");
|
log.wrap("after", log => {
|
||||||
throw err;
|
log.wrap("session", log => this._session.afterSync(sessionChanges, log), log.level.Detail);
|
||||||
}
|
// emit room related events after txn has been closed
|
||||||
this._session.afterSync(sessionChanges);
|
for(let rs of roomStates) {
|
||||||
// emit room related events after txn has been closed
|
log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail);
|
||||||
for(let rs of roomStates) {
|
}
|
||||||
rs.room.afterSync(rs.changes);
|
});
|
||||||
}
|
|
||||||
|
|
||||||
const toDeviceEvents = response.to_device?.events;
|
const toDeviceEvents = response.to_device?.events;
|
||||||
return {
|
return {
|
||||||
syncToken,
|
syncToken: response.next_batch,
|
||||||
roomStates,
|
roomStates,
|
||||||
sessionChanges,
|
sessionChanges,
|
||||||
hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0,
|
hadToDeviceMessages: Array.isArray(toDeviceEvents) && toDeviceEvents.length > 0,
|
||||||
|
@ -249,11 +246,11 @@ export class Sync {
|
||||||
async _prepareRooms(roomStates, log) {
|
async _prepareRooms(roomStates, log) {
|
||||||
const prepareTxn = this._openPrepareSyncTxn();
|
const prepareTxn = this._openPrepareSyncTxn();
|
||||||
await Promise.all(roomStates.map(async rs => {
|
await Promise.all(roomStates.map(async rs => {
|
||||||
rs.preparation = await log.wrap("room.prepareSync", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log));
|
rs.preparation = await log.wrap("room", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log), log.level.Detail);
|
||||||
}));
|
}));
|
||||||
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
|
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
|
||||||
await prepareTxn.complete();
|
await prepareTxn.complete();
|
||||||
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation)));
|
await Promise.all(roomStates.map(rs => rs.room.afterPrepareSync(rs.preparation, log)));
|
||||||
}
|
}
|
||||||
|
|
||||||
_openSyncTxn() {
|
_openSyncTxn() {
|
||||||
|
|
|
@ -93,8 +93,9 @@ export class Account {
|
||||||
if (oneTimeKeysEntries.length) {
|
if (oneTimeKeysEntries.length) {
|
||||||
payload.one_time_keys = this._oneTimeKeysPayload(oneTimeKeysEntries);
|
payload.one_time_keys = this._oneTimeKeysPayload(oneTimeKeysEntries);
|
||||||
}
|
}
|
||||||
const response = await this._hsApi.uploadKeys(payload).response();
|
const response = await this._hsApi.uploadKeys(payload, /*{log}*/).response();
|
||||||
this._serverOTKCount = response?.one_time_key_counts?.signed_curve25519;
|
this._serverOTKCount = response?.one_time_key_counts?.signed_curve25519;
|
||||||
|
// log.set("serverOTKCount", this._serverOTKCount);
|
||||||
// TODO: should we not modify this in the txn like we do elsewhere?
|
// TODO: should we not modify this in the txn like we do elsewhere?
|
||||||
// we'd have to pickle and unpickle the account to clone it though ...
|
// we'd have to pickle and unpickle the account to clone it though ...
|
||||||
// and the upload has succeed at this point, so in-memory would be correct
|
// and the upload has succeed at this point, so in-memory would be correct
|
||||||
|
@ -173,11 +174,12 @@ export class Account {
|
||||||
txn.session.set(ACCOUNT_SESSION_KEY, this._account.pickle(this._pickleKey));
|
txn.session.set(ACCOUNT_SESSION_KEY, this._account.pickle(this._pickleKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
writeSync(deviceOneTimeKeysCount, txn) {
|
writeSync(deviceOneTimeKeysCount, txn, log) {
|
||||||
// we only upload signed_curve25519 otks
|
// we only upload signed_curve25519 otks
|
||||||
const otkCount = deviceOneTimeKeysCount.signed_curve25519 || 0;
|
const otkCount = deviceOneTimeKeysCount.signed_curve25519 || 0;
|
||||||
if (Number.isSafeInteger(otkCount) && otkCount !== this._serverOTKCount) {
|
if (Number.isSafeInteger(otkCount) && otkCount !== this._serverOTKCount) {
|
||||||
txn.session.set(SERVER_OTK_COUNT_SESSION_KEY, otkCount);
|
txn.session.set(SERVER_OTK_COUNT_SESSION_KEY, otkCount);
|
||||||
|
log.set("otkCount", otkCount);
|
||||||
return otkCount;
|
return otkCount;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ export class DeviceTracker {
|
||||||
this._ownDeviceId = ownDeviceId;
|
this._ownDeviceId = ownDeviceId;
|
||||||
}
|
}
|
||||||
|
|
||||||
async writeDeviceChanges(deviceLists, txn) {
|
async writeDeviceChanges(changed, txn, log) {
|
||||||
const {userIdentities} = txn;
|
const {userIdentities} = txn;
|
||||||
// TODO: should we also look at left here to handle this?:
|
// TODO: should we also look at left here to handle this?:
|
||||||
// the usual problem here is that you share a room with a user,
|
// the usual problem here is that you share a room with a user,
|
||||||
|
@ -52,15 +52,15 @@ export class DeviceTracker {
|
||||||
// At which point you come online, all of this happens in the gap,
|
// At which point you come online, all of this happens in the gap,
|
||||||
// and you don't notice that they ever left,
|
// and you don't notice that they ever left,
|
||||||
// and so the client doesn't invalidate their device cache for the user
|
// and so the client doesn't invalidate their device cache for the user
|
||||||
if (Array.isArray(deviceLists.changed) && deviceLists.changed.length) {
|
log.set("changed", changed.length);
|
||||||
await Promise.all(deviceLists.changed.map(async userId => {
|
await Promise.all(changed.map(async userId => {
|
||||||
const user = await userIdentities.get(userId);
|
const user = await userIdentities.get(userId);
|
||||||
if (user) {
|
if (user) {
|
||||||
user.deviceTrackingStatus = TRACKING_STATUS_OUTDATED;
|
log.log({l: "outdated", id: userId});
|
||||||
userIdentities.set(user);
|
user.deviceTrackingStatus = TRACKING_STATUS_OUTDATED;
|
||||||
}
|
userIdentities.set(user);
|
||||||
}));
|
}
|
||||||
}
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
writeMemberChanges(room, memberChanges, txn) {
|
writeMemberChanges(room, memberChanges, txn) {
|
||||||
|
|
|
@ -337,7 +337,7 @@ export class RoomEncryption {
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
async flushPendingRoomKeyShares(hsApi, operations = null) {
|
async flushPendingRoomKeyShares(hsApi, operations, log) {
|
||||||
// this has to be reentrant as it can be called from Room.start while still running
|
// this has to be reentrant as it can be called from Room.start while still running
|
||||||
if (this._isFlushingRoomKeyShares) {
|
if (this._isFlushingRoomKeyShares) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -122,33 +122,40 @@ export class Decryption {
|
||||||
* @param {[type]} txn a storage transaction with read/write on inboundGroupSessions
|
* @param {[type]} txn a storage transaction with read/write on inboundGroupSessions
|
||||||
* @return {Promise<Array<MegolmInboundSessionDescription>>} an array with the newly added sessions
|
* @return {Promise<Array<MegolmInboundSessionDescription>>} an array with the newly added sessions
|
||||||
*/
|
*/
|
||||||
async addRoomKeys(decryptionResults, txn) {
|
async addRoomKeys(decryptionResults, txn, log) {
|
||||||
const newSessions = [];
|
const newSessions = [];
|
||||||
for (const {senderCurve25519Key: senderKey, event, claimedEd25519Key} of decryptionResults) {
|
for (const {senderCurve25519Key: senderKey, event, claimedEd25519Key} of decryptionResults) {
|
||||||
const roomId = event.content?.["room_id"];
|
await log.wrap("room_key", async log => {
|
||||||
const sessionId = event.content?.["session_id"];
|
const roomId = event.content?.["room_id"];
|
||||||
const sessionKey = event.content?.["session_key"];
|
const sessionId = event.content?.["session_id"];
|
||||||
|
const sessionKey = event.content?.["session_key"];
|
||||||
|
|
||||||
if (
|
log.set("roomId", roomId);
|
||||||
typeof roomId !== "string" ||
|
log.set("sessionId", sessionId);
|
||||||
typeof sessionId !== "string" ||
|
|
||||||
typeof senderKey !== "string" ||
|
|
||||||
typeof sessionKey !== "string"
|
|
||||||
) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const session = new this._olm.InboundGroupSession();
|
if (
|
||||||
try {
|
typeof roomId !== "string" ||
|
||||||
session.create(sessionKey);
|
typeof sessionId !== "string" ||
|
||||||
const sessionEntry = await this._writeInboundSession(
|
typeof senderKey !== "string" ||
|
||||||
session, roomId, senderKey, claimedEd25519Key, sessionId, txn);
|
typeof sessionKey !== "string"
|
||||||
if (sessionEntry) {
|
) {
|
||||||
newSessions.push(sessionEntry);
|
log.logLevel = log.level.Warn;
|
||||||
|
log.set("invalid", true);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
session.free();
|
const session = new this._olm.InboundGroupSession();
|
||||||
}
|
try {
|
||||||
|
session.create(sessionKey);
|
||||||
|
const sessionEntry = await this._writeInboundSession(
|
||||||
|
session, roomId, senderKey, claimedEd25519Key, sessionId, txn);
|
||||||
|
if (sessionEntry) {
|
||||||
|
newSessions.push(sessionEntry);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
session.free();
|
||||||
|
}
|
||||||
|
}, log.level.Detail);
|
||||||
}
|
}
|
||||||
// this will be passed to the Room in notifyRoomKeys
|
// this will be passed to the Room in notifyRoomKeys
|
||||||
return newSessions;
|
return newSessions;
|
||||||
|
|
|
@ -108,7 +108,7 @@ export class HomeServerApi {
|
||||||
if (options?.log) {
|
if (options?.log) {
|
||||||
const parent = options?.log;
|
const parent = options?.log;
|
||||||
log = parent.child({
|
log = parent.child({
|
||||||
kind: "request",
|
t: "network",
|
||||||
url,
|
url,
|
||||||
method,
|
method,
|
||||||
}, parent.level.Info);
|
}, parent.level.Info);
|
||||||
|
|
|
@ -176,11 +176,12 @@ export class Room extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
async prepareSync(roomResponse, membership, txn, log) {
|
async prepareSync(roomResponse, membership, txn, log) {
|
||||||
log.set("roomId", this.id);
|
log.set("id", this.id);
|
||||||
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
||||||
let roomEncryption = this._roomEncryption;
|
let roomEncryption = this._roomEncryption;
|
||||||
// encryption is enabled in this sync
|
// encryption is enabled in this sync
|
||||||
if (!roomEncryption && summaryChanges.encryption) {
|
if (!roomEncryption && summaryChanges.encryption) {
|
||||||
|
log.set("enableEncryption", true);
|
||||||
roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption);
|
roomEncryption = this._createRoomEncryption(this, summaryChanges.encryption);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,18 +205,21 @@ export class Room extends EventEmitter {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async afterPrepareSync(preparation) {
|
async afterPrepareSync(preparation, parentLog) {
|
||||||
if (preparation.decryptPreparation) {
|
if (preparation.decryptPreparation) {
|
||||||
preparation.decryptChanges = await preparation.decryptPreparation.decrypt();
|
await parentLog.wrap("afterPrepareSync decrypt", async log => {
|
||||||
preparation.decryptPreparation = null;
|
log.set("id", this.id);
|
||||||
|
preparation.decryptChanges = await preparation.decryptPreparation.decrypt();
|
||||||
|
preparation.decryptPreparation = null;
|
||||||
|
}, parentLog.level.Detail);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @package */
|
/** @package */
|
||||||
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn, log) {
|
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn, log) {
|
||||||
log.set("roomId", this.id);
|
log.set("id", this.id);
|
||||||
const {entries, newLiveKey, memberChanges} =
|
const {entries, newLiveKey, memberChanges} =
|
||||||
await this._syncWriter.writeSync(roomResponse, txn);
|
await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, txn, log), log.level.Detail);
|
||||||
if (decryptChanges) {
|
if (decryptChanges) {
|
||||||
const decryption = await decryptChanges.write(txn);
|
const decryption = await decryptChanges.write(txn);
|
||||||
decryption.applyToEntries(entries);
|
decryption.applyToEntries(entries);
|
||||||
|
@ -259,7 +263,8 @@ export class Room extends EventEmitter {
|
||||||
* Called with the changes returned from `writeSync` to apply them and emit changes.
|
* Called with the changes returned from `writeSync` to apply them and emit changes.
|
||||||
* No storage or network operations should be done here.
|
* No storage or network operations should be done here.
|
||||||
*/
|
*/
|
||||||
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}) {
|
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges, roomEncryption}, log) {
|
||||||
|
log.set("id", this.id);
|
||||||
this._syncWriter.afterSync(newLiveKey);
|
this._syncWriter.afterSync(newLiveKey);
|
||||||
this._setEncryption(roomEncryption);
|
this._setEncryption(roomEncryption);
|
||||||
if (memberChanges.size) {
|
if (memberChanges.size) {
|
||||||
|
@ -310,9 +315,11 @@ export class Room extends EventEmitter {
|
||||||
* Can be used to do longer running operations that resulted from the last sync,
|
* Can be used to do longer running operations that resulted from the last sync,
|
||||||
* like network operations.
|
* like network operations.
|
||||||
*/
|
*/
|
||||||
async afterSyncCompleted() {
|
async afterSyncCompleted(changes, log) {
|
||||||
|
log.set("id", this.id);
|
||||||
if (this._roomEncryption) {
|
if (this._roomEncryption) {
|
||||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi);
|
// TODO: pass log to flushPendingRoomKeyShares once we also have a logger in `start`
|
||||||
|
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -106,7 +106,7 @@ export class SyncWriter {
|
||||||
* @param {Transaction} txn used to read and write from the fragment store
|
* @param {Transaction} txn used to read and write from the fragment store
|
||||||
* @return {EventKey} the new event key to start writing events at
|
* @return {EventKey} the new event key to start writing events at
|
||||||
*/
|
*/
|
||||||
async _ensureLiveFragment(currentKey, entries, timeline, txn) {
|
async _ensureLiveFragment(currentKey, entries, timeline, txn, log) {
|
||||||
if (!currentKey) {
|
if (!currentKey) {
|
||||||
// means we haven't synced this room yet (just joined or did initial sync)
|
// means we haven't synced this room yet (just joined or did initial sync)
|
||||||
|
|
||||||
|
@ -115,6 +115,7 @@ export class SyncWriter {
|
||||||
let liveFragment = await this._createLiveFragment(txn, timeline.prev_batch);
|
let liveFragment = await this._createLiveFragment(txn, timeline.prev_batch);
|
||||||
currentKey = new EventKey(liveFragment.id, EventKey.defaultLiveKey.eventIndex);
|
currentKey = new EventKey(liveFragment.id, EventKey.defaultLiveKey.eventIndex);
|
||||||
entries.push(FragmentBoundaryEntry.start(liveFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.start(liveFragment, this._fragmentIdComparer));
|
||||||
|
log.log({l: "live fragment", first: true, id: currentKey.fragmentId});
|
||||||
} else if (timeline.limited) {
|
} else if (timeline.limited) {
|
||||||
// replace live fragment for limited sync, *only* if we had a live fragment already
|
// replace live fragment for limited sync, *only* if we had a live fragment already
|
||||||
const oldFragmentId = currentKey.fragmentId;
|
const oldFragmentId = currentKey.fragmentId;
|
||||||
|
@ -122,6 +123,7 @@ export class SyncWriter {
|
||||||
const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, currentKey.fragmentId, timeline.prev_batch, txn);
|
const {oldFragment, newFragment} = await this._replaceLiveFragment(oldFragmentId, currentKey.fragmentId, timeline.prev_batch, txn);
|
||||||
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.end(oldFragment, this._fragmentIdComparer));
|
||||||
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
|
entries.push(FragmentBoundaryEntry.start(newFragment, this._fragmentIdComparer));
|
||||||
|
log.log({l: "live fragment", limited: true, id: currentKey.fragmentId});
|
||||||
}
|
}
|
||||||
return currentKey;
|
return currentKey;
|
||||||
}
|
}
|
||||||
|
@ -148,10 +150,11 @@ export class SyncWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_writeStateEvents(roomResponse, memberChanges, txn) {
|
_writeStateEvents(roomResponse, memberChanges, txn, log) {
|
||||||
// persist state
|
// persist state
|
||||||
const {state} = roomResponse;
|
const {state} = roomResponse;
|
||||||
if (Array.isArray(state?.events)) {
|
if (Array.isArray(state?.events)) {
|
||||||
|
log.set("stateEvents", state.events.length);
|
||||||
for (const event of state.events) {
|
for (const event of state.events) {
|
||||||
const memberChange = this._writeStateEvent(event, txn);
|
const memberChange = this._writeStateEvent(event, txn);
|
||||||
if (memberChange) {
|
if (memberChange) {
|
||||||
|
@ -161,11 +164,13 @@ export class SyncWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _writeTimeline(entries, timeline, currentKey, memberChanges, txn) {
|
async _writeTimeline(entries, timeline, currentKey, memberChanges, txn, log) {
|
||||||
if (Array.isArray(timeline.events) && timeline.events.length) {
|
if (Array.isArray(timeline.events) && timeline.events.length) {
|
||||||
// only create a fragment when we will really write an event
|
// only create a fragment when we will really write an event
|
||||||
currentKey = await this._ensureLiveFragment(currentKey, entries, timeline, txn);
|
currentKey = await this._ensureLiveFragment(currentKey, entries, timeline, txn, log);
|
||||||
const events = deduplicateEvents(timeline.events);
|
const events = deduplicateEvents(timeline.events);
|
||||||
|
log.set("timelineEvents", events.length);
|
||||||
|
let timelineStateEventCount = 0;
|
||||||
for(const event of events) {
|
for(const event of events) {
|
||||||
// store event in timeline
|
// store event in timeline
|
||||||
currentKey = currentKey.nextKey();
|
currentKey = currentKey.nextKey();
|
||||||
|
@ -180,12 +185,14 @@ export class SyncWriter {
|
||||||
|
|
||||||
// process live state events first, so new member info is available
|
// process live state events first, so new member info is available
|
||||||
if (typeof event.state_key === "string") {
|
if (typeof event.state_key === "string") {
|
||||||
|
timelineStateEventCount += 1;
|
||||||
const memberChange = this._writeStateEvent(event, txn);
|
const memberChange = this._writeStateEvent(event, txn);
|
||||||
if (memberChange) {
|
if (memberChange) {
|
||||||
memberChanges.set(memberChange.userId, memberChange);
|
memberChanges.set(memberChange.userId, memberChange);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.set("timelineStateEventCount", timelineStateEventCount);
|
||||||
}
|
}
|
||||||
return currentKey;
|
return currentKey;
|
||||||
}
|
}
|
||||||
|
@ -218,14 +225,15 @@ export class SyncWriter {
|
||||||
* @param {Transaction} txn
|
* @param {Transaction} txn
|
||||||
* @return {SyncWriterResult}
|
* @return {SyncWriterResult}
|
||||||
*/
|
*/
|
||||||
async writeSync(roomResponse, txn) {
|
async writeSync(roomResponse, txn, log) {
|
||||||
const entries = [];
|
const entries = [];
|
||||||
const {timeline} = roomResponse;
|
const {timeline} = roomResponse;
|
||||||
const memberChanges = new Map();
|
const memberChanges = new Map();
|
||||||
// important this happens before _writeTimeline so
|
// important this happens before _writeTimeline so
|
||||||
// members are available in the transaction
|
// members are available in the transaction
|
||||||
this._writeStateEvents(roomResponse, memberChanges, txn);
|
this._writeStateEvents(roomResponse, memberChanges, txn, log);
|
||||||
const currentKey = await this._writeTimeline(entries, timeline, this._lastLiveKey, memberChanges, txn);
|
const currentKey = await this._writeTimeline(entries, timeline, this._lastLiveKey, memberChanges, txn, log);
|
||||||
|
log.set("memberChanges", memberChanges.size);
|
||||||
return {entries, newLiveKey: currentKey, memberChanges};
|
return {entries, newLiveKey: currentKey, memberChanges};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Reference in a new issue