From c5b2d0c8b297d9ebc66be894f554d96688887b40 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 28 Jun 2019 00:52:54 +0200 Subject: [PATCH 01/24] WIP --- doc/SENDING.md | 42 +++++++++-- src/matrix/error.js | 1 + src/matrix/room/sending/PendingEvent.js | 13 ++++ src/matrix/room/sending/SendQueue.js | 97 +++++++++++++++++++++++++ src/ui/web/WebPlatform.js | 4 + 5 files changed, 151 insertions(+), 6 deletions(-) create mode 100644 src/matrix/room/sending/PendingEvent.js create mode 100644 src/matrix/room/sending/SendQueue.js diff --git a/doc/SENDING.md b/doc/SENDING.md index 68fc394f..b4e6382c 100644 --- a/doc/SENDING.md +++ b/doc/SENDING.md @@ -76,8 +76,36 @@ steps of sending // sender is the thing that is shared across rooms to handle rate limiting. const sendQueue = new SendQueue({roomId, hsApi, sender, storage}); await sendQueue.load(); //loads the queue? - //might need to load members for e2e rooms + //might need to load members for e2e rooms ... + //events should be encrypted before storing them though ... + + // terminology ...? + // task: to let us wait for it to be our turn + // given rate limiting + class Sender { + acquireSlot() { + return new SendSlot(); + } + } + // terminology ...? + // task: after waiting for it to be our turn given rate-limiting, + // send the actual thing we want to send. + // this should be used for all rate-limited apis... ? + class SendSlot { + sendContent(content) { + + } + + sendRedaction() { + + } + + uploadMedia() { + + } + } + class SendQueue { // when trying to send enqueueEvent(pendingEvent) { @@ -93,11 +121,13 @@ steps of sending while (let pendingEvent = await findNextPendingEvent()) { pendingEvent.status = QUEUED; try { - await this.sender.sendEvent(() => { - // callback gets called - pendingEvent.status = SENDING; - return pendingEvent; - }); + const mediaSlot = await this.sender.acquireSlot(); + const mxcUrl = await mediaSlot.uploadMedia(pendingEvent.blob); + pendingEvent.content.url = mxcUrl; + const contentSlot = await this.sender.acquireSlot(); + contentSlot.sendContent(pendingEvent.content); + pendingEvent.status = SENDING; + await slot.sendContent(...); } catch (err) { //offline } diff --git a/src/matrix/error.js b/src/matrix/error.js index a70ec3fa..31f1e1ed 100644 --- a/src/matrix/error.js +++ b/src/matrix/error.js @@ -2,6 +2,7 @@ export class HomeServerError extends Error { constructor(method, url, body) { super(`${body.error} on ${method} ${url}`); this.errcode = body.errcode; + this.retry_after_ms = body.retry_after_ms; } } diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js new file mode 100644 index 00000000..d653f7a6 --- /dev/null +++ b/src/matrix/room/sending/PendingEvent.js @@ -0,0 +1,13 @@ +export default class PendingEvent { + static fromRedaction(eventId) { + + } + + static fromContent(content) { + + } + + static fromStateKey(eventType, stateKey, content) { + + } +} diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js new file mode 100644 index 00000000..82c754ce --- /dev/null +++ b/src/matrix/room/sending/SendQueue.js @@ -0,0 +1,97 @@ +class Sender { + constructor({hsApi}) { + this._hsApi = hsApi; + this._slotRequests = []; + this._sendScheduled = false; + this._offline = false; + this._waitTime = 0; + } + + // this should really be per roomId to avoid head-of-line blocking + acquireSlot() { + let request; + const promise = new Promise((resolve) => request = {resolve}); + this._slotRequests.push(request); + if (!this._sendScheduled) { + this._startNextSlot(); + } + return promise; + } + + async _startNextSlot() { + if (this._waitTime !== 0) { + await Platform.delay(this._waitTime); + } + const request = this._slotRequests.unshift(); + this._currentSlot = new SenderSlot(this); + request.resolve(this._currentSlot); + } + + _discardSlot(slot) { + if (slot === this._currentSlot) { + this._currentSlot = null; + this._sendScheduled = true; + Promise.resolve().then(() => this._startNextSlot()); + } + } + + async _doSend(slot, callback) { + this._sendScheduled = false; + if (slot !== this._currentSlot) { + throw new Error("slot is not active"); + } + try { + // loop is left by return or throw + while(true) { + try { + return await callback(this._hsApi); + } catch (err) { + if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { + await Platform.delay(err.retry_after_ms); + } else { + throw err; + } + } + } + } catch (err) { + if (err instanceof NetworkError) { + this._offline = true; + // went offline, probably want to notify SendQueues somehow + } + throw err; + } finally { + this._currentSlot = null; + if (!this._offline && this._slotRequests.length) { + this._sendScheduled = true; + Promise.resolve().then(() => this._startNextSlot()); + } + } + } +} + +class SenderSlot { + constructor(sender) { + this._sender = sender; + } + + sendEvent(pendingEvent) { + return this._sender._doSend(this, async hsApi => { + const request = hsApi.send( + pendingEvent.roomId, + pendingEvent.eventType, + pendingEvent.txnId, + pendingEvent.content + ); + const response = await request.response(); + return response.event_id; + }); + } + + discard() { + this._sender._discardSlot(this); + } +} + +export default class SendQueue { + constructor({sender}) +} diff --git a/src/ui/web/WebPlatform.js b/src/ui/web/WebPlatform.js index 1026d142..4f3d9e06 100644 --- a/src/ui/web/WebPlatform.js +++ b/src/ui/web/WebPlatform.js @@ -13,4 +13,8 @@ export default { // for indexeddb, we use unsigned 32 bit integers as keys return 0xFFFFFFFF; }, + + delay(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); + } } From f3d1128f28056813f6c6cbda631ef7fa8f5bea56 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 1 Jul 2019 10:00:29 +0200 Subject: [PATCH 02/24] WIP --- doc/GOAL.md | 3 + src/matrix/error.js | 6 + src/matrix/room/sending/PendingEvent.js | 14 +- src/matrix/room/sending/SendQueue.js | 216 +++++++++++++----- src/matrix/storage/common.js | 9 +- src/matrix/storage/idb/create.js | 1 + src/matrix/storage/idb/query-target.js | 10 + .../storage/idb/stores/PendingEventStore.js | 44 ++++ .../storage/idb/stores/TimelineEventStore.js | 7 +- .../idb/stores/TimelineFragmentStore.js | 5 +- src/matrix/storage/idb/utils.js | 11 + src/observable/list/ObservableArray.js | 4 +- 12 files changed, 256 insertions(+), 74 deletions(-) create mode 100644 src/matrix/storage/idb/stores/PendingEventStore.js diff --git a/doc/GOAL.md b/doc/GOAL.md index 9ee0a283..3883cf27 100644 --- a/doc/GOAL.md +++ b/doc/GOAL.md @@ -3,3 +3,6 @@ goal: write client that works on lumia 950 phone, so I can use matrix on my phone. try approach offline to indexeddb. go low-memory, and test the performance of storing every event individually in indexeddb. + +try to use little bandwidth, mainly by being an offline application and storing all requested data in indexeddb. +be as functional as possible while offline diff --git a/src/matrix/error.js b/src/matrix/error.js index 31f1e1ed..888b1256 100644 --- a/src/matrix/error.js +++ b/src/matrix/error.js @@ -4,6 +4,12 @@ export class HomeServerError extends Error { this.errcode = body.errcode; this.retry_after_ms = body.retry_after_ms; } + + get isFatal() { + switch (this.errcode) { + + } + } } export class RequestAbortError extends Error { diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index d653f7a6..11e7987f 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -1,13 +1,11 @@ export default class PendingEvent { - static fromRedaction(eventId) { - + constructor(roomId, queueIndex, eventType, content, txnId) { + this._roomId = roomId; + this._eventType = eventType; + this._content = content; + this._txnId = txnId; + this._queueIndex = queueIndex; } - static fromContent(content) { - } - - static fromStateKey(eventType, stateKey, content) { - - } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 82c754ce..2690bdd2 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -1,81 +1,124 @@ -class Sender { - constructor({hsApi}) { +import Platform from "../../../Platform.js"; + +class RateLimitingBackoff { + constructor() { + this._remainingRateLimitedRequest = 0; + } + + async waitAfterLimitExceeded(retryAfterMs) { + // this._remainingRateLimitedRequest = 5; + // if (typeof retryAfterMs !== "number") { + // } else { + // } + if (!retryAfterMs) { + retryAfterMs = 5000; + } + await Platform.delay(retryAfterMs); + } + + // do we have to know about succeeding requests? + // we can just + + async waitForNextSend() { + // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); + Platform.delay(1000); + } +} + +class SendScheduler { + constructor({hsApi, backoff}) { this._hsApi = hsApi; this._slotRequests = []; this._sendScheduled = false; this._offline = false; this._waitTime = 0; + this._backoff = backoff; } // this should really be per roomId to avoid head-of-line blocking - acquireSlot() { + // + // takes a callback instead of returning a promise with the slot + // to make sure the scheduler doesn't get blocked by a slot that is not consumed + runSlot(slotCallback) { let request; - const promise = new Promise((resolve) => request = {resolve}); + const promise = new Promise((resolve, reject) => request = {resolve, reject, slotCallback}); this._slotRequests.push(request); - if (!this._sendScheduled) { - this._startNextSlot(); + if (!this._sendScheduled && !this._offline) { + this._sendLoop(); } return promise; } - async _startNextSlot() { - if (this._waitTime !== 0) { - await Platform.delay(this._waitTime); + async _sendLoop() { + while (this._slotRequests.length) { + const request = this._slotRequests.unshift(); + this._currentSlot = new SendSlot(this); + // this can throw! + let result; + try { + result = await request.slotCallback(this._currentSlot); + } catch (err) { + if (err instanceof NetworkError) { + // we're offline, everybody will have + // to re-request slots when we come back online + this._offline = true; + for (const r of this._slotRequests) { + r.reject(err); + } + this._slotRequests = []; + } + request.reject(err); + break; + } + request.resolve(result); } - const request = this._slotRequests.unshift(); - this._currentSlot = new SenderSlot(this); - request.resolve(this._currentSlot); + // do next here instead of in _doSend } - _discardSlot(slot) { - if (slot === this._currentSlot) { - this._currentSlot = null; - this._sendScheduled = true; - Promise.resolve().then(() => this._startNextSlot()); - } - } - - async _doSend(slot, callback) { + async _doSend(slot, sendCallback) { this._sendScheduled = false; if (slot !== this._currentSlot) { - throw new Error("slot is not active"); + throw new Error("Slot is not active"); } - try { - // loop is left by return or throw - while(true) { - try { - return await callback(this._hsApi); - } catch (err) { - if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { - await Platform.delay(err.retry_after_ms); - } else { - throw err; - } + await this._backoff.waitForNextSend(); + // loop is left by return or throw + while (true) { // eslint-disable-line no-constant-condition + try { + return await sendCallback(this._hsApi); + } catch (err) { + if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { + await this._backoff.waitAfterLimitExceeded(err.retry_after_ms); + } else { + throw err; } } - } catch (err) { - if (err instanceof NetworkError) { - this._offline = true; - // went offline, probably want to notify SendQueues somehow - } - throw err; - } finally { - this._currentSlot = null; - if (!this._offline && this._slotRequests.length) { - this._sendScheduled = true; - Promise.resolve().then(() => this._startNextSlot()); - } } } } -class SenderSlot { - constructor(sender) { - this._sender = sender; +/* +this represents a slot to do one rate limited api call. +because rate-limiting is handled here, it should only +try to do one call, so the SendScheduler can safely +retry if the call ends up being rate limited. +This is also why we have this abstraction it hsApi is not +passed straight to SendQueue when it is its turn to send. +e.g. we wouldn't want to repeat the callback in SendQueue that could +have other side-effects before the call to hsApi that we wouldn't want +repeated (setting up progress handlers for file uploads, +... a UI update to say it started sending? + ... updating storage would probably only happen once the call succeeded + ... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried) +) maybe it is a bit overengineering, but lets stick with it for now. +At least the above is a clear definition why we have this class +*/ +class SendSlot { + constructor(scheduler) { + this._scheduler = scheduler; } - sendEvent(pendingEvent) { - return this._sender._doSend(this, async hsApi => { + sendContentEvent(pendingEvent) { + return this._scheduler._doSend(this, async hsApi => { const request = hsApi.send( pendingEvent.roomId, pendingEvent.eventType, @@ -87,11 +130,76 @@ class SenderSlot { }); } - discard() { - this._sender._discardSlot(this); + sendRedaction(pendingEvent) { + return this._scheduler._doSend(this, async hsApi => { + const request = hsApi.redact( + pendingEvent.roomId, + pendingEvent.redacts, + pendingEvent.txnId, + pendingEvent.reason + ); + const response = await request.response(); + return response.event_id; + }); + } + + // progressCallback should report the amount of bytes sent + uploadMedia(fileName, contentType, blob, progressCallback) { + } } -export default class SendQueue { - constructor({sender}) +function makeTxnId() { + const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); + const str = n.toString(16); + return "t" + "0".repeat(14 - str.length) + str; +} + +export default class SendQueue { + constructor({roomId, storage, scheduler, pendingEvents}) { + this._roomId = roomId; + this._storage = storage; + this._scheduler = scheduler; + this._pendingEvents = pendingEvents.map(d => PendingEvent.fromData(d)); + } + + async _sendLoop() { + let pendingEvent = null; + // eslint-disable-next-line no-cond-assign + while (pendingEvent = await this._nextPendingEvent()) { + // const mxcUrl = await this._scheduler.runSlot(slot => { + // return slot.uploadMedia(fileName, contentType, blob, bytesSent => { + // pendingEvent.updateAttachmentUploadProgress(bytesSent); + // }); + // }); + + // pendingEvent.setAttachmentUrl(mxcUrl); + //update storage for pendingEvent after updating url, + //remove blob only later to keep preview? + + await this._scheduler.runSlot(slot => { + if (pendingEvent.isRedaction) { + return slot.sendRedaction(pendingEvent); + } else if (pendingEvent.isContentEvent) { + return slot.sendContentEvent(pendingEvent); + } + }); + } + } + + async enqueueEvent(eventType, content) { + // temporary + const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + const pendingEventsStore = txn.pendingEvents; + const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; + const queueIndex = maxQueueIndex + 1; + const pendingEvent = new PendingEvent(this._roomId, queueIndex, eventType, content, makeTxnId()); + pendingEventsStore.add(pendingEvent.data); + await txn.complete(); + // create txnId + // create queueOrder + // store event + // if online and not running send loop + // start sending loop + } } diff --git a/src/matrix/storage/common.js b/src/matrix/storage/common.js index aa72fa0e..3b7e4ae8 100644 --- a/src/matrix/storage/common.js +++ b/src/matrix/storage/common.js @@ -1,4 +1,11 @@ -export const STORE_NAMES = Object.freeze(["session", "roomState", "roomSummary", "timelineEvents", "timelineFragments"]); +export const STORE_NAMES = Object.freeze([ + "session", + "roomState", + "roomSummary", + "timelineEvents", + "timelineFragments", + "pendingEvents", +]); export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => { nameMap[name] = name; diff --git a/src/matrix/storage/idb/create.js b/src/matrix/storage/idb/create.js index 42b41ae0..5c476679 100644 --- a/src/matrix/storage/idb/create.js +++ b/src/matrix/storage/idb/create.js @@ -20,6 +20,7 @@ function createStores(db) { timelineEvents.createIndex("byEventId", "eventIdKey", {unique: true}); //key = room_id | event.type | event.state_key, db.createObjectStore("roomState", {keyPath: "key"}); + db.createObjectStore("pendingEvents", {keyPath: "key"}); // const roomMembers = db.createObjectStore("roomMembers", {keyPath: [ // "event.room_id", diff --git a/src/matrix/storage/idb/query-target.js b/src/matrix/storage/idb/query-target.js index e8eb36fb..45d268b6 100644 --- a/src/matrix/storage/idb/query-target.js +++ b/src/matrix/storage/idb/query-target.js @@ -71,6 +71,16 @@ export default class QueryTarget { return this._find(range, predicate, "prev"); } + async findMaxKey(range) { + const cursor = this._target.openKeyCursor(range, "prev"); + let maxKey; + await iterateCursor(cursor, (_, key) => { + maxKey = key; + return {done: true}; + }); + return maxKey; + } + /** * Checks if a given set of keys exist. * Calls `callback(key, found)` for each key in `keys`, in key sorting order (or reversed if backwards=true). diff --git a/src/matrix/storage/idb/stores/PendingEventStore.js b/src/matrix/storage/idb/stores/PendingEventStore.js new file mode 100644 index 00000000..82eeb571 --- /dev/null +++ b/src/matrix/storage/idb/stores/PendingEventStore.js @@ -0,0 +1,44 @@ +import { encodeUint32, decodeUint32 } from "../utils.js"; +import Platform from "../../../../Platform.js"; + +function encodeKey(roomId, queueIndex) { + return `${roomId}|${encodeUint32(queueIndex)}`; +} + +function decodeKey(key) { + const [roomId, encodedQueueIndex] = key.split("|"); + const queueIndex = decodeUint32(encodedQueueIndex); + return {roomId, queueIndex}; +} + +export default class PendingEventStore { + constructor(eventStore) { + this._eventStore = eventStore; + } + + async getMaxQueueIndex(roomId) { + const range = IDBKeyRange.bound( + encodeKey(roomId, Platform.minStorageKey), + encodeKey(roomId, Platform.maxStorageKey), + false, + false, + ); + const maxKey = await this._eventStore.findMaxKey(range); + if (maxKey) { + return decodeKey(maxKey).queueIndex; + } + } + + add(pendingEvent) { + pendingEvent.key = encodeKey(pendingEvent.roomId, pendingEvent.queueIndex); + return this._eventStore.add(pendingEvent); + } + + update(pendingEvent) { + return this._eventStore.put(pendingEvent); + } + + getAllEvents() { + return this._eventStore.selectAll(); + } +} diff --git a/src/matrix/storage/idb/stores/TimelineEventStore.js b/src/matrix/storage/idb/stores/TimelineEventStore.js index 5285cd6b..f54fc758 100644 --- a/src/matrix/storage/idb/stores/TimelineEventStore.js +++ b/src/matrix/storage/idb/stores/TimelineEventStore.js @@ -1,13 +1,8 @@ import EventKey from "../../../room/timeline/EventKey.js"; import { StorageError } from "../../common.js"; +import { encodeUint32 } from "../utils.js"; import Platform from "../../../../Platform.js"; -// storage keys are defined to be unsigned 32bit numbers in WebPlatform.js, which is assumed by idb -function encodeUint32(n) { - const hex = n.toString(16); - return "0".repeat(8 - hex.length) + hex; -} - function encodeKey(roomId, fragmentId, eventIndex) { return `${roomId}|${encodeUint32(fragmentId)}|${encodeUint32(eventIndex)}`; } diff --git a/src/matrix/storage/idb/stores/TimelineFragmentStore.js b/src/matrix/storage/idb/stores/TimelineFragmentStore.js index b84759bd..064daed7 100644 --- a/src/matrix/storage/idb/stores/TimelineFragmentStore.js +++ b/src/matrix/storage/idb/stores/TimelineFragmentStore.js @@ -1,10 +1,9 @@ import { StorageError } from "../../common.js"; import Platform from "../../../../Platform.js"; +import { encodeUint32 } from "../utils.js"; function encodeKey(roomId, fragmentId) { - let fragmentIdHex = fragmentId.toString(16); - fragmentIdHex = "0".repeat(8 - fragmentIdHex.length) + fragmentIdHex; - return `${roomId}|${fragmentIdHex}`; + return `${roomId}|${encodeUint32(fragmentId)}`; } export default class RoomFragmentStore { diff --git a/src/matrix/storage/idb/utils.js b/src/matrix/storage/idb/utils.js index 367d5e7d..e51c5dab 100644 --- a/src/matrix/storage/idb/utils.js +++ b/src/matrix/storage/idb/utils.js @@ -1,5 +1,16 @@ import { StorageError } from "../common.js"; + +// storage keys are defined to be unsigned 32bit numbers in WebPlatform.js, which is assumed by idb +export function encodeUint32(n) { + const hex = n.toString(16); + return "0".repeat(8 - hex.length) + hex; +} + +export function decodeUint32(str) { + return parseInt(str, 16); +} + export function openDatabase(name, createObjectStore, version) { const req = window.indexedDB.open(name, version); req.onupgradeneeded = (ev) => { diff --git a/src/observable/list/ObservableArray.js b/src/observable/list/ObservableArray.js index bb5c7758..47b0e24a 100644 --- a/src/observable/list/ObservableArray.js +++ b/src/observable/list/ObservableArray.js @@ -1,9 +1,9 @@ import BaseObservableList from "./BaseObservableList.js"; export default class ObservableArray extends BaseObservableList { - constructor() { + constructor(initialValues = []) { super(); - this._items = []; + this._items = initialValues; } append(item) { From ccb722d7660217440e4a68f3818721f13c5a14fc Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 26 Jul 2019 22:03:57 +0200 Subject: [PATCH 03/24] more WIP --- src/Platform.js | 8 +- src/matrix/SendScheduler.js | 121 ++++++++ src/matrix/hs-api.js | 9 + src/matrix/room/sending/PendingEvent.js | 17 +- src/matrix/room/sending/SendQueue.js | 288 +++++++----------- src/matrix/room/timeline/entries/BaseEntry.js | 5 + .../timeline/entries/PendingEventEntry.js | 34 +++ src/matrix/storage/common.js | 3 + src/matrix/storage/idb/store.js | 12 + .../storage/idb/stores/PendingEventStore.js | 11 + src/observable/list/SortedArray.js | 10 +- 11 files changed, 324 insertions(+), 194 deletions(-) create mode 100644 src/matrix/SendScheduler.js create mode 100644 src/matrix/room/timeline/entries/PendingEventEntry.js diff --git a/src/Platform.js b/src/Platform.js index c3ce381f..b68d821d 100644 --- a/src/Platform.js +++ b/src/Platform.js @@ -1,5 +1,5 @@ -// #ifdef PLATFORM_GNOME -// export {default} from "./ui/gnome/GnomePlatform.js"; -// #else +//#ifdef PLATFORM_GNOME +//##export {default} from "./ui/gnome/GnomePlatform.js"; +//#else export {default} from "./ui/web/WebPlatform.js"; -// #endif +//#endif diff --git a/src/matrix/SendScheduler.js b/src/matrix/SendScheduler.js new file mode 100644 index 00000000..9fbab290 --- /dev/null +++ b/src/matrix/SendScheduler.js @@ -0,0 +1,121 @@ +import Platform from "../../../Platform.js"; +import {HomeServerError, NetworkError} from "./error.js"; + +export class RateLimitingBackoff { + constructor() { + this._remainingRateLimitedRequest = 0; + } + + async waitAfterLimitExceeded(retryAfterMs) { + // this._remainingRateLimitedRequest = 5; + // if (typeof retryAfterMs !== "number") { + // } else { + // } + if (!retryAfterMs) { + retryAfterMs = 5000; + } + await Platform.delay(retryAfterMs); + } + + // do we have to know about succeeding requests? + // we can just + + async waitForNextSend() { + // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); + Platform.delay(1000); + } +} + +/* +this represents a slot to do one rate limited api call. +because rate-limiting is handled here, it should only +try to do one call, so the SendScheduler can safely +retry if the call ends up being rate limited. +This is also why we have this abstraction it hsApi is not +passed straight to SendQueue when it is its turn to send. +e.g. we wouldn't want to repeat the callback in SendQueue that could +have other side-effects before the call to hsApi that we wouldn't want +repeated (setting up progress handlers for file uploads, +... a UI update to say it started sending? + ... updating storage would probably only happen once the call succeeded + ... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried) +) maybe it is a bit overengineering, but lets stick with it for now. +At least the above is a clear definition why we have this class +*/ +//class SendSlot -- obsolete + +export class SendScheduler { + constructor({hsApi, backoff}) { + this._hsApi = hsApi; + this._sendRequests = []; + this._sendScheduled = false; + this._offline = false; + this._waitTime = 0; + this._backoff = backoff; + /* + we should have some sort of flag here that we enable + after all the rooms have been notified that they can resume + sending, so that from session, we can say scheduler.enable(); + this way, when we have better scheduling, it won't be first come, + first serve, when there are a lot of events in different rooms to send, + but we can apply some priorization of who should go first + */ + // this._enabled; + } + + // this should really be per roomId to avoid head-of-line blocking + // + // takes a callback instead of returning a promise with the slot + // to make sure the scheduler doesn't get blocked by a slot that is not consumed + request(sendCallback) { + let request; + const promise = new Promise((resolve, reject) => request = {resolve, reject, sendCallback}); + this._sendRequests.push(request); + if (!this._sendScheduled && !this._offline) { + this._sendLoop(); + } + return promise; + } + + async _sendLoop() { + while (this._sendRequests.length) { + const request = this._sendRequests.unshift(); + let result; + try { + // this can throw! + result = await this._doSend(request.sendCallback); + } catch (err) { + if (err instanceof NetworkError) { + // we're offline, everybody will have + // to re-request slots when we come back online + this._offline = true; + for (const r of this._sendRequests) { + r.reject(err); + } + this._sendRequests = []; + } + request.reject(err); + break; + } + request.resolve(result); + } + // do next here instead of in _doSend + } + + async _doSend(sendCallback) { + this._sendScheduled = false; + await this._backoff.waitForNextSend(); + // loop is left by return or throw + while (true) { // eslint-disable-line no-constant-condition + try { + return await sendCallback(this._hsApi); + } catch (err) { + if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { + await this._backoff.waitAfterLimitExceeded(err.retry_after_ms); + } else { + throw err; + } + } + } + } +} diff --git a/src/matrix/hs-api.js b/src/matrix/hs-api.js index 2bc1ff72..0dd1e225 100644 --- a/src/matrix/hs-api.js +++ b/src/matrix/hs-api.js @@ -32,6 +32,7 @@ class RequestWrapper { } } +// todo: everywhere here, encode params in the url that could have slashes ... mainly event ids? export default class HomeServerApi { constructor(homeserver, accessToken) { // store these both in a closure somehow so it's harder to get at in case of XSS? @@ -98,6 +99,10 @@ export default class HomeServerApi { return this._request("POST", csPath, queryParams, body); } + _put(csPath, queryParams, body) { + return this._request("PUT", csPath, queryParams, body); + } + _get(csPath, queryParams, body) { return this._request("GET", csPath, queryParams, body); } @@ -111,6 +116,10 @@ export default class HomeServerApi { return this._get(`/rooms/${roomId}/messages`, params); } + send(roomId, eventType, txnId, content) { + return this._put(`/rooms/${roomId}/send/${eventType}/${txnId}`, {}, content); + } + passwordLogin(username, password) { return this._post("/login", undefined, { "type": "m.login.password", diff --git a/src/matrix/room/sending/PendingEvent.js b/src/matrix/room/sending/PendingEvent.js index 11e7987f..105da49b 100644 --- a/src/matrix/room/sending/PendingEvent.js +++ b/src/matrix/room/sending/PendingEvent.js @@ -1,11 +1,14 @@ export default class PendingEvent { - constructor(roomId, queueIndex, eventType, content, txnId) { - this._roomId = roomId; - this._eventType = eventType; - this._content = content; - this._txnId = txnId; - this._queueIndex = queueIndex; + constructor(data) { + this._data = data; } - + get roomId() { return this._data.roomId; } + get queueIndex() { return this._data.queueIndex; } + get eventType() { return this._data.eventType; } + get txnId() { return this._data.txnId; } + get remoteId() { return this._data.remoteId; } + set remoteId(value) { this._data.remoteId = value; } + get content() { return this._data.content; } + get data() { return this._data; } } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 2690bdd2..300e271e 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -1,153 +1,7 @@ -import Platform from "../../../Platform.js"; - -class RateLimitingBackoff { - constructor() { - this._remainingRateLimitedRequest = 0; - } - - async waitAfterLimitExceeded(retryAfterMs) { - // this._remainingRateLimitedRequest = 5; - // if (typeof retryAfterMs !== "number") { - // } else { - // } - if (!retryAfterMs) { - retryAfterMs = 5000; - } - await Platform.delay(retryAfterMs); - } - - // do we have to know about succeeding requests? - // we can just - - async waitForNextSend() { - // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); - Platform.delay(1000); - } -} - -class SendScheduler { - constructor({hsApi, backoff}) { - this._hsApi = hsApi; - this._slotRequests = []; - this._sendScheduled = false; - this._offline = false; - this._waitTime = 0; - this._backoff = backoff; - } - - // this should really be per roomId to avoid head-of-line blocking - // - // takes a callback instead of returning a promise with the slot - // to make sure the scheduler doesn't get blocked by a slot that is not consumed - runSlot(slotCallback) { - let request; - const promise = new Promise((resolve, reject) => request = {resolve, reject, slotCallback}); - this._slotRequests.push(request); - if (!this._sendScheduled && !this._offline) { - this._sendLoop(); - } - return promise; - } - - async _sendLoop() { - while (this._slotRequests.length) { - const request = this._slotRequests.unshift(); - this._currentSlot = new SendSlot(this); - // this can throw! - let result; - try { - result = await request.slotCallback(this._currentSlot); - } catch (err) { - if (err instanceof NetworkError) { - // we're offline, everybody will have - // to re-request slots when we come back online - this._offline = true; - for (const r of this._slotRequests) { - r.reject(err); - } - this._slotRequests = []; - } - request.reject(err); - break; - } - request.resolve(result); - } - // do next here instead of in _doSend - } - - async _doSend(slot, sendCallback) { - this._sendScheduled = false; - if (slot !== this._currentSlot) { - throw new Error("Slot is not active"); - } - await this._backoff.waitForNextSend(); - // loop is left by return or throw - while (true) { // eslint-disable-line no-constant-condition - try { - return await sendCallback(this._hsApi); - } catch (err) { - if (err instanceof HomeServerError && err.errcode === "M_LIMIT_EXCEEDED") { - await this._backoff.waitAfterLimitExceeded(err.retry_after_ms); - } else { - throw err; - } - } - } - } -} - -/* -this represents a slot to do one rate limited api call. -because rate-limiting is handled here, it should only -try to do one call, so the SendScheduler can safely -retry if the call ends up being rate limited. -This is also why we have this abstraction it hsApi is not -passed straight to SendQueue when it is its turn to send. -e.g. we wouldn't want to repeat the callback in SendQueue that could -have other side-effects before the call to hsApi that we wouldn't want -repeated (setting up progress handlers for file uploads, -... a UI update to say it started sending? - ... updating storage would probably only happen once the call succeeded - ... doing multiple hsApi calls for e.g. a file upload before sending a image message (they should individually be retried) -) maybe it is a bit overengineering, but lets stick with it for now. -At least the above is a clear definition why we have this class -*/ -class SendSlot { - constructor(scheduler) { - this._scheduler = scheduler; - } - - sendContentEvent(pendingEvent) { - return this._scheduler._doSend(this, async hsApi => { - const request = hsApi.send( - pendingEvent.roomId, - pendingEvent.eventType, - pendingEvent.txnId, - pendingEvent.content - ); - const response = await request.response(); - return response.event_id; - }); - } - - sendRedaction(pendingEvent) { - return this._scheduler._doSend(this, async hsApi => { - const request = hsApi.redact( - pendingEvent.roomId, - pendingEvent.redacts, - pendingEvent.txnId, - pendingEvent.reason - ); - const response = await request.response(); - return response.event_id; - }); - } - - // progressCallback should report the amount of bytes sent - uploadMedia(fileName, contentType, blob, progressCallback) { - - } -} +import SortedArray from "../../../observable/list/SortedArray.js"; +import {NetworkError} from "../../error.js"; +import {StorageError} from "../../storage/common.js"; +import PendingEvent from "./PendingEvent.js"; function makeTxnId() { const n = Math.floor(Math.random() * Number.MAX_SAFE_INTEGER); @@ -160,46 +14,118 @@ export default class SendQueue { this._roomId = roomId; this._storage = storage; this._scheduler = scheduler; - this._pendingEvents = pendingEvents.map(d => PendingEvent.fromData(d)); + this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); + this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data))); + this._isSending = false; + this._offline = false; + this._amountSent = 0; } async _sendLoop() { - let pendingEvent = null; - // eslint-disable-next-line no-cond-assign - while (pendingEvent = await this._nextPendingEvent()) { - // const mxcUrl = await this._scheduler.runSlot(slot => { - // return slot.uploadMedia(fileName, contentType, blob, bytesSent => { - // pendingEvent.updateAttachmentUploadProgress(bytesSent); - // }); - // }); - - // pendingEvent.setAttachmentUrl(mxcUrl); - //update storage for pendingEvent after updating url, - //remove blob only later to keep preview? - - await this._scheduler.runSlot(slot => { - if (pendingEvent.isRedaction) { - return slot.sendRedaction(pendingEvent); - } else if (pendingEvent.isContentEvent) { - return slot.sendContentEvent(pendingEvent); + this._isSending = true; + try { + while (this._amountSent < this._pendingEvents.length) { + const pendingEvent = this._pendingEvents.get(this._amountSent); + this._amountSent += 1; + if (pendingEvent.remoteId) { + continue; } - }); + const response = await this._scheduler.request(hsApi => { + return hsApi.send( + pendingEvent.roomId, + pendingEvent.eventType, + pendingEvent.txnId, + pendingEvent.content + ); + }); + pendingEvent.remoteId = response.event_id; + await this._tryUpdateEvent(pendingEvent); + } + } catch(err) { + if (err instanceof NetworkError) { + this._offline = true; + } + } finally { + this._isSending = false; + } + } + + + async receiveRemoteEcho(txnId) { + const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); + if (idx !== 0) { + const pendingEvent = this._pendingEvents.get(idx); + this._amountSent -= 1; + this._pendingEvents.remove(idx); + await this._removeEvent(pendingEvent); + } + } + + resumeSending() { + this._offline = false; + if (!this._isSending) { + this._sendLoop(); } } async enqueueEvent(eventType, content) { - // temporary + const pendingEvent = await this._createAndStoreEvent(eventType, content); + this._pendingEvents.set(pendingEvent); + if (!this._isSending && !this._offline) { + this._sendLoop(); + } + } + + get pendingEvents() { + return this._pendingEvents; + } + + async _tryUpdateEvent(pendingEvent) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); - const pendingEventsStore = txn.pendingEvents; - const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; - const queueIndex = maxQueueIndex + 1; - const pendingEvent = new PendingEvent(this._roomId, queueIndex, eventType, content, makeTxnId()); - pendingEventsStore.add(pendingEvent.data); + try { + // pendingEvent might have been removed already here + // by a racing remote echo, so check first so we don't recreate it + if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) { + txn.pendingEvents.update(pendingEvent.data); + } + } catch (err) { + txn.abort(); + throw err; + } await txn.complete(); - // create txnId - // create queueOrder - // store event - // if online and not running send loop - // start sending loop + } + + async _removeEvent(pendingEvent) { + const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + try { + txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + } + + async _createAndStoreEvent(eventType, content) { + const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + let pendingEvent; + try { + const pendingEventsStore = txn.pendingEvents; + const maxQueueIndex = await pendingEventsStore.getMaxQueueIndex(this._roomId) || 0; + const queueIndex = maxQueueIndex + 1; + pendingEvent = new PendingEvent({ + roomId: this._roomId, + queueIndex, + eventType, + content, + txnId: makeTxnId() + }); + pendingEventsStore.add(pendingEvent.data); + } catch (err) { + txn.abort(); + throw err; + } + await txn.complete(); + return pendingEvent; } } diff --git a/src/matrix/room/timeline/entries/BaseEntry.js b/src/matrix/room/timeline/entries/BaseEntry.js index 3ef00862..3c3d31b2 100644 --- a/src/matrix/room/timeline/entries/BaseEntry.js +++ b/src/matrix/room/timeline/entries/BaseEntry.js @@ -1,5 +1,6 @@ //entries can be sorted, first by fragment, then by entry index. import EventKey from "../EventKey.js"; +import { PENDING_FRAGMENT_ID } from "./PendingEventEntry.js"; export default class BaseEntry { constructor(fragmentIdComparer) { @@ -17,6 +18,10 @@ export default class BaseEntry { compare(otherEntry) { if (this.fragmentId === otherEntry.fragmentId) { return this.entryIndex - otherEntry.entryIndex; + } else if (this.fragmentId === PENDING_FRAGMENT_ID) { + return 1; + } else if (otherEntry.fragmentId === PENDING_FRAGMENT_ID) { + return -1; } else { // This might throw if the relation of two fragments is unknown. return this._fragmentIdComparer.compare(this.fragmentId, otherEntry.fragmentId); diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js new file mode 100644 index 00000000..03b79cf4 --- /dev/null +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -0,0 +1,34 @@ +import BaseEntry from "./BaseEntry.js"; + +export const PENDING_FRAGMENT_ID = Number.MAX_SAFE_INTEGER; + +export default class PendingEventEntry extends BaseEntry { + constructor(pendingEvent) { + super(null); + this._pendingEvent = pendingEvent; + } + + get fragmentId() { + return PENDING_FRAGMENT_ID; + } + + get entryIndex() { + return this._pendingEvent.queueIndex; + } + + get content() { + return this._pendingEvent.content; + } + + get event() { + return null; + } + + get type() { + return this._pendingEvent.eventType; + } + + get id() { + return this._pendingEvent.txnId; + } +} diff --git a/src/matrix/storage/common.js b/src/matrix/storage/common.js index 3b7e4ae8..4a6c0a21 100644 --- a/src/matrix/storage/common.js +++ b/src/matrix/storage/common.js @@ -23,5 +23,8 @@ export class StorageError extends Error { fullMessage += cause.message; } super(fullMessage); + if (cause) { + this.errcode = cause.name; + } } } diff --git a/src/matrix/storage/idb/store.js b/src/matrix/storage/idb/store.js index eea33cc1..ae2008bb 100644 --- a/src/matrix/storage/idb/store.js +++ b/src/matrix/storage/idb/store.js @@ -47,6 +47,14 @@ class QueryTargetWrapper { } } + delete(...params) { + try { + return this._qt.delete(...params); + } catch(err) { + throw new StorageError("delete failed", err); + } + } + index(...params) { try { return this._qt.index(...params); @@ -76,4 +84,8 @@ export default class Store extends QueryTarget { add(value) { return reqAsPromise(this._idbStore.add(value)); } + + delete(keyOrKeyRange) { + return reqAsPromise(this._idbStore.delete(keyOrKeyRange)); + } } diff --git a/src/matrix/storage/idb/stores/PendingEventStore.js b/src/matrix/storage/idb/stores/PendingEventStore.js index 82eeb571..fa54848c 100644 --- a/src/matrix/storage/idb/stores/PendingEventStore.js +++ b/src/matrix/storage/idb/stores/PendingEventStore.js @@ -29,6 +29,17 @@ export default class PendingEventStore { } } + remove(roomId, queueIndex) { + const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex)); + this._eventStore.delete(keyRange); + } + + async exists(roomId, queueIndex) { + const keyRange = IDBKeyRange.only(encodeKey(roomId, queueIndex)); + const key = await this._eventStore.getKey(keyRange); + return !!key; + } + add(pendingEvent) { pendingEvent.key = encodeKey(pendingEvent.roomId, pendingEvent.queueIndex); return this._eventStore.add(pendingEvent); diff --git a/src/observable/list/SortedArray.js b/src/observable/list/SortedArray.js index 8c90ce15..d38f94fd 100644 --- a/src/observable/list/SortedArray.js +++ b/src/observable/list/SortedArray.js @@ -32,8 +32,14 @@ export default class SortedArray extends BaseObservableList { } } - remove(item) { - throw new Error("unimplemented"); + get(idx) { + return this._items[idx]; + } + + remove(idx) { + const item = this._items[idx]; + this._items.splice(idx, 1); + this.emitRemove(idx, item); } get array() { From 707988f806c06e95cbc6d9282b19463da10bfa90 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 26 Jul 2019 22:33:33 +0200 Subject: [PATCH 04/24] better handle remote echos and hookup in session and room --- src/matrix/room/room.js | 20 ++++++-- src/matrix/room/sending/SendQueue.js | 48 ++++++++++--------- src/matrix/session.js | 23 ++++++++- .../storage/idb/stores/PendingEventStore.js | 2 +- src/matrix/sync.js | 1 + 5 files changed, 66 insertions(+), 28 deletions(-) diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index 7c17b3b1..42d3eef1 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -3,9 +3,10 @@ import RoomSummary from "./summary.js"; import SyncWriter from "./timeline/persistence/SyncWriter.js"; import Timeline from "./timeline/Timeline.js"; import FragmentIdComparer from "./timeline/FragmentIdComparer.js"; +import SendQueue from "./sending/SendQueue.js"; export default class Room extends EventEmitter { - constructor({roomId, storage, hsApi, emitCollectionChange}) { + constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents}) { super(); this._roomId = roomId; this._storage = storage; @@ -14,16 +15,21 @@ export default class Room extends EventEmitter { this._fragmentIdComparer = new FragmentIdComparer([]); this._syncWriter = new SyncWriter({roomId, storage, fragmentIdComparer: this._fragmentIdComparer}); this._emitCollectionChange = emitCollectionChange; + this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents}); this._timeline = null; } async persistSync(roomResponse, membership, txn) { const summaryChanged = this._summary.applySync(roomResponse, membership, txn); const newTimelineEntries = await this._syncWriter.writeSync(roomResponse, txn); - return {summaryChanged, newTimelineEntries}; + let removedPendingEvents; + if (roomResponse.timeline && roomResponse.timeline.events) { + removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); + } + return {summaryChanged, newTimelineEntries, removedPendingEvents}; } - emitSync({summaryChanged, newTimelineEntries}) { + emitSync({summaryChanged, newTimelineEntries, removedPendingEvents}) { if (summaryChanged) { this.emit("change"); this._emitCollectionChange(this); @@ -31,6 +37,9 @@ export default class Room extends EventEmitter { if (this._timeline) { this._timeline.appendLiveEntries(newTimelineEntries); } + if (removedPendingEvents) { + this._sendQueue.emitRemovals(removedPendingEvents); + } } load(summary, txn) { @@ -38,6 +47,10 @@ export default class Room extends EventEmitter { return this._syncWriter.load(txn); } + sendEvent(eventType, content) { + this._sendQueue.enqueueEvent(eventType, content); + } + get name() { return this._summary.name; } @@ -55,6 +68,7 @@ export default class Room extends EventEmitter { storage: this._storage, hsApi: this._hsApi, fragmentIdComparer: this._fragmentIdComparer, + pendingEvents: this._sendQueue.pendingEvents, closeCallback: () => this._timeline = null, }); await this._timeline.load(); diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index 300e271e..d1666c02 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -1,6 +1,5 @@ import SortedArray from "../../../observable/list/SortedArray.js"; import {NetworkError} from "../../error.js"; -import {StorageError} from "../../storage/common.js"; import PendingEvent from "./PendingEvent.js"; function makeTxnId() { @@ -10,10 +9,11 @@ function makeTxnId() { } export default class SendQueue { - constructor({roomId, storage, scheduler, pendingEvents}) { + constructor({roomId, storage, sendScheduler, pendingEvents}) { + pendingEvents = pendingEvents || []; this._roomId = roomId; this._storage = storage; - this._scheduler = scheduler; + this._sendScheduler = sendScheduler; this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data))); this._isSending = false; @@ -30,7 +30,7 @@ export default class SendQueue { if (pendingEvent.remoteId) { continue; } - const response = await this._scheduler.request(hsApi => { + const response = await this._sendScheduler.request(hsApi => { return hsApi.send( pendingEvent.roomId, pendingEvent.eventType, @@ -50,14 +50,29 @@ export default class SendQueue { } } + removeRemoteEchos(events, txn) { + const removed = []; + for (const event of events) { + const txnId = event.unsigned && event.unsigned.transaction_id; + if (txnId) { + const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); + if (idx !== -1) { + const pendingEvent = this._pendingEvents.get(idx); + txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); + removed.push(pendingEvent); + } + } + } + return removed; + } - async receiveRemoteEcho(txnId) { - const idx = this._pendingEvents.array.findIndex(pe => pe.txnId === txnId); - if (idx !== 0) { - const pendingEvent = this._pendingEvents.get(idx); - this._amountSent -= 1; - this._pendingEvents.remove(idx); - await this._removeEvent(pendingEvent); + emitRemovals(pendingEvents) { + for (const pendingEvent of pendingEvents) { + const idx = this._pendingEvents.array.indexOf(pendingEvent); + if (idx !== -1) { + this._amountSent -= 1; + this._pendingEvents.remove(idx); + } } } @@ -95,17 +110,6 @@ export default class SendQueue { await txn.complete(); } - async _removeEvent(pendingEvent) { - const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); - try { - txn.pendingEvents.remove(pendingEvent.roomId, pendingEvent.queueIndex); - } catch (err) { - txn.abort(); - throw err; - } - await txn.complete(); - } - async _createAndStoreEvent(eventType, content) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); let pendingEvent; diff --git a/src/matrix/session.js b/src/matrix/session.js index a6b7b354..15b45181 100644 --- a/src/matrix/session.js +++ b/src/matrix/session.js @@ -1,5 +1,6 @@ import Room from "./room/room.js"; import { ObservableMap } from "../observable/index.js"; +import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js"; export default class Session { // sessionInfo contains deviceId, userId and homeServer @@ -9,6 +10,7 @@ export default class Session { this._session = null; this._sessionInfo = sessionInfo; this._rooms = new ObservableMap(); + this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()}); this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params); } @@ -19,6 +21,7 @@ export default class Session { this._storage.storeNames.roomState, this._storage.storeNames.timelineEvents, this._storage.storeNames.timelineFragments, + this._storage.storeNames.pendingEvents, ]); // restore session object this._session = await txn.session.get(); @@ -26,24 +29,40 @@ export default class Session { this._session = {}; return; } + const pendingEventsByRoomId = await this._getPendingEventsByRoom(txn); // load rooms const rooms = await txn.roomSummary.getAll(); await Promise.all(rooms.map(summary => { - const room = this.createRoom(summary.roomId); + const room = this.createRoom(summary.roomId, pendingEventsByRoomId[summary.roomId]); return room.load(summary, txn); })); } + async _getPendingEventsByRoom(txn) { + const pendingEvents = await txn.pendingEvents.getAll(); + return pendingEvents.reduce((groups, pe) => { + const group = groups.get(pe.roomId); + if (group) { + group.push(pe); + } else { + groups.set(pe.roomId, [pe]); + } + return groups; + }, new Map()); + } + get rooms() { return this._rooms; } - createRoom(roomId) { + createRoom(roomId, pendingEvents) { const room = new Room({ roomId, storage: this._storage, emitCollectionChange: this._roomUpdateCallback, hsApi: this._hsApi, + sendScheduler: this._sendScheduler, + pendingEvents, }); this._rooms.add(roomId, room); return room; diff --git a/src/matrix/storage/idb/stores/PendingEventStore.js b/src/matrix/storage/idb/stores/PendingEventStore.js index fa54848c..d73b649e 100644 --- a/src/matrix/storage/idb/stores/PendingEventStore.js +++ b/src/matrix/storage/idb/stores/PendingEventStore.js @@ -49,7 +49,7 @@ export default class PendingEventStore { return this._eventStore.put(pendingEvent); } - getAllEvents() { + getAll() { return this._eventStore.selectAll(); } } diff --git a/src/matrix/sync.js b/src/matrix/sync.js index fd0d2bf6..270d72e3 100644 --- a/src/matrix/sync.js +++ b/src/matrix/sync.js @@ -77,6 +77,7 @@ export default class Sync extends EventEmitter { storeNames.roomState, storeNames.timelineEvents, storeNames.timelineFragments, + storeNames.pendingEvents, ]); const roomChanges = []; try { From 851100b88a6bff17736c31e3c226921057f8f56d Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Fri, 26 Jul 2019 22:40:39 +0200 Subject: [PATCH 05/24] send unsent messages after first sync --- src/main.js | 1 + src/matrix/room/room.js | 4 ++++ src/matrix/session.js | 6 ++++++ 3 files changed, 11 insertions(+) diff --git a/src/main.js b/src/main.js index 82a67efc..0b9ef4b8 100644 --- a/src/main.js +++ b/src/main.js @@ -76,6 +76,7 @@ export default async function main(container) { if (needsInitialSync) { showSession(container, session, sync); } + session.notifyNetworkAvailable(); } catch(err) { console.error(`${err.message}:\n${err.stack}`); } diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index 42d3eef1..cb47eb27 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -42,6 +42,10 @@ export default class Room extends EventEmitter { } } + resumeSending() { + this._sendQueue.resumeSending(); + } + load(summary, txn) { this._summary.load(summary); return this._syncWriter.load(txn); diff --git a/src/matrix/session.js b/src/matrix/session.js index 15b45181..251debb9 100644 --- a/src/matrix/session.js +++ b/src/matrix/session.js @@ -38,6 +38,12 @@ export default class Session { })); } + notifyNetworkAvailable() { + for (const room of this._rooms) { + room.resumeSending(); + } + } + async _getPendingEventsByRoom(txn) { const pendingEvents = await txn.pendingEvents.getAll(); return pendingEvents.reduce((groups, pe) => { From 3ed72df6205771f91e42ed94f6d8bce9a821daa0 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Sat, 27 Jul 2019 10:40:56 +0200 Subject: [PATCH 06/24] put everything together to make it roughly work no local echo yet, and send errors are being swallowed --- doc/SENDING.md | 8 +++++++ src/domain/session/room/RoomViewModel.js | 4 ++++ src/main.js | 3 ++- src/matrix/SendScheduler.js | 5 ++-- src/matrix/room/sending/SendQueue.js | 16 +++++++++++++ src/matrix/room/timeline/entries/BaseEntry.js | 2 +- .../timeline/entries/PendingEventEntry.js | 4 +--- src/matrix/session.js | 4 ++-- src/matrix/storage/idb/query-target.js | 4 ++++ src/matrix/storage/idb/store.js | 8 +++++++ src/matrix/storage/idb/transaction.js | 5 ++++ src/ui/web/css/room.css | 8 +++++++ src/ui/web/general/html.js | 2 +- src/ui/web/session/room/MessageComposer.js | 23 +++++++++++++++++++ src/ui/web/session/room/RoomView.js | 6 ++++- 15 files changed, 91 insertions(+), 11 deletions(-) create mode 100644 src/ui/web/session/room/MessageComposer.js diff --git a/doc/SENDING.md b/doc/SENDING.md index b4e6382c..f7e1e7f3 100644 --- a/doc/SENDING.md +++ b/doc/SENDING.md @@ -1,3 +1,11 @@ +# Remaining stuffs + - don't swallow send errors, they should probably appear in the room error? + - not sure it makes sense to show them where the composer is, + because they might get sent a long time after you enter them in brawl, + so you don't neccessarily have the context of the composer anymore + - local echo + + takes care of rate limiting, and sending events from different rooms in parallel, NO: txnIds are created inside room. ~~making txnIds? ... it's rooms though that will receive the event in their sync response~~ diff --git a/src/domain/session/room/RoomViewModel.js b/src/domain/session/room/RoomViewModel.js index 3f3e8d77..a628349e 100644 --- a/src/domain/session/room/RoomViewModel.js +++ b/src/domain/session/room/RoomViewModel.js @@ -63,4 +63,8 @@ export default class RoomViewModel extends EventEmitter { get avatarInitials() { return avatarInitials(this._room.name); } + + sendMessage(message) { + this._room.sendEvent("m.room.message", {msgtype: "m.text", body: message}); + } } diff --git a/src/main.js b/src/main.js index 0b9ef4b8..9f957513 100644 --- a/src/main.js +++ b/src/main.js @@ -5,7 +5,7 @@ import Sync from "./matrix/sync.js"; import SessionView from "./ui/web/session/SessionView.js"; import SessionViewModel from "./domain/session/SessionViewModel.js"; -const HOST = "192.168.2.108"; +const HOST = "127.0.0.1"; const HOMESERVER = `http://${HOST}:8008`; const USERNAME = "bruno1"; const USER_ID = `@${USERNAME}:localhost`; @@ -76,6 +76,7 @@ export default async function main(container) { if (needsInitialSync) { showSession(container, session, sync); } + // this will start sending unsent messages session.notifyNetworkAvailable(); } catch(err) { console.error(`${err.message}:\n${err.stack}`); diff --git a/src/matrix/SendScheduler.js b/src/matrix/SendScheduler.js index 9fbab290..b7407238 100644 --- a/src/matrix/SendScheduler.js +++ b/src/matrix/SendScheduler.js @@ -1,4 +1,4 @@ -import Platform from "../../../Platform.js"; +import Platform from "../Platform.js"; import {HomeServerError, NetworkError} from "./error.js"; export class RateLimitingBackoff { @@ -79,7 +79,7 @@ export class SendScheduler { async _sendLoop() { while (this._sendRequests.length) { - const request = this._sendRequests.unshift(); + const request = this._sendRequests.shift(); let result; try { // this can throw! @@ -94,6 +94,7 @@ export class SendScheduler { } this._sendRequests = []; } + console.error("error for request", request); request.reject(err); break; } diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index d1666c02..b147d0f1 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -15,6 +15,9 @@ export default class SendQueue { this._storage = storage; this._sendScheduler = sendScheduler; this._pendingEvents = new SortedArray((a, b) => a.queueIndex - b.queueIndex); + if (pendingEvents.length) { + console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents); + } this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data))); this._isSending = false; this._offline = false; @@ -24,13 +27,17 @@ export default class SendQueue { async _sendLoop() { this._isSending = true; try { + console.log("start sending", this._amountSent, "<", this._pendingEvents.length); while (this._amountSent < this._pendingEvents.length) { const pendingEvent = this._pendingEvents.get(this._amountSent); + console.log("trying to send", pendingEvent.content.body); this._amountSent += 1; if (pendingEvent.remoteId) { continue; } + console.log("really sending now"); const response = await this._sendScheduler.request(hsApi => { + console.log("got sendScheduler slot"); return hsApi.send( pendingEvent.roomId, pendingEvent.eventType, @@ -39,7 +46,10 @@ export default class SendQueue { ); }); pendingEvent.remoteId = response.event_id; + // + console.log("writing remoteId now"); await this._tryUpdateEvent(pendingEvent); + console.log("keep sending?", this._amountSent, "<", this._pendingEvents.length); } } catch(err) { if (err instanceof NetworkError) { @@ -97,16 +107,22 @@ export default class SendQueue { async _tryUpdateEvent(pendingEvent) { const txn = await this._storage.readWriteTxn([this._storage.storeNames.pendingEvents]); + console.log("_tryUpdateEvent: got txn"); try { // pendingEvent might have been removed already here // by a racing remote echo, so check first so we don't recreate it + console.log("_tryUpdateEvent: before exists"); if (await txn.pendingEvents.exists(pendingEvent.roomId, pendingEvent.queueIndex)) { + console.log("_tryUpdateEvent: inside if exists"); txn.pendingEvents.update(pendingEvent.data); } + console.log("_tryUpdateEvent: after exists"); } catch (err) { txn.abort(); + console.log("_tryUpdateEvent: error", err); throw err; } + console.log("_tryUpdateEvent: try complete"); await txn.complete(); } diff --git a/src/matrix/room/timeline/entries/BaseEntry.js b/src/matrix/room/timeline/entries/BaseEntry.js index 3c3d31b2..6c55788c 100644 --- a/src/matrix/room/timeline/entries/BaseEntry.js +++ b/src/matrix/room/timeline/entries/BaseEntry.js @@ -1,6 +1,6 @@ //entries can be sorted, first by fragment, then by entry index. import EventKey from "../EventKey.js"; -import { PENDING_FRAGMENT_ID } from "./PendingEventEntry.js"; +export const PENDING_FRAGMENT_ID = Number.MAX_SAFE_INTEGER; export default class BaseEntry { constructor(fragmentIdComparer) { diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js index 03b79cf4..8b3fd656 100644 --- a/src/matrix/room/timeline/entries/PendingEventEntry.js +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -1,6 +1,4 @@ -import BaseEntry from "./BaseEntry.js"; - -export const PENDING_FRAGMENT_ID = Number.MAX_SAFE_INTEGER; +import BaseEntry, {PENDING_FRAGMENT_ID} from "./BaseEntry.js"; export default class PendingEventEntry extends BaseEntry { constructor(pendingEvent) { diff --git a/src/matrix/session.js b/src/matrix/session.js index 251debb9..d39404a8 100644 --- a/src/matrix/session.js +++ b/src/matrix/session.js @@ -33,13 +33,13 @@ export default class Session { // load rooms const rooms = await txn.roomSummary.getAll(); await Promise.all(rooms.map(summary => { - const room = this.createRoom(summary.roomId, pendingEventsByRoomId[summary.roomId]); + const room = this.createRoom(summary.roomId, pendingEventsByRoomId.get(summary.roomId)); return room.load(summary, txn); })); } notifyNetworkAvailable() { - for (const room of this._rooms) { + for (const [, room] of this._rooms) { room.resumeSending(); } } diff --git a/src/matrix/storage/idb/query-target.js b/src/matrix/storage/idb/query-target.js index 45d268b6..fa5b99f5 100644 --- a/src/matrix/storage/idb/query-target.js +++ b/src/matrix/storage/idb/query-target.js @@ -21,6 +21,10 @@ export default class QueryTarget { return reqAsPromise(this._target.get(key)); } + getKey(key) { + return reqAsPromise(this._target.getKey(key)); + } + reduce(range, reducer, initialValue) { return this._reduce(range, reducer, initialValue, "next"); } diff --git a/src/matrix/storage/idb/store.js b/src/matrix/storage/idb/store.js index ae2008bb..58709573 100644 --- a/src/matrix/storage/idb/store.js +++ b/src/matrix/storage/idb/store.js @@ -46,6 +46,14 @@ class QueryTargetWrapper { throw new StorageError("get failed", err); } } + + getKey(...params) { + try { + return this._qt.getKey(...params); + } catch(err) { + throw new StorageError("getKey failed", err); + } + } delete(...params) { try { diff --git a/src/matrix/storage/idb/transaction.js b/src/matrix/storage/idb/transaction.js index e66b6d5f..1c2c8286 100644 --- a/src/matrix/storage/idb/transaction.js +++ b/src/matrix/storage/idb/transaction.js @@ -6,6 +6,7 @@ import RoomSummaryStore from "./stores/RoomSummaryStore.js"; import TimelineEventStore from "./stores/TimelineEventStore.js"; import RoomStateStore from "./stores/RoomStateStore.js"; import TimelineFragmentStore from "./stores/TimelineFragmentStore.js"; +import PendingEventStore from "./stores/PendingEventStore.js"; export default class Transaction { constructor(txn, allowedStoreNames) { @@ -55,6 +56,10 @@ export default class Transaction { return this._store("roomState", idbStore => new RoomStateStore(idbStore)); } + get pendingEvents() { + return this._store("pendingEvents", idbStore => new PendingEventStore(idbStore)); + } + complete() { return txnAsPromise(this._txn); } diff --git a/src/ui/web/css/room.css b/src/ui/web/css/room.css index 3a9dc886..955ad177 100644 --- a/src/ui/web/css/room.css +++ b/src/ui/web/css/room.css @@ -69,3 +69,11 @@ .RoomView_error { color: red; } + +.MessageComposer > input { + display: block; + width: 100%; + box-sizing: border-box; + padding: 0.8em; + border: none; +} diff --git a/src/ui/web/general/html.js b/src/ui/web/general/html.js index e316d3c8..a5488b2b 100644 --- a/src/ui/web/general/html.js +++ b/src/ui/web/general/html.js @@ -70,7 +70,7 @@ export function text(str) { export const TAG_NAMES = [ "ol", "ul", "li", "div", "h1", "h2", "h3", "h4", "h5", "h6", "p", "strong", "em", "span", "img", "section", "main", "article", "aside", - "pre", "button", "time"]; + "pre", "button", "time", "input", "textarea"]; export const tag = {}; diff --git a/src/ui/web/session/room/MessageComposer.js b/src/ui/web/session/room/MessageComposer.js new file mode 100644 index 00000000..a4fb3715 --- /dev/null +++ b/src/ui/web/session/room/MessageComposer.js @@ -0,0 +1,23 @@ +import TemplateView from "../../general/TemplateView.js"; + +export default class MessageComposer extends TemplateView { + constructor(viewModel) { + super(viewModel); + this._input = null; + } + + render(t) { + this._input = t.input({ + placeholder: "Send a message ...", + onKeydown: e => this._onKeyDown(e) + }); + return t.div({className: "MessageComposer"}, [this._input]); + } + + _onKeyDown(event) { + if (event.key === "Enter") { + this.viewModel.sendMessage(this._input.value); + this._input.value = ""; + } + } +} diff --git a/src/ui/web/session/room/RoomView.js b/src/ui/web/session/room/RoomView.js index c9225294..f431c16c 100644 --- a/src/ui/web/session/room/RoomView.js +++ b/src/ui/web/session/room/RoomView.js @@ -1,5 +1,6 @@ import TemplateView from "../../general/TemplateView.js"; import TimelineList from "./TimelineList.js"; +import MessageComposer from "./MessageComposer.js"; export default class RoomView extends TemplateView { constructor(viewModel) { @@ -18,17 +19,20 @@ export default class RoomView extends TemplateView { ]), ]), t.div({className: "RoomView_error"}, vm => vm.error), - this._timelineList.mount() + this._timelineList.mount(), + this._composer.mount(), ]) ]); } mount() { + this._composer = new MessageComposer(this.viewModel); this._timelineList = new TimelineList(); return super.mount(); } unmount() { + this._composer.unmount(); this._timelineList.unmount(); super.unmount(); } From b26f7df689295e23b7d7b497a465f354734e7c95 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 09:54:34 +0200 Subject: [PATCH 07/24] pending events are not certain to be sorted here --- src/matrix/room/sending/SendQueue.js | 2 +- src/observable/list/SortedArray.js | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/matrix/room/sending/SendQueue.js b/src/matrix/room/sending/SendQueue.js index b147d0f1..ec7296e8 100644 --- a/src/matrix/room/sending/SendQueue.js +++ b/src/matrix/room/sending/SendQueue.js @@ -18,7 +18,7 @@ export default class SendQueue { if (pendingEvents.length) { console.info(`SendQueue for room ${roomId} has ${pendingEvents.length} pending events`, pendingEvents); } - this._pendingEvents.setManySorted(pendingEvents.map(data => new PendingEvent(data))); + this._pendingEvents.setManyUnsorted(pendingEvents.map(data => new PendingEvent(data))); this._isSending = false; this._offline = false; this._amountSent = 0; diff --git a/src/observable/list/SortedArray.js b/src/observable/list/SortedArray.js index d38f94fd..6b34afdf 100644 --- a/src/observable/list/SortedArray.js +++ b/src/observable/list/SortedArray.js @@ -8,6 +8,10 @@ export default class SortedArray extends BaseObservableList { this._items = []; } + setManyUnsorted(items) { + this.setManySorted(items); + } + setManySorted(items) { // TODO: we can make this way faster by only looking up the first and last key, // and merging whatever is inbetween with items From 56cee450d1d09e58eb970c0f9d69003adf20343d Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 10:23:15 +0200 Subject: [PATCH 08/24] return syncing user id from PendingEventEntry.sender add User class where we later can track display name, avatar, ... --- src/domain/session/SessionViewModel.js | 2 +- src/matrix/User.js | 9 +++++++++ src/matrix/room/room.js | 4 +++- src/matrix/room/timeline/entries/PendingEventEntry.js | 7 ++++++- src/matrix/session.js | 7 +++++-- 5 files changed, 24 insertions(+), 5 deletions(-) create mode 100644 src/matrix/User.js diff --git a/src/domain/session/SessionViewModel.js b/src/domain/session/SessionViewModel.js index 321c0dbf..65750280 100644 --- a/src/domain/session/SessionViewModel.js +++ b/src/domain/session/SessionViewModel.js @@ -45,7 +45,7 @@ export default class SessionViewModel extends EventEmitter { } this._currentRoomViewModel = new RoomViewModel({ room, - ownUserId: this._session.userId, + ownUserId: this._session.user.id, closeCallback: () => this._closeCurrentRoom(), }); this._currentRoomViewModel.load(); diff --git a/src/matrix/User.js b/src/matrix/User.js new file mode 100644 index 00000000..5c0aa37f --- /dev/null +++ b/src/matrix/User.js @@ -0,0 +1,9 @@ +export default class User { + constructor(userId) { + this._userId = userId; + } + + get id() { + return this._userId; + } +} diff --git a/src/matrix/room/room.js b/src/matrix/room/room.js index cb47eb27..a16816fb 100644 --- a/src/matrix/room/room.js +++ b/src/matrix/room/room.js @@ -6,7 +6,7 @@ import FragmentIdComparer from "./timeline/FragmentIdComparer.js"; import SendQueue from "./sending/SendQueue.js"; export default class Room extends EventEmitter { - constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents}) { + constructor({roomId, storage, hsApi, emitCollectionChange, sendScheduler, pendingEvents, user}) { super(); this._roomId = roomId; this._storage = storage; @@ -17,6 +17,7 @@ export default class Room extends EventEmitter { this._emitCollectionChange = emitCollectionChange; this._sendQueue = new SendQueue({roomId, storage, sendScheduler, pendingEvents}); this._timeline = null; + this._user = user; } async persistSync(roomResponse, membership, txn) { @@ -74,6 +75,7 @@ export default class Room extends EventEmitter { fragmentIdComparer: this._fragmentIdComparer, pendingEvents: this._sendQueue.pendingEvents, closeCallback: () => this._timeline = null, + user: this._user, }); await this._timeline.load(); return this._timeline; diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js index 8b3fd656..0a316a5b 100644 --- a/src/matrix/room/timeline/entries/PendingEventEntry.js +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -1,9 +1,10 @@ import BaseEntry, {PENDING_FRAGMENT_ID} from "./BaseEntry.js"; export default class PendingEventEntry extends BaseEntry { - constructor(pendingEvent) { + constructor({pendingEvent, user}) { super(null); this._pendingEvent = pendingEvent; + this._user = user; } get fragmentId() { @@ -26,6 +27,10 @@ export default class PendingEventEntry extends BaseEntry { return this._pendingEvent.eventType; } + get sender() { + return this._user.id; + } + get id() { return this._pendingEvent.txnId; } diff --git a/src/matrix/session.js b/src/matrix/session.js index d39404a8..91c0983b 100644 --- a/src/matrix/session.js +++ b/src/matrix/session.js @@ -1,6 +1,7 @@ import Room from "./room/room.js"; import { ObservableMap } from "../observable/index.js"; import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js"; +import User from "./User.js"; export default class Session { // sessionInfo contains deviceId, userId and homeServer @@ -12,6 +13,7 @@ export default class Session { this._rooms = new ObservableMap(); this._sendScheduler = new SendScheduler({hsApi, backoff: new RateLimitingBackoff()}); this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params); + this._user = new User(sessionInfo.userId); } async load() { @@ -69,6 +71,7 @@ export default class Session { hsApi: this._hsApi, sendScheduler: this._sendScheduler, pendingEvents, + user: this._user, }); this._rooms.add(roomId, room); return room; @@ -85,7 +88,7 @@ export default class Session { return this._session.syncToken; } - get userId() { - return this._sessionInfo.userId; + get user() { + return this._user; } } From 9b94c4bb613884541ce3363b496ccef56f9e7cc9 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 10:27:12 +0200 Subject: [PATCH 09/24] don't expose raw event object from entry, pending event doesn't have it it only has content and *some* of the meta fields, but we want to threat pendingevententry and evententry as one and the same in the rest of the application, so don't give access to entire event object. --- .../session/room/timeline/tiles/MessageTile.js | 7 +++---- .../room/timeline/tiles/RoomMemberTile.js | 17 ++++++++--------- .../session/room/timeline/tiles/RoomNameTile.js | 5 ++--- .../session/room/timeline/tiles/TextTile.js | 5 ++--- .../session/room/timeline/tilesCreator.js | 8 +++----- src/matrix/room/timeline/entries/EventEntry.js | 16 ++++++++++++---- .../room/timeline/entries/PendingEventEntry.js | 10 +++++++++- 7 files changed, 39 insertions(+), 29 deletions(-) diff --git a/src/domain/session/room/timeline/tiles/MessageTile.js b/src/domain/session/room/timeline/tiles/MessageTile.js index f164c948..46c6fb59 100644 --- a/src/domain/session/room/timeline/tiles/MessageTile.js +++ b/src/domain/session/room/timeline/tiles/MessageTile.js @@ -5,7 +5,7 @@ export default class MessageTile extends SimpleTile { constructor(options) { super(options); this._isOwn = this._entry.event.sender === options.ownUserId; - this._date = new Date(this._entry.event.origin_server_ts); + this._date = new Date(this._entry.timestamp); this._isContinuation = false; } @@ -14,7 +14,7 @@ export default class MessageTile extends SimpleTile { } get sender() { - return this._entry.event.sender; + return this._entry.sender; } get date() { @@ -34,8 +34,7 @@ export default class MessageTile extends SimpleTile { } _getContent() { - const event = this._entry.event; - return event && event.content; + return this._entry.content; } updatePreviousSibling(prev) { diff --git a/src/domain/session/room/timeline/tiles/RoomMemberTile.js b/src/domain/session/room/timeline/tiles/RoomMemberTile.js index 3f0535fa..58e722c9 100644 --- a/src/domain/session/room/timeline/tiles/RoomMemberTile.js +++ b/src/domain/session/room/timeline/tiles/RoomMemberTile.js @@ -7,21 +7,20 @@ export default class RoomNameTile extends SimpleTile { } get announcement() { - const event = this._entry.event; - const content = event.content; + const {sender, content, stateKey} = this._entry; switch (content.membership) { - case "invite": return `${event.state_key} was invited to the room by ${event.sender}`; - case "join": return `${event.state_key} joined the room`; + case "invite": return `${stateKey} was invited to the room by ${sender}`; + case "join": return `${stateKey} joined the room`; case "leave": { - if (event.state_key === event.sender) { - return `${event.state_key} left the room`; + if (stateKey === sender) { + return `${stateKey} left the room`; } else { const reason = content.reason; - return `${event.state_key} was kicked from the room by ${event.sender}${reason ? `: ${reason}` : ""}`; + return `${stateKey} was kicked from the room by ${sender}${reason ? `: ${reason}` : ""}`; } } - case "ban": return `${event.state_key} was banned from the room by ${event.sender}`; - default: return `${event.sender} membership changed to ${content.membership}`; + case "ban": return `${stateKey} was banned from the room by ${sender}`; + default: return `${sender} membership changed to ${content.membership}`; } } } diff --git a/src/domain/session/room/timeline/tiles/RoomNameTile.js b/src/domain/session/room/timeline/tiles/RoomNameTile.js index 32cd5adf..36ad7934 100644 --- a/src/domain/session/room/timeline/tiles/RoomNameTile.js +++ b/src/domain/session/room/timeline/tiles/RoomNameTile.js @@ -7,8 +7,7 @@ export default class RoomNameTile extends SimpleTile { } get announcement() { - const event = this._entry.event; - const content = event.content; - return `${event.sender} named the room "${content.name}"` + const content = this._entry.content; + return `${this._entry.sender} named the room "${content.name}"` } } diff --git a/src/domain/session/room/timeline/tiles/TextTile.js b/src/domain/session/room/timeline/tiles/TextTile.js index a13f24fb..47680ef0 100644 --- a/src/domain/session/room/timeline/tiles/TextTile.js +++ b/src/domain/session/room/timeline/tiles/TextTile.js @@ -4,9 +4,8 @@ export default class TextTile extends MessageTile { get text() { const content = this._getContent(); const body = content && content.body; - const sender = this._entry.event.sender; - if (this._entry.type === "m.emote") { - return `* ${sender} ${body}`; + if (content.msgtype === "m.emote") { + return `* ${this._entry.sender} ${body}`; } else { return body; } diff --git a/src/domain/session/room/timeline/tilesCreator.js b/src/domain/session/room/timeline/tilesCreator.js index 8af1f18f..9f583650 100644 --- a/src/domain/session/room/timeline/tilesCreator.js +++ b/src/domain/session/room/timeline/tilesCreator.js @@ -1,6 +1,5 @@ import GapTile from "./tiles/GapTile.js"; import TextTile from "./tiles/TextTile.js"; -import ImageTile from "./tiles/ImageTile.js"; import LocationTile from "./tiles/LocationTile.js"; import RoomNameTile from "./tiles/RoomNameTile.js"; import RoomMemberTile from "./tiles/RoomMemberTile.js"; @@ -10,11 +9,10 @@ export default function ({timeline, ownUserId}) { const options = {entry, emitUpdate, ownUserId}; if (entry.isGap) { return new GapTile(options, timeline); - } else if (entry.event) { - const event = entry.event; - switch (event.type) { + } else if (entry.eventType) { + switch (entry.eventType) { case "m.room.message": { - const content = event.content; + const content = entry.content; const msgtype = content && content.msgtype; switch (msgtype) { case "m.text": diff --git a/src/matrix/room/timeline/entries/EventEntry.js b/src/matrix/room/timeline/entries/EventEntry.js index ce3697fa..ea2143f1 100644 --- a/src/matrix/room/timeline/entries/EventEntry.js +++ b/src/matrix/room/timeline/entries/EventEntry.js @@ -18,12 +18,20 @@ export default class EventEntry extends BaseEntry { return this._eventEntry.event.content; } - get event() { - return this._eventEntry.event; + get eventType() { + return this._eventEntry.event.type; } - get type() { - return this._eventEntry.event.type; + get stateKey() { + return this._eventEntry.event.state_key; + } + + get sender() { + return this._eventEntry.event.sender; + } + + get timestamp() { + return this._eventEntry.event.origin_server_ts; } get id() { diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js index 0a316a5b..4aa143d2 100644 --- a/src/matrix/room/timeline/entries/PendingEventEntry.js +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -23,14 +23,22 @@ export default class PendingEventEntry extends BaseEntry { return null; } - get type() { + get eventType() { return this._pendingEvent.eventType; } + get stateKey() { + return null; + } + get sender() { return this._user.id; } + get timestamp() { + return null; + } + get id() { return this._pendingEvent.txnId; } From 0cf7cb36c4c1bed82ebfe366b49c04642af9c08f Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 10:58:27 +0200 Subject: [PATCH 10/24] add failing test for unsubscribe using wrong handler --- src/observable/BaseObservableCollection.js | 23 ++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/observable/BaseObservableCollection.js b/src/observable/BaseObservableCollection.js index 17ea3c82..87e7c77b 100644 --- a/src/observable/BaseObservableCollection.js +++ b/src/observable/BaseObservableCollection.js @@ -30,3 +30,26 @@ export default class BaseObservableCollection { // Add iterator over handlers here } + +export function tests() { + class Collection extends BaseObservableCollection { + constructor() { + super(); + this.firstSubscribeCalls = 0; + this.firstUnsubscribeCalls = 0; + } + onSubscribeFirst() { this.firstSubscribeCalls += 1; } + onUnsubscribeLast() { this.firstUnsubscribeCalls += 1; } + } + + return { + test_unsubscribe(assert) { + const c = new Collection(); + const subscription = c.subscribe({}); + // unsubscribe + subscription(); + assert.equal(c.firstSubscribeCalls, 1); + assert.equal(c.firstUnsubscribeCalls, 1); + } + } +} From b53b60e142b455684b4a0025af5ef331a9c0a4bb Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 10:59:07 +0200 Subject: [PATCH 11/24] use correct handler and make test succeed --- src/observable/BaseObservableCollection.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/observable/BaseObservableCollection.js b/src/observable/BaseObservableCollection.js index 87e7c77b..13b059ae 100644 --- a/src/observable/BaseObservableCollection.js +++ b/src/observable/BaseObservableCollection.js @@ -18,7 +18,7 @@ export default class BaseObservableCollection { } return () => { if (handler) { - this._handlers.delete(this._handler); + this._handlers.delete(handler); if (this._handlers.size === 0) { this.onUnsubscribeLast(); } From 0b5c2f9273fde69f3361486b85075372e9c1a958 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 10:59:49 +0200 Subject: [PATCH 12/24] better naming --- src/observable/BaseObservableCollection.js | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/observable/BaseObservableCollection.js b/src/observable/BaseObservableCollection.js index 13b059ae..f9370f10 100644 --- a/src/observable/BaseObservableCollection.js +++ b/src/observable/BaseObservableCollection.js @@ -45,9 +45,8 @@ export function tests() { return { test_unsubscribe(assert) { const c = new Collection(); - const subscription = c.subscribe({}); - // unsubscribe - subscription(); + const unsubscribe = c.subscribe({}); + unsubscribe(); assert.equal(c.firstSubscribeCalls, 1); assert.equal(c.firstUnsubscribeCalls, 1); } From 6f650d19b170e18bf7443b8636043138b96a74df Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:17:55 +0200 Subject: [PATCH 13/24] map operator for observable lists --- src/observable/index.js | 1 + src/observable/list/MappedList.js | 115 ++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 src/observable/list/MappedList.js diff --git a/src/observable/index.js b/src/observable/index.js index 828a6aea..3a250fde 100644 --- a/src/observable/index.js +++ b/src/observable/index.js @@ -5,6 +5,7 @@ import BaseObservableMap from "./map/BaseObservableMap.js"; // re-export "root" (of chain) collections export { default as ObservableArray } from "./list/ObservableArray.js"; export { default as SortedArray } from "./list/SortedArray.js"; +export { default as MappedList } from "./list/MappedList.js"; export { default as ObservableMap } from "./map/ObservableMap.js"; // avoid circular dependency between these classes diff --git a/src/observable/list/MappedList.js b/src/observable/list/MappedList.js new file mode 100644 index 00000000..6d5f87eb --- /dev/null +++ b/src/observable/list/MappedList.js @@ -0,0 +1,115 @@ +import BaseObservableList from "./BaseObservableList.js"; + +export default class MappedList extends BaseObservableList { + constructor(sourceList, mapper, updater) { + super(); + this._sourceList = sourceList; + this._mapper = mapper; + this._updater = updater; + this._sourceUnsubscribe = null; + this._mappedValues = null; + } + + onSubscribeFirst() { + this._sourceUnsubscribe = this._sourceList.subscribe(this); + this._mappedValues = []; + for (const item of this._sourceList) { + this._mappedValues.push(this._mapper(item)); + } + } + + onReset() { + this._mappedValues = []; + this.emitReset(); + } + + onAdd(index, value) { + const mappedValue = this._mapper(value); + this._mappedValues.splice(index, 0, mappedValue); + this.emitAdd(index, mappedValue); + } + + onUpdate(index, value, params) { + const mappedValue = this._mappedValues[index]; + if (this._updater) { + this._updater(mappedValue, value); + } + this.emitUpdate(index, mappedValue, params); + } + + onRemove(index) { + const mappedValue = this._mappedValues[index]; + this._mappedValues.splice(index, 1); + this.emitRemove(index, mappedValue); + } + + onMove(fromIdx, toIdx) { + const mappedValue = this._mappedValues[fromIdx]; + this._mappedValues.splice(fromIdx, 1); + this._mappedValues.splice(toIdx, 0, mappedValue); + this.emitMove(fromIdx, toIdx, mappedValue); + } + + onUnsubscribeLast() { + this._sourceUnsubscribe(); + } + + get length() { + return this._mappedValues.length; + } + + [Symbol.iterator]() { + return this._mappedValues.values(); + } +} + +export async function tests() { + class MockList extends BaseObservableList { + get length() { + return 0; + } + [Symbol.iterator]() { + return [].values(); + } + } + + return { + test_add(assert) { + const source = new MockList(); + const mapped = new MappedList(source, n => {return {n: n*n};}); + let fired = false; + const unsubscribe = mapped.subscribe({ + onAdd(idx, value) { + fired = true; + assert.equal(idx, 0); + assert.equal(value.n, 36); + } + }); + source.emitAdd(0, 6); + assert(fired); + unsubscribe(); + }, + test_update(assert) { + const source = new MockList(); + const mapped = new MappedList( + source, + n => {return {n: n*n};}, + (o, n) => o.m = n*n + ); + let fired = false; + const unsubscribe = mapped.subscribe({ + onAdd() {}, + onUpdate(idx, value) { + fired = true; + assert.equal(idx, 0); + assert.equal(value.n, 36); + assert.equal(value.m, 49); + } + }); + source.emitAdd(0, 6); + source.emitUpdate(0, 7); + assert(fired); + unsubscribe(); + } + }; +} From 7a6e91de84712f13a8778042111a0aca7ca616fd Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:18:11 +0200 Subject: [PATCH 14/24] concat operator for observable lists --- src/observable/index.js | 1 + src/observable/list/BaseObservableList.js | 10 +- src/observable/list/ConcatList.js | 130 ++++++++++++++++++++++ 3 files changed, 136 insertions(+), 5 deletions(-) create mode 100644 src/observable/list/ConcatList.js diff --git a/src/observable/index.js b/src/observable/index.js index 3a250fde..5444e27e 100644 --- a/src/observable/index.js +++ b/src/observable/index.js @@ -6,6 +6,7 @@ import BaseObservableMap from "./map/BaseObservableMap.js"; export { default as ObservableArray } from "./list/ObservableArray.js"; export { default as SortedArray } from "./list/SortedArray.js"; export { default as MappedList } from "./list/MappedList.js"; +export { default as ConcatList } from "./list/ConcatList.js"; export { default as ObservableMap } from "./map/ObservableMap.js"; // avoid circular dependency between these classes diff --git a/src/observable/list/BaseObservableList.js b/src/observable/list/BaseObservableList.js index 97e50b20..cdab32f3 100644 --- a/src/observable/list/BaseObservableList.js +++ b/src/observable/list/BaseObservableList.js @@ -3,26 +3,26 @@ import BaseObservableCollection from "../BaseObservableCollection.js"; export default class BaseObservableList extends BaseObservableCollection { emitReset() { for(let h of this._handlers) { - h.onReset(); + h.onReset(this); } } // we need batch events, mostly on index based collection though? // maybe we should get started without? emitAdd(index, value) { for(let h of this._handlers) { - h.onAdd(index, value); + h.onAdd(index, value, this); } } emitUpdate(index, value, params) { for(let h of this._handlers) { - h.onUpdate(index, value, params); + h.onUpdate(index, value, params, this); } } emitRemove(index, value) { for(let h of this._handlers) { - h.onRemove(index, value); + h.onRemove(index, value, this); } } @@ -30,7 +30,7 @@ export default class BaseObservableList extends BaseObservableCollection { // been removed from its fromIdx emitMove(fromIdx, toIdx, value) { for(let h of this._handlers) { - h.onMove(fromIdx, toIdx, value); + h.onMove(fromIdx, toIdx, value, this); } } diff --git a/src/observable/list/ConcatList.js b/src/observable/list/ConcatList.js new file mode 100644 index 00000000..49d30ce1 --- /dev/null +++ b/src/observable/list/ConcatList.js @@ -0,0 +1,130 @@ +import BaseObservableList from "./BaseObservableList.js"; + +export default class ConcatList extends BaseObservableList { + constructor(...sourceLists) { + super(); + this._sourceLists = sourceLists; + this._sourceUnsubscribes = null; + } + + _offsetForSource(sourceList) { + const listIdx = this._sourceLists.indexOf(sourceList); + let offset = 0; + for (let i = 0; i < listIdx; ++i) { + offset += this._sourceLists[i].length; + } + return offset; + } + + onSubscribeFirst() { + this._sourceUnsubscribes = []; + for (const sourceList of this._sourceLists) { + this._sourceUnsubscribes.push(sourceList.subscribe(this)); + } + } + + onUnsubscribeLast() { + for (const sourceUnsubscribe of this._sourceUnsubscribes) { + sourceUnsubscribe(); + } + } + + onReset() { + // TODO: not ideal if other source lists are large + // but working impl for now + // reset, and + this.emitReset(); + let idx = 0; + for(const item of this) { + this.emitAdd(idx, item); + idx += 1; + } + } + + onAdd(index, value, sourceList) { + this.emitAdd(this._offsetForSource(sourceList) + index, value); + } + + onUpdate(index, value, params, sourceList) { + this.emitAdd(this._offsetForSource(sourceList) + index, value, params); + } + + onRemove(index, value, sourceList) { + this.emitRemove(this._offsetForSource(sourceList) + index, value); + } + + onMove(fromIdx, toIdx, value, sourceList) { + const offset = this._offsetForSource(sourceList); + this.emitMove(offset + fromIdx, offset + toIdx, value); + } + + get length() { + let len = 0; + for (let i = 0; i < this._sourceLists.length; ++i) { + len += this._sourceLists[i].length; + } + return len; + } + + [Symbol.iterator]() { + let sourceListIdx = 0; + let it = this._sourceLists[0][Symbol.iterator](); + return { + next: () => { + let result = it.next(); + while (result.done) { + sourceListIdx += 1; + if (sourceListIdx >= this._sourceLists.length) { + return result; //done + } + it = this._sourceLists[sourceListIdx][Symbol.iterator](); + result = it.next(); + } + return result; + } + } + } +} + +import ObservableArray from "./ObservableArray.js"; +export async function tests() { + return { + test_length(assert) { + const all = new ConcatList( + new ObservableArray([1, 2, 3]), + new ObservableArray([11, 12, 13]) + ); + assert.equal(all.length, 6); + }, + test_iterator(assert) { + const all = new ConcatList( + new ObservableArray([1, 2, 3]), + new ObservableArray([11, 12, 13]) + ); + const it = all[Symbol.iterator](); + assert.equal(it.next().value, 1); + assert.equal(it.next().value, 2); + assert.equal(it.next().value, 3); + assert.equal(it.next().value, 11); + assert.equal(it.next().value, 12); + assert.equal(it.next().value, 13); + assert(it.next().done); + }, + test_add(assert) { + const list1 = new ObservableArray([1, 2, 3]); + const list2 = new ObservableArray([11, 12, 13]); + const all = new ConcatList(list1, list2); + let fired = false; + all.subscribe({ + onAdd(index, value) { + fired = true; + assert.equal(index, 4); + assert.equal(value, 11.5); + } + }); + list2.insert(1, 11.5); + assert(fired); + }, + + }; +} From bfdff353b06f19baa1c88f1c21872ce069f48f42 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:52:28 +0200 Subject: [PATCH 15/24] pass params in map operator update fn --- src/observable/list/MappedList.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/observable/list/MappedList.js b/src/observable/list/MappedList.js index 6d5f87eb..75c220fa 100644 --- a/src/observable/list/MappedList.js +++ b/src/observable/list/MappedList.js @@ -32,7 +32,7 @@ export default class MappedList extends BaseObservableList { onUpdate(index, value, params) { const mappedValue = this._mappedValues[index]; if (this._updater) { - this._updater(mappedValue, value); + this._updater(mappedValue, params, value); } this.emitUpdate(index, mappedValue, params); } From e23abe209f32edeb0b3bf505b07882977ba40efe Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:53:07 +0200 Subject: [PATCH 16/24] fixup: leftover from removing access to events --- src/domain/session/room/timeline/tiles/MessageTile.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/domain/session/room/timeline/tiles/MessageTile.js b/src/domain/session/room/timeline/tiles/MessageTile.js index 46c6fb59..bb862047 100644 --- a/src/domain/session/room/timeline/tiles/MessageTile.js +++ b/src/domain/session/room/timeline/tiles/MessageTile.js @@ -4,7 +4,7 @@ export default class MessageTile extends SimpleTile { constructor(options) { super(options); - this._isOwn = this._entry.event.sender === options.ownUserId; + this._isOwn = this._entry.sender === options.ownUserId; this._date = new Date(this._entry.timestamp); this._isContinuation = false; } From 88a7d64091753084e4b94cc9e7933ad1a59e1d0e Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:53:30 +0200 Subject: [PATCH 17/24] fixup: delay not awaited, but not needed for now --- src/matrix/SendScheduler.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/matrix/SendScheduler.js b/src/matrix/SendScheduler.js index b7407238..5340d093 100644 --- a/src/matrix/SendScheduler.js +++ b/src/matrix/SendScheduler.js @@ -22,7 +22,6 @@ export class RateLimitingBackoff { async waitForNextSend() { // this._remainingRateLimitedRequest = Math.max(0, this._remainingRateLimitedRequest - 1); - Platform.delay(1000); } } From 8665bcb897f9e22669e1c9810d121bcd142eebae Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:53:58 +0200 Subject: [PATCH 18/24] concat synced events in timeline with pending events for local echo --- .../session/room/timeline/tiles/SimpleTile.js | 4 +++ src/matrix/room/timeline/Timeline.js | 25 ++++++++++++------- .../timeline/entries/PendingEventEntry.js | 8 ++++++ src/ui/web/css/timeline.css | 4 +++ .../session/room/timeline/TextMessageView.js | 2 +- 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/domain/session/room/timeline/tiles/SimpleTile.js b/src/domain/session/room/timeline/tiles/SimpleTile.js index ddfa0e4e..48935948 100644 --- a/src/domain/session/room/timeline/tiles/SimpleTile.js +++ b/src/domain/session/room/timeline/tiles/SimpleTile.js @@ -69,4 +69,8 @@ export default class SimpleTile { get internalId() { return this._entry.asEventKey().toString(); } + + get isPending() { + return this._entry.isPending; + } } diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 05fcc3f5..5ca48561 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -1,32 +1,39 @@ -import { SortedArray } from "../../../observable/index.js"; +import { SortedArray, MappedList, ConcatList } from "../../../observable/index.js"; import Direction from "./Direction.js"; import GapWriter from "./persistence/GapWriter.js"; import TimelineReader from "./persistence/TimelineReader.js"; +import PendingEventEntry from "./entries/PendingEventEntry.js"; export default class Timeline { - constructor({roomId, storage, closeCallback, fragmentIdComparer, hsApi}) { + constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, user, hsApi}) { this._roomId = roomId; this._storage = storage; this._closeCallback = closeCallback; this._fragmentIdComparer = fragmentIdComparer; this._hsApi = hsApi; - this._entriesList = new SortedArray((a, b) => a.compare(b)); + this._remoteEntries = new SortedArray((a, b) => a.compare(b)); this._timelineReader = new TimelineReader({ roomId: this._roomId, storage: this._storage, fragmentIdComparer: this._fragmentIdComparer }); + const localEntries = new MappedList(pendingEvents, pe => { + return new PendingEventEntry({pendingEvent: pe, user}); + }, (pee, params) => { + pee.notifyUpdate(params); + }); + this._allEntries = new ConcatList(this._remoteEntries, localEntries); } /** @package */ async load() { const entries = await this._timelineReader.readFromEnd(50); - this._entriesList.setManySorted(entries); + this._remoteEntries.setManySorted(entries); } /** @package */ appendLiveEntries(newEntries) { - this._entriesList.setManySorted(newEntries); + this._remoteEntries.setManySorted(newEntries); } /** @public */ @@ -42,12 +49,12 @@ export default class Timeline { fragmentIdComparer: this._fragmentIdComparer }); const newEntries = await gapWriter.writeFragmentFill(fragmentEntry, response); - this._entriesList.setManySorted(newEntries); + this._remoteEntries.setManySorted(newEntries); } // tries to prepend `amount` entries to the `entries` list. async loadAtTop(amount) { - const firstEventEntry = this._entriesList.array.find(e => !!e.event); + const firstEventEntry = this._remoteEntries.array.find(e => !!e.event); if (!firstEventEntry) { return; } @@ -56,12 +63,12 @@ export default class Timeline { Direction.Backward, amount ); - this._entriesList.setManySorted(entries); + this._remoteEntries.setManySorted(entries); } /** @public */ get entries() { - return this._entriesList; + return this._allEntries; } /** @public */ diff --git a/src/matrix/room/timeline/entries/PendingEventEntry.js b/src/matrix/room/timeline/entries/PendingEventEntry.js index 4aa143d2..63e8ba84 100644 --- a/src/matrix/room/timeline/entries/PendingEventEntry.js +++ b/src/matrix/room/timeline/entries/PendingEventEntry.js @@ -39,7 +39,15 @@ export default class PendingEventEntry extends BaseEntry { return null; } + get isPending() { + return true; + } + get id() { return this._pendingEvent.txnId; } + + notifyUpdate() { + + } } diff --git a/src/ui/web/css/timeline.css b/src/ui/web/css/timeline.css index aea0be49..0b38857c 100644 --- a/src/ui/web/css/timeline.css +++ b/src/ui/web/css/timeline.css @@ -53,6 +53,10 @@ background-color: darkgreen; } +.TextMessageView.pending .message-container { + background-color: #333; +} + .message-container p { margin: 5px 0; } diff --git a/src/ui/web/session/room/timeline/TextMessageView.js b/src/ui/web/session/room/timeline/TextMessageView.js index 3d489c5c..a4d698eb 100644 --- a/src/ui/web/session/room/timeline/TextMessageView.js +++ b/src/ui/web/session/room/timeline/TextMessageView.js @@ -4,7 +4,7 @@ export default class TextMessageView extends TemplateView { render(t, vm) { // no bindings ... should this be a template view? return t.li( - {className: {"TextMessageView": true, own: vm.isOwn}}, + {className: {"TextMessageView": true, own: vm.isOwn, pending: vm.isPending}}, t.div({className: "message-container"}, [ t.div({className: "sender"}, vm => vm.isContinuation ? "" : vm.sender), t.p([vm.text, t.time(vm.date + " " + vm.time)]), From 7218595c17b2b98948dca5317e95b82a86f3d8d8 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:54:21 +0200 Subject: [PATCH 19/24] only send message if it's non-empty --- src/domain/session/room/RoomViewModel.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/domain/session/room/RoomViewModel.js b/src/domain/session/room/RoomViewModel.js index a628349e..23875e20 100644 --- a/src/domain/session/room/RoomViewModel.js +++ b/src/domain/session/room/RoomViewModel.js @@ -65,6 +65,8 @@ export default class RoomViewModel extends EventEmitter { } sendMessage(message) { - this._room.sendEvent("m.room.message", {msgtype: "m.text", body: message}); + if (message) { + this._room.sendEvent("m.room.message", {msgtype: "m.text", body: message}); + } } } From 45cd85ead12816fb06067cecb4ca163bc70fbcf4 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 19:58:35 +0200 Subject: [PATCH 20/24] fixup: check for event entry with eventType, event isn't exposed anymore --- src/matrix/room/timeline/Timeline.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/matrix/room/timeline/Timeline.js b/src/matrix/room/timeline/Timeline.js index 5ca48561..b2afef57 100644 --- a/src/matrix/room/timeline/Timeline.js +++ b/src/matrix/room/timeline/Timeline.js @@ -54,7 +54,7 @@ export default class Timeline { // tries to prepend `amount` entries to the `entries` list. async loadAtTop(amount) { - const firstEventEntry = this._remoteEntries.array.find(e => !!e.event); + const firstEventEntry = this._remoteEntries.array.find(e => !!e.eventType); if (!firstEventEntry) { return; } From 3b7ab8f1c8c30d52d67b22a8effaef536bb64fa0 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 20:02:42 +0200 Subject: [PATCH 21/24] fix MappedList test after passing params as well to updater --- src/observable/list/MappedList.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/observable/list/MappedList.js b/src/observable/list/MappedList.js index 75c220fa..a2adcdbd 100644 --- a/src/observable/list/MappedList.js +++ b/src/observable/list/MappedList.js @@ -94,7 +94,7 @@ export async function tests() { const mapped = new MappedList( source, n => {return {n: n*n};}, - (o, n) => o.m = n*n + (o, p, n) => o.m = n*n ); let fired = false; const unsubscribe = mapped.subscribe({ From b723ab4cef02a67afa8d8cdc16811d72bef1a9d7 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 20:03:06 +0200 Subject: [PATCH 22/24] add failing test for ConcatList update --- src/observable/list/ConcatList.js | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/observable/list/ConcatList.js b/src/observable/list/ConcatList.js index 49d30ce1..aace70a3 100644 --- a/src/observable/list/ConcatList.js +++ b/src/observable/list/ConcatList.js @@ -125,6 +125,20 @@ export async function tests() { list2.insert(1, 11.5); assert(fired); }, - + test_update(assert) { + const list1 = new ObservableArray([1, 2, 3]); + const list2 = new ObservableArray([11, 12, 13]); + const all = new ConcatList(list1, list2); + let fired = false; + all.subscribe({ + onUpdate(index, value) { + fired = true; + assert.equal(index, 4); + assert.equal(value, 10); + } + }); + list2.emitUpdate(1, 10); + assert(fired); + }, }; } From de35df10d8e01a99bfdebfeed8945d19032c44e1 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 20:03:22 +0200 Subject: [PATCH 23/24] fix test + bug in ConcatList update --- src/observable/list/ConcatList.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/observable/list/ConcatList.js b/src/observable/list/ConcatList.js index aace70a3..6177f6f3 100644 --- a/src/observable/list/ConcatList.js +++ b/src/observable/list/ConcatList.js @@ -46,7 +46,7 @@ export default class ConcatList extends BaseObservableList { } onUpdate(index, value, params, sourceList) { - this.emitAdd(this._offsetForSource(sourceList) + index, value, params); + this.emitUpdate(this._offsetForSource(sourceList) + index, value, params); } onRemove(index, value, sourceList) { From 5b4d984645f913c3403ba9e64dda15c6a6a95802 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Mon, 29 Jul 2019 20:11:15 +0200 Subject: [PATCH 24/24] height of app should always fill screen (not more or less) --- src/ui/web/css/layout.css | 5 +++++ src/ui/web/css/timeline.css | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ui/web/css/layout.css b/src/ui/web/css/layout.css index 098be686..048888b0 100644 --- a/src/ui/web/css/layout.css +++ b/src/ui/web/css/layout.css @@ -50,6 +50,11 @@ body { min-height: 0; display: flex; flex-direction: column; + height: 100%; +} + +.TimelinePanel ul { + flex: 1 0 0; } .RoomHeader { diff --git a/src/ui/web/css/timeline.css b/src/ui/web/css/timeline.css index 0b38857c..89d927f4 100644 --- a/src/ui/web/css/timeline.css +++ b/src/ui/web/css/timeline.css @@ -1,6 +1,5 @@ .TimelinePanel ul { - flex: 1; overflow-y: auto; list-style: none; padding: 0;