diff --git a/doc/- sync comes under session b/doc/- sync comes under session new file mode 100644 index 00000000..4db2baf1 --- /dev/null +++ b/doc/- sync comes under session @@ -0,0 +1,3 @@ + - sync comes under session + - sessioncontainer/client orchestrating reconnection + - \ No newline at end of file diff --git a/doc/impl-thoughts/CATCHUP-BACKFILL.md b/doc/impl-thoughts/CATCHUP-BACKFILL.md new file mode 100644 index 00000000..12c6ca3b --- /dev/null +++ b/doc/impl-thoughts/CATCHUP-BACKFILL.md @@ -0,0 +1,7 @@ +we should automatically fill gaps (capped at a certain (large) amount of events, 5000?) after a limited sync for a room + +## E2EE rooms + +during these fills (once supported), we should calculate push actions and trigger notifications, as we would otherwise have received this through sync. + +we could also trigger notifications when just backfilling on initial sync up to a certain amount of time in the past? diff --git a/doc/impl-thoughts/RECONNECTING.md b/doc/impl-thoughts/RECONNECTING.md index f648481f..18ea49d9 100644 --- a/doc/impl-thoughts/RECONNECTING.md +++ b/doc/impl-thoughts/RECONNECTING.md @@ -29,6 +29,39 @@ rooms should report how many messages they have queued up, and each time they sent one? `SendReporter` (passed from `Session` to `Room`, passed down to `SendQueue`), with: - - setPendingEventCount(roomId, count) + - setPendingEventCount(roomId, count). This should probably use the generic Room updating mechanism, e.g. a pendingMessageCount on Room that is updated. Then session listens for this in `_roomUpdateCallback`. `Session` listens to `Reconnector` to update it's status, but perhaps we wait to send messages until catchup sync is done + + +# TODO + + - finish (Base)ObservableValue + - put in own file + - add waitFor + - decide whether we want to inherit (no?) + - cleanup Reconnector with recent changes, move generic code, make imports work + - add SyncStatus as ObservableValue of enum in Sync + - show load progress in LoginView/SessionPickView and do away with loading screen + - change main.js to pass in a creation function of a SessionContainer instead of everything it is replacing + - adjust BrawlViewModel, SessionPickViewModel and LoginViewModel to use a SessionContainer + - rename SessionsStore to SessionInfoStorage + - make sure we've renamed all \*State enums and fields to \*Status + - add pendingMessageCount prop to SendQueue and Room, aggregate this in Session + - add completedFirstSync to Sync, so we can check if the catchup or initial sync is still in progress + - update SyncStatusViewModel to use reconnector.connectionStatus, sync.completedFirstSync, session.syncToken (is initial sync?) and session.pendingMessageCount to show these messages: + - disconnected, retrying in x seconds. [try now]. + - reconnecting... + - doing catchup sync + - syncing, sending x messages + - syncing + + perhaps we will want to put this as an ObservableValue on the SessionContainer ? + + NO: When connected, syncing and not sending anything, just hide the thing for now? although when you send messages it will just pop in and out all the time. + + - see if it makes sense for SendScheduler to use the same RetryDelay as Reconnector + - finally adjust all file names to their class names? e.g. camel case + - see if we want more dependency injection + - for classes from outside sdk + - for internal sdk classes? probably not yet diff --git a/doc/impl-thoughts/session-container.md b/doc/impl-thoughts/session-container.md new file mode 100644 index 00000000..d84d8ce9 --- /dev/null +++ b/doc/impl-thoughts/session-container.md @@ -0,0 +1,18 @@ +what should this new container be called? + - Client + - SessionContainer + + +it is what is returned from bootstrapping a ... thing +it allows you to replace classes within the client through IoC? +it wires up the different components +it unwires the components when you're done with the thing +it could hold all the dependencies for setting up a client, even before login + - online detection api + - clock + - homeserver + - requestFn + +we'll be explicitly making its parts public though, like session, sync, reconnector + +merge the connectionstate and diff --git a/src/domain/ViewModel.js b/src/domain/ViewModel.js index 2e26faae..cc4a6fff 100644 --- a/src/domain/ViewModel.js +++ b/src/domain/ViewModel.js @@ -1,3 +1,7 @@ +// ViewModel should just be an eventemitter, not an ObservableValue +// as in some cases it would really be more convenient to have multiple events (like telling the timeline to scroll down) +// we do need to return a disposable from EventEmitter.on, or at least have a method here to easily track a subscription to an EventEmitter + export class ViewModel extends ObservableValue { constructor(options) { super(); diff --git a/src/main.js b/src/main.js index b8e4ab33..64461f4c 100644 --- a/src/main.js +++ b/src/main.js @@ -20,6 +20,14 @@ export default async function main(container) { // window.getBrawlFetchLog = () => recorder.log(); // normal network: const request = fetchRequest; + const clock = new DOMClock(); + + const sessionContainer = new SessionContainer({ + clock, + request, + storageFactory: new StorageFactory(), + }); + const vm = new BrawlViewModel({ storageFactory: new StorageFactory(), createHsApi: (homeServer, accessToken, reconnector) => new HomeServerApi({homeServer, accessToken, request, reconnector}), diff --git a/src/matrix/Reconnector.js b/src/matrix/Reconnector.js index 0cb85851..92b47fb7 100644 --- a/src/matrix/Reconnector.js +++ b/src/matrix/Reconnector.js @@ -16,7 +16,7 @@ export class ExponentialRetryDelay { const powerOfTwo = (seconds * seconds) * 1000; this._current = Math.max(this._max, powerOfTwo); } catch(err) { - // swallow AbortError, means skipWaiting was called + // swallow AbortError, means abort was called if (!(err instanceof AbortError)) { throw err; } @@ -25,7 +25,7 @@ export class ExponentialRetryDelay { } } - skipWaiting() { + abort() { if (this._timeout) { this._timeout.abort(); } @@ -33,7 +33,7 @@ export class ExponentialRetryDelay { reset() { this._current = this._start; - this.skipWaiting(); + this.abort(); } get nextValue() { @@ -66,14 +66,13 @@ export const ConnectionState = createEnum( "Online" ); -export class Reconnector extends ObservableValue { - constructor({retryDelay, createTimeMeasure}) { - this._online +export class Reconnector { + constructor({retryDelay, createTimeMeasure, isOnline}) { + this._isOnline = isOnline; this._retryDelay = retryDelay; - this._currentDelay = null; this._createTimeMeasure = createTimeMeasure; // assume online, and do our thing when something fails - this._state = ConnectionState.Online; + this._state = new ObservableValue(ConnectionState.Online); this._isReconnecting = false; this._versionsResponse = null; } @@ -82,39 +81,53 @@ export class Reconnector extends ObservableValue { return this._versionsResponse; } - get state() { + get connectionState() { return this._state; } get retryIn() { - if (this._state === ConnectionState.Waiting) { + if (this._state.get() === ConnectionState.Waiting) { return this._retryDelay.nextValue - this._stateSince.measure(); } return 0; } - onRequestFailed(hsApi) { + async onRequestFailed(hsApi) { if (!this._isReconnecting) { this._setState(ConnectionState.Offline); - this._reconnectLoop(hsApi); + + const isOnlineSubscription = this._isOnline && this._isOnline.subscribe(online => { + if (online) { + this.tryNow(); + } + }); + + try { + await this._reconnectLoop(hsApi); + } finally { + if (isOnlineSubscription) { + // unsubscribe from this._isOnline + isOnlineSubscription(); + } + } } } tryNow() { if (this._retryDelay) { - this._retryDelay.skipWaiting(); + // this will interrupt this._retryDelay.waitForRetry() in _reconnectLoop + this._retryDelay.abort(); } } _setState(state) { - if (state !== this._state) { - this._state = state; - if (this._state === ConnectionState.Waiting) { + if (state !== this._state.get()) { + if (state === ConnectionState.Waiting) { this._stateSince = this._createTimeMeasure(); } else { this._stateSince = null; } - this.emit(state); + this._state.set(state); } } @@ -123,19 +136,34 @@ export class Reconnector extends ObservableValue { this._versionsResponse = null; this._retryDelay.reset(); - while (!this._versionsResponse) { - try { - this._setState(ConnectionState.Reconnecting); - // use 10s timeout, because we don't want to be waiting for - // a stale connection when we just came online again - const versionsRequest = hsApi.versions({timeout: 10000}); - this._versionsResponse = await versionsRequest.response(); - this._setState(ConnectionState.Online); - } catch (err) { - // NetworkError or AbortError from timeout - this._setState(ConnectionState.Waiting); - await this._retryDelay.waitForRetry(); + try { + while (!this._versionsResponse) { + try { + this._setState(ConnectionState.Reconnecting); + // use 10s timeout, because we don't want to be waiting for + // a stale connection when we just came online again + const versionsRequest = hsApi.versions({timeout: 10000}); + this._versionsResponse = await versionsRequest.response(); + this._setState(ConnectionState.Online); + } catch (err) { + if (err instanceof NetworkError) { + this._setState(ConnectionState.Waiting); + try { + await this._retryDelay.waitForRetry(); + } catch (err) { + if (!(err instanceof AbortError)) { + throw err; + } + } + } else { + throw err; + } + } } + } catch (err) { + // nothing is catching the error above us, + // so just log here + console.err(err); } } } diff --git a/src/matrix/SendScheduler.js b/src/matrix/SendScheduler.js index 5340d093..3beb4fa1 100644 --- a/src/matrix/SendScheduler.js +++ b/src/matrix/SendScheduler.js @@ -62,6 +62,10 @@ export class SendScheduler { // this._enabled; } + stop() { + // TODO: abort current requests and set offline + } + // this should really be per roomId to avoid head-of-line blocking // // takes a callback instead of returning a promise with the slot diff --git a/src/matrix/SessionContainer.js b/src/matrix/SessionContainer.js index 86be76d9..5cf616c3 100644 --- a/src/matrix/SessionContainer.js +++ b/src/matrix/SessionContainer.js @@ -1,95 +1,226 @@ -const factory = { - Clock: () => new DOMClock(), - Request: () => fetchRequest, - Online: () => new DOMOnline(), - HomeServerApi: () -} +import HomeServerApi from "./hs-api.js"; -export const LoadState = createEnum( +export const LoadStatus = createEnum( + "NotLoading", + "Login", + "LoginFailed", "Loading", - "InitialSync", "Migrating", //not used atm, but would fit here + "InitialSync", + "CatchupSync", "Error", "Ready", ); -class SessionContainer extends ObservableValue { - constructor({clock, random, isOnline, request, storageFactory, factory}) { - this.disposables = new Disposables(); +export const LoginFailure = createEnum( + "Network", + "Credentials", + "Unknown", +); + +export class SessionContainer { + constructor({clock, random, onlineStatus, request, storageFactory, sessionsStore}) { + this._random = random; + this._clock = clock; + this._onlineStatus = onlineStatus; + this._request = request; + this._storageFactory = storageFactory; + this._sessionsStore = sessionsStore; + + this._status = new ObservableValue(LoadStatus.NotLoading); + this._error = null; + this._loginFailure = null; + this._reconnector = null; + this._session = null; + this._sync = null; } - dispose() { - this.disposables.dispose(); + _createNewSessionId() { + return (Math.floor(this._random() * Number.MAX_SAFE_INTEGER)).toString(); } - get state() { - return this._state; - } - - _setState(state) { - if (state !== this._state) { - const previousState = this._state; - this._state = state; - this.emit(previousState); + async startWithExistingSession(sessionId) { + if (this._status.get() !== LoadStatus.NotLoading) { + return; + } + this._status.set(LoadStatus.Loading); + try { + const sessionInfo = await this._sessionsStore.get(sessionId); + await this._loadSessionInfo(sessionInfo); + } catch (err) { + this._error = err; + this._status.set(LoadStatus.Error); } } + async startWithLogin(homeServer, username, password) { + if (this._status.get() !== LoadStatus.NotLoading) { + return; + } + this._status.set(LoadStatus.Login); + let sessionInfo; + try { + const hsApi = new HomeServerApi({homeServer, request: this._request}); + const loginData = await hsApi.passwordLogin(username, password).response(); + const sessionId = this._createNewSessionId(); + sessionInfo = { + id: sessionId, + deviceId: loginData.device_id, + userId: loginData.user_id, + homeServer: homeServer, + accessToken: loginData.access_token, + lastUsed: this._clock.now() + }; + await this._sessionsStore.add(sessionInfo); + } catch (err) { + this._error = err; + if (err instanceof HomeServerError) { + if (err.statusCode === 403) { + this._loginFailure = LoginFailure.Credentials; + } else { + this._loginFailure = LoginFailure.Unknown; + } + this._status.set(LoadStatus.LoginFailure); + } else if (err instanceof NetworkError) { + this._loginFailure = LoginFailure.Network; + this._status.set(LoadStatus.LoginFailure); + } else { + this._status.set(LoadStatus.Error); + } + return; + } + // loading the session can only lead to + // LoadStatus.Error in case of an error, + // so separate try/catch + try { + await this._loadSessionInfo(sessionInfo); + } catch (err) { + this._error = err; + this._status.set(LoadStatus.Error); + } + } + + async _loadSessionInfo(sessionInfo) { + this._status.set(LoadStatus.Loading); + this._reconnector = new Reconnector({ + onlineStatus: this._onlineStatus, + delay: new ExponentialRetryDelay(2000, this._clock.createTimeout), + createMeasure: this._clock.createMeasure + }); + const hsApi = new HomeServerApi({ + homeServer: sessionInfo.homeServer, + accessToken: sessionInfo.accessToken, + request: this._request, + reconnector: this._reconnector, + }); + const storage = await this._storageFactory.create(sessionInfo.id); + // no need to pass access token to session + const filteredSessionInfo = { + deviceId: sessionInfo.deviceId, + userId: sessionInfo.userId, + homeServer: sessionInfo.homeServer, + }; + this._session = new Session({storage, sessionInfo: filteredSessionInfo, hsApi}); + await this._session.load(); + + const needsInitialSync = !this._session.syncToken; + if (!needsInitialSync) { + this._status.set(LoadStatus.CatchupSync); + } else { + this._status.set(LoadStatus.InitialSync); + } + + this._sync = new Sync({hsApi, storage, session: this._session}); + + // notify sync and session when back online + this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => { + if (state === ConnectionStatus.Online) { + this._sync.start(); + this._session.start(this._reconnector.lastVersionsResponse); + } + }); + + try { + await this._sync.start(); + } catch (err) { + // swallow NetworkError here and continue, + // as the reconnector above will call + // sync.start again to retry in this case + if (!(err instanceof NetworkError)) { + throw err; + } + } + // only transition into Ready once the first sync has succeeded + await this._sync.status.waitFor(s => s === SyncStatus.Syncing); + this._status.set(LoadStatus.Ready); + + // if this fails, the reconnector will start polling versions to reconnect + const lastVersionsResponse = await hsApi.versions({timeout: 10000}).response(); + this._session.start(lastVersionsResponse); + } + + + get loadStatus() { + return this._status; + } + + get loadError() { + return this._error; + } + + /** only set at loadStatus InitialSync, CatchupSync or Ready */ get sync() { return this._sync; } + /** only set at loadStatus InitialSync, CatchupSync or Ready */ get session() { return this._session; } - _createReconnector() { - const reconnector = new Reconnector( - new ExponentialRetryDelay(2000, this._clock.createTimeout), - this._clock.createMeasure - ); - // retry connection immediatly when online is detected - this.disposables.track(isOnline.subscribe(online => { - if(online) { - reconnector.tryNow(); - } - })); - return reconnector; - } - - async start(sessionInfo) { - try { - this._setState(LoadState.Loading); - this._reconnector = this._createReconnector(); - const hsApi = this._createHsApi(sessionInfo.homeServer, sessionInfo.accessToken, this._reconnector); - const storage = await this._storageFactory.create(sessionInfo.id); - // no need to pass access token to session - const filteredSessionInfo = { - deviceId: sessionInfo.deviceId, - userId: sessionInfo.userId, - homeServer: sessionInfo.homeServer, - }; - this._session = new Session({storage, sessionInfo: filteredSessionInfo, hsApi}); - await this._session.load(); - this._sync = new Sync({hsApi, storage, this._session}); - - // notify sync and session when back online - this.disposables.track(reconnector.subscribe(state => { - this._sync.start(); - session.notifyNetworkAvailable(reconnector.lastVersionsResponse); - })); - - const needsInitialSync = !this._session.syncToken; - if (!needsInitialSync) { - this._setState(LoadState.Ready); - } else { - this._setState(LoadState.InitialSync); - } - await this._sync.start(); - this._setState(LoadState.Ready); - this._session.notifyNetworkAvailable(); - } catch (err) { - this._error = err; - this._setState(LoadState.Error); - } + stop() { + this._reconnectSubscription(); + this._reconnectSubscription = null; + this._sync.stop(); + this._session.stop(); } } + +/* +function main() { + // these are only required for external classes, + // SessionFactory has it's defaults for internal classes + const sessionFactory = new SessionFactory({ + Clock: DOMClock, + OnlineState: DOMOnlineState, + SessionsStore: LocalStorageSessionStore, // should be called SessionInfoStore? + StorageFactory: window.indexedDB ? IDBStorageFactory : MemoryStorageFactory, // should be called StorageManager? + // should be moved to StorageFactory as `KeyBounds`?: minStorageKey, middleStorageKey, maxStorageKey + // would need to pass it into EventKey though + request, + }); + + // lets not do this in a first cut + // internally in the matrix lib + const room = new creator.ctor("Room", Room)({}); + + // or short + const sessionFactory = new SessionFactory(WebFactory); + // sessionFactory.sessionInfoStore + + // registration + // const registration = sessionFactory.registerUser(); + // registration.stage + + + const container = sessionFactory.startWithRegistration(registration); + const container = sessionFactory.startWithLogin(server, username, password); + const container = sessionFactory.startWithExistingSession(sessionId); + // container.loadStatus is an ObservableValue + await container.loadStatus.waitFor(s => s === LoadStatus.Loaded || s === LoadStatus.CatchupSync); + + // loader isn't needed anymore from now on + const {session, sync, reconnector} = container; + container.stop(); +} +*/ diff --git a/src/matrix/error.js b/src/matrix/error.js index 42dcac73..2ba95037 100644 --- a/src/matrix/error.js +++ b/src/matrix/error.js @@ -3,6 +3,7 @@ export class HomeServerError extends Error { super(`${body ? body.error : status} on ${method} ${url}`); this.errcode = body ? body.errcode : null; this.retry_after_ms = body ? body.retry_after_ms : 0; + this.statusCode = status; } get isFatal() { diff --git a/src/matrix/hs-api.js b/src/matrix/hs-api.js index 314a9324..755d4825 100644 --- a/src/matrix/hs-api.js +++ b/src/matrix/hs-api.js @@ -81,7 +81,6 @@ export default class HomeServerApi { requestResult.response().then(() => timeout.abort()); } - const wrapper = new RequestWrapper(method, url, requestResult); if (this._reconnector) { diff --git a/src/matrix/net/fetch.js b/src/matrix/net/fetch.js index 36713475..76929c6b 100644 --- a/src/matrix/net/fetch.js +++ b/src/matrix/net/fetch.js @@ -57,7 +57,7 @@ export default function fetchRequest(url, options) { // this can either mean user is offline, server is offline, or a CORS error (server misconfiguration). // // One could check navigator.onLine to rule out the first - // but the 2 later ones are indistinguishable from javascript. + // but the 2 latter ones are indistinguishable from javascript. throw new NetworkError(`${options.method} ${url}: ${err.message}`); } throw err; diff --git a/src/matrix/session.js b/src/matrix/session.js index 837cb51a..d513d0b2 100644 --- a/src/matrix/session.js +++ b/src/matrix/session.js @@ -40,7 +40,11 @@ export default class Session { })); } - notifyNetworkAvailable(lastVersionResponse) { + stop() { + this._sendScheduler.stop(); + } + + start(lastVersionResponse) { for (const [, room] of this._rooms) { room.resumeSending(); } diff --git a/src/observable/BaseObservableCollection.js b/src/observable/BaseObservableCollection.js index 2a8a2336..a12269cf 100644 --- a/src/observable/BaseObservableCollection.js +++ b/src/observable/BaseObservableCollection.js @@ -32,7 +32,7 @@ export default class BaseObservableCollection { } // like an EventEmitter, but doesn't have an event type -export class ObservableValue extends BaseObservableCollection { +export class BaseObservableValue extends BaseObservableCollection { emit(argument) { for (const h of this._handlers) { h(argument); @@ -40,6 +40,22 @@ export class ObservableValue extends BaseObservableCollection { } } +export class ObservableValue extends BaseObservableValue { + constructor(initialValue) { + super(); + this._value = initialValue; + } + + get() { + return this._value; + } + + set(value) { + this._value = value; + this.emit(this._value); + } +} + export function tests() { class Collection extends BaseObservableCollection { constructor() { diff --git a/src/ui/web/dom/Online.js b/src/ui/web/dom/OnlineStatus.js similarity index 92% rename from src/ui/web/dom/Online.js rename to src/ui/web/dom/OnlineStatus.js index 0fa5ee7f..56276de0 100644 --- a/src/ui/web/dom/Online.js +++ b/src/ui/web/dom/OnlineStatus.js @@ -1,4 +1,4 @@ -export class Online extends ObservableValue { +export class OnlineStatus extends ObservableValue { constructor() { super(); this._onOffline = this._onOffline.bind(this);