From f3d1128f28056813f6c6cbda631ef7fa8f5bea56 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 1 Jul 2019 10:00:29 +0200 Subject: [PATCH] WIP --- doc/GOAL.md | 3 + src/matrix/error.js | 6 + src/matrix/room/sending/PendingEvent.js | 14 +- src/matrix/room/sending/SendQueue.js | 216 +++++++++++++----- src/matrix/storage/common.js | 9 +- src/matrix/storage/idb/create.js | 1 + src/matrix/storage/idb/query-target.js | 10 + .../storage/idb/stores/PendingEventStore.js | 44 ++++ .../storage/idb/stores/TimelineEventStore.js | 7 +- .../idb/stores/TimelineFragmentStore.js | 5 +- src/matrix/storage/idb/utils.js | 11 + src/observable/list/ObservableArray.js | 4 +- 12 files changed, 256 insertions(+), 74 deletions(-) create mode 100644 src/matrix/storage/idb/stores/PendingEventStore.js diff --git a/doc/GOAL.md b/doc/GOAL.md index 9ee0a283..3883cf27 100644 --- a/doc/GOAL.md +++ b/doc/GOAL.md @@ -3,3 +3,6 @@ goal: write client that works on lumia 950 phone, so I can use matrix on my phone. try approach offline to indexeddb. go low-memory, and test the performance of storing every event individually in indexeddb. + +try to use little bandwidth, mainly by being an offline application and storing all requested data in indexeddb. +be as functional as possible while offline diff --git a/src/matrix/error.js b/src/matrix/error.js index 31f1e1ed..888b1256 100644 --- a/src/matrix/error.js +++ b/src/matrix/error.js @@ -4,6 +4,12 @@ export class HomeServerError extends Error { this.errcode = body.errcode; this.retry_after_ms = body.retry_after_ms; } + + get isFatal() { + switch (this.errcode) { + + } + } } export class RequestAbortError extends Error { diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index d653f7a6..11e7987f 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -1,13 +1,11 @@ export default class PendingEvent { - static fromRedaction(eventId) { - + constructor(roomId, queueIndex, eventType, content, txnId) { + this._roomId = roomId; + this._eventType = eventType; + this._content = content; + this._txnId = txnId; + this._queueIndex = queueIndex; } - static fromContent(content) { - } - - static fromStateKey(eventType, stateKey, content) { - - } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 82c754ce..2690bdd2 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -1,81 +1,124 @@ -class Sender { - constructor({hsApi}) { +import Platform from "../../../Platform.js"; + +class RateLimitingBackoff { + constructor() { + this._remainingRateLimitedRequest = 0; + } + + async waitAfterLimitExceeded(retryAfterMs) { + // this._remainingRateLimitedRequest = 5; + // if (typeof retryAfterMs !== "number") { + // } else { + // } + if (!retryAfterMs) { + retryAfterMs = 5000; + } + await Platform.delay(retryAfterMs); + } + + // do we have to know about succeeding requests? + // we can just + + async waitForNextSend() { + // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); + Platform.delay(1000); + } +} + +class SendScheduler { + constructor({hsApi, backoff}) { this._hsApi = hsApi; this._slotRequests = []; this._sendScheduled = false; this._offline = false; this._waitTime = 0; + this._backoff = backoff; } // this should really be per roomId to avoid head-of-line blocking - acquireSlot() { + // + // takes a callback instead of returning a promise with the slot + // to make sure the scheduler doesn't get blocked by a slot that is not consumed + runSlot(slotCallback) { let request; - const promise = new Promise((resolve) => request = {resolve}); + const promise = new Promise((resolve, reject) => request = {resolve, reject, slotCallback}); this._slotRequests.push(request); - if (!this._sendScheduled) { - this._startNextSlot(); + if (!this._sendScheduled && !this._offline) { + this._sendLoop(); } return promise; } - async _startNextSlot() { - if (this._waitTime !== 0) { - await Platform.delay(this._waitTime); + async _sendLoop() { + while (this._slotRequests.length) { + const request = this._slotRequests.unshift(); + this._currentSlot = new SendSlot(this); + // this can throw! + let result; + try { + result = await request.slotCallback(this._currentSlot); + } catch (err) { + if (err instanceof NetworkError) { + // we're offline, everybody will have + // to re-request slots when we come back online + this._offline = true; + for (const r of this._slotRequests) { + r.reject(err); + } + this._slotRequests = []; + } + request.reject(err); + break; + } + request.resolve(result); } - const request = this._slotRequests.unshift(); - this._currentSlot = new SenderSlot(this); - request.resolve(this._currentSlot); + // do next here instead of in _doSend } - _discardSlot(slot) { - if (slot === this._currentSlot) { - this._currentSlot = null; - this._sendScheduled = true; - Promise.resolve().then(() => this._startNextSlot()); - } - } - - async _doSend(slot, callback) { + async _doSend(slot, sendCallback) { this._sendScheduled = false; if (slot !== this._currentSlot) { - throw new Error("slot is not active"); + throw new Error("Slot is not active"); } - try { - // loop is left by return or throw - while(true) { - try { - return await callback(this._hsApi); - } catch (err) { - if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { - await Platform.delay(err.retry_after_ms); - } else { - throw err; - } + await this._backoff.waitForNextSend(); + // loop is left by return or throw + while (true) { // eslint-disable-line no-constant-condition + try { + return await sendCallback(this._hsApi); + } catch (err) { + if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { + await this._backoff.waitAfterLimitExceeded(err.retry_after_ms); + } else { + throw err; } } - } catch (err) { - if (err instanceof NetworkError) { - this._offline = true; - // went offline, probably want to notify SendQueues somehow - } - throw err; - } finally { - this._currentSlot = null; - if (!this._offline && this._slotRequests.length) { - this._sendScheduled = true; - Promise.resolve().then(() => this._startNextSlot()); - } } } } -class SenderSlot { - constructor(sender) { - this._sender = sender; +/* +this represents a slot to do one rate limited api call. +because rate-limiting is handled here, it should only +try to do one call, so the SendScheduler can safely +retry if the call ends up being rate limited. +This is also why we have this abstraction it hsApi is not +passed straight to SendQueue when it is its turn to send. +e.g. we wouldn't want to repeat the callback in SendQueue that could +have other side-effects before the call to hsApi that we wouldn't want +repeated (setting up progress handlers for file uploads, +... a UI update to say it started sending? + ... updating storage would probably only happen once the call succeeded + ... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried) +) maybe it is a bit overengineering, but lets stick with it for now. +At least the above is a clear definition why we have this class +*/ +class SendSlot { + constructor(scheduler) { + this._scheduler = scheduler; } - sendEvent(pendingEvent) { - return this._sender._doSend(this, async hsApi => { + sendContentEvent(pendingEvent) { + return this._scheduler._doSend(this, async hsApi => { const request = hsApi.send( pendingEvent.roomId, pendingEvent.eventType, @@ -87,11 +130,76 @@ class SenderSlot { }); } - discard() { - this._sender._discardSlot(this); + sendRedaction(pendingEvent) { + return this._scheduler._doSend(this, async hsApi => { + const request = hsApi.redact( + pendingEvent.roomId, + pendingEvent.redacts, + pendingEvent.txnId, + pendingEvent.reason + ); + const response = await request.response(); + return response.event_id; + }); + } + + // progressCallback should report the amount of bytes sent + uploadMedia(fileName, contentType, blob, progressCallback) { + } } -export default class SendQueue { - constructor({sender}) +function makeTxnId() { + const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); + const str = n.toString(16); + return "t" + "0".repeat(14 - str.length) + str; +} + +export default class SendQueue { + constructor({roomId, storage, scheduler, pendingEvents}) { + this._roomId = roomId; + this._storage = storage; + this._scheduler = scheduler; + this._pendingEvents = pendingEvents.map(d => PendingEvent.fromData(d)); + } + + async _sendLoop() { + let pendingEvent = null; + // eslint-disable-next-line no-cond-assign + while (pendingEvent = await this._nextPendingEvent()) { + // const mxcUrl = await this._scheduler.runSlot(slot => { + // return slot.uploadMedia(fileName, contentType, blob, bytesSent => { + // pendingEvent.updateAttachmentUploadProgress(bytesSent); + // }); + // }); + + // pendingEvent.setAttachmentUrl(mxcUrl); + //update storage for pendingEvent after updating url, + //remove blob only later to keep preview? + + await this._scheduler.runSlot(slot => { + if (pendingEvent.isRedaction) { + return slot.sendRedaction(pendingEvent); + } else if (pendingEvent.isContentEvent) { + return slot.sendContentEvent(pendingEvent); + } + }); + } + } + + async enqueueEvent(eventType, content) { + // temporary + const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + const pendingEventsStore = txn.pendingEvents; + const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; + const queueIndex = maxQueueIndex + 1; + const pendingEvent = new PendingEvent(this._roomId, queueIndex, eventType, content, makeTxnId()); + pendingEventsStore.add(pendingEvent.data); + await txn.complete(); + // create txnId + // create queueOrder + // store event + // if online and not running send loop + // start sending loop + } } diff --git a/src/matrix/storage/common.js b/src/matrix/storage/common.js index aa72fa0e..3b7e4ae8 100644 --- a/src/matrix/storage/common.js +++ b/src/matrix/storage/common.js @@ -1,4 +1,11 @@ -export const STORE_NAMES = Object.freeze(["session", "roomState", "roomSummary", "timelineEvents", "timelineFragments"]); +export const STORE_NAMES = Object.freeze([ + "session", + "roomState", + "roomSummary", + "timelineEvents", + "timelineFragments", + "pendingEvents", +]); export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => { nameMap[name] = name; diff --git a/src/matrix/storage/idb/create.js b/src/matrix/storage/idb/create.js index 42b41ae0..5c476679 100644 --- a/src/matrix/storage/idb/create.js +++ b/src/matrix/storage/idb/create.js @@ -20,6 +20,7 @@ function createStores(db) { timelineEvents.createIndex("byEventId", "eventIdKey", {unique: true}); //key = room_id | event.type | event.state_key, db.createObjectStore("roomState", {keyPath: "key"}); + db.createObjectStore("pendingEvents", {keyPath: "key"}); // const roomMembers = db.createObjectStore("roomMembers", {keyPath: [ // "event.room_id", diff --git a/src/matrix/storage/idb/query-target.js b/src/matrix/storage/idb/query-target.js index e8eb36fb..45d268b6 100644 --- a/src/matrix/storage/idb/query-target.js +++ b/src/matrix/storage/idb/query-target.js @@ -71,6 +71,16 @@ export default class QueryTarget { return this._find(range, predicate, "prev"); } + async findMaxKey(range) { + const cursor = this._target.openKeyCursor(range, "prev"); + let maxKey; + await iterateCursor(cursor, (_, key) => { + maxKey = key; + return {done: true}; + }); + return maxKey; + } + /** * Checks if a given set of keys exist. * Calls `callback(key, found)` for each key in `keys`, in key sorting order (or reversed if backwards=true). diff --git a/src/matrix/storage/idb/stores/PendingEventStore.js b/src/matrix/storage/idb/stores/PendingEventStore.js new file mode 100644 index 00000000..82eeb571 --- /dev/null +++ b/src/matrix/storage/idb/stores/PendingEventStore.js @@ -0,0 +1,44 @@ +import { encodeUint32, decodeUint32 } from "../utils.js"; +import Platform from "../../../../Platform.js"; + +function encodeKey(roomId, queueIndex) { + return `${roomId}|${encodeUint32(queueIndex)}`; +} + +function decodeKey(key) { + const [roomId, encodedQueueIndex] = key.split("|"); + const queueIndex = decodeUint32(encodedQueueIndex); + return {roomId, queueIndex}; +} + +export default class PendingEventStore { + constructor(eventStore) { + this._eventStore = eventStore; + } + + async getMaxQueueIndex(roomId) { + const range = IDBKeyRange.bound( + encodeKey(roomId, Platform.minStorageKey), + encodeKey(roomId, Platform.maxStorageKey), + false, + false, + ); + const maxKey = await this._eventStore.findMaxKey(range); + if (maxKey) { + return decodeKey(maxKey).queueIndex; + } + } + + add(pendingEvent) { + pendingEvent.key = encodeKey(pendingEvent.roomId, pendingEvent.queueIndex); + return this._eventStore.add(pendingEvent); + } + + update(pendingEvent) { + return this._eventStore.put(pendingEvent); + } + + getAllEvents() { + return this._eventStore.selectAll(); + } +} diff --git a/src/matrix/storage/idb/stores/TimelineEventStore.js b/src/matrix/storage/idb/stores/TimelineEventStore.js index 5285cd6b..f54fc758 100644 --- a/src/matrix/storage/idb/stores/TimelineEventStore.js +++ b/src/matrix/storage/idb/stores/TimelineEventStore.js @@ -1,13 +1,8 @@ import EventKey from "../../../room/timeline/EventKey.js"; import { StorageError } from "../../common.js"; +import { encodeUint32 } from "../utils.js"; import Platform from "../../../../Platform.js"; -// storage keys are defined to be unsigned 32bit numbers in WebPlatform.js, which is assumed by idb -function encodeUint32(n) { - const hex = n.toString(16); - return "0".repeat(8 - hex.length) + hex; -} - function encodeKey(roomId, fragmentId, eventIndex) { return `${roomId}|${encodeUint32(fragmentId)}|${encodeUint32(eventIndex)}`; } diff --git a/src/matrix/storage/idb/stores/TimelineFragmentStore.js b/src/matrix/storage/idb/stores/TimelineFragmentStore.js index b84759bd..064daed7 100644 --- a/src/matrix/storage/idb/stores/TimelineFragmentStore.js +++ b/src/matrix/storage/idb/stores/TimelineFragmentStore.js @@ -1,10 +1,9 @@ import { StorageError } from "../../common.js"; import Platform from "../../../../Platform.js"; +import { encodeUint32 } from "../utils.js"; function encodeKey(roomId, fragmentId) { - let fragmentIdHex = fragmentId.toString(16); - fragmentIdHex = "0".repeat(8 - fragmentIdHex.length) + fragmentIdHex; - return `${roomId}|${fragmentIdHex}`; + return `${roomId}|${encodeUint32(fragmentId)}`; } export default class RoomFragmentStore { diff --git a/src/matrix/storage/idb/utils.js b/src/matrix/storage/idb/utils.js index 367d5e7d..e51c5dab 100644 --- a/src/matrix/storage/idb/utils.js +++ b/src/matrix/storage/idb/utils.js @@ -1,5 +1,16 @@ import { StorageError } from "../common.js"; + +// storage keys are defined to be unsigned 32bit numbers in WebPlatform.js, which is assumed by idb +export function encodeUint32(n) { + const hex = n.toString(16); + return "0".repeat(8 - hex.length) + hex; +} + +export function decodeUint32(str) { + return parseInt(str, 16); +} + export function openDatabase(name, createObjectStore, version) { const req = window.indexedDB.open(name, version); req.onupgradeneeded = (ev) => { diff --git a/src/observable/list/ObservableArray.js b/src/observable/list/ObservableArray.js index bb5c7758..47b0e24a 100644 --- a/src/observable/list/ObservableArray.js +++ b/src/observable/list/ObservableArray.js @@ -1,9 +1,9 @@ import BaseObservableList from "./BaseObservableList.js"; export default class ObservableArray extends BaseObservableList { - constructor() { + constructor(initialValues = []) { super(); - this._items = []; + this._items = initialValues; } append(item) {