From c934049523c3148a470b6caad142d3b05facbe7d Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 21 May 2021 10:47:48 +0200 Subject: [PATCH] also resolve related event ids when removing remote echo during sync as /sync races with /send, and remote echo may happen first. It's important for local echo that the pending redaction/relation will also get attached to the remote echo before /send returns, otherwise the remote echo would be "unannotated" until /send returns --- src/matrix/room/BaseRoom.js | 4 +- src/matrix/room/Room.js | 6 +-- src/matrix/room/sending/PendingEvent.js | 24 ++++------- src/matrix/room/sending/SendQueue.js | 53 ++++++++++++++++--------- 4 files changed, 48 insertions(+), 39 deletions(-) diff --git a/src/matrix/room/BaseRoom.js b/src/matrix/room/BaseRoom.js index e6e7ae33..9cb38974 100644 --- a/src/matrix/room/BaseRoom.js +++ b/src/matrix/room/BaseRoom.js @@ -263,7 +263,7 @@ export class BaseRoom extends EventEmitter { let gapResult; try { // detect remote echos of pending messages in the gap - extraGapFillChanges = this._writeGapFill(response.chunk, txn, log); + extraGapFillChanges = await this._writeGapFill(response.chunk, txn, log); // write new events into gap const gapWriter = new GapWriter({ roomId: this._roomId, @@ -300,7 +300,7 @@ export class BaseRoom extends EventEmitter { JoinedRoom uses this update remote echos. */ // eslint-disable-next-line no-unused-vars - _writeGapFill(chunk, txn, log) {} + async _writeGapFill(chunk, txn, log) {} _applyGapFill() {} /** @public */ diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index cb145b7d..da9eef52 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -159,7 +159,7 @@ export class Room extends BaseRoom { } let removedPendingEvents; if (Array.isArray(roomResponse.timeline?.events)) { - removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn, log); + removedPendingEvents = await this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn, log); } return { summaryChanges, @@ -280,8 +280,8 @@ export class Room extends BaseRoom { } } - _writeGapFill(gapChunk, txn, log) { - const removedPendingEvents = this._sendQueue.removeRemoteEchos(gapChunk, txn, log); + async _writeGapFill(gapChunk, txn, log) { + const removedPendingEvents = await this._sendQueue.removeRemoteEchos(gapChunk, txn, log); return removedPendingEvents; } diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index 3e7971e7..ef5d086e 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -16,7 +16,6 @@ limitations under the License. import {createEnum} from "../../../utils/enum.js"; import {AbortError} from "../../../utils/error.js"; import {REDACTION_TYPE} from "../common.js"; -import {isTxnId} from "../../common.js"; export const SendStatus = createEnum( "Waiting", @@ -49,6 +48,13 @@ export class PendingEvent { get txnId() { return this._data.txnId; } get remoteId() { return this._data.remoteId; } get content() { return this._data.content; } + get relatedTxnId() { return this._data.relatedTxnId; } + get relatedEventId() { return this._data.relatedEventId; } + + setRelatedEventId(eventId) { + this._data.relatedEventId = eventId; + } + get data() { return this._data; } getAttachment(key) { @@ -164,10 +170,9 @@ export class PendingEvent { const eventType = this._data.encryptedEventType || this._data.eventType; const content = this._data.encryptedContent || this._data.content; if (eventType === REDACTION_TYPE) { - // TODO: should we double check here that this._data.redacts is not a txnId here anymore? this._sendRequest = hsApi.redact( this.roomId, - this._data.redacts, + this._data.relatedEventId, this.txnId, content, {log} @@ -197,17 +202,4 @@ export class PendingEvent { } } } - - get relatedTxnId() { - if (isTxnId(this._data.redacts)) { - return this._data.redacts; - } - return null; - } - - setRelatedEventId(eventId) { - if (this._data.redacts) { - this._data.redacts = eventId; - } - } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 5fa6dca6..45bcb519 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -104,16 +104,11 @@ export class SendQueue { // the relatedTxnId to a related event id, they need to do so now. // We ensure this by writing the new remote id for the pending event and all related events // with unresolved relatedTxnId in the queue in one transaction. - const relatedEvents = this._pendingEvents.array.filter(pe => pe.relatedTxnId === pendingEvent.txnId); const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); try { await this._tryUpdateEventWithTxn(pendingEvent, txn); - for (const relatedPE of relatedEvents) { - relatedPE.setRelatedEventId(pendingEvent.remoteId); - await this._tryUpdateEventWithTxn(relatedPE, txn); - // emit that we now have a related remote id - this._pendingEvents.update(relatedPE) - } + await this._resolveRemoteIdInPendingRelations( + pendingEvent.txnId, pendingEvent.remoteId, txn); } catch (err) { txn.abort(); throw err; @@ -122,7 +117,20 @@ export class SendQueue { } } - removeRemoteEchos(events, txn, parentLog) { + async _resolveRemoteIdInPendingRelations(txnId, remoteId, txn) { + const relatedEventWithoutRemoteId = this._pendingEvents.array.filter(pe => { + return pe.relatedTxnId === txnId && pe.relatedEventId !== remoteId; + }); + for (const relatedPE of relatedEventWithoutRemoteId) { + relatedPE.setRelatedEventId(remoteId); + await this._tryUpdateEventWithTxn(relatedPE, txn); + // emit that we now have a related remote id + // this._pendingEvents.update(relatedPE); + } + return relatedEventWithoutRemoteId; + } + + async removeRemoteEchos(events, txn, parentLog) { const removed = []; for (const event of events) { const txnId = event.unsigned && event.unsigned.transaction_id; @@ -134,9 +142,11 @@ export class SendQueue { } if (idx !== -1) { const pendingEvent = this._pendingEvents.get(idx); - parentLog.log({l: "removeRemoteEcho", queueIndex: pendingEvent.queueIndex, remoteId: event.event_id, txnId}); + const remoteId = event.event_id; + parentLog.log({l: "removeRemoteEcho", queueIndex: pendingEvent.queueIndex, remoteId, txnId}); txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); removed.push(pendingEvent); + await this._resolveRemoteIdInPendingRelations(txnId, remoteId, txn); } } return removed; @@ -184,12 +194,12 @@ export class SendQueue { } async enqueueEvent(eventType, content, attachments, log) { - await this._enqueueEvent(eventType, content, attachments, null, log); + await this._enqueueEvent(eventType, content, attachments, null, null, log); } - async _enqueueEvent(eventType, content, attachments, redacts, log) { - const pendingEvent = await this._createAndStoreEvent(eventType, content, redacts, attachments); + async _enqueueEvent(eventType, content, attachments, relatedTxnId, relatedEventId, log) { + const pendingEvent = await this._createAndStoreEvent(eventType, content, relatedTxnId, relatedEventId, attachments); this._pendingEvents.set(pendingEvent); log.set("queueIndex", pendingEvent.queueIndex); log.set("pendingEvents", this._pendingEvents.length); @@ -202,8 +212,11 @@ export class SendQueue { } async enqueueRedaction(eventIdOrTxnId, reason, log) { + let relatedTxnId; + let relatedEventId; if (isTxnId(eventIdOrTxnId)) { - log.set("txnIdToRedact", eventIdOrTxnId); + relatedTxnId = eventIdOrTxnId; + log.set("relatedTxnId", eventIdOrTxnId); const txnId = eventIdOrTxnId; const pe = this._pendingEvents.array.find(pe => pe.txnId === txnId); if (pe && !pe.remoteId && pe.status !== SendStatus.Sending) { @@ -211,7 +224,9 @@ export class SendQueue { // just remove it from the queue await pe.abort(); return; - } else if (!pe) { + } else if (pe) { + relatedEventId = pe.remoteId; + } else { // we don't have the pending event anymore, // the remote echo must have arrived in the meantime. // we could look for it in the timeline, but for now @@ -220,9 +235,10 @@ export class SendQueue { return; } } else { - log.set("eventIdToRedact", eventIdOrTxnId); + relatedEventId = eventIdOrTxnId; + log.set("relatedEventId", relatedEventId); } - await this._enqueueEvent(REDACTION_TYPE, {reason}, null, eventIdOrTxnId, log); + await this._enqueueEvent(REDACTION_TYPE, {reason}, null, relatedTxnId, relatedEventId, log); } get pendingEvents() { @@ -248,7 +264,7 @@ export class SendQueue { } } - async _createAndStoreEvent(eventType, content, redacts, attachments) { + async _createAndStoreEvent(eventType, content, relatedTxnId, relatedEventId, attachments) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); let pendingEvent; try { @@ -261,7 +277,8 @@ export class SendQueue { queueIndex, eventType, content, - redacts, + relatedTxnId, + relatedEventId, txnId: makeTxnId(), needsEncryption, needsUpload: !!attachments