From ccb722d7660217440e4a68f3818721f13c5a14fc Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 26 Jul 2019 22:03:57 +0200 Subject: [PATCH] more WIP --- src/Platform.js | 8 +- src/matrix/SendScheduler.js | 121 ++++++++ src/matrix/hs-api.js | 9 + src/matrix/room/sending/PendingEvent.js | 17 +- src/matrix/room/sending/SendQueue.js | 288 +++++++----------- src/matrix/room/timeline/entries/BaseEntry.js | 5 + .../timeline/entries/PendingEventEntry.js | 34 +++ src/matrix/storage/common.js | 3 + src/matrix/storage/idb/store.js | 12 + .../storage/idb/stores/PendingEventStore.js | 11 + src/observable/list/SortedArray.js | 10 +- 11 files changed, 324 insertions(+), 194 deletions(-) create mode 100644 src/matrix/SendScheduler.js create mode 100644 src/matrix/room/timeline/entries/PendingEventEntry.js diff --git a/src/Platform.js b/src/Platform.js index c3ce381f..b68d821d 100644 --- a/src/Platform.js +++ b/src/Platform.js @@ -1,5 +1,5 @@ -// #ifdef PLATFORM_GNOME -// export {default} from "./ui/gnome/GnomePlatform.js"; -// #else +//#ifdef PLATFORM_GNOME +//##export {default} from "./ui/gnome/GnomePlatform.js"; +//#else export {default} from "./ui/web/WebPlatform.js"; -// #endif +//#endif diff --git a/src/matrix/SendScheduler.js b/src/matrix/SendScheduler.js new file mode 100644 index 00000000..9fbab290 --- /dev/null +++ b/src/matrix/SendScheduler.js @@ -0,0 +1,121 @@ +import Platform from "../../../Platform.js"; +import {HomeServerError, NetworkError} from "./error.js"; + +export class RateLimitingBackoff { + constructor() { + this._remainingRateLimitedRequest = 0; + } + + async waitAfterLimitExceeded(retryAfterMs) { + // this._remainingRateLimitedRequest = 5; + // if (typeof retryAfterMs !== "number") { + // } else { + // } + if (!retryAfterMs) { + retryAfterMs = 5000; + } + await Platform.delay(retryAfterMs); + } + + // do we have to know about succeeding requests? + // we can just + + async waitForNextSend() { + // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); + Platform.delay(1000); + } +} + +/* +this represents a slot to do one rate limited api call. +because rate-limiting is handled here, it should only +try to do one call, so the SendScheduler can safely +retry if the call ends up being rate limited. +This is also why we have this abstraction it hsApi is not +passed straight to SendQueue when it is its turn to send. +e.g. we wouldn't want to repeat the callback in SendQueue that could +have other side-effects before the call to hsApi that we wouldn't want +repeated (setting up progress handlers for file uploads, +... a UI update to say it started sending? + ... updating storage would probably only happen once the call succeeded + ... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried) +) maybe it is a bit overengineering, but lets stick with it for now. +At least the above is a clear definition why we have this class +*/ +//class SendSlot -- obsolete + +export class SendScheduler { + constructor({hsApi, backoff}) { + this._hsApi = hsApi; + this._sendRequests = []; + this._sendScheduled = false; + this._offline = false; + this._waitTime = 0; + this._backoff = backoff; + /* + we should have some sort of flag here that we enable + after all the rooms have been notified that they can resume + sending, so that from session, we can say scheduler.enable(); + this way, when we have better scheduling, it won't be first come, + first serve, when there are a lot of events in different rooms to send, + but we can apply some priorization of who should go first + */ + // this._enabled; + } + + // this should really be per roomId to avoid head-of-line blocking + // + // takes a callback instead of returning a promise with the slot + // to make sure the scheduler doesn't get blocked by a slot that is not consumed + request(sendCallback) { + let request; + const promise = new Promise((resolve, reject) => request = {resolve, reject, sendCallback}); + this._sendRequests.push(request); + if (!this._sendScheduled && !this._offline) { + this._sendLoop(); + } + return promise; + } + + async _sendLoop() { + while (this._sendRequests.length) { + const request = this._sendRequests.unshift(); + let result; + try { + // this can throw! + result = await this._doSend(request.sendCallback); + } catch (err) { + if (err instanceof NetworkError) { + // we're offline, everybody will have + // to re-request slots when we come back online + this._offline = true; + for (const r of this._sendRequests) { + r.reject(err); + } + this._sendRequests = []; + } + request.reject(err); + break; + } + request.resolve(result); + } + // do next here instead of in _doSend + } + + async _doSend(sendCallback) { + this._sendScheduled = false; + await this._backoff.waitForNextSend(); + // loop is left by return or throw + while (true) { // eslint-disable-line no-constant-condition + try { + return await sendCallback(this._hsApi); + } catch (err) { + if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { + await this._backoff.waitAfterLimitExceeded(err.retry_after_ms); + } else { + throw err; + } + } + } + } +} diff --git a/src/matrix/hs-api.js b/src/matrix/hs-api.js index 2bc1ff72..0dd1e225 100644 --- a/src/matrix/hs-api.js +++ b/src/matrix/hs-api.js @@ -32,6 +32,7 @@ class RequestWrapper { } } +// todo: everywhere here, encode params in the url that could have slashes ... mainly event ids? export default class HomeServerApi { constructor(homeserver, accessToken) { // store these both in a closure somehow so it's harder to get at in case of XSS? @@ -98,6 +99,10 @@ export default class HomeServerApi { return this._request("POST", csPath, queryParams, body); } + _put(csPath, queryParams, body) { + return this._request("PUT", csPath, queryParams, body); + } + _get(csPath, queryParams, body) { return this._request("GET", csPath, queryParams, body); } @@ -111,6 +116,10 @@ export default class HomeServerApi { return this._get(`/rooms/${roomId}/messages`, params); } + send(roomId, eventType, txnId, content) { + return this._put(`/rooms/${roomId}/send/${eventType}/${txnId}`, {}, content); + } + passwordLogin(username, password) { return this._post("/login", undefined, { "type": "m.login.password", diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index 11e7987f..105da49b 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -1,11 +1,14 @@ export default class PendingEvent { - constructor(roomId, queueIndex, eventType, content, txnId) { - this._roomId = roomId; - this._eventType = eventType; - this._content = content; - this._txnId = txnId; - this._queueIndex = queueIndex; + constructor(data) { + this._data = data; } - + get roomId() { return this._data.roomId; } + get queueIndex() { return this._data.queueIndex; } + get eventType() { return this._data.eventType; } + get txnId() { return this._data.txnId; } + get remoteId() { return this._data.remoteId; } + set remoteId(value) { this._data.remoteId = value; } + get content() { return this._data.content; } + get data() { return this._data; } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 2690bdd2..300e271e 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -1,153 +1,7 @@ -import Platform from "../../../Platform.js"; - -class RateLimitingBackoff { - constructor() { - this._remainingRateLimitedRequest = 0; - } - - async waitAfterLimitExceeded(retryAfterMs) { - // this._remainingRateLimitedRequest = 5; - // if (typeof retryAfterMs !== "number") { - // } else { - // } - if (!retryAfterMs) { - retryAfterMs = 5000; - } - await Platform.delay(retryAfterMs); - } - - // do we have to know about succeeding requests? - // we can just - - async waitForNextSend() { - // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); - Platform.delay(1000); - } -} - -class SendScheduler { - constructor({hsApi, backoff}) { - this._hsApi = hsApi; - this._slotRequests = []; - this._sendScheduled = false; - this._offline = false; - this._waitTime = 0; - this._backoff = backoff; - } - - // this should really be per roomId to avoid head-of-line blocking - // - // takes a callback instead of returning a promise with the slot - // to make sure the scheduler doesn't get blocked by a slot that is not consumed - runSlot(slotCallback) { - let request; - const promise = new Promise((resolve, reject) => request = {resolve, reject, slotCallback}); - this._slotRequests.push(request); - if (!this._sendScheduled && !this._offline) { - this._sendLoop(); - } - return promise; - } - - async _sendLoop() { - while (this._slotRequests.length) { - const request = this._slotRequests.unshift(); - this._currentSlot = new SendSlot(this); - // this can throw! - let result; - try { - result = await request.slotCallback(this._currentSlot); - } catch (err) { - if (err instanceof NetworkError) { - // we're offline, everybody will have - // to re-request slots when we come back online - this._offline = true; - for (const r of this._slotRequests) { - r.reject(err); - } - this._slotRequests = []; - } - request.reject(err); - break; - } - request.resolve(result); - } - // do next here instead of in _doSend - } - - async _doSend(slot, sendCallback) { - this._sendScheduled = false; - if (slot !== this._currentSlot) { - throw new Error("Slot is not active"); - } - await this._backoff.waitForNextSend(); - // loop is left by return or throw - while (true) { // eslint-disable-line no-constant-condition - try { - return await sendCallback(this._hsApi); - } catch (err) { - if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { - await this._backoff.waitAfterLimitExceeded(err.retry_after_ms); - } else { - throw err; - } - } - } - } -} - -/* -this represents a slot to do one rate limited api call. -because rate-limiting is handled here, it should only -try to do one call, so the SendScheduler can safely -retry if the call ends up being rate limited. -This is also why we have this abstraction it hsApi is not -passed straight to SendQueue when it is its turn to send. -e.g. we wouldn't want to repeat the callback in SendQueue that could -have other side-effects before the call to hsApi that we wouldn't want -repeated (setting up progress handlers for file uploads, -... a UI update to say it started sending? - ... updating storage would probably only happen once the call succeeded - ... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried) -) maybe it is a bit overengineering, but lets stick with it for now. -At least the above is a clear definition why we have this class -*/ -class SendSlot { - constructor(scheduler) { - this._scheduler = scheduler; - } - - sendContentEvent(pendingEvent) { - return this._scheduler._doSend(this, async hsApi => { - const request = hsApi.send( - pendingEvent.roomId, - pendingEvent.eventType, - pendingEvent.txnId, - pendingEvent.content - ); - const response = await request.response(); - return response.event_id; - }); - } - - sendRedaction(pendingEvent) { - return this._scheduler._doSend(this, async hsApi => { - const request = hsApi.redact( - pendingEvent.roomId, - pendingEvent.redacts, - pendingEvent.txnId, - pendingEvent.reason - ); - const response = await request.response(); - return response.event_id; - }); - } - - // progressCallback should report the amount of bytes sent - uploadMedia(fileName, contentType, blob, progressCallback) { - - } -} +import SortedArray from "../../../observable/list/SortedArray.js"; +import {NetworkError} from "../../error.js"; +import {StorageError} from "../../storage/common.js"; +import PendingEvent from "./PendingEvent.js"; function makeTxnId() { const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); @@ -160,46 +14,118 @@ export default class SendQueue { this._roomId = roomId; this._storage = storage; this._scheduler = scheduler; - this._pendingEvents = pendingEvents.map(d => PendingEvent.fromData(d)); + this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); + this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data))); + this._isSending = false; + this._offline = false; + this._amountSent = 0; } async _sendLoop() { - let pendingEvent = null; - // eslint-disable-next-line no-cond-assign - while (pendingEvent = await this._nextPendingEvent()) { - // const mxcUrl = await this._scheduler.runSlot(slot => { - // return slot.uploadMedia(fileName, contentType, blob, bytesSent => { - // pendingEvent.updateAttachmentUploadProgress(bytesSent); - // }); - // }); - - // pendingEvent.setAttachmentUrl(mxcUrl); - //update storage for pendingEvent after updating url, - //remove blob only later to keep preview? - - await this._scheduler.runSlot(slot => { - if (pendingEvent.isRedaction) { - return slot.sendRedaction(pendingEvent); - } else if (pendingEvent.isContentEvent) { - return slot.sendContentEvent(pendingEvent); + this._isSending = true; + try { + while (this._amountSent < this._pendingEvents.length) { + const pendingEvent = this._pendingEvents.get(this._amountSent); + this._amountSent += 1; + if (pendingEvent.remoteId) { + continue; } - }); + const response = await this._scheduler.request(hsApi => { + return hsApi.send( + pendingEvent.roomId, + pendingEvent.eventType, + pendingEvent.txnId, + pendingEvent.content + ); + }); + pendingEvent.remoteId = response.event_id; + await this._tryUpdateEvent(pendingEvent); + } + } catch(err) { + if (err instanceof NetworkError) { + this._offline = true; + } + } finally { + this._isSending = false; + } + } + + + async receiveRemoteEcho(txnId) { + const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); + if (idx !== 0) { + const pendingEvent = this._pendingEvents.get(idx); + this._amountSent -= 1; + this._pendingEvents.remove(idx); + await this._removeEvent(pendingEvent); + } + } + + resumeSending() { + this._offline = false; + if (!this._isSending) { + this._sendLoop(); } } async enqueueEvent(eventType, content) { - // temporary + const pendingEvent = await this._createAndStoreEvent(eventType, content); + this._pendingEvents.set(pendingEvent); + if (!this._isSending && !this._offline) { + this._sendLoop(); + } + } + + get pendingEvents() { + return this._pendingEvents; + } + + async _tryUpdateEvent(pendingEvent) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); - const pendingEventsStore = txn.pendingEvents; - const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; - const queueIndex = maxQueueIndex + 1; - const pendingEvent = new PendingEvent(this._roomId, queueIndex, eventType, content, makeTxnId()); - pendingEventsStore.add(pendingEvent.data); + try { + // pendingEvent might have been removed already here + // by a racing remote echo, so check first so we don't recreate it + if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) { + txn.pendingEvents.update(pendingEvent.data); + } + } catch (err) { + txn.abort(); + throw err; + } await txn.complete(); - // create txnId - // create queueOrder - // store event - // if online and not running send loop - // start sending loop + } + + async _removeEvent(pendingEvent) { + const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + try { + txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + } + + async _createAndStoreEvent(eventType, content) { + const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + let pendingEvent; + try { + const pendingEventsStore = txn.pendingEvents; + const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; + const queueIndex = maxQueueIndex + 1; + pendingEvent = new PendingEvent({ + roomId: this._roomId, + queueIndex, + eventType, + content, + txnId: makeTxnId() + }); + pendingEventsStore.add(pendingEvent.data); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + return pendingEvent; } } diff --git a/src/matrix/room/timeline/entries/BaseEntry.js b/src/matrix/room/timeline/entries/BaseEntry.js index 3ef00862..3c3d31b2 100644 --- a/src/matrix/room/timeline/entries/BaseEntry.js +++ b/src/matrix/room/timeline/entries/BaseEntry.js @@ -1,5 +1,6 @@ //entries can be sorted, first by fragment, then by entry index. import EventKey from "../EventKey.js"; +import { PENDING_FRAGMENT_ID } from "./PendingEventEntry.js"; export default class BaseEntry { constructor(fragmentIdComparer) { @@ -17,6 +18,10 @@ export default class BaseEntry { compare(otherEntry) { if (this.fragmentId === otherEntry.fragmentId) { return this.entryIndex - otherEntry.entryIndex; + } else if (this.fragmentId === PENDING_FRAGMENT_ID) { + return 1; + } else if (otherEntry.fragmentId === PENDING_FRAGMENT_ID) { + return -1; } else { // This might throw if the relation of two fragments is unknown. return this._fragmentIdComparer.compare(this.fragmentId, otherEntry.fragmentId); diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js new file mode 100644 index 00000000..03b79cf4 --- /dev/null +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -0,0 +1,34 @@ +import BaseEntry from "./BaseEntry.js"; + +export const PENDING_FRAGMENT_ID = Number.MAX_SAFE_INTEGER; + +export default class PendingEventEntry extends BaseEntry { + constructor(pendingEvent) { + super(null); + this._pendingEvent = pendingEvent; + } + + get fragmentId() { + return PENDING_FRAGMENT_ID; + } + + get entryIndex() { + return this._pendingEvent.queueIndex; + } + + get content() { + return this._pendingEvent.content; + } + + get event() { + return null; + } + + get type() { + return this._pendingEvent.eventType; + } + + get id() { + return this._pendingEvent.txnId; + } +} diff --git a/src/matrix/storage/common.js b/src/matrix/storage/common.js index 3b7e4ae8..4a6c0a21 100644 --- a/src/matrix/storage/common.js +++ b/src/matrix/storage/common.js @@ -23,5 +23,8 @@ export class StorageError extends Error { fullMessage += cause.message; } super(fullMessage); + if (cause) { + this.errcode = cause.name; + } } } diff --git a/src/matrix/storage/idb/store.js b/src/matrix/storage/idb/store.js index eea33cc1..ae2008bb 100644 --- a/src/matrix/storage/idb/store.js +++ b/src/matrix/storage/idb/store.js @@ -47,6 +47,14 @@ class QueryTargetWrapper { } } + delete(...params) { + try { + return this._qt.delete(...params); + } catch(err) { + throw new StorageError("delete failed", err); + } + } + index(...params) { try { return this._qt.index(...params); @@ -76,4 +84,8 @@ export default class Store extends QueryTarget { add(value) { return reqAsPromise(this._idbStore.add(value)); } + + delete(keyOrKeyRange) { + return reqAsPromise(this._idbStore.delete(keyOrKeyRange)); + } } diff --git a/src/matrix/storage/idb/stores/PendingEventStore.js b/src/matrix/storage/idb/stores/PendingEventStore.js index 82eeb571..fa54848c 100644 --- a/src/matrix/storage/idb/stores/PendingEventStore.js +++ b/src/matrix/storage/idb/stores/PendingEventStore.js @@ -29,6 +29,17 @@ export default class PendingEventStore { } } + remove(roomId, queueIndex) { + const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex)); + this._eventStore.delete(keyRange); + } + + async exists(roomId, queueIndex) { + const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex)); + const key = await this._eventStore.getKey(keyRange); + return !!key; + } + add(pendingEvent) { pendingEvent.key = encodeKey(pendingEvent.roomId, pendingEvent.queueIndex); return this._eventStore.add(pendingEvent); diff --git a/src/observable/list/SortedArray.js b/src/observable/list/SortedArray.js index 8c90ce15..d38f94fd 100644 --- a/src/observable/list/SortedArray.js +++ b/src/observable/list/SortedArray.js @@ -32,8 +32,14 @@ export default class SortedArray extends BaseObservableList { } } - remove(item) { - throw new Error("unimplemented"); + get(idx) { + return this._items[idx]; + } + + remove(idx) { + const item = this._items[idx]; + this._items.splice(idx, 1); + this.emitRemove(idx, item); } get array() {