diff --git a/src/matrix/room/AttachmentUpload.js b/src/matrix/room/AttachmentUpload.js index bccbbb67..dc25ce16 100644 --- a/src/matrix/room/AttachmentUpload.js +++ b/src/matrix/room/AttachmentUpload.js @@ -15,81 +15,33 @@ limitations under the License. */ import {encryptAttachment} from "../e2ee/attachment.js"; -import {createEnum} from "../../utils/enum.js"; -import {ObservableValue} from "../../observable/ObservableValue.js"; -import {AbortError} from "../../utils/error.js"; - -export const UploadStatus = createEnum("Waiting", "Encrypting", "Uploading", "Uploaded", "Error"); export class AttachmentUpload { - constructor({filename, blob, hsApi, platform, isEncrypted}) { + constructor({filename, blob, platform}) { this._filename = filename; + // need to keep around for local preview while uploading this._unencryptedBlob = blob; - this._isEncrypted = isEncrypted; + this._transferredBlob = this._unencryptedBlob; this._platform = platform; - this._hsApi = hsApi; this._mxcUrl = null; - this._transferredBlob = null; this._encryptionInfo = null; this._uploadRequest = null; this._aborted = false; this._error = null; - this._status = new ObservableValue(UploadStatus.Waiting); - this._progress = new ObservableValue(0); + this._sentBytes = 0; } - get status() { - return this._status; + /** important to call after encrypt() if encryption is needed */ + get size() { + return this._transferredBlob.size; } - get uploadProgress() { - return this._progress; - } - - async upload() { - if (this._status.get() === UploadStatus.Waiting) { - this._upload(); - } - await this._status.waitFor(s => { - return s === UploadStatus.Error || s === UploadStatus.Uploaded; - }).promise; - if (this._status.get() === UploadStatus.Error) { - throw this._error; - } - } - - /** @package */ - async _upload() { - try { - let transferredBlob = this._unencryptedBlob; - if (this._isEncrypted) { - this._status.set(UploadStatus.Encrypting); - const {info, blob} = await encryptAttachment(this._platform, this._unencryptedBlob); - transferredBlob = blob; - this._encryptionInfo = info; - } - if (this._aborted) { - throw new AbortError("upload aborted during encryption"); - } - this._progress.set(0); - this._status.set(UploadStatus.Uploading); - this._uploadRequest = this._hsApi.uploadAttachment(transferredBlob, this._filename, { - uploadProgress: sentBytes => this._progress.set(sentBytes / transferredBlob.size) - }); - const {content_uri} = await this._uploadRequest.response(); - this._progress.set(1); - this._mxcUrl = content_uri; - this._transferredBlob = transferredBlob; - this._status.set(UploadStatus.Uploaded); - } catch (err) { - this._error = err; - this._status.set(UploadStatus.Error); - } + get sentBytes() { + return this._sentBytes; } /** @public */ abort() { - this._aborted = true; this._uploadRequest?.abort(); } @@ -98,8 +50,26 @@ export class AttachmentUpload { return this._unencryptedBlob; } - get error() { - return this._error; + /** @package */ + async encrypt() { + if (this._encryptionInfo) { + throw new Error("already encrypted"); + } + const {info, blob} = await encryptAttachment(this._platform, this._transferredBlob); + this._transferredBlob = blob; + this._encryptionInfo = info; + } + + /** @package */ + async upload(hsApi, progressCallback) { + this._uploadRequest = hsApi.uploadAttachment(this._transferredBlob, this._filename, { + uploadProgress: sentBytes => { + this._sentBytes = sentBytes; + progressCallback(); + } + }); + const {content_uri} = await this._uploadRequest.response(); + this._mxcUrl = content_uri; } /** @package */ @@ -110,7 +80,7 @@ export class AttachmentUpload { let prefix = urlPath.substr(0, urlPath.lastIndexOf("url")); setPath(`${prefix}info.size`, content, this._transferredBlob.size); setPath(`${prefix}info.mimetype`, content, this._unencryptedBlob.mimeType); - if (this._isEncrypted) { + if (this._encryptionInfo) { setPath(`${prefix}file`, content, Object.assign(this._encryptionInfo, { mimetype: this._unencryptedBlob.mimeType, url: this._mxcUrl diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 69829e9c..f2635492 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -634,9 +634,7 @@ export class Room extends EventEmitter { } createAttachment(blob, filename) { - const attachment = new AttachmentUpload({blob, filename, - hsApi: this._hsApi, platform: this._platform, isEncrypted: this.isEncrypted}); - return attachment; + return new AttachmentUpload({blob, filename, platform: this._platform}); } dispose() { diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index b1e7f5a2..a1e906ae 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -13,11 +13,31 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +import {createEnum} from "../../../utils/enum.js"; +import {AbortError} from "../../../utils/error.js"; + +export const SendStatus = createEnum( + "Waiting", + "EncryptingAttachments", + "UploadingAttachments", + "Encrypting", + "Sending", + "Sent", + "Error", +); export class PendingEvent { - constructor(data, attachments) { + constructor({data, remove, emitUpdate, attachments}) { this._data = data; - this.attachments = attachments; + this._attachments = attachments; + this._emitUpdate = () => { + console.log("PendingEvent status", this.status, this._attachments && Object.entries(this._attachments).map(([key, a]) => `${key}: ${a.sentBytes}/${a.size}`)); + emitUpdate(); + }; + this._removeFromQueueCallback = remove; + this._aborted = false; + this._status = SendStatus.Waiting; + this._sendRequest = null; } get roomId() { return this._data.roomId; } @@ -25,14 +45,111 @@ export class PendingEvent { get eventType() { return this._data.eventType; } get txnId() { return this._data.txnId; } get remoteId() { return this._data.remoteId; } - set remoteId(value) { this._data.remoteId = value; } get content() { return this._data.content; } - get needsEncryption() { return this._data.needsEncryption; } get data() { return this._data; } + getAttachment(key) { + return this._attachments && this._attachments[key]; + } + + get needsSending() { + return !this.remoteId && !this.aborted; + } + + get needsEncryption() { + return this._data.needsEncryption && !this.aborted; + } + + get needsUpload() { + return this._data.needsUpload && !this.aborted; + } + + setEncrypting() { + this._status = SendStatus.Encrypting; + this._emitUpdate("status"); + } + setEncrypted(type, content) { - this._data.eventType = type; - this._data.content = content; + this._data.encryptedEventType = type; + this._data.encryptedContent = content; this._data.needsEncryption = false; } + + setError(error) { + this._status = SendStatus.Error; + this._error = error; + this._emitUpdate("status"); + } + + get status() { return this._status; } + get error() { return this._error; } + + get attachmentsTotalBytes() { + return Object.values(this._attachments).reduce((t, a) => t + a.size, 0); + } + + get attachmentsSentBytes() { + return Object.values(this._attachments).reduce((t, a) => t + a.sentBytes, 0); + } + + async uploadAttachments(hsApi) { + if (!this.needsUpload) { + return; + } + if (this.needsEncryption) { + this._status = SendStatus.EncryptingAttachments; + this._emitUpdate("status"); + for (const attachment of Object.values(this._attachments)) { + await attachment.encrypt(); + if (this.aborted) { + throw new AbortError(); + } + } + } + this._status = SendStatus.UploadingAttachments; + this._emitUpdate("status"); + for (const [urlPath, attachment] of Object.entries(this._attachments)) { + await attachment.upload(hsApi, () => { + this._emitUpdate("attachmentsSentBytes"); + }); + attachment.applyToContent(urlPath, this.content); + } + this._data.needsUpload = false; + } + + abort() { + if (!this._aborted) { + this._aborted = true; + if (this._attachments) { + for (const attachment of Object.values(this._attachments)) { + attachment.abort(); + } + } + this._sendRequest?.abort(); + this._removeFromQueueCallback(); + } + } + + get aborted() { + return this._aborted; + } + + async send(hsApi) { + console.log(`sending event ${this.eventType} in ${this.roomId}`); + this._status = SendStatus.Sending; + this._emitUpdate("status"); + const eventType = this._data.encryptedEventType || this._data.eventType; + const content = this._data.encryptedContent || this._data.content; + this._sendRequest = hsApi.send( + this.roomId, + eventType, + this.txnId, + content + ); + const response = await this._sendRequest.response(); + this._sendRequest = null; + this._data.remoteId = response.event_id; + this._status = SendStatus.Sent; + this._emitUpdate("status"); + } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 23019358..aa3a29b9 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -29,13 +29,22 @@ export class SendQueue { if (pendingEvents.length) { console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents); } - this._pendingEvents.setManyUnsorted(pendingEvents.map(data => new PendingEvent(data))); + this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data))); this._isSending = false; this._offline = false; - this._amountSent = 0; this._roomEncryption = null; } + _createPendingEvent(data, attachments = null) { + const pendingEvent = new PendingEvent({ + data, + remove: () => this._removeEvent(pendingEvent), + emitUpdate: () => this._pendingEvents.set(pendingEvent), + attachments + }); + return pendingEvent; + } + enableEncryption(roomEncryption) { this._roomEncryption = roomEncryption; } @@ -43,53 +52,44 @@ export class SendQueue { async _sendLoop() { this._isSending = true; try { - console.log("start sending", this._amountSent, "<", this._pendingEvents.length); - while (this._amountSent < this._pendingEvents.length) { - const pendingEvent = this._pendingEvents.get(this._amountSent); - console.log("trying to send", pendingEvent.content.body); - if (pendingEvent.remoteId) { - this._amountSent += 1; - continue; - } - if (pendingEvent.attachments) { - try { - await this._uploadAttachments(pendingEvent); - } catch (err) { - console.log("upload failed, skip sending message", err, pendingEvent); - this._amountSent += 1; - continue; + for (let i = 0; i < this._pendingEvents.length; i += 1) { + const pendingEvent = this._pendingEvents.get(i); + try { + await this._sendEvent(pendingEvent); + } catch(err) { + if (err instanceof ConnectionError) { + this._offline = true; + break; + } else { + pendingEvent.setError(err); } - console.log("attachments upload, content is now", pendingEvent.content); - } - if (pendingEvent.needsEncryption) { - const {type, content} = await this._roomEncryption.encrypt( - pendingEvent.eventType, pendingEvent.content, this._hsApi); - pendingEvent.setEncrypted(type, content); - await this._tryUpdateEvent(pendingEvent); - } - console.log("really sending now"); - const response = await this._hsApi.send( - pendingEvent.roomId, - pendingEvent.eventType, - pendingEvent.txnId, - pendingEvent.content - ).response(); - pendingEvent.remoteId = response.event_id; - // - console.log("writing remoteId now"); - await this._tryUpdateEvent(pendingEvent); - console.log("keep sending?", this._amountSent, "<", this._pendingEvents.length); - this._amountSent += 1; - } - } catch(err) { - if (err instanceof ConnectionError) { - this._offline = true; + } } } finally { this._isSending = false; } } + async _sendEvent(pendingEvent) { + if (pendingEvent.needsUpload) { + await pendingEvent.uploadAttachments(this._hsApi); + console.log("attachments upload, content is now", pendingEvent.content); + await this._tryUpdateEvent(pendingEvent); + } + if (pendingEvent.needsEncryption) { + pendingEvent.setEncrypting(); + const {type, content} = await this._roomEncryption.encrypt( + pendingEvent.eventType, pendingEvent.content, this._hsApi); + pendingEvent.setEncrypted(type, content); + await this._tryUpdateEvent(pendingEvent); + } + if (pendingEvent.needsSending) { + await pendingEvent.send(this._hsApi); + console.log("writing remoteId"); + await this._tryUpdateEvent(pendingEvent); + } + } + removeRemoteEchos(events, txn) { const removed = []; for (const event of events) { @@ -109,11 +109,24 @@ export class SendQueue { return removed; } + async _removeEvent(pendingEvent) { + const idx = this._pendingEvents.array.indexOf(pendingEvent); + if (idx !== -1) { + const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + try { + txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); + } catch (err) { + txn.abort(); + } + await txn.complete(); + this._pendingEvents.remove(idx); + } + } + emitRemovals(pendingEvents) { for (const pendingEvent of pendingEvents) { const idx = this._pendingEvents.array.indexOf(pendingEvent); if (idx !== -1) { - this._amountSent -= 1; this._pendingEvents.remove(idx); } } @@ -170,13 +183,14 @@ export class SendQueue { const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; console.log("_createAndStoreEvent got maxQueueIndex", maxQueueIndex); const queueIndex = maxQueueIndex + 1; - pendingEvent = new PendingEvent({ + pendingEvent = this._createPendingEvent({ roomId: this._roomId, queueIndex, eventType, content, txnId: makeTxnId(), - needsEncryption: !!this._roomEncryption + needsEncryption: !!this._roomEncryption, + needsUpload: !!attachments }, attachments); console.log("_createAndStoreEvent: adding to pendingEventsStore"); pendingEventsStore.add(pendingEvent.data); @@ -187,12 +201,4 @@ export class SendQueue { await txn.complete(); return pendingEvent; } - - async _uploadAttachments(pendingEvent) { - const {attachments} = pendingEvent; - for (const [urlPath, attachment] of Object.entries(attachments)) { - await attachment.upload(); - attachment.applyToContent(urlPath, pendingEvent.content); - } - } } diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js index e31e56d5..eff14cb5 100644 --- a/src/matrix/room/timeline/entries/PendingEventEntry.js +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -64,8 +64,8 @@ export class PendingEventEntry extends BaseEntry { return this._pendingEvent.txnId; } - get attachments() { - return this._pendingEvent.attachments; + get pendingEvent() { + return this._pendingEvent; } notifyUpdate() {