From c5b2d0c8b297d9ebc66be894f554d96688887b40 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 28 Jun 2019 00:52:54 +0200 Subject: [PATCH] WIP --- doc/SENDING.md | 42 +++++++++-- src/matrix/error.js | 1 + src/matrix/room/sending/PendingEvent.js | 13 ++++ src/matrix/room/sending/SendQueue.js | 97 +++++++++++++++++++++++++ src/ui/web/WebPlatform.js | 4 + 5 files changed, 151 insertions(+), 6 deletions(-) create mode 100644 src/matrix/room/sending/PendingEvent.js create mode 100644 src/matrix/room/sending/SendQueue.js diff --git a/doc/SENDING.md b/doc/SENDING.md index 68fc394f..b4e6382c 100644 --- a/doc/SENDING.md +++ b/doc/SENDING.md @@ -76,8 +76,36 @@ steps of sending // sender is the thing that is shared across rooms to handle rate limiting. const sendQueue = new SendQueue({roomId, hsApi, sender, storage}); await sendQueue.load(); //loads the queue? - //might need to load members for e2e rooms + //might need to load members for e2e rooms ... + //events should be encrypted before storing them though ... + + // terminology ...? + // task: to let us wait for it to be our turn + // given rate limiting + class Sender { + acquireSlot() { + return new SendSlot(); + } + } + // terminology ...? + // task: after waiting for it to be our turn given rate-limiting, + // send the actual thing we want to send. + // this should be used for all rate-limited apis... ? + class SendSlot { + sendContent(content) { + + } + + sendRedaction() { + + } + + uploadMedia() { + + } + } + class SendQueue { // when trying to send enqueueEvent(pendingEvent) { @@ -93,11 +121,13 @@ steps of sending while (let pendingEvent = await findNextPendingEvent()) { pendingEvent.status = QUEUED; try { - await this.sender.sendEvent(() => { - // callback gets called - pendingEvent.status = SENDING; - return pendingEvent; - }); + const mediaSlot = await this.sender.acquireSlot(); + const mxcUrl = await mediaSlot.uploadMedia(pendingEvent.blob); + pendingEvent.content.url = mxcUrl; + const contentSlot = await this.sender.acquireSlot(); + contentSlot.sendContent(pendingEvent.content); + pendingEvent.status = SENDING; + await slot.sendContent(...); } catch (err) { //offline } diff --git a/src/matrix/error.js b/src/matrix/error.js index a70ec3fa..31f1e1ed 100644 --- a/src/matrix/error.js +++ b/src/matrix/error.js @@ -2,6 +2,7 @@ export class HomeServerError extends Error { constructor(method, url, body) { super(`${body.error} on ${method} ${url}`); this.errcode = body.errcode; + this.retry_after_ms = body.retry_after_ms; } } diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js new file mode 100644 index 00000000..d653f7a6 --- /dev/null +++ b/src/matrix/room/sending/PendingEvent.js @@ -0,0 +1,13 @@ +export default class PendingEvent { + static fromRedaction(eventId) { + + } + + static fromContent(content) { + + } + + static fromStateKey(eventType, stateKey, content) { + + } +} diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js new file mode 100644 index 00000000..82c754ce --- /dev/null +++ b/src/matrix/room/sending/SendQueue.js @@ -0,0 +1,97 @@ +class Sender { + constructor({hsApi}) { + this._hsApi = hsApi; + this._slotRequests = []; + this._sendScheduled = false; + this._offline = false; + this._waitTime = 0; + } + + // this should really be per roomId to avoid head-of-line blocking + acquireSlot() { + let request; + const promise = new Promise((resolve) => request = {resolve}); + this._slotRequests.push(request); + if (!this._sendScheduled) { + this._startNextSlot(); + } + return promise; + } + + async _startNextSlot() { + if (this._waitTime !== 0) { + await Platform.delay(this._waitTime); + } + const request = this._slotRequests.unshift(); + this._currentSlot = new SenderSlot(this); + request.resolve(this._currentSlot); + } + + _discardSlot(slot) { + if (slot === this._currentSlot) { + this._currentSlot = null; + this._sendScheduled = true; + Promise.resolve().then(() => this._startNextSlot()); + } + } + + async _doSend(slot, callback) { + this._sendScheduled = false; + if (slot !== this._currentSlot) { + throw new Error("slot is not active"); + } + try { + // loop is left by return or throw + while(true) { + try { + return await callback(this._hsApi); + } catch (err) { + if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { + await Platform.delay(err.retry_after_ms); + } else { + throw err; + } + } + } + } catch (err) { + if (err instanceof NetworkError) { + this._offline = true; + // went offline, probably want to notify SendQueues somehow + } + throw err; + } finally { + this._currentSlot = null; + if (!this._offline && this._slotRequests.length) { + this._sendScheduled = true; + Promise.resolve().then(() => this._startNextSlot()); + } + } + } +} + +class SenderSlot { + constructor(sender) { + this._sender = sender; + } + + sendEvent(pendingEvent) { + return this._sender._doSend(this, async hsApi => { + const request = hsApi.send( + pendingEvent.roomId, + pendingEvent.eventType, + pendingEvent.txnId, + pendingEvent.content + ); + const response = await request.response(); + return response.event_id; + }); + } + + discard() { + this._sender._discardSlot(this); + } +} + +export default class SendQueue { + constructor({sender}) +} diff --git a/src/ui/web/WebPlatform.js b/src/ui/web/WebPlatform.js index 1026d142..4f3d9e06 100644 --- a/src/ui/web/WebPlatform.js +++ b/src/ui/web/WebPlatform.js @@ -13,4 +13,8 @@ export default { // for indexeddb, we use unsigned 32 bit integers as keys return 0xFFFFFFFF; }, + + delay(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); + } }