diff --git a/src/matrix/SendScheduler.js b/src/matrix/SendScheduler.js index e7627c81..4080e8ad 100644 --- a/src/matrix/SendScheduler.js +++ b/src/matrix/SendScheduler.js @@ -14,72 +14,70 @@ See the License for the specific language governing permissions and limitations under the License. */ -import {Platform} from "../Platform.js"; -import {HomeServerError, ConnectionError} from "./error.js"; +import {AbortError} from "../utils/error.js"; +import {HomeServerError} from "./error.js"; +import {HomeServerApi} from "./net/HomeServerApi.js"; +import {ExponentialRetryDelay} from "./net/ExponentialRetryDelay.js"; -export class RateLimitingBackoff { - constructor() { - this._remainingRateLimitedRequest = 0; +class Request { + constructor(methodName, args) { + this._methodName = methodName; + this._args = args; + this._responsePromise = new Promise((resolve, reject) => { + this._resolve = resolve; + this._reject = reject; + }); + this._requestResult = null; } - async waitAfterLimitExceeded(retryAfterMs) { - // this._remainingRateLimitedRequest = 5; - // if (typeof retryAfterMs !== "number") { - // } else { - // } - if (!retryAfterMs) { - retryAfterMs = 5000; + abort() { + if (this._requestResult) { + this._requestResult.abort(); + } else { + this._reject(new AbortError()); } - await Platform.delay(retryAfterMs); } - // do we have to know about succeeding requests? - // we can just - - async waitForNextSend() { - // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); + response() { + return this._responsePromise; } } -/* -this represents a slot to do one rate limited api call. -because rate-limiting is handled here, it should only -try to do one call, so the SendScheduler can safely -retry if the call ends up being rate limited. -This is also why we have this abstraction it hsApi is not -passed straight to SendQueue when it is its turn to send. -e.g. we wouldn't want to repeat the callback in SendQueue that could -have other side-effects before the call to hsApi that we wouldn't want -repeated (setting up progress handlers for file uploads, -... a UI update to say it started sending? - ... updating storage would probably only happen once the call succeeded - ... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried) -) maybe it is a bit overengineering, but lets stick with it for now. -At least the above is a clear definition why we have this class -*/ -//class SendSlot -- obsolete +class HomeServerApiWrapper { + constructor(scheduler) { + this._scheduler = scheduler; + } +} + +// add request-wrapping methods to prototype +for (const methodName of Object.getOwnPropertyNames(HomeServerApi.prototype)) { + if (methodName !== "constructor" && !methodName.startsWith("_")) { + HomeServerApiWrapper.prototype[methodName] = function(...args) { + return this._scheduler._hsApiRequest(methodName, args); + }; + } +} export class SendScheduler { - constructor({hsApi, backoff}) { + constructor({hsApi, clock}) { this._hsApi = hsApi; - this._sendRequests = []; - this._sendScheduled = false; + this._clock = clock; + this._requests = new Set(); + this._isRateLimited = false; + this._isDrainingRateLimit = false; this._stopped = false; - this._waitTime = 0; - this._backoff = backoff; - /* - we should have some sort of flag here that we enable - after all the rooms have been notified that they can resume - sending, so that from session, we can say scheduler.enable(); - this way, when we have better scheduling, it won't be first come, - first serve, when there are a lot of events in different rooms to send, - but we can apply some priorization of who should go first - */ - // this._enabled; + } + + createHomeServerApiWrapper() { + return new HomeServerApiWrapper(this); } stop() { - // TODO: abort current requests and set offline + this._stopped = true; + for (const request of this._requests) { + request.abort(); + } + this._requests.clear(); } start() { @@ -90,60 +88,45 @@ export class SendScheduler { return !this._stopped; } - // this should really be per roomId to avoid head-of-line blocking - // - // takes a callback instead of returning a promise with the slot - // to make sure the scheduler doesn't get blocked by a slot that is not consumed - request(sendCallback) { - let request; - const promise = new Promise((resolve, reject) => request = {resolve, reject, sendCallback}); - this._sendRequests.push(request); - if (!this._sendScheduled && !this._stopped) { - this._sendLoop(); - } - return promise; + _hsApiRequest(name, args) { + const request = new Request(name, args); + this._doSend(request); + return request; } - async _sendLoop() { - while (this._sendRequests.length) { - const request = this._sendRequests.shift(); - let result; - try { - // this can throw! - result = await this._doSend(request.sendCallback); - } catch (err) { - if (err instanceof ConnectionError) { - // we're offline, everybody will have - // to re-request slots when we come back online - this._stopped = true; - for (const r of this._sendRequests) { - r.reject(err); + async _doSend(request) { + this._requests.add(request); + try { + let retryDelay; + while (!this._stopped) { + try { + const requestResult = this._hsApi[request._methodName].apply(this._hsApi, request._args); + // so the request can be aborted + request._requestResult = requestResult; + const response = await requestResult.response(); + request._resolve(response); + return; + } catch (err) { + if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { + if (Number.isSafeInteger(err.retry_after_ms)) { + await this._clock.createTimeout(err.retry_after_ms).elapsed(); + } else { + if (!retryDelay) { + retryDelay = new ExponentialRetryDelay(this._clock.createTimeout); + } + await retryDelay.waitForRetry(); + } + } else { + request._reject(err); + return; } - this._sendRequests = []; - } - console.error("error for request", err); - request.reject(err); - break; - } - request.resolve(result); - } - // do next here instead of in _doSend - } - - async _doSend(sendCallback) { - this._sendScheduled = false; - await this._backoff.waitForNextSend(); - // loop is left by return or throw - while (true) { // eslint-disable-line no-constant-condition - try { - return await sendCallback(this._hsApi); - } catch (err) { - if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { - await this._backoff.waitAfterLimitExceeded(err.retry_after_ms); - } else { - throw err; } } + if (this._stopped) { + request.abort(); + } + } finally { + this._requests.delete(request); } } } diff --git a/src/matrix/Session.js b/src/matrix/Session.js index c2c2e20a..32fe96b5 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -16,7 +16,7 @@ limitations under the License. import {Room} from "./room/Room.js"; import { ObservableMap } from "../observable/index.js"; -import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js"; +import {SendScheduler} from "./SendScheduler.js"; import {User} from "./User.js"; import {DeviceMessageHandler} from "./DeviceMessageHandler.js"; import {Account as E2EEAccount} from "./e2ee/Account.js"; @@ -42,15 +42,15 @@ const PICKLE_KEY = "DEFAULT_KEY"; export class Session { // sessionInfo contains deviceId, userId and homeServer - constructor({clock, storage, hsApi, sessionInfo, olm, olmWorker, cryptoDriver, mediaRepository}) { + constructor({clock, storage, unwrappedHsApi, sessionInfo, olm, olmWorker, cryptoDriver, mediaRepository}) { this._clock = clock; this._storage = storage; - this._hsApi = hsApi; + this._sendScheduler = new SendScheduler({hsApi: unwrappedHsApi, clock}); + this._hsApi = this._sendScheduler.createHomeServerApiWrapper(); this._mediaRepository = mediaRepository; this._syncInfo = null; this._sessionInfo = sessionInfo; this._rooms = new ObservableMap(); - this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()}); this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params); this._user = new User(sessionInfo.userId); this._deviceMessageHandler = new DeviceMessageHandler({storage}); @@ -332,9 +332,8 @@ export class Session { storage: this._storage, emitCollectionChange: this._roomUpdateCallback, hsApi: this._hsApi, - sendScheduler: this._sendScheduler, -._hsApi, - mediaRepository: this._mediaRep pendingEvents, + mediaRepository: this._mediaRepository, + pendingEvents, user: this._user, createRoomEncryption: this._createRoomEncryption, clock: this._clock diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index 18b5d18c..99c2155e 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -31,7 +31,7 @@ import {DecryptionSource} from "../e2ee/common.js"; const EVENT_ENCRYPTED_TYPE = "m.room.encrypted"; export class Room extends EventEmitter { - constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, sendScheduler, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) { + constructor({roomId, storage, hsApi, mediaRepository, emitCollectionChange, pendingEvents, user, createRoomEncryption, getSyncToken, clock}) { super(); this._roomId = roomId; this._storage = storage; @@ -41,7 +41,7 @@ export class Room extends EventEmitter { this._fragmentIdComparer = new FragmentIdComparer([]); this._syncWriter = new SyncWriter({roomId, fragmentIdComparer: this._fragmentIdComparer}); this._emitCollectionChange = emitCollectionChange; - this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents}); + this._sendQueue = new SendQueue({roomId, storage, hsApi, pendingEvents}); this._timeline = null; this._user = user; this._changedMembersDuringSync = null; diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index fe7afe77..52c2e7b8 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -20,11 +20,11 @@ import {PendingEvent} from "./PendingEvent.js"; import {makeTxnId} from "../../common.js"; export class SendQueue { - constructor({roomId, storage, sendScheduler, pendingEvents}) { + constructor({roomId, storage, hsApi, pendingEvents}) { pendingEvents = pendingEvents || []; this._roomId = roomId; this._storage = storage; - this._sendScheduler = sendScheduler; + this._hsApi = hsApi; this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); if (pendingEvents.length) { console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents); @@ -51,22 +51,18 @@ export class SendQueue { continue; } if (pendingEvent.needsEncryption) { - const {type, content} = await this._sendScheduler.request(async hsApi => { - return await this._roomEncryption.encrypt(pendingEvent.eventType, pendingEvent.content, hsApi); - }); + const {type, content} = await this._roomEncryption.encrypt( + pendingEvent.eventType, pendingEvent.content, this._hsApi); pendingEvent.setEncrypted(type, content); await this._tryUpdateEvent(pendingEvent); } console.log("really sending now"); - const response = await this._sendScheduler.request(hsApi => { - console.log("got sendScheduler slot"); - return hsApi.send( + const response = await this._hsApi.send( pendingEvent.roomId, pendingEvent.eventType, pendingEvent.txnId, pendingEvent.content ).response(); - }); pendingEvent.remoteId = response.event_id; // console.log("writing remoteId now");