merge state machine from AttachmentUpload into PendingEvent

to have less state machines, and we are mostly interested in the
aggregate status of all attachments of an event

this will also drive updates through the pending events collection
that already exists rather than an extra observablevalue, so less
housekeeping to update the UI.
This commit is contained in:
Bruno Windels 2020-11-18 13:02:38 +01:00
parent 91f7970d66
commit fd81111bfb
5 changed files with 215 additions and 124 deletions

View file

@ -15,81 +15,33 @@ limitations under the License.
*/
import {encryptAttachment} from "../e2ee/attachment.js";
import {createEnum} from "../../utils/enum.js";
import {ObservableValue} from "../../observable/ObservableValue.js";
import {AbortError} from "../../utils/error.js";
export const UploadStatus = createEnum("Waiting", "Encrypting", "Uploading", "Uploaded", "Error");
export class AttachmentUpload {
constructor({filename, blob, hsApi, platform, isEncrypted}) {
constructor({filename, blob, platform}) {
this._filename = filename;
// need to keep around for local preview while uploading
this._unencryptedBlob = blob;
this._isEncrypted = isEncrypted;
this._transferredBlob = this._unencryptedBlob;
this._platform = platform;
this._hsApi = hsApi;
this._mxcUrl = null;
this._transferredBlob = null;
this._encryptionInfo = null;
this._uploadRequest = null;
this._aborted = false;
this._error = null;
this._status = new ObservableValue(UploadStatus.Waiting);
this._progress = new ObservableValue(0);
this._sentBytes = 0;
}
get status() {
return this._status;
/** important to call after encrypt() if encryption is needed */
get size() {
return this._transferredBlob.size;
}
get uploadProgress() {
return this._progress;
}
async upload() {
if (this._status.get() === UploadStatus.Waiting) {
this._upload();
}
await this._status.waitFor(s => {
return s === UploadStatus.Error || s === UploadStatus.Uploaded;
}).promise;
if (this._status.get() === UploadStatus.Error) {
throw this._error;
}
}
/** @package */
async _upload() {
try {
let transferredBlob = this._unencryptedBlob;
if (this._isEncrypted) {
this._status.set(UploadStatus.Encrypting);
const {info, blob} = await encryptAttachment(this._platform, this._unencryptedBlob);
transferredBlob = blob;
this._encryptionInfo = info;
}
if (this._aborted) {
throw new AbortError("upload aborted during encryption");
}
this._progress.set(0);
this._status.set(UploadStatus.Uploading);
this._uploadRequest = this._hsApi.uploadAttachment(transferredBlob, this._filename, {
uploadProgress: sentBytes => this._progress.set(sentBytes / transferredBlob.size)
});
const {content_uri} = await this._uploadRequest.response();
this._progress.set(1);
this._mxcUrl = content_uri;
this._transferredBlob = transferredBlob;
this._status.set(UploadStatus.Uploaded);
} catch (err) {
this._error = err;
this._status.set(UploadStatus.Error);
}
get sentBytes() {
return this._sentBytes;
}
/** @public */
abort() {
this._aborted = true;
this._uploadRequest?.abort();
}
@ -98,8 +50,26 @@ export class AttachmentUpload {
return this._unencryptedBlob;
}
get error() {
return this._error;
/** @package */
async encrypt() {
if (this._encryptionInfo) {
throw new Error("already encrypted");
}
const {info, blob} = await encryptAttachment(this._platform, this._transferredBlob);
this._transferredBlob = blob;
this._encryptionInfo = info;
}
/** @package */
async upload(hsApi, progressCallback) {
this._uploadRequest = hsApi.uploadAttachment(this._transferredBlob, this._filename, {
uploadProgress: sentBytes => {
this._sentBytes = sentBytes;
progressCallback();
}
});
const {content_uri} = await this._uploadRequest.response();
this._mxcUrl = content_uri;
}
/** @package */
@ -110,7 +80,7 @@ export class AttachmentUpload {
let prefix = urlPath.substr(0, urlPath.lastIndexOf("url"));
setPath(`${prefix}info.size`, content, this._transferredBlob.size);
setPath(`${prefix}info.mimetype`, content, this._unencryptedBlob.mimeType);
if (this._isEncrypted) {
if (this._encryptionInfo) {
setPath(`${prefix}file`, content, Object.assign(this._encryptionInfo, {
mimetype: this._unencryptedBlob.mimeType,
url: this._mxcUrl

View file

@ -634,9 +634,7 @@ export class Room extends EventEmitter {
}
createAttachment(blob, filename) {
const attachment = new AttachmentUpload({blob, filename,
hsApi: this._hsApi, platform: this._platform, isEncrypted: this.isEncrypted});
return attachment;
return new AttachmentUpload({blob, filename, platform: this._platform});
}
dispose() {

View file

@ -13,11 +13,31 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {createEnum} from "../../../utils/enum.js";
import {AbortError} from "../../../utils/error.js";
export const SendStatus = createEnum(
"Waiting",
"EncryptingAttachments",
"UploadingAttachments",
"Encrypting",
"Sending",
"Sent",
"Error",
);
export class PendingEvent {
constructor(data, attachments) {
constructor({data, remove, emitUpdate, attachments}) {
this._data = data;
this.attachments = attachments;
this._attachments = attachments;
this._emitUpdate = () => {
console.log("PendingEvent status", this.status, this._attachments && Object.entries(this._attachments).map(([key, a]) => `${key}: ${a.sentBytes}/${a.size}`));
emitUpdate();
};
this._removeFromQueueCallback = remove;
this._aborted = false;
this._status = SendStatus.Waiting;
this._sendRequest = null;
}
get roomId() { return this._data.roomId; }
@ -25,14 +45,111 @@ export class PendingEvent {
get eventType() { return this._data.eventType; }
get txnId() { return this._data.txnId; }
get remoteId() { return this._data.remoteId; }
set remoteId(value) { this._data.remoteId = value; }
get content() { return this._data.content; }
get needsEncryption() { return this._data.needsEncryption; }
get data() { return this._data; }
getAttachment(key) {
return this._attachments && this._attachments[key];
}
get needsSending() {
return !this.remoteId && !this.aborted;
}
get needsEncryption() {
return this._data.needsEncryption && !this.aborted;
}
get needsUpload() {
return this._data.needsUpload && !this.aborted;
}
setEncrypting() {
this._status = SendStatus.Encrypting;
this._emitUpdate("status");
}
setEncrypted(type, content) {
this._data.eventType = type;
this._data.content = content;
this._data.encryptedEventType = type;
this._data.encryptedContent = content;
this._data.needsEncryption = false;
}
setError(error) {
this._status = SendStatus.Error;
this._error = error;
this._emitUpdate("status");
}
get status() { return this._status; }
get error() { return this._error; }
get attachmentsTotalBytes() {
return Object.values(this._attachments).reduce((t, a) => t + a.size, 0);
}
get attachmentsSentBytes() {
return Object.values(this._attachments).reduce((t, a) => t + a.sentBytes, 0);
}
async uploadAttachments(hsApi) {
if (!this.needsUpload) {
return;
}
if (this.needsEncryption) {
this._status = SendStatus.EncryptingAttachments;
this._emitUpdate("status");
for (const attachment of Object.values(this._attachments)) {
await attachment.encrypt();
if (this.aborted) {
throw new AbortError();
}
}
}
this._status = SendStatus.UploadingAttachments;
this._emitUpdate("status");
for (const [urlPath, attachment] of Object.entries(this._attachments)) {
await attachment.upload(hsApi, () => {
this._emitUpdate("attachmentsSentBytes");
});
attachment.applyToContent(urlPath, this.content);
}
this._data.needsUpload = false;
}
abort() {
if (!this._aborted) {
this._aborted = true;
if (this._attachments) {
for (const attachment of Object.values(this._attachments)) {
attachment.abort();
}
}
this._sendRequest?.abort();
this._removeFromQueueCallback();
}
}
get aborted() {
return this._aborted;
}
async send(hsApi) {
console.log(`sending event ${this.eventType} in ${this.roomId}`);
this._status = SendStatus.Sending;
this._emitUpdate("status");
const eventType = this._data.encryptedEventType || this._data.eventType;
const content = this._data.encryptedContent || this._data.content;
this._sendRequest = hsApi.send(
this.roomId,
eventType,
this.txnId,
content
);
const response = await this._sendRequest.response();
this._sendRequest = null;
this._data.remoteId = response.event_id;
this._status = SendStatus.Sent;
this._emitUpdate("status");
}
}

View file

@ -29,13 +29,22 @@ export class SendQueue {
if (pendingEvents.length) {
console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents);
}
this._pendingEvents.setManyUnsorted(pendingEvents.map(data => new PendingEvent(data)));
this._pendingEvents.setManyUnsorted(pendingEvents.map(data => this._createPendingEvent(data)));
this._isSending = false;
this._offline = false;
this._amountSent = 0;
this._roomEncryption = null;
}
_createPendingEvent(data, attachments = null) {
const pendingEvent = new PendingEvent({
data,
remove: () => this._removeEvent(pendingEvent),
emitUpdate: () => this._pendingEvents.set(pendingEvent),
attachments
});
return pendingEvent;
}
enableEncryption(roomEncryption) {
this._roomEncryption = roomEncryption;
}
@ -43,53 +52,44 @@ export class SendQueue {
async _sendLoop() {
this._isSending = true;
try {
console.log("start sending", this._amountSent, "<", this._pendingEvents.length);
while (this._amountSent < this._pendingEvents.length) {
const pendingEvent = this._pendingEvents.get(this._amountSent);
console.log("trying to send", pendingEvent.content.body);
if (pendingEvent.remoteId) {
this._amountSent += 1;
continue;
}
if (pendingEvent.attachments) {
try {
await this._uploadAttachments(pendingEvent);
} catch (err) {
console.log("upload failed, skip sending message", err, pendingEvent);
this._amountSent += 1;
continue;
for (let i = 0; i < this._pendingEvents.length; i += 1) {
const pendingEvent = this._pendingEvents.get(i);
try {
await this._sendEvent(pendingEvent);
} catch(err) {
if (err instanceof ConnectionError) {
this._offline = true;
break;
} else {
pendingEvent.setError(err);
}
console.log("attachments upload, content is now", pendingEvent.content);
}
if (pendingEvent.needsEncryption) {
const {type, content} = await this._roomEncryption.encrypt(
pendingEvent.eventType, pendingEvent.content, this._hsApi);
pendingEvent.setEncrypted(type, content);
await this._tryUpdateEvent(pendingEvent);
}
console.log("really sending now");
const response = await this._hsApi.send(
pendingEvent.roomId,
pendingEvent.eventType,
pendingEvent.txnId,
pendingEvent.content
).response();
pendingEvent.remoteId = response.event_id;
//
console.log("writing remoteId now");
await this._tryUpdateEvent(pendingEvent);
console.log("keep sending?", this._amountSent, "<", this._pendingEvents.length);
this._amountSent += 1;
}
} catch(err) {
if (err instanceof ConnectionError) {
this._offline = true;
}
}
} finally {
this._isSending = false;
}
}
async _sendEvent(pendingEvent) {
if (pendingEvent.needsUpload) {
await pendingEvent.uploadAttachments(this._hsApi);
console.log("attachments upload, content is now", pendingEvent.content);
await this._tryUpdateEvent(pendingEvent);
}
if (pendingEvent.needsEncryption) {
pendingEvent.setEncrypting();
const {type, content} = await this._roomEncryption.encrypt(
pendingEvent.eventType, pendingEvent.content, this._hsApi);
pendingEvent.setEncrypted(type, content);
await this._tryUpdateEvent(pendingEvent);
}
if (pendingEvent.needsSending) {
await pendingEvent.send(this._hsApi);
console.log("writing remoteId");
await this._tryUpdateEvent(pendingEvent);
}
}
removeRemoteEchos(events, txn) {
const removed = [];
for (const event of events) {
@ -109,11 +109,24 @@ export class SendQueue {
return removed;
}
async _removeEvent(pendingEvent) {
const idx = this._pendingEvents.array.indexOf(pendingEvent);
if (idx !== -1) {
const txn = this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]);
try {
txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex);
} catch (err) {
txn.abort();
}
await txn.complete();
this._pendingEvents.remove(idx);
}
}
emitRemovals(pendingEvents) {
for (const pendingEvent of pendingEvents) {
const idx = this._pendingEvents.array.indexOf(pendingEvent);
if (idx !== -1) {
this._amountSent -= 1;
this._pendingEvents.remove(idx);
}
}
@ -170,13 +183,14 @@ export class SendQueue {
const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0;
console.log("_createAndStoreEvent got maxQueueIndex", maxQueueIndex);
const queueIndex = maxQueueIndex + 1;
pendingEvent = new PendingEvent({
pendingEvent = this._createPendingEvent({
roomId: this._roomId,
queueIndex,
eventType,
content,
txnId: makeTxnId(),
needsEncryption: !!this._roomEncryption
needsEncryption: !!this._roomEncryption,
needsUpload: !!attachments
}, attachments);
console.log("_createAndStoreEvent: adding to pendingEventsStore");
pendingEventsStore.add(pendingEvent.data);
@ -187,12 +201,4 @@ export class SendQueue {
await txn.complete();
return pendingEvent;
}
async _uploadAttachments(pendingEvent) {
const {attachments} = pendingEvent;
for (const [urlPath, attachment] of Object.entries(attachments)) {
await attachment.upload();
attachment.applyToContent(urlPath, pendingEvent.content);
}
}
}

View file

@ -64,8 +64,8 @@ export class PendingEventEntry extends BaseEntry {
return this._pendingEvent.txnId;
}
get attachments() {
return this._pendingEvent.attachments;
get pendingEvent() {
return this._pendingEvent;
}
notifyUpdate() {