forked from mystiq/hydrogen-web
Merge pull request #238 from vector-im/bwindels/send-logging
Log session load, login, and sending messages
This commit is contained in:
commit
87296f434a
17 changed files with 537 additions and 309 deletions
|
@ -47,6 +47,7 @@
|
|||
aside .values li span {
|
||||
word-wrap: ;
|
||||
word-wrap: anywhere;
|
||||
padding: 4px;
|
||||
}
|
||||
|
||||
aside .values {
|
||||
|
@ -63,7 +64,7 @@
|
|||
aside .values span.value {
|
||||
width: 70%;
|
||||
display: block;
|
||||
padding-left: 10px;
|
||||
white-space: pre-wrap;
|
||||
}
|
||||
|
||||
aside .values li {
|
||||
|
@ -109,7 +110,7 @@
|
|||
margin: 0;
|
||||
}
|
||||
|
||||
.timeline div.item {
|
||||
.timeline .item {
|
||||
--hue: 100deg;
|
||||
--brightness: 80%;
|
||||
background-color: hsl(var(--hue), 60%, var(--brightness));
|
||||
|
@ -120,15 +121,16 @@
|
|||
margin: 1px;
|
||||
flex: 1;
|
||||
min-width: 0;
|
||||
cursor: pointer;
|
||||
color: inherit;
|
||||
text-decoration: none;
|
||||
}
|
||||
|
||||
|
||||
.timeline div.item:not(.has-children) {
|
||||
.timeline .item:not(.has-children) {
|
||||
margin-left: calc(24px + 4px + 1px);
|
||||
}
|
||||
|
||||
.timeline div.item .caption {
|
||||
.timeline .item .caption {
|
||||
white-space: nowrap;
|
||||
text-overflow: ellipsis;
|
||||
overflow: hidden;
|
||||
|
@ -156,15 +158,15 @@
|
|||
color: white;
|
||||
}
|
||||
|
||||
.timeline div.item.type-network {
|
||||
.timeline .item.type-network {
|
||||
--hue: 30deg;
|
||||
}
|
||||
|
||||
.timeline div.item.type-navigation {
|
||||
.timeline .item.type-navigation {
|
||||
--hue: 200deg;
|
||||
}
|
||||
|
||||
.timeline div.item.selected {
|
||||
.timeline .item.selected {
|
||||
background-color: Highlight;
|
||||
border-color: Highlight;
|
||||
color: HighlightText;
|
||||
|
|
|
@ -21,37 +21,63 @@ const main = document.querySelector("main");
|
|||
|
||||
let selectedItemNode;
|
||||
let rootItem;
|
||||
let itemByRef;
|
||||
|
||||
const logLevels = [undefined, "All", "Debug", "Detail", "Info", "Warn", "Error", "Fatal", "Off"];
|
||||
|
||||
main.addEventListener("click", event => {
|
||||
if (selectedItemNode) {
|
||||
selectedItemNode.classList.remove("selected");
|
||||
selectedItemNode = null;
|
||||
}
|
||||
if (event.target.classList.contains("toggleExpanded")) {
|
||||
const li = event.target.parentElement.parentElement;
|
||||
li.classList.toggle("expanded");
|
||||
} else {
|
||||
// allow clicking any links other than .item in the timeline, like refs
|
||||
if (event.target.tagName === "A" && !event.target.classList.contains("item")) {
|
||||
return;
|
||||
}
|
||||
const itemNode = event.target.closest(".item");
|
||||
if (itemNode) {
|
||||
selectedItemNode = itemNode;
|
||||
selectedItemNode.classList.add("selected");
|
||||
const path = selectedItemNode.dataset.path;
|
||||
let item = rootItem;
|
||||
let parent;
|
||||
if (path.length) {
|
||||
const indices = path.split("/").map(i => parseInt(i, 10));
|
||||
for(const i of indices) {
|
||||
parent = item;
|
||||
item = itemChildren(item)[i];
|
||||
}
|
||||
}
|
||||
showItemDetails(item, parent, itemNode);
|
||||
// we don't want scroll to jump when clicking
|
||||
// so prevent default behaviour, and select and push to history manually
|
||||
event.preventDefault();
|
||||
selectNode(itemNode);
|
||||
history.pushState(null, null, `#${itemNode.id}`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
window.addEventListener("hashchange", () => {
|
||||
const id = window.location.hash.substr(1);
|
||||
const itemNode = document.getElementById(id);
|
||||
if (itemNode && itemNode.closest("main")) {
|
||||
selectNode(itemNode);
|
||||
itemNode.scrollIntoView({behavior: "smooth", block: "nearest"});
|
||||
}
|
||||
});
|
||||
|
||||
function selectNode(itemNode) {
|
||||
if (selectedItemNode) {
|
||||
selectedItemNode.classList.remove("selected");
|
||||
}
|
||||
selectedItemNode = itemNode;
|
||||
selectedItemNode.classList.add("selected");
|
||||
let item = rootItem;
|
||||
let parent;
|
||||
const indices = selectedItemNode.id.split("/").map(i => parseInt(i, 10));
|
||||
for(const i of indices) {
|
||||
parent = item;
|
||||
item = itemChildren(item)[i];
|
||||
}
|
||||
showItemDetails(item, parent, selectedItemNode);
|
||||
}
|
||||
|
||||
function stringifyItemValue(value) {
|
||||
if (typeof value === "object" && value !== null) {
|
||||
return JSON.stringify(value, undefined, 2);
|
||||
} else {
|
||||
return value + "";
|
||||
}
|
||||
}
|
||||
|
||||
function showItemDetails(item, parent, itemNode) {
|
||||
const parentOffset = itemStart(parent) ? `${itemStart(item) - itemStart(parent)}ms` : "none";
|
||||
const expandButton = t.button("Expand recursively");
|
||||
|
@ -64,11 +90,23 @@ function showItemDetails(item, parent, itemNode) {
|
|||
t.p([t.strong("Start: "), new Date(itemStart(item)).toString()]),
|
||||
t.p([t.strong("Duration: "), `${itemDuration(item)}ms`]),
|
||||
t.p([t.strong("Child count: "), itemChildren(item) ? `${itemChildren(item).length}` : "none"]),
|
||||
t.p([t.strong("Forced finish: "), (itemForcedFinish(item) || false) + ""]),
|
||||
t.p(t.strong("Values:")),
|
||||
t.ul({class: "values"}, Object.entries(itemValues(item)).map(([key, value]) => {
|
||||
let valueNode;
|
||||
if (key === "ref") {
|
||||
const refItem = itemByRef.get(value);
|
||||
if (refItem) {
|
||||
valueNode = t.a({href: `#${refItem.id}`}, itemCaption(refItem));
|
||||
} else {
|
||||
valueNode = `unknown ref ${value}`;
|
||||
}
|
||||
} else {
|
||||
valueNode = stringifyItemValue(value);
|
||||
}
|
||||
return t.li([
|
||||
t.span({className: "key"}, normalizeValueKey(key)),
|
||||
t.span({className: "value"}, value+"")
|
||||
t.span({className: "value"}, valueNode)
|
||||
]);
|
||||
})),
|
||||
t.p(expandButton)
|
||||
|
@ -89,15 +127,32 @@ function expandResursively(li) {
|
|||
|
||||
document.getElementById("openFile").addEventListener("click", loadFile);
|
||||
|
||||
function getRootItemHeader(prevItem, item) {
|
||||
if (prevItem) {
|
||||
const diff = itemStart(item) - itemEnd(prevItem);
|
||||
if (diff >= 0) {
|
||||
return `+ ${formatTime(diff)}`;
|
||||
} else {
|
||||
return `ran ${formatTime(-diff)} in parallel with`;
|
||||
}
|
||||
} else {
|
||||
return new Date(itemStart(item)).toString();
|
||||
}
|
||||
}
|
||||
|
||||
async function loadFile() {
|
||||
const file = await openFile();
|
||||
const json = await readFileAsText(file);
|
||||
const logs = JSON.parse(json);
|
||||
logs.items.sort((a, b) => itemStart(a) - itemStart(b));
|
||||
rootItem = {c: logs.items};
|
||||
itemByRef = new Map();
|
||||
preprocessRecursively(rootItem, null, itemByRef, []);
|
||||
|
||||
const fragment = logs.items.reduce((fragment, item, i, items) => {
|
||||
const prevItem = i === 0 ? null : items[i - 1];
|
||||
fragment.appendChild(t.section([
|
||||
t.h2(prevItem ? `+ ${formatTime(itemStart(item) - itemEnd(prevItem))}` : new Date(itemStart(item)).toString()),
|
||||
t.h2(getRootItemHeader(prevItem, item)),
|
||||
t.div({className: "timeline"}, t.ol(itemToNode(item, [i])))
|
||||
]));
|
||||
return fragment;
|
||||
|
@ -105,6 +160,22 @@ async function loadFile() {
|
|||
main.replaceChildren(fragment);
|
||||
}
|
||||
|
||||
function preprocessRecursively(item, parentElement, refsMap, path) {
|
||||
item.s = (parentElement?.s || 0) + item.s;
|
||||
if (itemRefSource(item)) {
|
||||
refsMap.set(itemRefSource(item), item);
|
||||
}
|
||||
if (itemChildren(item)) {
|
||||
for (let i = 0; i < itemChildren(item).length; i += 1) {
|
||||
// do it in advance for a child as we don't want to do it for the rootItem
|
||||
const child = itemChildren(item)[i];
|
||||
const childPath = path.concat(i);
|
||||
child.id = childPath.join("/");
|
||||
preprocessRecursively(child, item, refsMap, childPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function formatTime(ms) {
|
||||
if (ms < 1000) {
|
||||
return `${ms}ms`;
|
||||
|
@ -128,6 +199,9 @@ function itemLevel(item) { return item.l; }
|
|||
function itemLabel(item) { return item.v?.l; }
|
||||
function itemType(item) { return item.v?.t; }
|
||||
function itemError(item) { return item.e; }
|
||||
function itemForcedFinish(item) { return item.f; }
|
||||
function itemRef(item) { return item.v?.ref; }
|
||||
function itemRefSource(item) { return item.v?.refId; }
|
||||
function itemShortErrorMessage(item) {
|
||||
if (itemError(item)) {
|
||||
const e = itemError(item);
|
||||
|
@ -144,6 +218,13 @@ function itemCaption(item) {
|
|||
return `${itemLabel(item)} (${itemValues(item).status})`;
|
||||
} else if (itemLabel(item) && itemError(item)) {
|
||||
return `${itemLabel(item)} (${itemShortErrorMessage(item)})`;
|
||||
} else if (itemRef(item)) {
|
||||
const refItem = itemByRef.get(itemRef(item));
|
||||
if (refItem) {
|
||||
return `ref "${itemCaption(refItem)}"`
|
||||
} else {
|
||||
return `unknown ref ${itemRef(item)}`
|
||||
}
|
||||
} else {
|
||||
return itemLabel(item) || itemType(item);
|
||||
}
|
||||
|
@ -157,7 +238,7 @@ function normalizeValueKey(key) {
|
|||
}
|
||||
|
||||
// returns the node and the total range (recursively) occupied by the node
|
||||
function itemToNode(item, path) {
|
||||
function itemToNode(item) {
|
||||
const hasChildren = !!itemChildren(item)?.length;
|
||||
const className = {
|
||||
item: true,
|
||||
|
@ -167,18 +248,29 @@ function itemToNode(item, path) {
|
|||
[`level-${itemLevel(item)}`]: true,
|
||||
};
|
||||
|
||||
const id = item.id;
|
||||
let captionNode;
|
||||
if (itemRef(item)) {
|
||||
const refItem = itemByRef.get(itemRef(item));
|
||||
if (refItem) {
|
||||
captionNode = ["ref ", t.a({href: `#${refItem.id}`}, itemCaption(refItem))];
|
||||
}
|
||||
}
|
||||
if (!captionNode) {
|
||||
captionNode = itemCaption(item);
|
||||
}
|
||||
const li = t.li([
|
||||
t.div([
|
||||
hasChildren ? t.button({className: "toggleExpanded"}) : "",
|
||||
t.div({className, "data-path": path.join("/")}, [
|
||||
t.span({class: "caption"}, itemCaption(item)),
|
||||
t.a({className, id, href: `#${id}`}, [
|
||||
t.span({class: "caption"}, captionNode),
|
||||
t.span({class: "duration"}, `(${itemDuration(item)}ms)`),
|
||||
])
|
||||
])
|
||||
]);
|
||||
if (itemChildren(item) && itemChildren(item).length) {
|
||||
li.appendChild(t.ol(itemChildren(item).map((item, i) => {
|
||||
return itemToNode(item, path.concat(i));
|
||||
li.appendChild(t.ol(itemChildren(item).map(item => {
|
||||
return itemToNode(item);
|
||||
})));
|
||||
}
|
||||
return li;
|
||||
|
|
|
@ -24,13 +24,46 @@ export class BaseLogger {
|
|||
}
|
||||
|
||||
log(labelOrValues, logLevel = LogLevel.Info) {
|
||||
const item = new LogItem(labelOrValues, logLevel, null, this._platform.clock);
|
||||
const item = new LogItem(labelOrValues, logLevel, null, this);
|
||||
item._end = item._start;
|
||||
this._persistItem(item.serialize(null));
|
||||
}
|
||||
|
||||
run(labelOrValues, callback, logLevel = LogLevel.Info, filterCreator = null) {
|
||||
const item = new LogItem(labelOrValues, logLevel, null, this._platform.clock);
|
||||
/** if item is a log item, wrap the callback in a child of it, otherwise start a new root log item. */
|
||||
wrapOrRun(item, labelOrValues, callback, logLevel = null, filterCreator = null) {
|
||||
if (item) {
|
||||
return item.wrap(labelOrValues, callback, logLevel, filterCreator);
|
||||
} else {
|
||||
return this.run(labelOrValues, callback, logLevel, filterCreator);
|
||||
}
|
||||
}
|
||||
|
||||
/** run a callback in detached mode,
|
||||
where the (async) result or errors are not propagated but still logged.
|
||||
Useful to pair with LogItem.refDetached.
|
||||
|
||||
@return {LogItem} the log item added, useful to pass to LogItem.refDetached */
|
||||
runDetached(labelOrValues, callback, logLevel = null, filterCreator = null) {
|
||||
if (logLevel === null) {
|
||||
logLevel = LogLevel.Info;
|
||||
}
|
||||
const item = new LogItem(labelOrValues, logLevel, null, this);
|
||||
this._run(item, callback, logLevel, filterCreator, false /* don't throw, nobody is awaiting */);
|
||||
return item;
|
||||
}
|
||||
|
||||
/** run a callback wrapped in a log operation.
|
||||
Errors and duration are transparently logged, also for async operations.
|
||||
Whatever the callback returns is returned here. */
|
||||
run(labelOrValues, callback, logLevel = null, filterCreator = null) {
|
||||
if (logLevel === null) {
|
||||
logLevel = LogLevel.Info;
|
||||
}
|
||||
const item = new LogItem(labelOrValues, logLevel, null, this);
|
||||
return this._run(item, callback, logLevel, filterCreator, true);
|
||||
}
|
||||
|
||||
_run(item, callback, logLevel, filterCreator, shouldThrow) {
|
||||
this._openItems.add(item);
|
||||
|
||||
const finishItem = () => {
|
||||
|
@ -64,7 +97,9 @@ export class BaseLogger {
|
|||
return promiseResult;
|
||||
}, err => {
|
||||
finishItem();
|
||||
throw err;
|
||||
if (shouldThrow) {
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
finishItem();
|
||||
|
@ -72,7 +107,9 @@ export class BaseLogger {
|
|||
}
|
||||
} catch (err) {
|
||||
finishItem();
|
||||
throw err;
|
||||
if (shouldThrow) {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,6 +122,7 @@ export class BaseLogger {
|
|||
// about the duration of the item, etc ...
|
||||
const serialized = openItem.serialize(new LogFilter(), 0);
|
||||
if (serialized) {
|
||||
serialized.f = true; //(f)orced
|
||||
this._persistItem(serialized);
|
||||
}
|
||||
} catch (err) {
|
||||
|
@ -106,4 +144,12 @@ export class BaseLogger {
|
|||
get level() {
|
||||
return LogLevel;
|
||||
}
|
||||
|
||||
_now() {
|
||||
return this._platform.clock.now();
|
||||
}
|
||||
|
||||
_createRefId() {
|
||||
return Math.round(this._platform.random() * Number.MAX_SAFE_INTEGER);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,9 +17,9 @@ limitations under the License.
|
|||
import {LogLevel, LogFilter} from "./LogFilter.js";
|
||||
|
||||
export class LogItem {
|
||||
constructor(labelOrValues, logLevel, filterCreator, clock) {
|
||||
this._clock = clock;
|
||||
this._start = clock.now();
|
||||
constructor(labelOrValues, logLevel, filterCreator, logger) {
|
||||
this._logger = logger;
|
||||
this._start = logger._now();
|
||||
this._end = null;
|
||||
// (l)abel
|
||||
this._values = typeof labelOrValues === "string" ? {l: labelOrValues} : labelOrValues;
|
||||
|
@ -29,6 +29,25 @@ export class LogItem {
|
|||
this._filterCreator = filterCreator;
|
||||
}
|
||||
|
||||
/** start a new root log item and run it detached mode, see BaseLogger.runDetached */
|
||||
runDetached(labelOrValues, callback, logLevel, filterCreator) {
|
||||
return this._logger.runDetached(labelOrValues, callback, logLevel, filterCreator);
|
||||
}
|
||||
|
||||
/** start a new detached root log item and log a reference to it from this item */
|
||||
wrapDetached(labelOrValues, callback, logLevel, filterCreator) {
|
||||
this.refDetached(this.runDetached(labelOrValues, callback, logLevel, filterCreator));
|
||||
}
|
||||
|
||||
/** logs a reference to a different log item, usually obtained from runDetached.
|
||||
This is useful if the referenced operation can't be awaited. */
|
||||
refDetached(logItem, logLevel = null) {
|
||||
if (!logItem._values.refId) {
|
||||
logItem.set("refId", this._logger._createRefId());
|
||||
}
|
||||
return this.log({ref: logItem._values.refId}, logLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new child item and runs it in `callback`.
|
||||
*/
|
||||
|
@ -69,7 +88,7 @@ export class LogItem {
|
|||
*/
|
||||
log(labelOrValues, logLevel = null) {
|
||||
const item = this.child(labelOrValues, logLevel, null);
|
||||
item.end = item.start;
|
||||
item._end = item._start;
|
||||
}
|
||||
|
||||
set(key, value) {
|
||||
|
@ -81,7 +100,7 @@ export class LogItem {
|
|||
}
|
||||
}
|
||||
|
||||
serialize(filter) {
|
||||
serialize(filter, parentStartTime = null) {
|
||||
if (this._filterCreator) {
|
||||
try {
|
||||
filter = this._filterCreator(new LogFilter(filter), this);
|
||||
|
@ -92,7 +111,7 @@ export class LogItem {
|
|||
let children;
|
||||
if (this._children !== null) {
|
||||
children = this._children.reduce((array, c) => {
|
||||
const s = c.serialize(filter);
|
||||
const s = c.serialize(filter, this._start);
|
||||
if (s) {
|
||||
if (array === null) {
|
||||
array = [];
|
||||
|
@ -108,7 +127,7 @@ export class LogItem {
|
|||
// in (v)alues, (l)abel and (t)ype are also reserved.
|
||||
const item = {
|
||||
// (s)tart
|
||||
s: this._start,
|
||||
s: parentStartTime === null ? this._start : this._start - parentStartTime,
|
||||
// (d)uration
|
||||
d: this.duration,
|
||||
// (v)alues
|
||||
|
@ -127,6 +146,7 @@ export class LogItem {
|
|||
// (c)hildren
|
||||
item.c = children;
|
||||
}
|
||||
// (f)orced can also be set on an item by the logger
|
||||
return item;
|
||||
}
|
||||
|
||||
|
@ -177,7 +197,7 @@ export class LogItem {
|
|||
c.finish();
|
||||
}
|
||||
}
|
||||
this._end = this._clock.now();
|
||||
this._end = this._logger._now();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -200,7 +220,7 @@ export class LogItem {
|
|||
if (!logLevel) {
|
||||
logLevel = this.logLevel || LogLevel.Info;
|
||||
}
|
||||
const item = new LogItem(labelOrValues, logLevel, filterCreator, this._clock);
|
||||
const item = new LogItem(labelOrValues, logLevel, filterCreator, this._logger);
|
||||
if (this._children === null) {
|
||||
this._children = [];
|
||||
}
|
||||
|
|
|
@ -96,7 +96,6 @@ export class Session {
|
|||
|
||||
// called once this._e2eeAccount is assigned
|
||||
_setupEncryption() {
|
||||
console.log("loaded e2ee account with keys", this._e2eeAccount.identityKeys);
|
||||
// TODO: this should all go in a wrapper in e2ee/ that is bootstrapped by passing in the account
|
||||
// and can create RoomEncryption objects and handle encrypted to_device messages and device list changes.
|
||||
const senderKeyLock = new LockMap();
|
||||
|
@ -228,7 +227,7 @@ export class Session {
|
|||
}
|
||||
|
||||
/** @internal */
|
||||
async createIdentity() {
|
||||
async createIdentity(log) {
|
||||
if (this._olm) {
|
||||
if (!this._e2eeAccount) {
|
||||
this._e2eeAccount = await E2EEAccount.create({
|
||||
|
@ -240,15 +239,16 @@ export class Session {
|
|||
olmWorker: this._olmWorker,
|
||||
storage: this._storage,
|
||||
});
|
||||
log.set("keys", this._e2eeAccount.identityKeys);
|
||||
this._setupEncryption();
|
||||
}
|
||||
await this._e2eeAccount.generateOTKsIfNeeded(this._storage);
|
||||
await this._e2eeAccount.uploadKeys(this._storage);
|
||||
await this._e2eeAccount.generateOTKsIfNeeded(this._storage, log);
|
||||
await log.wrap("uploadKeys", log => this._e2eeAccount.uploadKeys(this._storage, log));
|
||||
}
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
async load() {
|
||||
async load(log) {
|
||||
const txn = this._storage.readTxn([
|
||||
this._storage.storeNames.session,
|
||||
this._storage.storeNames.roomSummary,
|
||||
|
@ -271,6 +271,7 @@ export class Session {
|
|||
txn
|
||||
});
|
||||
if (this._e2eeAccount) {
|
||||
log.set("keys", this._e2eeAccount.identityKeys);
|
||||
this._setupEncryption();
|
||||
}
|
||||
}
|
||||
|
@ -279,7 +280,7 @@ export class Session {
|
|||
const rooms = await txn.roomSummary.getAll();
|
||||
await Promise.all(rooms.map(summary => {
|
||||
const room = this.createRoom(summary.roomId, pendingEventsByRoomId.get(summary.roomId));
|
||||
return room.load(summary, txn);
|
||||
return log.wrap("room", log => room.load(summary, txn, log));
|
||||
}));
|
||||
}
|
||||
|
||||
|
@ -297,7 +298,7 @@ export class Session {
|
|||
* and useful to store so we can later tell what capabilities
|
||||
* our homeserver has.
|
||||
*/
|
||||
async start(lastVersionResponse) {
|
||||
async start(lastVersionResponse, log) {
|
||||
if (lastVersionResponse) {
|
||||
// store /versions response
|
||||
const txn = this._storage.readWriteTxn([
|
||||
|
@ -328,13 +329,13 @@ export class Session {
|
|||
const operations = await opsTxn.operations.getAll();
|
||||
const operationsByScope = groupBy(operations, o => o.scope);
|
||||
|
||||
for (const [, room] of this._rooms) {
|
||||
for (const room of this._rooms.values()) {
|
||||
let roomOperationsByType;
|
||||
const roomOperations = operationsByScope.get(room.id);
|
||||
if (roomOperations) {
|
||||
roomOperationsByType = groupBy(roomOperations, r => r.type);
|
||||
}
|
||||
room.start(roomOperationsByType);
|
||||
room.start(roomOperationsByType, log);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -438,7 +439,7 @@ export class Session {
|
|||
// are about to receive messages for
|
||||
// (https://github.com/vector-im/riot-web/issues/2782).
|
||||
if (!isCatchupSync) {
|
||||
const needsToUploadOTKs = await this._e2eeAccount.generateOTKsIfNeeded(this._storage);
|
||||
const needsToUploadOTKs = await this._e2eeAccount.generateOTKsIfNeeded(this._storage, log);
|
||||
if (needsToUploadOTKs) {
|
||||
promises.push(log.wrap("uploadKeys", log => this._e2eeAccount.uploadKeys(this._storage, log)));
|
||||
}
|
||||
|
|
|
@ -73,68 +73,79 @@ export class SessionContainer {
|
|||
return;
|
||||
}
|
||||
this._status.set(LoadStatus.Loading);
|
||||
try {
|
||||
const sessionInfo = await this._platform.sessionInfoStorage.get(sessionId);
|
||||
if (!sessionInfo) {
|
||||
throw new Error("Invalid session id: " + sessionId);
|
||||
await this._platform.logger.run("load session", async log => {
|
||||
log.set("id", sessionId);
|
||||
try {
|
||||
const sessionInfo = await this._platform.sessionInfoStorage.get(sessionId);
|
||||
if (!sessionInfo) {
|
||||
throw new Error("Invalid session id: " + sessionId);
|
||||
}
|
||||
await this._loadSessionInfo(sessionInfo, false, log);
|
||||
log.set("status", this._status.get());
|
||||
} catch (err) {
|
||||
log.catch(err);
|
||||
this._error = err;
|
||||
this._status.set(LoadStatus.Error);
|
||||
}
|
||||
await this._loadSessionInfo(sessionInfo, false);
|
||||
} catch (err) {
|
||||
this._error = err;
|
||||
this._status.set(LoadStatus.Error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async startWithLogin(homeServer, username, password) {
|
||||
if (this._status.get() !== LoadStatus.NotLoading) {
|
||||
return;
|
||||
}
|
||||
this._status.set(LoadStatus.Login);
|
||||
const clock = this._platform.clock;
|
||||
let sessionInfo;
|
||||
try {
|
||||
const request = this._platform.request;
|
||||
const hsApi = new HomeServerApi({homeServer, request, createTimeout: clock.createTimeout});
|
||||
const loginData = await hsApi.passwordLogin(username, password, "Hydrogen").response();
|
||||
const sessionId = this.createNewSessionId();
|
||||
sessionInfo = {
|
||||
id: sessionId,
|
||||
deviceId: loginData.device_id,
|
||||
userId: loginData.user_id,
|
||||
homeServer: homeServer,
|
||||
accessToken: loginData.access_token,
|
||||
lastUsed: clock.now()
|
||||
};
|
||||
await this._platform.sessionInfoStorage.add(sessionInfo);
|
||||
} catch (err) {
|
||||
this._error = err;
|
||||
if (err instanceof HomeServerError) {
|
||||
if (err.errcode === "M_FORBIDDEN") {
|
||||
this._loginFailure = LoginFailure.Credentials;
|
||||
await this._platform.logger.run("login", async log => {
|
||||
this._status.set(LoadStatus.Login);
|
||||
const clock = this._platform.clock;
|
||||
let sessionInfo;
|
||||
try {
|
||||
const request = this._platform.request;
|
||||
const hsApi = new HomeServerApi({homeServer, request, createTimeout: clock.createTimeout});
|
||||
const loginData = await hsApi.passwordLogin(username, password, "Hydrogen", {log}).response();
|
||||
const sessionId = this.createNewSessionId();
|
||||
sessionInfo = {
|
||||
id: sessionId,
|
||||
deviceId: loginData.device_id,
|
||||
userId: loginData.user_id,
|
||||
homeServer: homeServer,
|
||||
accessToken: loginData.access_token,
|
||||
lastUsed: clock.now()
|
||||
};
|
||||
log.set("id", sessionId);
|
||||
await this._platform.sessionInfoStorage.add(sessionInfo);
|
||||
} catch (err) {
|
||||
this._error = err;
|
||||
if (err instanceof HomeServerError) {
|
||||
if (err.errcode === "M_FORBIDDEN") {
|
||||
this._loginFailure = LoginFailure.Credentials;
|
||||
} else {
|
||||
this._loginFailure = LoginFailure.Unknown;
|
||||
}
|
||||
log.set("loginFailure", this._loginFailure);
|
||||
this._status.set(LoadStatus.LoginFailed);
|
||||
} else if (err instanceof ConnectionError) {
|
||||
this._loginFailure = LoginFailure.Connection;
|
||||
this._status.set(LoadStatus.LoginFailed);
|
||||
} else {
|
||||
this._loginFailure = LoginFailure.Unknown;
|
||||
this._status.set(LoadStatus.Error);
|
||||
}
|
||||
this._status.set(LoadStatus.LoginFailed);
|
||||
} else if (err instanceof ConnectionError) {
|
||||
this._loginFailure = LoginFailure.Connection;
|
||||
this._status.set(LoadStatus.LoginFailed);
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
// loading the session can only lead to
|
||||
// LoadStatus.Error in case of an error,
|
||||
// so separate try/catch
|
||||
try {
|
||||
await this._loadSessionInfo(sessionInfo, true, log);
|
||||
log.set("status", this._status.get());
|
||||
} catch (err) {
|
||||
log.catch(err);
|
||||
this._error = err;
|
||||
this._status.set(LoadStatus.Error);
|
||||
}
|
||||
return;
|
||||
}
|
||||
// loading the session can only lead to
|
||||
// LoadStatus.Error in case of an error,
|
||||
// so separate try/catch
|
||||
try {
|
||||
await this._loadSessionInfo(sessionInfo, true);
|
||||
} catch (err) {
|
||||
this._error = err;
|
||||
this._status.set(LoadStatus.Error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async _loadSessionInfo(sessionInfo, isNewLogin) {
|
||||
async _loadSessionInfo(sessionInfo, isNewLogin, log) {
|
||||
const clock = this._platform.clock;
|
||||
this._sessionStartedByReconnector = false;
|
||||
this._status.set(LoadStatus.Loading);
|
||||
|
@ -178,24 +189,26 @@ export class SessionContainer {
|
|||
mediaRepository,
|
||||
platform: this._platform,
|
||||
});
|
||||
await this._session.load();
|
||||
await this._session.load(log);
|
||||
if (isNewLogin) {
|
||||
this._status.set(LoadStatus.SessionSetup);
|
||||
await this._session.createIdentity();
|
||||
await log.wrap("createIdentity", log => this._session.createIdentity(log));
|
||||
}
|
||||
|
||||
this._sync = new Sync({hsApi: this._requestScheduler.hsApi, storage: this._storage, session: this._session, logger: this._platform.logger});
|
||||
// notify sync and session when back online
|
||||
this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => {
|
||||
if (state === ConnectionStatus.Online) {
|
||||
// needs to happen before sync and session or it would abort all requests
|
||||
this._requestScheduler.start();
|
||||
this._sync.start();
|
||||
this._sessionStartedByReconnector = true;
|
||||
this._session.start(this._reconnector.lastVersionsResponse);
|
||||
this._platform.logger.runDetached("reconnect", async log => {
|
||||
// needs to happen before sync and session or it would abort all requests
|
||||
this._requestScheduler.start();
|
||||
this._sync.start();
|
||||
this._sessionStartedByReconnector = true;
|
||||
await log.wrap("session start", log => this._session.start(this._reconnector.lastVersionsResponse, log));
|
||||
});
|
||||
}
|
||||
});
|
||||
await this._waitForFirstSync();
|
||||
await log.wrap("wait first sync", () => this._waitForFirstSync());
|
||||
|
||||
this._status.set(LoadStatus.Ready);
|
||||
|
||||
|
@ -204,8 +217,9 @@ export class SessionContainer {
|
|||
// started to session, so check first
|
||||
// to prevent an extra /versions request
|
||||
if (!this._sessionStartedByReconnector) {
|
||||
const lastVersionsResponse = await hsApi.versions({timeout: 10000}).response();
|
||||
this._session.start(lastVersionsResponse);
|
||||
const lastVersionsResponse = await hsApi.versions({timeout: 10000, log}).response();
|
||||
// log as ref as we don't want to await it
|
||||
await log.wrap("session start", log => this._session.start(lastVersionsResponse, log));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,12 +15,10 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {AbortError} from "./error.js";
|
||||
import {ObservableValue} from "../observable/ObservableValue.js";
|
||||
import {createEnum} from "../utils/enum.js";
|
||||
|
||||
const INCREMENTAL_TIMEOUT = 30000;
|
||||
const SYNC_EVENT_LIMIT = 10;
|
||||
|
||||
export const SyncStatus = createEnum(
|
||||
"InitialSync",
|
||||
|
|
|
@ -80,22 +80,24 @@ export class Account {
|
|||
return this._identityKeys;
|
||||
}
|
||||
|
||||
async uploadKeys(storage) {
|
||||
async uploadKeys(storage, log) {
|
||||
const oneTimeKeys = JSON.parse(this._account.one_time_keys());
|
||||
// only one algorithm supported by olm atm, so hardcode its name
|
||||
const oneTimeKeysEntries = Object.entries(oneTimeKeys.curve25519);
|
||||
if (oneTimeKeysEntries.length || !this._areDeviceKeysUploaded) {
|
||||
const payload = {};
|
||||
if (!this._areDeviceKeysUploaded) {
|
||||
log.set("identity", true);
|
||||
const identityKeys = JSON.parse(this._account.identity_keys());
|
||||
payload.device_keys = this._deviceKeysPayload(identityKeys);
|
||||
}
|
||||
if (oneTimeKeysEntries.length) {
|
||||
log.set("otks", true);
|
||||
payload.one_time_keys = this._oneTimeKeysPayload(oneTimeKeysEntries);
|
||||
}
|
||||
const response = await this._hsApi.uploadKeys(payload, /*{log}*/).response();
|
||||
const response = await this._hsApi.uploadKeys(payload, {log}).response();
|
||||
this._serverOTKCount = response?.one_time_key_counts?.signed_curve25519;
|
||||
// log.set("serverOTKCount", this._serverOTKCount);
|
||||
log.set("serverOTKCount", this._serverOTKCount);
|
||||
// TODO: should we not modify this in the txn like we do elsewhere?
|
||||
// we'd have to pickle and unpickle the account to clone it though ...
|
||||
// and the upload has succeed at this point, so in-memory would be correct
|
||||
|
@ -114,7 +116,7 @@ export class Account {
|
|||
}
|
||||
}
|
||||
|
||||
async generateOTKsIfNeeded(storage) {
|
||||
async generateOTKsIfNeeded(storage, log) {
|
||||
const maxOTKs = this._account.max_number_of_one_time_keys();
|
||||
const limit = maxOTKs / 2;
|
||||
if (this._serverOTKCount < limit) {
|
||||
|
@ -128,11 +130,16 @@ export class Account {
|
|||
if (totalOTKCount < limit) {
|
||||
// we could in theory also generated the keys and store them in
|
||||
// writeSync, but then we would have to clone the account to avoid side-effects.
|
||||
await this._updateSessionStorage(storage, sessionStore => {
|
||||
await log.wrap("generate otks", log => this._updateSessionStorage(storage, sessionStore => {
|
||||
const newKeyCount = maxOTKs - totalOTKCount;
|
||||
log.set("max", maxOTKs);
|
||||
log.set("server", this._serverOTKCount);
|
||||
log.set("unpublished", unpublishedOTKCount);
|
||||
log.set("new", newKeyCount);
|
||||
log.set("limit", limit);
|
||||
this._account.generate_one_time_keys(newKeyCount);
|
||||
sessionStore.set(ACCOUNT_SESSION_KEY, this._account.pickle(this._pickleKey));
|
||||
});
|
||||
}));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,11 +69,11 @@ export class DeviceTracker {
|
|||
}));
|
||||
}
|
||||
|
||||
async trackRoom(room) {
|
||||
async trackRoom(room, log) {
|
||||
if (room.isTrackingMembers || !room.isEncrypted) {
|
||||
return;
|
||||
}
|
||||
const memberList = await room.loadMemberList();
|
||||
const memberList = await room.loadMemberList(log);
|
||||
try {
|
||||
const txn = this._storage.readWriteTxn([
|
||||
this._storage.storeNames.roomSummary,
|
||||
|
@ -83,6 +83,7 @@ export class DeviceTracker {
|
|||
try {
|
||||
isTrackingChanges = room.writeIsTrackingMembers(true, txn);
|
||||
const members = Array.from(memberList.members.values());
|
||||
log.set("members", members.length);
|
||||
await this._writeJoinedMembers(members, txn);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
|
@ -142,7 +143,7 @@ export class DeviceTracker {
|
|||
}
|
||||
}
|
||||
|
||||
async _queryKeys(userIds, hsApi) {
|
||||
async _queryKeys(userIds, hsApi, log) {
|
||||
// TODO: we need to handle the race here between /sync and /keys/query just like we need to do for the member list ...
|
||||
// there are multiple requests going out for /keys/query though and only one for /members
|
||||
|
||||
|
@ -153,9 +154,9 @@ export class DeviceTracker {
|
|||
return deviceKeysMap;
|
||||
}, {}),
|
||||
"token": this._getSyncToken()
|
||||
}).response();
|
||||
}, {log}).response();
|
||||
|
||||
const verifiedKeysPerUser = this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"]);
|
||||
const verifiedKeysPerUser = log.wrap("verify", log => this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"], log));
|
||||
const txn = this._storage.readWriteTxn([
|
||||
this._storage.storeNames.userIdentities,
|
||||
this._storage.storeNames.deviceIdentities,
|
||||
|
@ -167,6 +168,7 @@ export class DeviceTracker {
|
|||
return await this._storeQueriedDevicesForUserId(userId, deviceIdentities, txn);
|
||||
}));
|
||||
deviceIdentities = devicesIdentitiesPerUser.reduce((all, devices) => all.concat(devices), []);
|
||||
log.set("devices", deviceIdentities.length);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
|
@ -215,7 +217,7 @@ export class DeviceTracker {
|
|||
/**
|
||||
* @return {Array<{userId, verifiedKeys: Array<DeviceSection>>}
|
||||
*/
|
||||
_filterVerifiedDeviceKeys(keyQueryDeviceKeysResponse) {
|
||||
_filterVerifiedDeviceKeys(keyQueryDeviceKeysResponse, parentLog) {
|
||||
const curve25519Keys = new Set();
|
||||
const verifiedKeys = Object.entries(keyQueryDeviceKeysResponse).map(([userId, keysByDevice]) => {
|
||||
const verifiedEntries = Object.entries(keysByDevice).filter(([deviceId, deviceKeys]) => {
|
||||
|
@ -233,11 +235,21 @@ export class DeviceTracker {
|
|||
return false;
|
||||
}
|
||||
if (curve25519Keys.has(curve25519Key)) {
|
||||
console.warn("ignoring device with duplicate curve25519 key in /keys/query response", deviceKeys);
|
||||
parentLog.log({
|
||||
l: "ignore device with duplicate curve25519 key",
|
||||
keys: deviceKeys
|
||||
}, parentLog.level.Warn);
|
||||
return false;
|
||||
}
|
||||
curve25519Keys.add(curve25519Key);
|
||||
return this._hasValidSignature(deviceKeys);
|
||||
const isValid = this._hasValidSignature(deviceKeys);
|
||||
if (!isValid) {
|
||||
parentLog.log({
|
||||
l: "ignore device with invalid signature",
|
||||
keys: deviceKeys
|
||||
}, parentLog.level.Warn);
|
||||
}
|
||||
return isValid;
|
||||
});
|
||||
const verifiedKeys = verifiedEntries.map(([, deviceKeys]) => deviceKeys);
|
||||
return {userId, verifiedKeys};
|
||||
|
@ -258,7 +270,7 @@ export class DeviceTracker {
|
|||
* @param {String} roomId [description]
|
||||
* @return {[type]} [description]
|
||||
*/
|
||||
async devicesForTrackedRoom(roomId, hsApi) {
|
||||
async devicesForTrackedRoom(roomId, hsApi, log) {
|
||||
const txn = this._storage.readTxn([
|
||||
this._storage.storeNames.roomMembers,
|
||||
this._storage.storeNames.userIdentities,
|
||||
|
@ -271,14 +283,14 @@ export class DeviceTracker {
|
|||
// So, this will also contain non-joined memberships
|
||||
const userIds = await txn.roomMembers.getAllUserIds(roomId);
|
||||
|
||||
return await this._devicesForUserIds(roomId, userIds, txn, hsApi);
|
||||
return await this._devicesForUserIds(roomId, userIds, txn, hsApi, log);
|
||||
}
|
||||
|
||||
async devicesForRoomMembers(roomId, userIds, hsApi) {
|
||||
async devicesForRoomMembers(roomId, userIds, hsApi, log) {
|
||||
const txn = this._storage.readTxn([
|
||||
this._storage.storeNames.userIdentities,
|
||||
]);
|
||||
return await this._devicesForUserIds(roomId, userIds, txn, hsApi);
|
||||
return await this._devicesForUserIds(roomId, userIds, txn, hsApi, log);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -288,7 +300,7 @@ export class DeviceTracker {
|
|||
* @param {HomeServerApi} hsApi
|
||||
* @return {Array<DeviceIdentity>}
|
||||
*/
|
||||
async _devicesForUserIds(roomId, userIds, userIdentityTxn, hsApi) {
|
||||
async _devicesForUserIds(roomId, userIds, userIdentityTxn, hsApi, log) {
|
||||
const allMemberIdentities = await Promise.all(userIds.map(userId => userIdentityTxn.userIdentities.get(userId)));
|
||||
const identities = allMemberIdentities.filter(identity => {
|
||||
// identity will be missing for any userIds that don't have
|
||||
|
@ -297,12 +309,14 @@ export class DeviceTracker {
|
|||
});
|
||||
const upToDateIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_UPTODATE);
|
||||
const outdatedIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_OUTDATED);
|
||||
log.set("uptodate", upToDateIdentities.length);
|
||||
log.set("outdated", outdatedIdentities.length);
|
||||
let queriedDevices;
|
||||
if (outdatedIdentities.length) {
|
||||
// TODO: ignore the race between /sync and /keys/query for now,
|
||||
// where users could get marked as outdated or added/removed from the room while
|
||||
// querying keys
|
||||
queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi);
|
||||
queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi, log);
|
||||
}
|
||||
|
||||
const deviceTxn = this._storage.readTxn([
|
||||
|
|
|
@ -252,21 +252,21 @@ export class RoomEncryption {
|
|||
}
|
||||
|
||||
/** shares the encryption key for the next message if needed */
|
||||
async ensureMessageKeyIsShared(hsApi) {
|
||||
async ensureMessageKeyIsShared(hsApi, log) {
|
||||
if (this._lastKeyPreShareTime?.measure() < MIN_PRESHARE_INTERVAL) {
|
||||
return;
|
||||
}
|
||||
this._lastKeyPreShareTime = this._clock.createMeasure();
|
||||
const roomKeyMessage = await this._megolmEncryption.ensureOutboundSession(this._room.id, this._encryptionParams);
|
||||
if (roomKeyMessage) {
|
||||
await this._shareNewRoomKey(roomKeyMessage, hsApi);
|
||||
await log.wrap("share key", log => this._shareNewRoomKey(roomKeyMessage, hsApi, log));
|
||||
}
|
||||
}
|
||||
|
||||
async encrypt(type, content, hsApi) {
|
||||
const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams);
|
||||
async encrypt(type, content, hsApi, log) {
|
||||
const megolmResult = await log.wrap("megolm encrypt", () => this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams));
|
||||
if (megolmResult.roomKeyMessage) {
|
||||
this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi);
|
||||
log.wrapDetached("share key", log => this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi, log));
|
||||
}
|
||||
return {
|
||||
type: ENCRYPTED_TYPE,
|
||||
|
@ -283,9 +283,9 @@ export class RoomEncryption {
|
|||
return false;
|
||||
}
|
||||
|
||||
async _shareNewRoomKey(roomKeyMessage, hsApi) {
|
||||
await this._deviceTracker.trackRoom(this._room);
|
||||
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi);
|
||||
async _shareNewRoomKey(roomKeyMessage, hsApi, log) {
|
||||
await this._deviceTracker.trackRoom(this._room, log);
|
||||
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi, log);
|
||||
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
|
||||
|
||||
// store operation for room key share, in case we don't finish here
|
||||
|
@ -297,13 +297,15 @@ export class RoomEncryption {
|
|||
writeOpTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
log.set("id", operationId);
|
||||
log.set("sessionId", roomKeyMessage.session_id);
|
||||
await writeOpTxn.complete();
|
||||
// TODO: at this point we have the room key stored, and the rest is sort of optional
|
||||
// it would be nice if we could signal SendQueue that any error from here on is non-fatal and
|
||||
// return the encrypted payload.
|
||||
|
||||
// send the room key
|
||||
await this._sendRoomKey(roomKeyMessage, devices, hsApi);
|
||||
await this._sendRoomKey(roomKeyMessage, devices, hsApi, log);
|
||||
|
||||
// remove the operation
|
||||
const removeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
|
@ -353,29 +355,33 @@ export class RoomEncryption {
|
|||
if (operation.type !== "share_room_key") {
|
||||
continue;
|
||||
}
|
||||
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi);
|
||||
await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi);
|
||||
const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
removeTxn.operations.remove(operation.id);
|
||||
} catch (err) {
|
||||
removeTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await removeTxn.complete();
|
||||
await log.wrap("operation", async log => {
|
||||
log.set("id", operation.id);
|
||||
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi, log);
|
||||
await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi, log);
|
||||
const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
|
||||
try {
|
||||
removeTxn.operations.remove(operation.id);
|
||||
} catch (err) {
|
||||
removeTxn.abort();
|
||||
throw err;
|
||||
}
|
||||
await removeTxn.complete();
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
this._isFlushingRoomKeyShares = false;
|
||||
}
|
||||
}
|
||||
|
||||
async _sendRoomKey(roomKeyMessage, devices, hsApi) {
|
||||
const messages = await this._olmEncryption.encrypt(
|
||||
"m.room_key", roomKeyMessage, devices, hsApi);
|
||||
await this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi);
|
||||
async _sendRoomKey(roomKeyMessage, devices, hsApi, log) {
|
||||
const messages = await log.wrap("olm encrypt", log => this._olmEncryption.encrypt(
|
||||
"m.room_key", roomKeyMessage, devices, hsApi, log));
|
||||
await log.wrap("send", log => this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi, log));
|
||||
}
|
||||
|
||||
async _sendMessagesToDevices(type, messages, hsApi) {
|
||||
async _sendMessagesToDevices(type, messages, hsApi, log) {
|
||||
log.set("messages", messages.length);
|
||||
const messagesByUser = groupBy(messages, message => message.device.userId);
|
||||
const payload = {
|
||||
messages: Array.from(messagesByUser.entries()).reduce((userMap, [userId, messages]) => {
|
||||
|
@ -387,7 +393,7 @@ export class RoomEncryption {
|
|||
}, {})
|
||||
};
|
||||
const txnId = makeTxnId();
|
||||
await hsApi.sendToDevice(type, payload, txnId).response();
|
||||
await hsApi.sendToDevice(type, payload, txnId, {log}).response();
|
||||
}
|
||||
|
||||
dispose() {
|
||||
|
|
|
@ -47,17 +47,17 @@ export class Encryption {
|
|||
this._senderKeyLock = senderKeyLock;
|
||||
}
|
||||
|
||||
async encrypt(type, content, devices, hsApi) {
|
||||
async encrypt(type, content, devices, hsApi, log) {
|
||||
let messages = [];
|
||||
for (let i = 0; i < devices.length ; i += MAX_BATCH_SIZE) {
|
||||
const batchDevices = devices.slice(i, i + MAX_BATCH_SIZE);
|
||||
const batchMessages = await this._encryptForMaxDevices(type, content, batchDevices, hsApi);
|
||||
const batchMessages = await this._encryptForMaxDevices(type, content, batchDevices, hsApi, log);
|
||||
messages = messages.concat(batchMessages);
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
async _encryptForMaxDevices(type, content, devices, hsApi) {
|
||||
async _encryptForMaxDevices(type, content, devices, hsApi, log) {
|
||||
// TODO: see if we can only hold some of the locks until after the /keys/claim call (if needed)
|
||||
// take a lock on all senderKeys so decryption and other calls to encrypt (should not happen)
|
||||
// don't modify the sessions at the same time
|
||||
|
@ -75,16 +75,17 @@ export class Encryption {
|
|||
let encryptionTargets = [];
|
||||
try {
|
||||
if (devicesWithoutSession.length) {
|
||||
const newEncryptionTargets = await this._createNewSessions(
|
||||
devicesWithoutSession, hsApi, timestamp);
|
||||
const newEncryptionTargets = await log.wrap("create sessions", log => this._createNewSessions(
|
||||
devicesWithoutSession, hsApi, timestamp, log));
|
||||
encryptionTargets = encryptionTargets.concat(newEncryptionTargets);
|
||||
}
|
||||
await this._loadSessions(existingEncryptionTargets);
|
||||
encryptionTargets = encryptionTargets.concat(existingEncryptionTargets);
|
||||
const messages = encryptionTargets.map(target => {
|
||||
const encryptLog = {l: "encrypt", targets: encryptionTargets.length};
|
||||
const messages = log.wrap(encryptLog, () => encryptionTargets.map(target => {
|
||||
const encryptedContent = this._encryptForDevice(type, content, target);
|
||||
return new EncryptedMessage(encryptedContent, target.device);
|
||||
});
|
||||
}));
|
||||
await this._storeSessions(encryptionTargets, timestamp);
|
||||
return messages;
|
||||
} finally {
|
||||
|
@ -149,8 +150,8 @@ export class Encryption {
|
|||
}
|
||||
}
|
||||
|
||||
async _createNewSessions(devicesWithoutSession, hsApi, timestamp) {
|
||||
const newEncryptionTargets = await this._claimOneTimeKeys(hsApi, devicesWithoutSession);
|
||||
async _createNewSessions(devicesWithoutSession, hsApi, timestamp, log) {
|
||||
const newEncryptionTargets = await log.wrap("claim", log => this._claimOneTimeKeys(hsApi, devicesWithoutSession, log));
|
||||
try {
|
||||
for (const target of newEncryptionTargets) {
|
||||
const {device, oneTimeKey} = target;
|
||||
|
@ -166,7 +167,7 @@ export class Encryption {
|
|||
return newEncryptionTargets;
|
||||
}
|
||||
|
||||
async _claimOneTimeKeys(hsApi, deviceIdentities) {
|
||||
async _claimOneTimeKeys(hsApi, deviceIdentities, log) {
|
||||
// create a Map<userId, Map<deviceId, deviceIdentity>>
|
||||
const devicesByUser = groupByWithCreator(deviceIdentities,
|
||||
device => device.userId,
|
||||
|
@ -183,11 +184,10 @@ export class Encryption {
|
|||
const claimResponse = await hsApi.claimKeys({
|
||||
timeout: 10000,
|
||||
one_time_keys: oneTimeKeys
|
||||
}).response();
|
||||
}, {log}).response();
|
||||
if (Object.keys(claimResponse.failures).length) {
|
||||
console.warn("failures for claiming one time keys", oneTimeKeys, claimResponse.failures);
|
||||
log.log({l: "failures", servers: Object.keys(claimResponse.failures)}, log.level.Warn);
|
||||
}
|
||||
// TODO: log claimResponse.failures
|
||||
const userKeyMap = claimResponse?.["one_time_keys"];
|
||||
return this._verifyAndCreateOTKTargets(userKeyMap, devicesByUser);
|
||||
}
|
||||
|
|
|
@ -61,12 +61,13 @@ export class AttachmentUpload {
|
|||
}
|
||||
|
||||
/** @package */
|
||||
async upload(hsApi, progressCallback) {
|
||||
async upload(hsApi, progressCallback, log) {
|
||||
this._uploadRequest = hsApi.uploadAttachment(this._transferredBlob, this._filename, {
|
||||
uploadProgress: sentBytes => {
|
||||
this._sentBytes = sentBytes;
|
||||
progressCallback();
|
||||
}
|
||||
},
|
||||
log
|
||||
});
|
||||
const {content_uri} = await this._uploadRequest.response();
|
||||
this._mxcUrl = content_uri;
|
||||
|
|
|
@ -167,6 +167,7 @@ export class Room extends EventEmitter {
|
|||
throw err;
|
||||
}
|
||||
await writeTxn.complete();
|
||||
// TODO: log decryption errors here
|
||||
decryption.applyToEntries(entries);
|
||||
if (this._observedEvents) {
|
||||
this._observedEvents.updateEvents(entries);
|
||||
|
@ -245,7 +246,7 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
let removedPendingEvents;
|
||||
if (Array.isArray(roomResponse.timeline?.events)) {
|
||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
|
||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn, log);
|
||||
}
|
||||
return {
|
||||
summaryChanges,
|
||||
|
@ -318,30 +319,29 @@ export class Room extends EventEmitter {
|
|||
async afterSyncCompleted(changes, log) {
|
||||
log.set("id", this.id);
|
||||
if (this._roomEncryption) {
|
||||
// TODO: pass log to flushPendingRoomKeyShares once we also have a logger in `start`
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, null);
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, null, log);
|
||||
}
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async start(pendingOperations) {
|
||||
start(pendingOperations, parentLog) {
|
||||
if (this._roomEncryption) {
|
||||
try {
|
||||
const roomKeyShares = pendingOperations?.get("share_room_key");
|
||||
if (roomKeyShares) {
|
||||
// if we got interrupted last time sending keys to newly joined members
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, roomKeyShares);
|
||||
}
|
||||
} catch (err) {
|
||||
// we should not throw here
|
||||
console.error(`could not send out (all) pending room keys for room ${this.id}`, err.stack);
|
||||
const roomKeyShares = pendingOperations?.get("share_room_key");
|
||||
if (roomKeyShares) {
|
||||
// if we got interrupted last time sending keys to newly joined members
|
||||
parentLog.wrapDetached("flush room keys", log => {
|
||||
log.set("id", this.id);
|
||||
return this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, roomKeyShares, log);
|
||||
});
|
||||
}
|
||||
}
|
||||
this._sendQueue.resumeSending();
|
||||
|
||||
this._sendQueue.resumeSending(parentLog);
|
||||
}
|
||||
|
||||
/** @package */
|
||||
async load(summary, txn) {
|
||||
async load(summary, txn, log) {
|
||||
log.set("id", this.id);
|
||||
try {
|
||||
this._summary.load(summary);
|
||||
if (this._summary.data.encryption) {
|
||||
|
@ -354,24 +354,33 @@ export class Room extends EventEmitter {
|
|||
const changes = await this._heroes.calculateChanges(this._summary.data.heroes, [], txn);
|
||||
this._heroes.applyChanges(changes, this._summary.data);
|
||||
}
|
||||
return this._syncWriter.load(txn);
|
||||
return this._syncWriter.load(txn, log);
|
||||
} catch (err) {
|
||||
throw new WrappedError(`Could not load room ${this._roomId}`, err);
|
||||
}
|
||||
}
|
||||
|
||||
/** @public */
|
||||
sendEvent(eventType, content, attachments) {
|
||||
return this._sendQueue.enqueueEvent(eventType, content, attachments);
|
||||
sendEvent(eventType, content, attachments, log = null) {
|
||||
this._platform.logger.wrapOrRun(log, "send", log => {
|
||||
log.set("id", this.id);
|
||||
return this._sendQueue.enqueueEvent(eventType, content, attachments, log);
|
||||
});
|
||||
}
|
||||
|
||||
/** @public */
|
||||
async ensureMessageKeyIsShared() {
|
||||
return this._roomEncryption?.ensureMessageKeyIsShared(this._hsApi);
|
||||
async ensureMessageKeyIsShared(log = null) {
|
||||
if (!this._roomEncryption) {
|
||||
return;
|
||||
}
|
||||
return this._platform.logger.wrapOrRun(log, "ensureMessageKeyIsShared", log => {
|
||||
log.set("id", this.id);
|
||||
return this._roomEncryption.ensureMessageKeyIsShared(this._hsApi, log);
|
||||
});
|
||||
}
|
||||
|
||||
/** @public */
|
||||
async loadMemberList() {
|
||||
async loadMemberList(log = null) {
|
||||
if (this._memberList) {
|
||||
// TODO: also await fetchOrLoadMembers promise here
|
||||
this._memberList.retain();
|
||||
|
@ -385,7 +394,8 @@ export class Room extends EventEmitter {
|
|||
syncToken: this._getSyncToken(),
|
||||
// to handle race between /members and /sync
|
||||
setChangedMembersMap: map => this._changedMembersDuringSync = map,
|
||||
});
|
||||
log,
|
||||
}, this._platform.logger);
|
||||
this._memberList = new MemberList({
|
||||
members,
|
||||
closeCallback: () => { this._memberList = null; }
|
||||
|
@ -395,57 +405,59 @@ export class Room extends EventEmitter {
|
|||
}
|
||||
|
||||
/** @public */
|
||||
async fillGap(fragmentEntry, amount) {
|
||||
fillGap(fragmentEntry, amount, log = null) {
|
||||
// TODO move some/all of this out of Room
|
||||
if (fragmentEntry.edgeReached) {
|
||||
return;
|
||||
}
|
||||
const response = await this._hsApi.messages(this._roomId, {
|
||||
from: fragmentEntry.token,
|
||||
dir: fragmentEntry.direction.asApiString(),
|
||||
limit: amount,
|
||||
filter: {
|
||||
lazy_load_members: true,
|
||||
include_redundant_members: true,
|
||||
return this._platform.logger.wrapOrRun(log, "fillGap", async log => {
|
||||
if (fragmentEntry.edgeReached) {
|
||||
return;
|
||||
}
|
||||
}).response();
|
||||
const response = await this._hsApi.messages(this._roomId, {
|
||||
from: fragmentEntry.token,
|
||||
dir: fragmentEntry.direction.asApiString(),
|
||||
limit: amount,
|
||||
filter: {
|
||||
lazy_load_members: true,
|
||||
include_redundant_members: true,
|
||||
}
|
||||
}, {log}).response();
|
||||
|
||||
const txn = this._storage.readWriteTxn([
|
||||
this._storage.storeNames.pendingEvents,
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
]);
|
||||
let removedPendingEvents;
|
||||
let gapResult;
|
||||
try {
|
||||
// detect remote echos of pending messages in the gap
|
||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn);
|
||||
// write new events into gap
|
||||
const gapWriter = new GapWriter({
|
||||
roomId: this._roomId,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
});
|
||||
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
if (this._roomEncryption) {
|
||||
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
|
||||
await decryptRequest.complete();
|
||||
}
|
||||
// once txn is committed, update in-memory state & emit events
|
||||
for (const fragment of gapResult.fragments) {
|
||||
this._fragmentIdComparer.add(fragment);
|
||||
}
|
||||
if (removedPendingEvents) {
|
||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||
}
|
||||
if (this._timeline) {
|
||||
this._timeline.addGapEntries(gapResult.entries);
|
||||
}
|
||||
const txn = this._storage.readWriteTxn([
|
||||
this._storage.storeNames.pendingEvents,
|
||||
this._storage.storeNames.timelineEvents,
|
||||
this._storage.storeNames.timelineFragments,
|
||||
]);
|
||||
let removedPendingEvents;
|
||||
let gapResult;
|
||||
try {
|
||||
// detect remote echos of pending messages in the gap
|
||||
removedPendingEvents = this._sendQueue.removeRemoteEchos(response.chunk, txn, log);
|
||||
// write new events into gap
|
||||
const gapWriter = new GapWriter({
|
||||
roomId: this._roomId,
|
||||
storage: this._storage,
|
||||
fragmentIdComparer: this._fragmentIdComparer,
|
||||
});
|
||||
gapResult = await gapWriter.writeFragmentFill(fragmentEntry, response, txn);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
if (this._roomEncryption) {
|
||||
const decryptRequest = this._decryptEntries(DecryptionSource.Timeline, gapResult.entries);
|
||||
await decryptRequest.complete();
|
||||
}
|
||||
// once txn is committed, update in-memory state & emit events
|
||||
for (const fragment of gapResult.fragments) {
|
||||
this._fragmentIdComparer.add(fragment);
|
||||
}
|
||||
if (removedPendingEvents) {
|
||||
this._sendQueue.emitRemovals(removedPendingEvents);
|
||||
}
|
||||
if (this._timeline) {
|
||||
this._timeline.addGapEntries(gapResult.entries);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** @public */
|
||||
|
|
|
@ -25,13 +25,13 @@ async function loadMembers({roomId, storage}) {
|
|||
return memberDatas.map(d => new RoomMember(d));
|
||||
}
|
||||
|
||||
async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChangedMembersMap}) {
|
||||
async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChangedMembersMap}, log) {
|
||||
// if any members are changed by sync while we're fetching members,
|
||||
// they will end up here, so we check not to override them
|
||||
const changedMembersDuringSync = new Map();
|
||||
setChangedMembersMap(changedMembersDuringSync);
|
||||
|
||||
const memberResponse = await hsApi.members(roomId, {at: syncToken}).response();
|
||||
const memberResponse = await hsApi.members(roomId, {at: syncToken}, {log}).response();
|
||||
|
||||
const txn = storage.readWriteTxn([
|
||||
storage.storeNames.roomSummary,
|
||||
|
@ -48,6 +48,7 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan
|
|||
if (!Array.isArray(memberEvents)) {
|
||||
throw new Error("malformed");
|
||||
}
|
||||
log.set("members", memberEvents.length);
|
||||
members = await Promise.all(memberEvents.map(async memberEvent => {
|
||||
const userId = memberEvent?.state_key;
|
||||
if (!userId) {
|
||||
|
@ -80,10 +81,11 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan
|
|||
return members;
|
||||
}
|
||||
|
||||
export async function fetchOrLoadMembers(options) {
|
||||
export async function fetchOrLoadMembers(options, logger) {
|
||||
const {summary} = options;
|
||||
if (!summary.data.hasFetchedMembers) {
|
||||
return fetchMembers(options);
|
||||
// we only want to log if we fetch members, so start or continue the optional log operation here
|
||||
return logger.wrapOrRun(options.log, "fetchMembers", log => fetchMembers(options, log));
|
||||
} else {
|
||||
return loadMembers(options);
|
||||
}
|
||||
|
|
|
@ -30,10 +30,7 @@ export class PendingEvent {
|
|||
constructor({data, remove, emitUpdate, attachments}) {
|
||||
this._data = data;
|
||||
this._attachments = attachments;
|
||||
this._emitUpdate = () => {
|
||||
console.log("PendingEvent status", this.status, this._attachments && Object.entries(this._attachments).map(([key, a]) => `${key}: ${a.sentBytes}/${a.size}`));
|
||||
emitUpdate();
|
||||
};
|
||||
this._emitUpdate = emitUpdate;
|
||||
this._removeFromQueueCallback = remove;
|
||||
this._aborted = false;
|
||||
this._status = SendStatus.Waiting;
|
||||
|
@ -100,7 +97,7 @@ export class PendingEvent {
|
|||
return this._attachments && Object.values(this._attachments).reduce((t, a) => t + a.sentBytes, 0);
|
||||
}
|
||||
|
||||
async uploadAttachments(hsApi) {
|
||||
async uploadAttachments(hsApi, log) {
|
||||
if (!this.needsUpload) {
|
||||
return;
|
||||
}
|
||||
|
@ -111,7 +108,10 @@ export class PendingEvent {
|
|||
this._status = SendStatus.EncryptingAttachments;
|
||||
this._emitUpdate("status");
|
||||
for (const attachment of Object.values(this._attachments)) {
|
||||
await attachment.encrypt();
|
||||
await log.wrap("encrypt", () => {
|
||||
log.set("size", attachment.size);
|
||||
return attachment.encrypt();
|
||||
});
|
||||
if (this.aborted) {
|
||||
throw new AbortError();
|
||||
}
|
||||
|
@ -123,8 +123,11 @@ export class PendingEvent {
|
|||
// upload smallest attachments first
|
||||
entries.sort(([, a1], [, a2]) => a1.size - a2.size);
|
||||
for (const [urlPath, attachment] of entries) {
|
||||
await attachment.upload(hsApi, () => {
|
||||
this._emitUpdate("attachmentsSentBytes");
|
||||
await log.wrap("upload", log => {
|
||||
log.set("size", attachment.size);
|
||||
return attachment.upload(hsApi, () => {
|
||||
this._emitUpdate("attachmentsSentBytes");
|
||||
}, log);
|
||||
});
|
||||
attachment.applyToContent(urlPath, this.content);
|
||||
}
|
||||
|
@ -148,8 +151,7 @@ export class PendingEvent {
|
|||
return this._aborted;
|
||||
}
|
||||
|
||||
async send(hsApi) {
|
||||
console.log(`sending event ${this.eventType} in ${this.roomId}`);
|
||||
async send(hsApi, log) {
|
||||
this._status = SendStatus.Sending;
|
||||
this._emitUpdate("status");
|
||||
const eventType = this._data.encryptedEventType || this._data.eventType;
|
||||
|
@ -158,11 +160,13 @@ export class PendingEvent {
|
|||
this.roomId,
|
||||
eventType,
|
||||
this.txnId,
|
||||
content
|
||||
content,
|
||||
{log}
|
||||
);
|
||||
const response = await this._sendRequest.response();
|
||||
this._sendRequest = null;
|
||||
this._data.remoteId = response.event_id;
|
||||
log.set("id", this._data.remoteId);
|
||||
this._status = SendStatus.Sent;
|
||||
this._emitUpdate("status");
|
||||
}
|
||||
|
|
|
@ -26,9 +26,6 @@ export class SendQueue {
|
|||
this._storage = storage;
|
||||
this._hsApi = hsApi;
|
||||
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
|
||||
if (pendingEvents.length) {
|
||||
console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents);
|
||||
}
|
||||
this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data)));
|
||||
this._isSending = false;
|
||||
this._offline = false;
|
||||
|
@ -49,48 +46,54 @@ export class SendQueue {
|
|||
this._roomEncryption = roomEncryption;
|
||||
}
|
||||
|
||||
async _sendLoop() {
|
||||
_sendLoop(log) {
|
||||
this._isSending = true;
|
||||
try {
|
||||
for (let i = 0; i < this._pendingEvents.length; i += 1) {
|
||||
const pendingEvent = this._pendingEvents.get(i);
|
||||
try {
|
||||
await this._sendEvent(pendingEvent);
|
||||
} catch(err) {
|
||||
if (err instanceof ConnectionError) {
|
||||
this._offline = true;
|
||||
break;
|
||||
} else {
|
||||
pendingEvent.setError(err);
|
||||
}
|
||||
}
|
||||
this._sendLoopLogItem = log.runDetached("send queue flush", async log => {
|
||||
try {
|
||||
for (let i = 0; i < this._pendingEvents.length; i += 1) {
|
||||
await log.wrap("send event", async log => {
|
||||
const pendingEvent = this._pendingEvents.get(i);
|
||||
log.set("queueIndex", pendingEvent.queueIndex);
|
||||
try {
|
||||
await this._sendEvent(pendingEvent, log);
|
||||
} catch(err) {
|
||||
if (err instanceof ConnectionError) {
|
||||
this._offline = true;
|
||||
log.set("offline", true);
|
||||
} else {
|
||||
log.catch(err);
|
||||
pendingEvent.setError(err);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
this._isSending = false;
|
||||
this._sendLoopLogItem = null;
|
||||
}
|
||||
} finally {
|
||||
this._isSending = false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async _sendEvent(pendingEvent) {
|
||||
async _sendEvent(pendingEvent, log) {
|
||||
if (pendingEvent.needsUpload) {
|
||||
await pendingEvent.uploadAttachments(this._hsApi);
|
||||
console.log("attachments upload, content is now", pendingEvent.content);
|
||||
await log.wrap("upload attachments", log => pendingEvent.uploadAttachments(this._hsApi, log));
|
||||
await this._tryUpdateEvent(pendingEvent);
|
||||
}
|
||||
if (pendingEvent.needsEncryption) {
|
||||
pendingEvent.setEncrypting();
|
||||
const {type, content} = await this._roomEncryption.encrypt(
|
||||
pendingEvent.eventType, pendingEvent.content, this._hsApi);
|
||||
const {type, content} = await log.wrap("encrypt", log => this._roomEncryption.encrypt(
|
||||
pendingEvent.eventType, pendingEvent.content, this._hsApi, log));
|
||||
pendingEvent.setEncrypted(type, content);
|
||||
await this._tryUpdateEvent(pendingEvent);
|
||||
}
|
||||
if (pendingEvent.needsSending) {
|
||||
await pendingEvent.send(this._hsApi);
|
||||
console.log("writing remoteId");
|
||||
await pendingEvent.send(this._hsApi, log);
|
||||
|
||||
await this._tryUpdateEvent(pendingEvent);
|
||||
}
|
||||
}
|
||||
|
||||
removeRemoteEchos(events, txn) {
|
||||
removeRemoteEchos(events, txn, parentLog) {
|
||||
const removed = [];
|
||||
for (const event of events) {
|
||||
const txnId = event.unsigned && event.unsigned.transaction_id;
|
||||
|
@ -102,6 +105,7 @@ export class SendQueue {
|
|||
}
|
||||
if (idx !== -1) {
|
||||
const pendingEvent = this._pendingEvents.get(idx);
|
||||
parentLog.log({l: "removeRemoteEcho", id: pendingEvent.remoteId});
|
||||
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
|
||||
removed.push(pendingEvent);
|
||||
}
|
||||
|
@ -134,19 +138,32 @@ export class SendQueue {
|
|||
}
|
||||
}
|
||||
|
||||
resumeSending() {
|
||||
resumeSending(parentLog) {
|
||||
this._offline = false;
|
||||
if (!this._isSending) {
|
||||
this._sendLoop();
|
||||
if (this._pendingEvents.length) {
|
||||
parentLog.wrap("resumeSending", log => {
|
||||
log.set("id", this._roomId);
|
||||
log.set("pendingEvents", this._pendingEvents.length);
|
||||
if (!this._isSending) {
|
||||
this._sendLoop(log);
|
||||
}
|
||||
if (this._sendLoopLogItem) {
|
||||
log.refDetached(this._sendLoopLogItem);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async enqueueEvent(eventType, content, attachments) {
|
||||
async enqueueEvent(eventType, content, attachments, log) {
|
||||
const pendingEvent = await this._createAndStoreEvent(eventType, content, attachments);
|
||||
this._pendingEvents.set(pendingEvent);
|
||||
console.log("added to _pendingEvents set", this._pendingEvents.length);
|
||||
log.set("queueIndex", pendingEvent.queueIndex);
|
||||
log.set("pendingEvents", this._pendingEvents.length);
|
||||
if (!this._isSending && !this._offline) {
|
||||
this._sendLoop();
|
||||
this._sendLoop(log);
|
||||
}
|
||||
if (this._sendLoopLogItem) {
|
||||
log.refDetached(this._sendLoopLogItem);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,34 +173,25 @@ export class SendQueue {
|
|||
|
||||
async _tryUpdateEvent(pendingEvent) {
|
||||
const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
||||
console.log("_tryUpdateEvent: got txn");
|
||||
try {
|
||||
// pendingEvent might have been removed already here
|
||||
// by a racing remote echo, so check first so we don't recreate it
|
||||
console.log("_tryUpdateEvent: before exists");
|
||||
if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) {
|
||||
console.log("_tryUpdateEvent: inside if exists");
|
||||
txn.pendingEvents.update(pendingEvent.data);
|
||||
}
|
||||
console.log("_tryUpdateEvent: after exists");
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
console.log("_tryUpdateEvent: error", err);
|
||||
throw err;
|
||||
}
|
||||
console.log("_tryUpdateEvent: try complete");
|
||||
await txn.complete();
|
||||
}
|
||||
|
||||
async _createAndStoreEvent(eventType, content, attachments) {
|
||||
console.log("_createAndStoreEvent");
|
||||
const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
|
||||
let pendingEvent;
|
||||
try {
|
||||
const pendingEventsStore = txn.pendingEvents;
|
||||
console.log("_createAndStoreEvent getting maxQueueIndex");
|
||||
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
|
||||
console.log("_createAndStoreEvent got maxQueueIndex", maxQueueIndex);
|
||||
const queueIndex = maxQueueIndex + 1;
|
||||
pendingEvent = this._createPendingEvent({
|
||||
roomId: this._roomId,
|
||||
|
@ -194,7 +202,6 @@ export class SendQueue {
|
|||
needsEncryption: !!this._roomEncryption,
|
||||
needsUpload: !!attachments
|
||||
}, attachments);
|
||||
console.log("_createAndStoreEvent: adding to pendingEventsStore");
|
||||
pendingEventsStore.add(pendingEvent.data);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
|
@ -205,7 +212,7 @@ export class SendQueue {
|
|||
}
|
||||
|
||||
dispose() {
|
||||
for (const pe in this._pendingEvents.array) {
|
||||
for (const pe of this._pendingEvents) {
|
||||
pe.dispose();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ export class SyncWriter {
|
|||
this._lastLiveKey = null;
|
||||
}
|
||||
|
||||
async load(txn) {
|
||||
async load(txn, log) {
|
||||
const liveFragment = await txn.timelineFragments.liveFragment(this._roomId);
|
||||
if (liveFragment) {
|
||||
const [lastEvent] = await txn.timelineEvents.lastEvents(this._roomId, liveFragment.id, 1);
|
||||
|
@ -53,7 +53,9 @@ export class SyncWriter {
|
|||
}
|
||||
// if there is no live fragment, we don't create it here because load gets a readonly txn.
|
||||
// this is on purpose, load shouldn't modify the store
|
||||
console.log("room persister load", this._roomId, this._lastLiveKey && this._lastLiveKey.toString());
|
||||
if (this._lastLiveKey) {
|
||||
log.set("live key", this._lastLiveKey.toString());
|
||||
}
|
||||
}
|
||||
|
||||
async _createLiveFragment(txn, previousToken) {
|
||||
|
|
Loading…
Reference in a new issue