From dd38fc13d7cca410a2ad38f80a5b0144aa2ac5af Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Tue, 23 Feb 2021 19:22:59 +0100 Subject: [PATCH] log sending messages --- src/matrix/e2ee/DeviceTracker.js | 42 +++++++---- src/matrix/e2ee/RoomEncryption.js | 56 ++++++++------- src/matrix/e2ee/olm/Encryption.js | 26 +++---- src/matrix/room/AttachmentUpload.js | 5 +- src/matrix/room/Room.js | 22 ++++-- src/matrix/room/members/load.js | 10 +-- src/matrix/room/sending/PendingEvent.js | 20 ++++-- src/matrix/room/sending/SendQueue.js | 92 +++++++++++++------------ 8 files changed, 159 insertions(+), 114 deletions(-) diff --git a/src/matrix/e2ee/DeviceTracker.js b/src/matrix/e2ee/DeviceTracker.js index 89ceae7b..a740636c 100644 --- a/src/matrix/e2ee/DeviceTracker.js +++ b/src/matrix/e2ee/DeviceTracker.js @@ -69,11 +69,11 @@ export class DeviceTracker { })); } - async trackRoom(room) { + async trackRoom(room, log) { if (room.isTrackingMembers || !room.isEncrypted) { return; } - const memberList = await room.loadMemberList(); + const memberList = await room.loadMemberList(log); try { const txn = this._storage.readWriteTxn([ this._storage.storeNames.roomSummary, @@ -83,6 +83,7 @@ export class DeviceTracker { try { isTrackingChanges = room.writeIsTrackingMembers(true, txn); const members = Array.from(memberList.members.values()); + log.set("members", members.length); await this._writeJoinedMembers(members, txn); } catch (err) { txn.abort(); @@ -142,7 +143,7 @@ export class DeviceTracker { } } - async _queryKeys(userIds, hsApi) { + async _queryKeys(userIds, hsApi, log) { // TODO: we need to handle the race here between /sync and /keys/query just like we need to do for the member list ... // there are multiple requests going out for /keys/query though and only one for /members @@ -153,9 +154,9 @@ export class DeviceTracker { return deviceKeysMap; }, {}), "token": this._getSyncToken() - }).response(); + }, {log}).response(); - const verifiedKeysPerUser = this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"]); + const verifiedKeysPerUser = log.wrap("verify", log => this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"], log)); const txn = this._storage.readWriteTxn([ this._storage.storeNames.userIdentities, this._storage.storeNames.deviceIdentities, @@ -167,6 +168,7 @@ export class DeviceTracker { return await this._storeQueriedDevicesForUserId(userId, deviceIdentities, txn); })); deviceIdentities = devicesIdentitiesPerUser.reduce((all, devices) => all.concat(devices), []); + log.set("devices", deviceIdentities.length); } catch (err) { txn.abort(); throw err; @@ -215,7 +217,7 @@ export class DeviceTracker { /** * @return {Array<{userId, verifiedKeys: Array>} */ - _filterVerifiedDeviceKeys(keyQueryDeviceKeysResponse) { + _filterVerifiedDeviceKeys(keyQueryDeviceKeysResponse, parentLog) { const curve25519Keys = new Set(); const verifiedKeys = Object.entries(keyQueryDeviceKeysResponse).map(([userId, keysByDevice]) => { const verifiedEntries = Object.entries(keysByDevice).filter(([deviceId, deviceKeys]) => { @@ -233,11 +235,21 @@ export class DeviceTracker { return false; } if (curve25519Keys.has(curve25519Key)) { - console.warn("ignoring device with duplicate curve25519 key in /keys/query response", deviceKeys); + parentLog.log({ + l: "ignore device with duplicate curve25519 key", + keys: deviceKeys + }, parentLog.level.Warn); return false; } curve25519Keys.add(curve25519Key); - return this._hasValidSignature(deviceKeys); + const isValid = this._hasValidSignature(deviceKeys); + if (!isValid) { + parentLog.log({ + l: "ignore device with invalid signature", + keys: deviceKeys + }, parentLog.level.Warn); + } + return isValid; }); const verifiedKeys = verifiedEntries.map(([, deviceKeys]) => deviceKeys); return {userId, verifiedKeys}; @@ -258,7 +270,7 @@ export class DeviceTracker { * @param {String} roomId [description] * @return {[type]} [description] */ - async devicesForTrackedRoom(roomId, hsApi) { + async devicesForTrackedRoom(roomId, hsApi, log) { const txn = this._storage.readTxn([ this._storage.storeNames.roomMembers, this._storage.storeNames.userIdentities, @@ -271,14 +283,14 @@ export class DeviceTracker { // So, this will also contain non-joined memberships const userIds = await txn.roomMembers.getAllUserIds(roomId); - return await this._devicesForUserIds(roomId, userIds, txn, hsApi); + return await this._devicesForUserIds(roomId, userIds, txn, hsApi, log); } - async devicesForRoomMembers(roomId, userIds, hsApi) { + async devicesForRoomMembers(roomId, userIds, hsApi, log) { const txn = this._storage.readTxn([ this._storage.storeNames.userIdentities, ]); - return await this._devicesForUserIds(roomId, userIds, txn, hsApi); + return await this._devicesForUserIds(roomId, userIds, txn, hsApi, log); } /** @@ -288,7 +300,7 @@ export class DeviceTracker { * @param {HomeServerApi} hsApi * @return {Array} */ - async _devicesForUserIds(roomId, userIds, userIdentityTxn, hsApi) { + async _devicesForUserIds(roomId, userIds, userIdentityTxn, hsApi, log) { const allMemberIdentities = await Promise.all(userIds.map(userId => userIdentityTxn.userIdentities.get(userId))); const identities = allMemberIdentities.filter(identity => { // identity will be missing for any userIds that don't have @@ -297,12 +309,14 @@ export class DeviceTracker { }); const upToDateIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_UPTODATE); const outdatedIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_OUTDATED); + log.set("uptodate", upToDateIdentities.length); + log.set("outdated", outdatedIdentities.length); let queriedDevices; if (outdatedIdentities.length) { // TODO: ignore the race between /sync and /keys/query for now, // where users could get marked as outdated or added/removed from the room while // querying keys - queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi); + queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi, log); } const deviceTxn = this._storage.readTxn([ diff --git a/src/matrix/e2ee/RoomEncryption.js b/src/matrix/e2ee/RoomEncryption.js index d5f12a51..c0dc17e7 100644 --- a/src/matrix/e2ee/RoomEncryption.js +++ b/src/matrix/e2ee/RoomEncryption.js @@ -252,21 +252,21 @@ export class RoomEncryption { } /** shares the encryption key for the next message if needed */ - async ensureMessageKeyIsShared(hsApi) { + async ensureMessageKeyIsShared(hsApi, log) { if (this._lastKeyPreShareTime?.measure() < MIN_PRESHARE_INTERVAL) { return; } this._lastKeyPreShareTime = this._clock.createMeasure(); const roomKeyMessage = await this._megolmEncryption.ensureOutboundSession(this._room.id, this._encryptionParams); if (roomKeyMessage) { - await this._shareNewRoomKey(roomKeyMessage, hsApi); + await log.wrap("share key", log => this._shareNewRoomKey(roomKeyMessage, hsApi, log)); } } - async encrypt(type, content, hsApi) { - const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams); + async encrypt(type, content, hsApi, log) { + const megolmResult = await log.wrap("megolm encrypt", () => this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams)); if (megolmResult.roomKeyMessage) { - this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi); + log.wrapDetached("share key", log => this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi, log)); } return { type: ENCRYPTED_TYPE, @@ -283,9 +283,9 @@ export class RoomEncryption { return false; } - async _shareNewRoomKey(roomKeyMessage, hsApi) { - await this._deviceTracker.trackRoom(this._room); - const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi); + async _shareNewRoomKey(roomKeyMessage, hsApi, log) { + await this._deviceTracker.trackRoom(this._room, log); + const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi, log); const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set())); // store operation for room key share, in case we don't finish here @@ -297,13 +297,15 @@ export class RoomEncryption { writeOpTxn.abort(); throw err; } + log.set("id", operationId); + log.set("sessionId", roomKeyMessage.session_id); await writeOpTxn.complete(); // TODO: at this point we have the room key stored, and the rest is sort of optional // it would be nice if we could signal SendQueue that any error from here on is non-fatal and // return the encrypted payload. // send the room key - await this._sendRoomKey(roomKeyMessage, devices, hsApi); + await this._sendRoomKey(roomKeyMessage, devices, hsApi, log); // remove the operation const removeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]); @@ -353,29 +355,33 @@ export class RoomEncryption { if (operation.type !== "share_room_key") { continue; } - const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi); - await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi); - const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]); - try { - removeTxn.operations.remove(operation.id); - } catch (err) { - removeTxn.abort(); - throw err; - } - await removeTxn.complete(); + await log.wrap("operation", async log => { + log.set("id", operation.id); + const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi, log); + await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi, log); + const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]); + try { + removeTxn.operations.remove(operation.id); + } catch (err) { + removeTxn.abort(); + throw err; + } + await removeTxn.complete(); + }); } } finally { this._isFlushingRoomKeyShares = false; } } - async _sendRoomKey(roomKeyMessage, devices, hsApi) { - const messages = await this._olmEncryption.encrypt( - "m.room_key", roomKeyMessage, devices, hsApi); - await this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi); + async _sendRoomKey(roomKeyMessage, devices, hsApi, log) { + const messages = await log.wrap("olm encrypt", log => this._olmEncryption.encrypt( + "m.room_key", roomKeyMessage, devices, hsApi, log)); + await log.wrap("send", log => this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi, log)); } - async _sendMessagesToDevices(type, messages, hsApi) { + async _sendMessagesToDevices(type, messages, hsApi, log) { + log.set("messages", messages.length); const messagesByUser = groupBy(messages, message => message.device.userId); const payload = { messages: Array.from(messagesByUser.entries()).reduce((userMap, [userId, messages]) => { @@ -387,7 +393,7 @@ export class RoomEncryption { }, {}) }; const txnId = makeTxnId(); - await hsApi.sendToDevice(type, payload, txnId).response(); + await hsApi.sendToDevice(type, payload, txnId, {log}).response(); } dispose() { diff --git a/src/matrix/e2ee/olm/Encryption.js b/src/matrix/e2ee/olm/Encryption.js index 8ce4583a..c26c0239 100644 --- a/src/matrix/e2ee/olm/Encryption.js +++ b/src/matrix/e2ee/olm/Encryption.js @@ -47,17 +47,17 @@ export class Encryption { this._senderKeyLock = senderKeyLock; } - async encrypt(type, content, devices, hsApi) { + async encrypt(type, content, devices, hsApi, log) { let messages = []; for (let i = 0; i < devices.length ; i += MAX_BATCH_SIZE) { const batchDevices = devices.slice(i, i + MAX_BATCH_SIZE); - const batchMessages = await this._encryptForMaxDevices(type, content, batchDevices, hsApi); + const batchMessages = await this._encryptForMaxDevices(type, content, batchDevices, hsApi, log); messages = messages.concat(batchMessages); } return messages; } - async _encryptForMaxDevices(type, content, devices, hsApi) { + async _encryptForMaxDevices(type, content, devices, hsApi, log) { // TODO: see if we can only hold some of the locks until after the /keys/claim call (if needed) // take a lock on all senderKeys so decryption and other calls to encrypt (should not happen) // don't modify the sessions at the same time @@ -75,16 +75,17 @@ export class Encryption { let encryptionTargets = []; try { if (devicesWithoutSession.length) { - const newEncryptionTargets = await this._createNewSessions( - devicesWithoutSession, hsApi, timestamp); + const newEncryptionTargets = await log.wrap("create sessions", log => this._createNewSessions( + devicesWithoutSession, hsApi, timestamp, log)); encryptionTargets = encryptionTargets.concat(newEncryptionTargets); } await this._loadSessions(existingEncryptionTargets); encryptionTargets = encryptionTargets.concat(existingEncryptionTargets); - const messages = encryptionTargets.map(target => { + const encryptLog = {l: "encrypt", targets: encryptionTargets.length}; + const messages = log.wrap(encryptLog, () => encryptionTargets.map(target => { const encryptedContent = this._encryptForDevice(type, content, target); return new EncryptedMessage(encryptedContent, target.device); - }); + })); await this._storeSessions(encryptionTargets, timestamp); return messages; } finally { @@ -149,8 +150,8 @@ export class Encryption { } } - async _createNewSessions(devicesWithoutSession, hsApi, timestamp) { - const newEncryptionTargets = await this._claimOneTimeKeys(hsApi, devicesWithoutSession); + async _createNewSessions(devicesWithoutSession, hsApi, timestamp, log) { + const newEncryptionTargets = await log.wrap("claim", log => this._claimOneTimeKeys(hsApi, devicesWithoutSession, log)); try { for (const target of newEncryptionTargets) { const {device, oneTimeKey} = target; @@ -166,7 +167,7 @@ export class Encryption { return newEncryptionTargets; } - async _claimOneTimeKeys(hsApi, deviceIdentities) { + async _claimOneTimeKeys(hsApi, deviceIdentities, log) { // create a Map> const devicesByUser = groupByWithCreator(deviceIdentities, device => device.userId, @@ -183,11 +184,10 @@ export class Encryption { const claimResponse = await hsApi.claimKeys({ timeout: 10000, one_time_keys: oneTimeKeys - }).response(); + }, {log}).response(); if (Object.keys(claimResponse.failures).length) { - console.warn("failures for claiming one time keys", oneTimeKeys, claimResponse.failures); + log.log({l: "failures", servers: Object.keys(claimResponse.failures)}, log.level.Warn); } - // TODO: log claimResponse.failures const userKeyMap = claimResponse?.["one_time_keys"]; return this._verifyAndCreateOTKTargets(userKeyMap, devicesByUser); } diff --git a/src/matrix/room/AttachmentUpload.js b/src/matrix/room/AttachmentUpload.js index a66fbd45..e2e7e3bf 100644 --- a/src/matrix/room/AttachmentUpload.js +++ b/src/matrix/room/AttachmentUpload.js @@ -61,12 +61,13 @@ export class AttachmentUpload { } /** @package */ - async upload(hsApi, progressCallback) { + async upload(hsApi, progressCallback, log) { this._uploadRequest = hsApi.uploadAttachment(this._transferredBlob, this._filename, { uploadProgress: sentBytes => { this._sentBytes = sentBytes; progressCallback(); - } + }, + log }); const {content_uri} = await this._uploadRequest.response(); this._mxcUrl = content_uri; diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index e562e8b1..1419a96d 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -361,17 +361,26 @@ export class Room extends EventEmitter { } /** @public */ - sendEvent(eventType, content, attachments) { - return this._sendQueue.enqueueEvent(eventType, content, attachments); + sendEvent(eventType, content, attachments, log = null) { + this._platform.logger.wrapOrRun(log, "send", log => { + log.set("id", this.id); + return this._sendQueue.enqueueEvent(eventType, content, attachments, log); + }); } /** @public */ - async ensureMessageKeyIsShared() { - return this._roomEncryption?.ensureMessageKeyIsShared(this._hsApi); + async ensureMessageKeyIsShared(log = null) { + if (!this._roomEncryption) { + return; + } + return this._platform.logger.wrapOrRun(log, "ensureMessageKeyIsShared", log => { + log.set("id", this.id); + return this._roomEncryption.ensureMessageKeyIsShared(this._hsApi, log); + }); } /** @public */ - async loadMemberList() { + async loadMemberList(log = null) { if (this._memberList) { // TODO: also await fetchOrLoadMembers promise here this._memberList.retain(); @@ -385,7 +394,8 @@ export class Room extends EventEmitter { syncToken: this._getSyncToken(), // to handle race between /members and /sync setChangedMembersMap: map => this._changedMembersDuringSync = map, - }); + log, + }, this._platform.logger); this._memberList = new MemberList({ members, closeCallback: () => { this._memberList = null; } diff --git a/src/matrix/room/members/load.js b/src/matrix/room/members/load.js index aa14d2fb..2b8853de 100644 --- a/src/matrix/room/members/load.js +++ b/src/matrix/room/members/load.js @@ -25,13 +25,13 @@ async function loadMembers({roomId, storage}) { return memberDatas.map(d => new RoomMember(d)); } -async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChangedMembersMap}) { +async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChangedMembersMap}, log) { // if any members are changed by sync while we're fetching members, // they will end up here, so we check not to override them const changedMembersDuringSync = new Map(); setChangedMembersMap(changedMembersDuringSync); - const memberResponse = await hsApi.members(roomId, {at: syncToken}).response(); + const memberResponse = await hsApi.members(roomId, {at: syncToken}, {log}).response(); const txn = storage.readWriteTxn([ storage.storeNames.roomSummary, @@ -48,6 +48,7 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan if (!Array.isArray(memberEvents)) { throw new Error("malformed"); } + log.set("members", memberEvents.length); members = await Promise.all(memberEvents.map(async memberEvent => { const userId = memberEvent?.state_key; if (!userId) { @@ -80,10 +81,11 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan return members; } -export async function fetchOrLoadMembers(options) { +export async function fetchOrLoadMembers(options, logger) { const {summary} = options; if (!summary.data.hasFetchedMembers) { - return fetchMembers(options); + // we only want to log if we fetch members, so start or continue the optional log operation here + return logger.wrapOrRun(options.log, "fetchMembers", log => fetchMembers(options, log)); } else { return loadMembers(options); } diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index 1980b30b..4e3a18b2 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -100,7 +100,7 @@ export class PendingEvent { return this._attachments && Object.values(this._attachments).reduce((t, a) => t + a.sentBytes, 0); } - async uploadAttachments(hsApi) { + async uploadAttachments(hsApi, log) { if (!this.needsUpload) { return; } @@ -111,7 +111,10 @@ export class PendingEvent { this._status = SendStatus.EncryptingAttachments; this._emitUpdate("status"); for (const attachment of Object.values(this._attachments)) { - await attachment.encrypt(); + await log.wrap("encrypt", () => { + log.set("size", attachment.size); + return attachment.encrypt() + }); if (this.aborted) { throw new AbortError(); } @@ -123,8 +126,11 @@ export class PendingEvent { // upload smallest attachments first entries.sort(([, a1], [, a2]) => a1.size - a2.size); for (const [urlPath, attachment] of entries) { - await attachment.upload(hsApi, () => { - this._emitUpdate("attachmentsSentBytes"); + await log.wrap("upload", log => { + log.set("size", attachment.size); + return attachment.upload(hsApi, () => { + this._emitUpdate("attachmentsSentBytes"); + }, log); }); attachment.applyToContent(urlPath, this.content); } @@ -148,8 +154,7 @@ export class PendingEvent { return this._aborted; } - async send(hsApi) { - console.log(`sending event ${this.eventType} in ${this.roomId}`); + async send(hsApi, log) { this._status = SendStatus.Sending; this._emitUpdate("status"); const eventType = this._data.encryptedEventType || this._data.eventType; @@ -158,7 +163,8 @@ export class PendingEvent { this.roomId, eventType, this.txnId, - content + content, + {log} ); const response = await this._sendRequest.response(); this._sendRequest = null; diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index ce26d849..5aa59870 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -26,9 +26,6 @@ export class SendQueue { this._storage = storage; this._hsApi = hsApi; this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); - if (pendingEvents.length) { - console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents); - } this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data))); this._isSending = false; this._offline = false; @@ -49,43 +46,49 @@ export class SendQueue { this._roomEncryption = roomEncryption; } - async _sendLoop() { + _sendLoop(log) { this._isSending = true; - try { - for (let i = 0; i < this._pendingEvents.length; i += 1) { - const pendingEvent = this._pendingEvents.get(i); - try { - await this._sendEvent(pendingEvent); - } catch(err) { - if (err instanceof ConnectionError) { - this._offline = true; - break; - } else { - pendingEvent.setError(err); - } - } + this._sendLoopLogItem = log.runDetached("send queue flush", async log => { + try { + for (let i = 0; i < this._pendingEvents.length; i += 1) { + await log.wrap("send event", async log => { + const pendingEvent = this._pendingEvents.get(i); + log.set("id", pendingEvent.queueIndex); + try { + await this._sendEvent(pendingEvent, log); + } catch(err) { + if (err instanceof ConnectionError) { + this._offline = true; + log.set("offline", true); + } else { + log.catch(err); + pendingEvent.setError(err); + } + } + }); + } + } finally { + this._isSending = false; + this._sendLoopLogItem = null; } - } finally { - this._isSending = false; - } + }); } - async _sendEvent(pendingEvent) { + async _sendEvent(pendingEvent, log) { if (pendingEvent.needsUpload) { - await pendingEvent.uploadAttachments(this._hsApi); - console.log("attachments upload, content is now", pendingEvent.content); + await log.wrap("upload attachments", log => pendingEvent.uploadAttachments(this._hsApi, log)); await this._tryUpdateEvent(pendingEvent); } if (pendingEvent.needsEncryption) { pendingEvent.setEncrypting(); - const {type, content} = await this._roomEncryption.encrypt( - pendingEvent.eventType, pendingEvent.content, this._hsApi); + const {type, content} = await log.wrap("encrypt", log => this._roomEncryption.encrypt( + pendingEvent.eventType, pendingEvent.content, this._hsApi, log)); pendingEvent.setEncrypted(type, content); await this._tryUpdateEvent(pendingEvent); } if (pendingEvent.needsSending) { - await pendingEvent.send(this._hsApi); - console.log("writing remoteId"); + await pendingEvent.send(this._hsApi, log); + await this._tryUpdateEvent(pendingEvent); } } @@ -134,19 +137,32 @@ export class SendQueue { } } - resumeSending() { + resumeSending(parentLog) { this._offline = false; - if (!this._isSending) { - this._sendLoop(); + if (this._pendingEvents.length) { + parentLog.wrap("resumeSending", log => { + log.set("id", this._roomId); + log.set("pendingEvents", this._pendingEvents.length); + if (!this._isSending) { + this._sendLoop(log); + } + if (this._sendLoopLogItem) { + log.refDetached(this._sendLoopLogItem); + } + }); } } - async enqueueEvent(eventType, content, attachments) { + async enqueueEvent(eventType, content, attachments, log) { const pendingEvent = await this._createAndStoreEvent(eventType, content, attachments); this._pendingEvents.set(pendingEvent); - console.log("added to _pendingEvents set", this._pendingEvents.length); + log.set("queueIndex", pendingEvent.queueIndex); + log.set("pendingEvents", this._pendingEvents.length); if (!this._isSending && !this._offline) { - this._sendLoop(); + this._sendLoop(log); + } + if (this._sendLoopLogItem) { + log.refDetached(this._sendLoopLogItem); } } @@ -156,34 +172,25 @@ export class SendQueue { async _tryUpdateEvent(pendingEvent) { const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); - console.log("_tryUpdateEvent: got txn"); try { // pendingEvent might have been removed already here // by a racing remote echo, so check first so we don't recreate it - console.log("_tryUpdateEvent: before exists"); if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) { - console.log("_tryUpdateEvent: inside if exists"); txn.pendingEvents.update(pendingEvent.data); } - console.log("_tryUpdateEvent: after exists"); } catch (err) { txn.abort(); - console.log("_tryUpdateEvent: error", err); throw err; } - console.log("_tryUpdateEvent: try complete"); await txn.complete(); } async _createAndStoreEvent(eventType, content, attachments) { - console.log("_createAndStoreEvent"); const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); let pendingEvent; try { const pendingEventsStore = txn.pendingEvents; - console.log("_createAndStoreEvent getting maxQueueIndex"); const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; - console.log("_createAndStoreEvent got maxQueueIndex", maxQueueIndex); const queueIndex = maxQueueIndex + 1; pendingEvent = this._createPendingEvent({ roomId: this._roomId, @@ -194,7 +201,6 @@ export class SendQueue { needsEncryption: !!this._roomEncryption, needsUpload: !!attachments }, attachments); - console.log("_createAndStoreEvent: adding to pendingEventsStore"); pendingEventsStore.add(pendingEvent.data); } catch (err) { txn.abort();