log sending messages

This commit is contained in:
Bruno Windels 2021-02-23 19:22:59 +01:00
parent 57bb75e864
commit dd38fc13d7
8 changed files with 159 additions and 114 deletions

View file

@ -69,11 +69,11 @@ export class DeviceTracker {
})); }));
} }
async trackRoom(room) { async trackRoom(room, log) {
if (room.isTrackingMembers || !room.isEncrypted) { if (room.isTrackingMembers || !room.isEncrypted) {
return; return;
} }
const memberList = await room.loadMemberList(); const memberList = await room.loadMemberList(log);
try { try {
const txn = this._storage.readWriteTxn([ const txn = this._storage.readWriteTxn([
this._storage.storeNames.roomSummary, this._storage.storeNames.roomSummary,
@ -83,6 +83,7 @@ export class DeviceTracker {
try { try {
isTrackingChanges = room.writeIsTrackingMembers(true, txn); isTrackingChanges = room.writeIsTrackingMembers(true, txn);
const members = Array.from(memberList.members.values()); const members = Array.from(memberList.members.values());
log.set("members", members.length);
await this._writeJoinedMembers(members, txn); await this._writeJoinedMembers(members, txn);
} catch (err) { } catch (err) {
txn.abort(); txn.abort();
@ -142,7 +143,7 @@ export class DeviceTracker {
} }
} }
async _queryKeys(userIds, hsApi) { async _queryKeys(userIds, hsApi, log) {
// TODO: we need to handle the race here between /sync and /keys/query just like we need to do for the member list ... // TODO: we need to handle the race here between /sync and /keys/query just like we need to do for the member list ...
// there are multiple requests going out for /keys/query though and only one for /members // there are multiple requests going out for /keys/query though and only one for /members
@ -153,9 +154,9 @@ export class DeviceTracker {
return deviceKeysMap; return deviceKeysMap;
}, {}), }, {}),
"token": this._getSyncToken() "token": this._getSyncToken()
}).response(); }, {log}).response();
const verifiedKeysPerUser = this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"]); const verifiedKeysPerUser = log.wrap("verify", log => this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"], log));
const txn = this._storage.readWriteTxn([ const txn = this._storage.readWriteTxn([
this._storage.storeNames.userIdentities, this._storage.storeNames.userIdentities,
this._storage.storeNames.deviceIdentities, this._storage.storeNames.deviceIdentities,
@ -167,6 +168,7 @@ export class DeviceTracker {
return await this._storeQueriedDevicesForUserId(userId, deviceIdentities, txn); return await this._storeQueriedDevicesForUserId(userId, deviceIdentities, txn);
})); }));
deviceIdentities = devicesIdentitiesPerUser.reduce((all, devices) => all.concat(devices), []); deviceIdentities = devicesIdentitiesPerUser.reduce((all, devices) => all.concat(devices), []);
log.set("devices", deviceIdentities.length);
} catch (err) { } catch (err) {
txn.abort(); txn.abort();
throw err; throw err;
@ -215,7 +217,7 @@ export class DeviceTracker {
/** /**
* @return {Array<{userId, verifiedKeys: Array<DeviceSection>>} * @return {Array<{userId, verifiedKeys: Array<DeviceSection>>}
*/ */
_filterVerifiedDeviceKeys(keyQueryDeviceKeysResponse) { _filterVerifiedDeviceKeys(keyQueryDeviceKeysResponse, parentLog) {
const curve25519Keys = new Set(); const curve25519Keys = new Set();
const verifiedKeys = Object.entries(keyQueryDeviceKeysResponse).map(([userId, keysByDevice]) => { const verifiedKeys = Object.entries(keyQueryDeviceKeysResponse).map(([userId, keysByDevice]) => {
const verifiedEntries = Object.entries(keysByDevice).filter(([deviceId, deviceKeys]) => { const verifiedEntries = Object.entries(keysByDevice).filter(([deviceId, deviceKeys]) => {
@ -233,11 +235,21 @@ export class DeviceTracker {
return false; return false;
} }
if (curve25519Keys.has(curve25519Key)) { if (curve25519Keys.has(curve25519Key)) {
console.warn("ignoring device with duplicate curve25519 key in /keys/query response", deviceKeys); parentLog.log({
l: "ignore device with duplicate curve25519 key",
keys: deviceKeys
}, parentLog.level.Warn);
return false; return false;
} }
curve25519Keys.add(curve25519Key); curve25519Keys.add(curve25519Key);
return this._hasValidSignature(deviceKeys); const isValid = this._hasValidSignature(deviceKeys);
if (!isValid) {
parentLog.log({
l: "ignore device with invalid signature",
keys: deviceKeys
}, parentLog.level.Warn);
}
return isValid;
}); });
const verifiedKeys = verifiedEntries.map(([, deviceKeys]) => deviceKeys); const verifiedKeys = verifiedEntries.map(([, deviceKeys]) => deviceKeys);
return {userId, verifiedKeys}; return {userId, verifiedKeys};
@ -258,7 +270,7 @@ export class DeviceTracker {
* @param {String} roomId [description] * @param {String} roomId [description]
* @return {[type]} [description] * @return {[type]} [description]
*/ */
async devicesForTrackedRoom(roomId, hsApi) { async devicesForTrackedRoom(roomId, hsApi, log) {
const txn = this._storage.readTxn([ const txn = this._storage.readTxn([
this._storage.storeNames.roomMembers, this._storage.storeNames.roomMembers,
this._storage.storeNames.userIdentities, this._storage.storeNames.userIdentities,
@ -271,14 +283,14 @@ export class DeviceTracker {
// So, this will also contain non-joined memberships // So, this will also contain non-joined memberships
const userIds = await txn.roomMembers.getAllUserIds(roomId); const userIds = await txn.roomMembers.getAllUserIds(roomId);
return await this._devicesForUserIds(roomId, userIds, txn, hsApi); return await this._devicesForUserIds(roomId, userIds, txn, hsApi, log);
} }
async devicesForRoomMembers(roomId, userIds, hsApi) { async devicesForRoomMembers(roomId, userIds, hsApi, log) {
const txn = this._storage.readTxn([ const txn = this._storage.readTxn([
this._storage.storeNames.userIdentities, this._storage.storeNames.userIdentities,
]); ]);
return await this._devicesForUserIds(roomId, userIds, txn, hsApi); return await this._devicesForUserIds(roomId, userIds, txn, hsApi, log);
} }
/** /**
@ -288,7 +300,7 @@ export class DeviceTracker {
* @param {HomeServerApi} hsApi * @param {HomeServerApi} hsApi
* @return {Array<DeviceIdentity>} * @return {Array<DeviceIdentity>}
*/ */
async _devicesForUserIds(roomId, userIds, userIdentityTxn, hsApi) { async _devicesForUserIds(roomId, userIds, userIdentityTxn, hsApi, log) {
const allMemberIdentities = await Promise.all(userIds.map(userId => userIdentityTxn.userIdentities.get(userId))); const allMemberIdentities = await Promise.all(userIds.map(userId => userIdentityTxn.userIdentities.get(userId)));
const identities = allMemberIdentities.filter(identity => { const identities = allMemberIdentities.filter(identity => {
// identity will be missing for any userIds that don't have // identity will be missing for any userIds that don't have
@ -297,12 +309,14 @@ export class DeviceTracker {
}); });
const upToDateIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_UPTODATE); const upToDateIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_UPTODATE);
const outdatedIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_OUTDATED); const outdatedIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_OUTDATED);
log.set("uptodate", upToDateIdentities.length);
log.set("outdated", outdatedIdentities.length);
let queriedDevices; let queriedDevices;
if (outdatedIdentities.length) { if (outdatedIdentities.length) {
// TODO: ignore the race between /sync and /keys/query for now, // TODO: ignore the race between /sync and /keys/query for now,
// where users could get marked as outdated or added/removed from the room while // where users could get marked as outdated or added/removed from the room while
// querying keys // querying keys
queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi); queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi, log);
} }
const deviceTxn = this._storage.readTxn([ const deviceTxn = this._storage.readTxn([

View file

@ -252,21 +252,21 @@ export class RoomEncryption {
} }
/** shares the encryption key for the next message if needed */ /** shares the encryption key for the next message if needed */
async ensureMessageKeyIsShared(hsApi) { async ensureMessageKeyIsShared(hsApi, log) {
if (this._lastKeyPreShareTime?.measure() < MIN_PRESHARE_INTERVAL) { if (this._lastKeyPreShareTime?.measure() < MIN_PRESHARE_INTERVAL) {
return; return;
} }
this._lastKeyPreShareTime = this._clock.createMeasure(); this._lastKeyPreShareTime = this._clock.createMeasure();
const roomKeyMessage = await this._megolmEncryption.ensureOutboundSession(this._room.id, this._encryptionParams); const roomKeyMessage = await this._megolmEncryption.ensureOutboundSession(this._room.id, this._encryptionParams);
if (roomKeyMessage) { if (roomKeyMessage) {
await this._shareNewRoomKey(roomKeyMessage, hsApi); await log.wrap("share key", log => this._shareNewRoomKey(roomKeyMessage, hsApi, log));
} }
} }
async encrypt(type, content, hsApi) { async encrypt(type, content, hsApi, log) {
const megolmResult = await this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams); const megolmResult = await log.wrap("megolm encrypt", () => this._megolmEncryption.encrypt(this._room.id, type, content, this._encryptionParams));
if (megolmResult.roomKeyMessage) { if (megolmResult.roomKeyMessage) {
this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi); log.wrapDetached("share key", log => this._shareNewRoomKey(megolmResult.roomKeyMessage, hsApi, log));
} }
return { return {
type: ENCRYPTED_TYPE, type: ENCRYPTED_TYPE,
@ -283,9 +283,9 @@ export class RoomEncryption {
return false; return false;
} }
async _shareNewRoomKey(roomKeyMessage, hsApi) { async _shareNewRoomKey(roomKeyMessage, hsApi, log) {
await this._deviceTracker.trackRoom(this._room); await this._deviceTracker.trackRoom(this._room, log);
const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi); const devices = await this._deviceTracker.devicesForTrackedRoom(this._room.id, hsApi, log);
const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set())); const userIds = Array.from(devices.reduce((set, device) => set.add(device.userId), new Set()));
// store operation for room key share, in case we don't finish here // store operation for room key share, in case we don't finish here
@ -297,13 +297,15 @@ export class RoomEncryption {
writeOpTxn.abort(); writeOpTxn.abort();
throw err; throw err;
} }
log.set("id", operationId);
log.set("sessionId", roomKeyMessage.session_id);
await writeOpTxn.complete(); await writeOpTxn.complete();
// TODO: at this point we have the room key stored, and the rest is sort of optional // TODO: at this point we have the room key stored, and the rest is sort of optional
// it would be nice if we could signal SendQueue that any error from here on is non-fatal and // it would be nice if we could signal SendQueue that any error from here on is non-fatal and
// return the encrypted payload. // return the encrypted payload.
// send the room key // send the room key
await this._sendRoomKey(roomKeyMessage, devices, hsApi); await this._sendRoomKey(roomKeyMessage, devices, hsApi, log);
// remove the operation // remove the operation
const removeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]); const removeOpTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
@ -353,29 +355,33 @@ export class RoomEncryption {
if (operation.type !== "share_room_key") { if (operation.type !== "share_room_key") {
continue; continue;
} }
const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi); await log.wrap("operation", async log => {
await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi); log.set("id", operation.id);
const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]); const devices = await this._deviceTracker.devicesForRoomMembers(this._room.id, operation.userIds, hsApi, log);
try { await this._sendRoomKey(operation.roomKeyMessage, devices, hsApi, log);
removeTxn.operations.remove(operation.id); const removeTxn = this._storage.readWriteTxn([this._storage.storeNames.operations]);
} catch (err) { try {
removeTxn.abort(); removeTxn.operations.remove(operation.id);
throw err; } catch (err) {
} removeTxn.abort();
await removeTxn.complete(); throw err;
}
await removeTxn.complete();
});
} }
} finally { } finally {
this._isFlushingRoomKeyShares = false; this._isFlushingRoomKeyShares = false;
} }
} }
async _sendRoomKey(roomKeyMessage, devices, hsApi) { async _sendRoomKey(roomKeyMessage, devices, hsApi, log) {
const messages = await this._olmEncryption.encrypt( const messages = await log.wrap("olm encrypt", log => this._olmEncryption.encrypt(
"m.room_key", roomKeyMessage, devices, hsApi); "m.room_key", roomKeyMessage, devices, hsApi, log));
await this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi); await log.wrap("send", log => this._sendMessagesToDevices(ENCRYPTED_TYPE, messages, hsApi, log));
} }
async _sendMessagesToDevices(type, messages, hsApi) { async _sendMessagesToDevices(type, messages, hsApi, log) {
log.set("messages", messages.length);
const messagesByUser = groupBy(messages, message => message.device.userId); const messagesByUser = groupBy(messages, message => message.device.userId);
const payload = { const payload = {
messages: Array.from(messagesByUser.entries()).reduce((userMap, [userId, messages]) => { messages: Array.from(messagesByUser.entries()).reduce((userMap, [userId, messages]) => {
@ -387,7 +393,7 @@ export class RoomEncryption {
}, {}) }, {})
}; };
const txnId = makeTxnId(); const txnId = makeTxnId();
await hsApi.sendToDevice(type, payload, txnId).response(); await hsApi.sendToDevice(type, payload, txnId, {log}).response();
} }
dispose() { dispose() {

View file

@ -47,17 +47,17 @@ export class Encryption {
this._senderKeyLock = senderKeyLock; this._senderKeyLock = senderKeyLock;
} }
async encrypt(type, content, devices, hsApi) { async encrypt(type, content, devices, hsApi, log) {
let messages = []; let messages = [];
for (let i = 0; i < devices.length ; i += MAX_BATCH_SIZE) { for (let i = 0; i < devices.length ; i += MAX_BATCH_SIZE) {
const batchDevices = devices.slice(i, i + MAX_BATCH_SIZE); const batchDevices = devices.slice(i, i + MAX_BATCH_SIZE);
const batchMessages = await this._encryptForMaxDevices(type, content, batchDevices, hsApi); const batchMessages = await this._encryptForMaxDevices(type, content, batchDevices, hsApi, log);
messages = messages.concat(batchMessages); messages = messages.concat(batchMessages);
} }
return messages; return messages;
} }
async _encryptForMaxDevices(type, content, devices, hsApi) { async _encryptForMaxDevices(type, content, devices, hsApi, log) {
// TODO: see if we can only hold some of the locks until after the /keys/claim call (if needed) // TODO: see if we can only hold some of the locks until after the /keys/claim call (if needed)
// take a lock on all senderKeys so decryption and other calls to encrypt (should not happen) // take a lock on all senderKeys so decryption and other calls to encrypt (should not happen)
// don't modify the sessions at the same time // don't modify the sessions at the same time
@ -75,16 +75,17 @@ export class Encryption {
let encryptionTargets = []; let encryptionTargets = [];
try { try {
if (devicesWithoutSession.length) { if (devicesWithoutSession.length) {
const newEncryptionTargets = await this._createNewSessions( const newEncryptionTargets = await log.wrap("create sessions", log => this._createNewSessions(
devicesWithoutSession, hsApi, timestamp); devicesWithoutSession, hsApi, timestamp, log));
encryptionTargets = encryptionTargets.concat(newEncryptionTargets); encryptionTargets = encryptionTargets.concat(newEncryptionTargets);
} }
await this._loadSessions(existingEncryptionTargets); await this._loadSessions(existingEncryptionTargets);
encryptionTargets = encryptionTargets.concat(existingEncryptionTargets); encryptionTargets = encryptionTargets.concat(existingEncryptionTargets);
const messages = encryptionTargets.map(target => { const encryptLog = {l: "encrypt", targets: encryptionTargets.length};
const messages = log.wrap(encryptLog, () => encryptionTargets.map(target => {
const encryptedContent = this._encryptForDevice(type, content, target); const encryptedContent = this._encryptForDevice(type, content, target);
return new EncryptedMessage(encryptedContent, target.device); return new EncryptedMessage(encryptedContent, target.device);
}); }));
await this._storeSessions(encryptionTargets, timestamp); await this._storeSessions(encryptionTargets, timestamp);
return messages; return messages;
} finally { } finally {
@ -149,8 +150,8 @@ export class Encryption {
} }
} }
async _createNewSessions(devicesWithoutSession, hsApi, timestamp) { async _createNewSessions(devicesWithoutSession, hsApi, timestamp, log) {
const newEncryptionTargets = await this._claimOneTimeKeys(hsApi, devicesWithoutSession); const newEncryptionTargets = await log.wrap("claim", log => this._claimOneTimeKeys(hsApi, devicesWithoutSession, log));
try { try {
for (const target of newEncryptionTargets) { for (const target of newEncryptionTargets) {
const {device, oneTimeKey} = target; const {device, oneTimeKey} = target;
@ -166,7 +167,7 @@ export class Encryption {
return newEncryptionTargets; return newEncryptionTargets;
} }
async _claimOneTimeKeys(hsApi, deviceIdentities) { async _claimOneTimeKeys(hsApi, deviceIdentities, log) {
// create a Map<userId, Map<deviceId, deviceIdentity>> // create a Map<userId, Map<deviceId, deviceIdentity>>
const devicesByUser = groupByWithCreator(deviceIdentities, const devicesByUser = groupByWithCreator(deviceIdentities,
device => device.userId, device => device.userId,
@ -183,11 +184,10 @@ export class Encryption {
const claimResponse = await hsApi.claimKeys({ const claimResponse = await hsApi.claimKeys({
timeout: 10000, timeout: 10000,
one_time_keys: oneTimeKeys one_time_keys: oneTimeKeys
}).response(); }, {log}).response();
if (Object.keys(claimResponse.failures).length) { if (Object.keys(claimResponse.failures).length) {
console.warn("failures for claiming one time keys", oneTimeKeys, claimResponse.failures); log.log({l: "failures", servers: Object.keys(claimResponse.failures)}, log.level.Warn);
} }
// TODO: log claimResponse.failures
const userKeyMap = claimResponse?.["one_time_keys"]; const userKeyMap = claimResponse?.["one_time_keys"];
return this._verifyAndCreateOTKTargets(userKeyMap, devicesByUser); return this._verifyAndCreateOTKTargets(userKeyMap, devicesByUser);
} }

View file

@ -61,12 +61,13 @@ export class AttachmentUpload {
} }
/** @package */ /** @package */
async upload(hsApi, progressCallback) { async upload(hsApi, progressCallback, log) {
this._uploadRequest = hsApi.uploadAttachment(this._transferredBlob, this._filename, { this._uploadRequest = hsApi.uploadAttachment(this._transferredBlob, this._filename, {
uploadProgress: sentBytes => { uploadProgress: sentBytes => {
this._sentBytes = sentBytes; this._sentBytes = sentBytes;
progressCallback(); progressCallback();
} },
log
}); });
const {content_uri} = await this._uploadRequest.response(); const {content_uri} = await this._uploadRequest.response();
this._mxcUrl = content_uri; this._mxcUrl = content_uri;

View file

@ -361,17 +361,26 @@ export class Room extends EventEmitter {
} }
/** @public */ /** @public */
sendEvent(eventType, content, attachments) { sendEvent(eventType, content, attachments, log = null) {
return this._sendQueue.enqueueEvent(eventType, content, attachments); this._platform.logger.wrapOrRun(log, "send", log => {
log.set("id", this.id);
return this._sendQueue.enqueueEvent(eventType, content, attachments, log);
});
} }
/** @public */ /** @public */
async ensureMessageKeyIsShared() { async ensureMessageKeyIsShared(log = null) {
return this._roomEncryption?.ensureMessageKeyIsShared(this._hsApi); if (!this._roomEncryption) {
return;
}
return this._platform.logger.wrapOrRun(log, "ensureMessageKeyIsShared", log => {
log.set("id", this.id);
return this._roomEncryption.ensureMessageKeyIsShared(this._hsApi, log);
});
} }
/** @public */ /** @public */
async loadMemberList() { async loadMemberList(log = null) {
if (this._memberList) { if (this._memberList) {
// TODO: also await fetchOrLoadMembers promise here // TODO: also await fetchOrLoadMembers promise here
this._memberList.retain(); this._memberList.retain();
@ -385,7 +394,8 @@ export class Room extends EventEmitter {
syncToken: this._getSyncToken(), syncToken: this._getSyncToken(),
// to handle race between /members and /sync // to handle race between /members and /sync
setChangedMembersMap: map => this._changedMembersDuringSync = map, setChangedMembersMap: map => this._changedMembersDuringSync = map,
}); log,
}, this._platform.logger);
this._memberList = new MemberList({ this._memberList = new MemberList({
members, members,
closeCallback: () => { this._memberList = null; } closeCallback: () => { this._memberList = null; }

View file

@ -25,13 +25,13 @@ async function loadMembers({roomId, storage}) {
return memberDatas.map(d => new RoomMember(d)); return memberDatas.map(d => new RoomMember(d));
} }
async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChangedMembersMap}) { async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChangedMembersMap}, log) {
// if any members are changed by sync while we're fetching members, // if any members are changed by sync while we're fetching members,
// they will end up here, so we check not to override them // they will end up here, so we check not to override them
const changedMembersDuringSync = new Map(); const changedMembersDuringSync = new Map();
setChangedMembersMap(changedMembersDuringSync); setChangedMembersMap(changedMembersDuringSync);
const memberResponse = await hsApi.members(roomId, {at: syncToken}).response(); const memberResponse = await hsApi.members(roomId, {at: syncToken}, {log}).response();
const txn = storage.readWriteTxn([ const txn = storage.readWriteTxn([
storage.storeNames.roomSummary, storage.storeNames.roomSummary,
@ -48,6 +48,7 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan
if (!Array.isArray(memberEvents)) { if (!Array.isArray(memberEvents)) {
throw new Error("malformed"); throw new Error("malformed");
} }
log.set("members", memberEvents.length);
members = await Promise.all(memberEvents.map(async memberEvent => { members = await Promise.all(memberEvents.map(async memberEvent => {
const userId = memberEvent?.state_key; const userId = memberEvent?.state_key;
if (!userId) { if (!userId) {
@ -80,10 +81,11 @@ async function fetchMembers({summary, syncToken, roomId, hsApi, storage, setChan
return members; return members;
} }
export async function fetchOrLoadMembers(options) { export async function fetchOrLoadMembers(options, logger) {
const {summary} = options; const {summary} = options;
if (!summary.data.hasFetchedMembers) { if (!summary.data.hasFetchedMembers) {
return fetchMembers(options); // we only want to log if we fetch members, so start or continue the optional log operation here
return logger.wrapOrRun(options.log, "fetchMembers", log => fetchMembers(options, log));
} else { } else {
return loadMembers(options); return loadMembers(options);
} }

View file

@ -100,7 +100,7 @@ export class PendingEvent {
return this._attachments && Object.values(this._attachments).reduce((t, a) => t + a.sentBytes, 0); return this._attachments && Object.values(this._attachments).reduce((t, a) => t + a.sentBytes, 0);
} }
async uploadAttachments(hsApi) { async uploadAttachments(hsApi, log) {
if (!this.needsUpload) { if (!this.needsUpload) {
return; return;
} }
@ -111,7 +111,10 @@ export class PendingEvent {
this._status = SendStatus.EncryptingAttachments; this._status = SendStatus.EncryptingAttachments;
this._emitUpdate("status"); this._emitUpdate("status");
for (const attachment of Object.values(this._attachments)) { for (const attachment of Object.values(this._attachments)) {
await attachment.encrypt(); await log.wrap("encrypt", () => {
log.set("size", attachment.size);
return attachment.encrypt()
});
if (this.aborted) { if (this.aborted) {
throw new AbortError(); throw new AbortError();
} }
@ -123,8 +126,11 @@ export class PendingEvent {
// upload smallest attachments first // upload smallest attachments first
entries.sort(([, a1], [, a2]) => a1.size - a2.size); entries.sort(([, a1], [, a2]) => a1.size - a2.size);
for (const [urlPath, attachment] of entries) { for (const [urlPath, attachment] of entries) {
await attachment.upload(hsApi, () => { await log.wrap("upload", log => {
this._emitUpdate("attachmentsSentBytes"); log.set("size", attachment.size);
return attachment.upload(hsApi, () => {
this._emitUpdate("attachmentsSentBytes");
}, log);
}); });
attachment.applyToContent(urlPath, this.content); attachment.applyToContent(urlPath, this.content);
} }
@ -148,8 +154,7 @@ export class PendingEvent {
return this._aborted; return this._aborted;
} }
async send(hsApi) { async send(hsApi, log) {
console.log(`sending event ${this.eventType} in ${this.roomId}`);
this._status = SendStatus.Sending; this._status = SendStatus.Sending;
this._emitUpdate("status"); this._emitUpdate("status");
const eventType = this._data.encryptedEventType || this._data.eventType; const eventType = this._data.encryptedEventType || this._data.eventType;
@ -158,7 +163,8 @@ export class PendingEvent {
this.roomId, this.roomId,
eventType, eventType,
this.txnId, this.txnId,
content content,
{log}
); );
const response = await this._sendRequest.response(); const response = await this._sendRequest.response();
this._sendRequest = null; this._sendRequest = null;

View file

@ -26,9 +26,6 @@ export class SendQueue {
this._storage = storage; this._storage = storage;
this._hsApi = hsApi; this._hsApi = hsApi;
this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex);
if (pendingEvents.length) {
console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents);
}
this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data))); this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data)));
this._isSending = false; this._isSending = false;
this._offline = false; this._offline = false;
@ -49,43 +46,49 @@ export class SendQueue {
this._roomEncryption = roomEncryption; this._roomEncryption = roomEncryption;
} }
async _sendLoop() { _sendLoop(log) {
this._isSending = true; this._isSending = true;
try { this._sendLoopLogItem = log.runDetached("send queue flush", async log => {
for (let i = 0; i < this._pendingEvents.length; i += 1) { try {
const pendingEvent = this._pendingEvents.get(i); for (let i = 0; i < this._pendingEvents.length; i += 1) {
try { await log.wrap("send event", async log => {
await this._sendEvent(pendingEvent); const pendingEvent = this._pendingEvents.get(i);
} catch(err) { log.set("id", pendingEvent.queueIndex);
if (err instanceof ConnectionError) { try {
this._offline = true; await this._sendEvent(pendingEvent, log);
break; } catch(err) {
} else { if (err instanceof ConnectionError) {
pendingEvent.setError(err); this._offline = true;
} log.set("offline", true);
} } else {
log.catch(err);
pendingEvent.setError(err);
}
}
});
}
} finally {
this._isSending = false;
this._sendLoopLogItem = null;
} }
} finally { });
this._isSending = false;
}
} }
async _sendEvent(pendingEvent) { async _sendEvent(pendingEvent, log) {
if (pendingEvent.needsUpload) { if (pendingEvent.needsUpload) {
await pendingEvent.uploadAttachments(this._hsApi); await log.wrap("upload attachments", log => pendingEvent.uploadAttachments(this._hsApi, log));
console.log("attachments upload, content is now", pendingEvent.content);
await this._tryUpdateEvent(pendingEvent); await this._tryUpdateEvent(pendingEvent);
} }
if (pendingEvent.needsEncryption) { if (pendingEvent.needsEncryption) {
pendingEvent.setEncrypting(); pendingEvent.setEncrypting();
const {type, content} = await this._roomEncryption.encrypt( const {type, content} = await log.wrap("encrypt", log => this._roomEncryption.encrypt(
pendingEvent.eventType, pendingEvent.content, this._hsApi); pendingEvent.eventType, pendingEvent.content, this._hsApi, log));
pendingEvent.setEncrypted(type, content); pendingEvent.setEncrypted(type, content);
await this._tryUpdateEvent(pendingEvent); await this._tryUpdateEvent(pendingEvent);
} }
if (pendingEvent.needsSending) { if (pendingEvent.needsSending) {
await pendingEvent.send(this._hsApi); await pendingEvent.send(this._hsApi, log);
console.log("writing remoteId");
await this._tryUpdateEvent(pendingEvent); await this._tryUpdateEvent(pendingEvent);
} }
} }
@ -134,19 +137,32 @@ export class SendQueue {
} }
} }
resumeSending() { resumeSending(parentLog) {
this._offline = false; this._offline = false;
if (!this._isSending) { if (this._pendingEvents.length) {
this._sendLoop(); parentLog.wrap("resumeSending", log => {
log.set("id", this._roomId);
log.set("pendingEvents", this._pendingEvents.length);
if (!this._isSending) {
this._sendLoop(log);
}
if (this._sendLoopLogItem) {
log.refDetached(this._sendLoopLogItem);
}
});
} }
} }
async enqueueEvent(eventType, content, attachments) { async enqueueEvent(eventType, content, attachments, log) {
const pendingEvent = await this._createAndStoreEvent(eventType, content, attachments); const pendingEvent = await this._createAndStoreEvent(eventType, content, attachments);
this._pendingEvents.set(pendingEvent); this._pendingEvents.set(pendingEvent);
console.log("added to _pendingEvents set", this._pendingEvents.length); log.set("queueIndex", pendingEvent.queueIndex);
log.set("pendingEvents", this._pendingEvents.length);
if (!this._isSending && !this._offline) { if (!this._isSending && !this._offline) {
this._sendLoop(); this._sendLoop(log);
}
if (this._sendLoopLogItem) {
log.refDetached(this._sendLoopLogItem);
} }
} }
@ -156,34 +172,25 @@ export class SendQueue {
async _tryUpdateEvent(pendingEvent) { async _tryUpdateEvent(pendingEvent) {
const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
console.log("_tryUpdateEvent: got txn");
try { try {
// pendingEvent might have been removed already here // pendingEvent might have been removed already here
// by a racing remote echo, so check first so we don't recreate it // by a racing remote echo, so check first so we don't recreate it
console.log("_tryUpdateEvent: before exists");
if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) { if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) {
console.log("_tryUpdateEvent: inside if exists");
txn.pendingEvents.update(pendingEvent.data); txn.pendingEvents.update(pendingEvent.data);
} }
console.log("_tryUpdateEvent: after exists");
} catch (err) { } catch (err) {
txn.abort(); txn.abort();
console.log("_tryUpdateEvent: error", err);
throw err; throw err;
} }
console.log("_tryUpdateEvent: try complete");
await txn.complete(); await txn.complete();
} }
async _createAndStoreEvent(eventType, content, attachments) { async _createAndStoreEvent(eventType, content, attachments) {
console.log("_createAndStoreEvent");
const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
let pendingEvent; let pendingEvent;
try { try {
const pendingEventsStore = txn.pendingEvents; const pendingEventsStore = txn.pendingEvents;
console.log("_createAndStoreEvent getting maxQueueIndex");
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
console.log("_createAndStoreEvent got maxQueueIndex", maxQueueIndex);
const queueIndex = maxQueueIndex + 1; const queueIndex = maxQueueIndex + 1;
pendingEvent = this._createPendingEvent({ pendingEvent = this._createPendingEvent({
roomId: this._roomId, roomId: this._roomId,
@ -194,7 +201,6 @@ export class SendQueue {
needsEncryption: !!this._roomEncryption, needsEncryption: !!this._roomEncryption,
needsUpload: !!attachments needsUpload: !!attachments
}, attachments); }, attachments);
console.log("_createAndStoreEvent: adding to pendingEventsStore");
pendingEventsStore.add(pendingEvent.data); pendingEventsStore.add(pendingEvent.data);
} catch (err) { } catch (err) {
txn.abort(); txn.abort();