From fd81111bfbf75c84d6dc3107c8ee43849ead90ca Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Wed, 18 Nov 2020 13:02:38 +0100 Subject: [PATCH] merge state machine from AttachmentUpload into PendingEvent to have less state machines, and we are mostly interested in the aggregate status of all attachments of an event this will also drive updates through the pending events collection that already exists rather than an extra observablevalue, so less housekeeping to update the UI. --- src/matrix/room/AttachmentUpload.js | 90 ++++-------- src/matrix/room/Room.js | 4 +- src/matrix/room/sending/PendingEvent.js | 129 +++++++++++++++++- src/matrix/room/sending/SendQueue.js | 112 ++++++++------- .../timeline/entries/PendingEventEntry.js | 4 +- 5 files changed, 215 insertions(+), 124 deletions(-) diff --git a/src/matrix/room/AttachmentUpload.js b/src/matrix/room/AttachmentUpload.js index bccbbb67..dc25ce16 100644 --- a/src/matrix/room/AttachmentUpload.js +++ b/src/matrix/room/AttachmentUpload.js @@ -15,81 +15,33 @@ limitations under the License. */ import {encryptAttachment} from "../e2ee/attachment.js"; -import {createEnum} from "../../utils/enum.js"; -import {ObservableValue} from "../../observable/ObservableValue.js"; -import {AbortError} from "../../utils/error.js"; - -export const UploadStatus = createEnum("Waiting", "Encrypting", "Uploading", "Uploaded", "Error"); export class AttachmentUpload { - constructor({filename, blob, hsApi, platform, isEncrypted}) { + constructor({filename, blob, platform}) { this._filename = filename; + // need to keep around for local preview while uploading this._unencryptedBlob = blob; - this._isEncrypted = isEncrypted; + this._transferredBlob = this._unencryptedBlob; this._platform = platform; - this._hsApi = hsApi; this._mxcUrl = null; - this._transferredBlob = null; this._encryptionInfo = null; this._uploadRequest = null; this._aborted = false; this._error = null; - this._status = new ObservableValue(UploadStatus.Waiting); - this._progress = new ObservableValue(0); + this._sentBytes = 0; } - get status() { - return this._status; + /** important to call after encrypt() if encryption is needed */ + get size() { + return this._transferredBlob.size; } - get uploadProgress() { - return this._progress; - } - - async upload() { - if (this._status.get() === UploadStatus.Waiting) { - this._upload(); - } - await this._status.waitFor(s => { - return s === UploadStatus.Error || s === UploadStatus.Uploaded; - }).promise; - if (this._status.get() === UploadStatus.Error) { - throw this._error; - } - } - - /** @package */ - async _upload() { - try { - let transferredBlob = this._unencryptedBlob; - if (this._isEncrypted) { - this._status.set(UploadStatus.Encrypting); - const {info, blob} = await encryptAttachment(this._platform, this._unencryptedBlob); - transferredBlob = blob; - this._encryptionInfo = info; - } - if (this._aborted) { - throw new AbortError("upload aborted during encryption"); - } - this._progress.set(0); - this._status.set(UploadStatus.Uploading); - this._uploadRequest = this._hsApi.uploadAttachment(transferredBlob, this._filename, { - uploadProgress: sentBytes => this._progress.set(sentBytes / transferredBlob.size) - }); - const {content_uri} = await this._uploadRequest.response(); - this._progress.set(1); - this._mxcUrl = content_uri; - this._transferredBlob = transferredBlob; - this._status.set(UploadStatus.Uploaded); - } catch (err) { - this._error = err; - this._status.set(UploadStatus.Error); - } + get sentBytes() { + return this._sentBytes; } /** @public */ abort() { - this._aborted = true; this._uploadRequest?.abort(); } @@ -98,8 +50,26 @@ export class AttachmentUpload { return this._unencryptedBlob; } - get error() { - return this._error; + /** @package */ + async encrypt() { + if (this._encryptionInfo) { + throw new Error("already encrypted"); + } + const {info, blob} = await encryptAttachment(this._platform, this._transferredBlob); + this._transferredBlob = blob; + this._encryptionInfo = info; + } + + /** @package */ + async upload(hsApi, progressCallback) { + this._uploadRequest = hsApi.uploadAttachment(this._transferredBlob, this._filename, { + uploadProgress: sentBytes => { + this._sentBytes = sentBytes; + progressCallback(); + } + }); + const {content_uri} = await this._uploadRequest.response(); + this._mxcUrl = content_uri; } /** @package */ @@ -110,7 +80,7 @@ export class AttachmentUpload { let prefix = urlPath.substr(0, urlPath.lastIndexOf("url")); setPath(`${prefix}info.size`, content, this._transferredBlob.size); setPath(`${prefix}info.mimetype`, content, this._unencryptedBlob.mimeType); - if (this._isEncrypted) { + if (this._encryptionInfo) { setPath(`${prefix}file`, content, Object.assign(this._encryptionInfo, { mimetype: this._unencryptedBlob.mimeType, url: this._mxcUrl diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 69829e9c..f2635492 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -634,9 +634,7 @@ export class Room extends EventEmitter { } createAttachment(blob, filename) { - const attachment = new AttachmentUpload({blob, filename, - hsApi: this._hsApi, platform: this._platform, isEncrypted: this.isEncrypted}); - return attachment; + return new AttachmentUpload({blob, filename, platform: this._platform}); } dispose() { diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index b1e7f5a2..a1e906ae 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -13,11 +13,31 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +import {createEnum} from "../../../utils/enum.js"; +import {AbortError} from "../../../utils/error.js"; + +export const SendStatus = createEnum( + "Waiting", + "EncryptingAttachments", + "UploadingAttachments", + "Encrypting", + "Sending", + "Sent", + "Error", +); export class PendingEvent { - constructor(data, attachments) { + constructor({data, remove, emitUpdate, attachments}) { this._data = data; - this.attachments = attachments; + this._attachments = attachments; + this._emitUpdate = () => { + console.log("PendingEvent status", this.status, this._attachments && Object.entries(this._attachments).map(([key, a]) => `${key}: ${a.sentBytes}/${a.size}`)); + emitUpdate(); + }; + this._removeFromQueueCallback = remove; + this._aborted = false; + this._status = SendStatus.Waiting; + this._sendRequest = null; } get roomId() { return this._data.roomId; } @@ -25,14 +45,111 @@ export class PendingEvent { get eventType() { return this._data.eventType; } get txnId() { return this._data.txnId; } get remoteId() { return this._data.remoteId; } - set remoteId(value) { this._data.remoteId = value; } get content() { return this._data.content; } - get needsEncryption() { return this._data.needsEncryption; } get data() { return this._data; } + getAttachment(key) { + return this._attachments && this._attachments[key]; + } + + get needsSending() { + return !this.remoteId && !this.aborted; + } + + get needsEncryption() { + return this._data.needsEncryption && !this.aborted; + } + + get needsUpload() { + return this._data.needsUpload && !this.aborted; + } + + setEncrypting() { + this._status = SendStatus.Encrypting; + this._emitUpdate("status"); + } + setEncrypted(type, content) { - this._data.eventType = type; - this._data.content = content; + this._data.encryptedEventType = type; + this._data.encryptedContent = content; this._data.needsEncryption = false; } + + setError(error) { + this._status = SendStatus.Error; + this._error = error; + this._emitUpdate("status"); + } + + get status() { return this._status; } + get error() { return this._error; } + + get attachmentsTotalBytes() { + return Object.values(this._attachments).reduce((t, a) => t + a.size, 0); + } + + get attachmentsSentBytes() { + return Object.values(this._attachments).reduce((t, a) => t + a.sentBytes, 0); + } + + async uploadAttachments(hsApi) { + if (!this.needsUpload) { + return; + } + if (this.needsEncryption) { + this._status = SendStatus.EncryptingAttachments; + this._emitUpdate("status"); + for (const attachment of Object.values(this._attachments)) { + await attachment.encrypt(); + if (this.aborted) { + throw new AbortError(); + } + } + } + this._status = SendStatus.UploadingAttachments; + this._emitUpdate("status"); + for (const [urlPath, attachment] of Object.entries(this._attachments)) { + await attachment.upload(hsApi, () => { + this._emitUpdate("attachmentsSentBytes"); + }); + attachment.applyToContent(urlPath, this.content); + } + this._data.needsUpload = false; + } + + abort() { + if (!this._aborted) { + this._aborted = true; + if (this._attachments) { + for (const attachment of Object.values(this._attachments)) { + attachment.abort(); + } + } + this._sendRequest?.abort(); + this._removeFromQueueCallback(); + } + } + + get aborted() { + return this._aborted; + } + + async send(hsApi) { + console.log(`sending event ${this.eventType} in ${this.roomId}`); + this._status = SendStatus.Sending; + this._emitUpdate("status"); + const eventType = this._data.encryptedEventType || this._data.eventType; + const content = this._data.encryptedContent || this._data.content; + this._sendRequest = hsApi.send( + this.roomId, + eventType, + this.txnId, + content + ); + const response = await this._sendRequest.response(); + this._sendRequest = null; + this._data.remoteId = response.event_id; + this._status = SendStatus.Sent; + this._emitUpdate("status"); + } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 23019358..aa3a29b9 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -29,13 +29,22 @@ export class SendQueue { if (pendingEvents.length) { console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents); } - this._pendingEvents.setManyUnsorted(pendingEvents.map(data => new PendingEvent(data))); + this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data))); this._isSending = false; this._offline = false; - this._amountSent = 0; this._roomEncryption = null; } + _createPendingEvent(data, attachments = null) { + const pendingEvent = new PendingEvent({ + data, + remove: () => this._removeEvent(pendingEvent), + emitUpdate: () => this._pendingEvents.set(pendingEvent), + attachments + }); + return pendingEvent; + } + enableEncryption(roomEncryption) { this._roomEncryption = roomEncryption; } @@ -43,53 +52,44 @@ export class SendQueue { async _sendLoop() { this._isSending = true; try { - console.log("start sending", this._amountSent, "<", this._pendingEvents.length); - while (this._amountSent < this._pendingEvents.length) { - const pendingEvent = this._pendingEvents.get(this._amountSent); - console.log("trying to send", pendingEvent.content.body); - if (pendingEvent.remoteId) { - this._amountSent += 1; - continue; - } - if (pendingEvent.attachments) { - try { - await this._uploadAttachments(pendingEvent); - } catch (err) { - console.log("upload failed, skip sending message", err, pendingEvent); - this._amountSent += 1; - continue; + for (let i = 0; i < this._pendingEvents.length; i += 1) { + const pendingEvent = this._pendingEvents.get(i); + try { + await this._sendEvent(pendingEvent); + } catch(err) { + if (err instanceof ConnectionError) { + this._offline = true; + break; + } else { + pendingEvent.setError(err); } - console.log("attachments upload, content is now", pendingEvent.content); - } - if (pendingEvent.needsEncryption) { - const {type, content} = await this._roomEncryption.encrypt( - pendingEvent.eventType, pendingEvent.content, this._hsApi); - pendingEvent.setEncrypted(type, content); - await this._tryUpdateEvent(pendingEvent); - } - console.log("really sending now"); - const response = await this._hsApi.send( - pendingEvent.roomId, - pendingEvent.eventType, - pendingEvent.txnId, - pendingEvent.content - ).response(); - pendingEvent.remoteId = response.event_id; - // - console.log("writing remoteId now"); - await this._tryUpdateEvent(pendingEvent); - console.log("keep sending?", this._amountSent, "<", this._pendingEvents.length); - this._amountSent += 1; - } - } catch(err) { - if (err instanceof ConnectionError) { - this._offline = true; + } } } finally { this._isSending = false; } } + async _sendEvent(pendingEvent) { + if (pendingEvent.needsUpload) { + await pendingEvent.uploadAttachments(this._hsApi); + console.log("attachments upload, content is now", pendingEvent.content); + await this._tryUpdateEvent(pendingEvent); + } + if (pendingEvent.needsEncryption) { + pendingEvent.setEncrypting(); + const {type, content} = await this._roomEncryption.encrypt( + pendingEvent.eventType, pendingEvent.content, this._hsApi); + pendingEvent.setEncrypted(type, content); + await this._tryUpdateEvent(pendingEvent); + } + if (pendingEvent.needsSending) { + await pendingEvent.send(this._hsApi); + console.log("writing remoteId"); + await this._tryUpdateEvent(pendingEvent); + } + } + removeRemoteEchos(events, txn) { const removed = []; for (const event of events) { @@ -109,11 +109,24 @@ export class SendQueue { return removed; } + async _removeEvent(pendingEvent) { + const idx = this._pendingEvents.array.indexOf(pendingEvent); + if (idx !== -1) { + const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + try { + txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); + } catch (err) { + txn.abort(); + } + await txn.complete(); + this._pendingEvents.remove(idx); + } + } + emitRemovals(pendingEvents) { for (const pendingEvent of pendingEvents) { const idx = this._pendingEvents.array.indexOf(pendingEvent); if (idx !== -1) { - this._amountSent -= 1; this._pendingEvents.remove(idx); } } @@ -170,13 +183,14 @@ export class SendQueue { const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; console.log("_createAndStoreEvent got maxQueueIndex", maxQueueIndex); const queueIndex = maxQueueIndex + 1; - pendingEvent = new PendingEvent({ + pendingEvent = this._createPendingEvent({ roomId: this._roomId, queueIndex, eventType, content, txnId: makeTxnId(), - needsEncryption: !!this._roomEncryption + needsEncryption: !!this._roomEncryption, + needsUpload: !!attachments }, attachments); console.log("_createAndStoreEvent: adding to pendingEventsStore"); pendingEventsStore.add(pendingEvent.data); @@ -187,12 +201,4 @@ export class SendQueue { await txn.complete(); return pendingEvent; } - - async _uploadAttachments(pendingEvent) { - const {attachments} = pendingEvent; - for (const [urlPath, attachment] of Object.entries(attachments)) { - await attachment.upload(); - attachment.applyToContent(urlPath, pendingEvent.content); - } - } } diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js index e31e56d5..eff14cb5 100644 --- a/src/matrix/room/timeline/entries/PendingEventEntry.js +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -64,8 +64,8 @@ export class PendingEventEntry extends BaseEntry { return this._pendingEvent.txnId; } - get attachments() { - return this._pendingEvent.attachments; + get pendingEvent() { + return this._pendingEvent; } notifyUpdate() {