This commit is contained in:
Bruno Windels 2020-04-18 19:16:16 +02:00
parent 378b75c98a
commit 1f15ca6498
15 changed files with 362 additions and 106 deletions

View file

@ -0,0 +1,3 @@
- sync comes under session
- sessioncontainer/client orchestrating reconnection
-

View file

@ -0,0 +1,7 @@
we should automatically fill gaps (capped at a certain (large) amount of events, 5000?) after a limited sync for a room
## E2EE rooms
during these fills (once supported), we should calculate push actions and trigger notifications, as we would otherwise have received this through sync.
we could also trigger notifications when just backfilling on initial sync up to a certain amount of time in the past?

View file

@ -29,6 +29,39 @@
rooms should report how many messages they have queued up, and each time they sent one? rooms should report how many messages they have queued up, and each time they sent one?
`SendReporter` (passed from `Session` to `Room`, passed down to `SendQueue`), with: `SendReporter` (passed from `Session` to `Room`, passed down to `SendQueue`), with:
- setPendingEventCount(roomId, count) - setPendingEventCount(roomId, count). This should probably use the generic Room updating mechanism, e.g. a pendingMessageCount on Room that is updated. Then session listens for this in `_roomUpdateCallback`.
`Session` listens to `Reconnector` to update it's status, but perhaps we wait to send messages until catchup sync is done `Session` listens to `Reconnector` to update it's status, but perhaps we wait to send messages until catchup sync is done
# TODO
- finish (Base)ObservableValue
- put in own file
- add waitFor
- decide whether we want to inherit (no?)
- cleanup Reconnector with recent changes, move generic code, make imports work
- add SyncStatus as ObservableValue of enum in Sync
- show load progress in LoginView/SessionPickView and do away with loading screen
- change main.js to pass in a creation function of a SessionContainer instead of everything it is replacing
- adjust BrawlViewModel, SessionPickViewModel and LoginViewModel to use a SessionContainer
- rename SessionsStore to SessionInfoStorage
- make sure we've renamed all \*State enums and fields to \*Status
- add pendingMessageCount prop to SendQueue and Room, aggregate this in Session
- add completedFirstSync to Sync, so we can check if the catchup or initial sync is still in progress
- update SyncStatusViewModel to use reconnector.connectionStatus, sync.completedFirstSync, session.syncToken (is initial sync?) and session.pendingMessageCount to show these messages:
- disconnected, retrying in x seconds. [try now].
- reconnecting...
- doing catchup sync
- syncing, sending x messages
- syncing
perhaps we will want to put this as an ObservableValue on the SessionContainer ?
NO: When connected, syncing and not sending anything, just hide the thing for now? although when you send messages it will just pop in and out all the time.
- see if it makes sense for SendScheduler to use the same RetryDelay as Reconnector
- finally adjust all file names to their class names? e.g. camel case
- see if we want more dependency injection
- for classes from outside sdk
- for internal sdk classes? probably not yet

View file

@ -0,0 +1,18 @@
what should this new container be called?
- Client
- SessionContainer
it is what is returned from bootstrapping a ... thing
it allows you to replace classes within the client through IoC?
it wires up the different components
it unwires the components when you're done with the thing
it could hold all the dependencies for setting up a client, even before login
- online detection api
- clock
- homeserver
- requestFn
we'll be explicitly making its parts public though, like session, sync, reconnector
merge the connectionstate and

View file

@ -1,3 +1,7 @@
// ViewModel should just be an eventemitter, not an ObservableValue
// as in some cases it would really be more convenient to have multiple events (like telling the timeline to scroll down)
// we do need to return a disposable from EventEmitter.on, or at least have a method here to easily track a subscription to an EventEmitter
export class ViewModel extends ObservableValue { export class ViewModel extends ObservableValue {
constructor(options) { constructor(options) {
super(); super();

View file

@ -20,6 +20,14 @@ export default async function main(container) {
// window.getBrawlFetchLog = () => recorder.log(); // window.getBrawlFetchLog = () => recorder.log();
// normal network: // normal network:
const request = fetchRequest; const request = fetchRequest;
const clock = new DOMClock();
const sessionContainer = new SessionContainer({
clock,
request,
storageFactory: new StorageFactory(),
});
const vm = new BrawlViewModel({ const vm = new BrawlViewModel({
storageFactory: new StorageFactory(), storageFactory: new StorageFactory(),
createHsApi: (homeServer, accessToken, reconnector) => new HomeServerApi({homeServer, accessToken, request, reconnector}), createHsApi: (homeServer, accessToken, reconnector) => new HomeServerApi({homeServer, accessToken, request, reconnector}),

View file

@ -16,7 +16,7 @@ export class ExponentialRetryDelay {
const powerOfTwo = (seconds * seconds) * 1000; const powerOfTwo = (seconds * seconds) * 1000;
this._current = Math.max(this._max, powerOfTwo); this._current = Math.max(this._max, powerOfTwo);
} catch(err) { } catch(err) {
// swallow AbortError, means skipWaiting was called // swallow AbortError, means abort was called
if (!(err instanceof AbortError)) { if (!(err instanceof AbortError)) {
throw err; throw err;
} }
@ -25,7 +25,7 @@ export class ExponentialRetryDelay {
} }
} }
skipWaiting() { abort() {
if (this._timeout) { if (this._timeout) {
this._timeout.abort(); this._timeout.abort();
} }
@ -33,7 +33,7 @@ export class ExponentialRetryDelay {
reset() { reset() {
this._current = this._start; this._current = this._start;
this.skipWaiting(); this.abort();
} }
get nextValue() { get nextValue() {
@ -66,14 +66,13 @@ export const ConnectionState = createEnum(
"Online" "Online"
); );
export class Reconnector extends ObservableValue { export class Reconnector {
constructor({retryDelay, createTimeMeasure}) { constructor({retryDelay, createTimeMeasure, isOnline}) {
this._online this._isOnline = isOnline;
this._retryDelay = retryDelay; this._retryDelay = retryDelay;
this._currentDelay = null;
this._createTimeMeasure = createTimeMeasure; this._createTimeMeasure = createTimeMeasure;
// assume online, and do our thing when something fails // assume online, and do our thing when something fails
this._state = ConnectionState.Online; this._state = new ObservableValue(ConnectionState.Online);
this._isReconnecting = false; this._isReconnecting = false;
this._versionsResponse = null; this._versionsResponse = null;
} }
@ -82,39 +81,53 @@ export class Reconnector extends ObservableValue {
return this._versionsResponse; return this._versionsResponse;
} }
get state() { get connectionState() {
return this._state; return this._state;
} }
get retryIn() { get retryIn() {
if (this._state === ConnectionState.Waiting) { if (this._state.get() === ConnectionState.Waiting) {
return this._retryDelay.nextValue - this._stateSince.measure(); return this._retryDelay.nextValue - this._stateSince.measure();
} }
return 0; return 0;
} }
onRequestFailed(hsApi) { async onRequestFailed(hsApi) {
if (!this._isReconnecting) { if (!this._isReconnecting) {
this._setState(ConnectionState.Offline); this._setState(ConnectionState.Offline);
this._reconnectLoop(hsApi);
const isOnlineSubscription = this._isOnline && this._isOnline.subscribe(online => {
if (online) {
this.tryNow();
}
});
try {
await this._reconnectLoop(hsApi);
} finally {
if (isOnlineSubscription) {
// unsubscribe from this._isOnline
isOnlineSubscription();
}
}
} }
} }
tryNow() { tryNow() {
if (this._retryDelay) { if (this._retryDelay) {
this._retryDelay.skipWaiting(); // this will interrupt this._retryDelay.waitForRetry() in _reconnectLoop
this._retryDelay.abort();
} }
} }
_setState(state) { _setState(state) {
if (state !== this._state) { if (state !== this._state.get()) {
this._state = state; if (state === ConnectionState.Waiting) {
if (this._state === ConnectionState.Waiting) {
this._stateSince = this._createTimeMeasure(); this._stateSince = this._createTimeMeasure();
} else { } else {
this._stateSince = null; this._stateSince = null;
} }
this.emit(state); this._state.set(state);
} }
} }
@ -123,6 +136,7 @@ export class Reconnector extends ObservableValue {
this._versionsResponse = null; this._versionsResponse = null;
this._retryDelay.reset(); this._retryDelay.reset();
try {
while (!this._versionsResponse) { while (!this._versionsResponse) {
try { try {
this._setState(ConnectionState.Reconnecting); this._setState(ConnectionState.Reconnecting);
@ -132,10 +146,24 @@ export class Reconnector extends ObservableValue {
this._versionsResponse = await versionsRequest.response(); this._versionsResponse = await versionsRequest.response();
this._setState(ConnectionState.Online); this._setState(ConnectionState.Online);
} catch (err) { } catch (err) {
// NetworkError or AbortError from timeout if (err instanceof NetworkError) {
this._setState(ConnectionState.Waiting); this._setState(ConnectionState.Waiting);
try {
await this._retryDelay.waitForRetry(); await this._retryDelay.waitForRetry();
} catch (err) {
if (!(err instanceof AbortError)) {
throw err;
} }
} }
} else {
throw err;
}
}
}
} catch (err) {
// nothing is catching the error above us,
// so just log here
console.err(err);
}
} }
} }

View file

@ -62,6 +62,10 @@ export class SendScheduler {
// this._enabled; // this._enabled;
} }
stop() {
// TODO: abort current requests and set offline
}
// this should really be per roomId to avoid head-of-line blocking // this should really be per roomId to avoid head-of-line blocking
// //
// takes a callback instead of returning a promise with the slot // takes a callback instead of returning a promise with the slot

View file

@ -1,66 +1,118 @@
const factory = { import HomeServerApi from "./hs-api.js";
Clock: () => new DOMClock(),
Request: () => fetchRequest,
Online: () => new DOMOnline(),
HomeServerApi: ()
}
export const LoadState = createEnum( export const LoadStatus = createEnum(
"NotLoading",
"Login",
"LoginFailed",
"Loading", "Loading",
"InitialSync",
"Migrating", //not used atm, but would fit here "Migrating", //not used atm, but would fit here
"InitialSync",
"CatchupSync",
"Error", "Error",
"Ready", "Ready",
); );
class SessionContainer extends ObservableValue { export const LoginFailure = createEnum(
constructor({clock, random, isOnline, request, storageFactory, factory}) { "Network",
this.disposables = new Disposables(); "Credentials",
"Unknown",
);
export class SessionContainer {
constructor({clock, random, onlineStatus, request, storageFactory, sessionsStore}) {
this._random = random;
this._clock = clock;
this._onlineStatus = onlineStatus;
this._request = request;
this._storageFactory = storageFactory;
this._sessionsStore = sessionsStore;
this._status = new ObservableValue(LoadStatus.NotLoading);
this._error = null;
this._loginFailure = null;
this._reconnector = null;
this._session = null;
this._sync = null;
} }
dispose() { _createNewSessionId() {
this.disposables.dispose(); return (Math.floor(this._random() * Number.MAX_SAFE_INTEGER)).toString();
} }
get state() { async startWithExistingSession(sessionId) {
return this._state; if (this._status.get() !== LoadStatus.NotLoading) {
return;
} }
this._status.set(LoadStatus.Loading);
_setState(state) {
if (state !== this._state) {
const previousState = this._state;
this._state = state;
this.emit(previousState);
}
}
get sync() {
return this._sync;
}
get session() {
return this._session;
}
_createReconnector() {
const reconnector = new Reconnector(
new ExponentialRetryDelay(2000, this._clock.createTimeout),
this._clock.createMeasure
);
// retry connection immediatly when online is detected
this.disposables.track(isOnline.subscribe(online => {
if(online) {
reconnector.tryNow();
}
}));
return reconnector;
}
async start(sessionInfo) {
try { try {
this._setState(LoadState.Loading); const sessionInfo = await this._sessionsStore.get(sessionId);
this._reconnector = this._createReconnector(); await this._loadSessionInfo(sessionInfo);
const hsApi = this._createHsApi(sessionInfo.homeServer, sessionInfo.accessToken, this._reconnector); } catch (err) {
this._error = err;
this._status.set(LoadStatus.Error);
}
}
async startWithLogin(homeServer, username, password) {
if (this._status.get() !== LoadStatus.NotLoading) {
return;
}
this._status.set(LoadStatus.Login);
let sessionInfo;
try {
const hsApi = new HomeServerApi({homeServer, request: this._request});
const loginData = await hsApi.passwordLogin(username, password).response();
const sessionId = this._createNewSessionId();
sessionInfo = {
id: sessionId,
deviceId: loginData.device_id,
userId: loginData.user_id,
homeServer: homeServer,
accessToken: loginData.access_token,
lastUsed: this._clock.now()
};
await this._sessionsStore.add(sessionInfo);
} catch (err) {
this._error = err;
if (err instanceof HomeServerError) {
if (err.statusCode === 403) {
this._loginFailure = LoginFailure.Credentials;
} else {
this._loginFailure = LoginFailure.Unknown;
}
this._status.set(LoadStatus.LoginFailure);
} else if (err instanceof NetworkError) {
this._loginFailure = LoginFailure.Network;
this._status.set(LoadStatus.LoginFailure);
} else {
this._status.set(LoadStatus.Error);
}
return;
}
// loading the session can only lead to
// LoadStatus.Error in case of an error,
// so separate try/catch
try {
await this._loadSessionInfo(sessionInfo);
} catch (err) {
this._error = err;
this._status.set(LoadStatus.Error);
}
}
async _loadSessionInfo(sessionInfo) {
this._status.set(LoadStatus.Loading);
this._reconnector = new Reconnector({
onlineStatus: this._onlineStatus,
delay: new ExponentialRetryDelay(2000, this._clock.createTimeout),
createMeasure: this._clock.createMeasure
});
const hsApi = new HomeServerApi({
homeServer: sessionInfo.homeServer,
accessToken: sessionInfo.accessToken,
request: this._request,
reconnector: this._reconnector,
});
const storage = await this._storageFactory.create(sessionInfo.id); const storage = await this._storageFactory.create(sessionInfo.id);
// no need to pass access token to session // no need to pass access token to session
const filteredSessionInfo = { const filteredSessionInfo = {
@ -70,26 +122,105 @@ class SessionContainer extends ObservableValue {
}; };
this._session = new Session({storage, sessionInfo: filteredSessionInfo, hsApi}); this._session = new Session({storage, sessionInfo: filteredSessionInfo, hsApi});
await this._session.load(); await this._session.load();
this._sync = new Sync({hsApi, storage, this._session});
// notify sync and session when back online
this.disposables.track(reconnector.subscribe(state => {
this._sync.start();
session.notifyNetworkAvailable(reconnector.lastVersionsResponse);
}));
const needsInitialSync = !this._session.syncToken; const needsInitialSync = !this._session.syncToken;
if (!needsInitialSync) { if (!needsInitialSync) {
this._setState(LoadState.Ready); this._status.set(LoadStatus.CatchupSync);
} else { } else {
this._setState(LoadState.InitialSync); this._status.set(LoadStatus.InitialSync);
} }
this._sync = new Sync({hsApi, storage, session: this._session});
// notify sync and session when back online
this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => {
if (state === ConnectionStatus.Online) {
this._sync.start();
this._session.start(this._reconnector.lastVersionsResponse);
}
});
try {
await this._sync.start(); await this._sync.start();
this._setState(LoadState.Ready);
this._session.notifyNetworkAvailable();
} catch (err) { } catch (err) {
this._error = err; // swallow NetworkError here and continue,
this._setState(LoadState.Error); // as the reconnector above will call
// sync.start again to retry in this case
if (!(err instanceof NetworkError)) {
throw err;
} }
} }
// only transition into Ready once the first sync has succeeded
await this._sync.status.waitFor(s => s === SyncStatus.Syncing);
this._status.set(LoadStatus.Ready);
// if this fails, the reconnector will start polling versions to reconnect
const lastVersionsResponse = await hsApi.versions({timeout: 10000}).response();
this._session.start(lastVersionsResponse);
}
get loadStatus() {
return this._status;
}
get loadError() {
return this._error;
}
/** only set at loadStatus InitialSync, CatchupSync or Ready */
get sync() {
return this._sync;
}
/** only set at loadStatus InitialSync, CatchupSync or Ready */
get session() {
return this._session;
}
stop() {
this._reconnectSubscription();
this._reconnectSubscription = null;
this._sync.stop();
this._session.stop();
}
} }
/*
function main() {
// these are only required for external classes,
// SessionFactory has it's defaults for internal classes
const sessionFactory = new SessionFactory({
Clock: DOMClock,
OnlineState: DOMOnlineState,
SessionsStore: LocalStorageSessionStore, // should be called SessionInfoStore?
StorageFactory: window.indexedDB ? IDBStorageFactory : MemoryStorageFactory, // should be called StorageManager?
// should be moved to StorageFactory as `KeyBounds`?: minStorageKey, middleStorageKey, maxStorageKey
// would need to pass it into EventKey though
request,
});
// lets not do this in a first cut
// internally in the matrix lib
const room = new creator.ctor("Room", Room)({});
// or short
const sessionFactory = new SessionFactory(WebFactory);
// sessionFactory.sessionInfoStore
// registration
// const registration = sessionFactory.registerUser();
// registration.stage
const container = sessionFactory.startWithRegistration(registration);
const container = sessionFactory.startWithLogin(server, username, password);
const container = sessionFactory.startWithExistingSession(sessionId);
// container.loadStatus is an ObservableValue<LoadStatus>
await container.loadStatus.waitFor(s => s === LoadStatus.Loaded || s === LoadStatus.CatchupSync);
// loader isn't needed anymore from now on
const {session, sync, reconnector} = container;
container.stop();
}
*/

View file

@ -3,6 +3,7 @@ export class HomeServerError extends Error {
super(`${body ? body.error : status} on ${method} ${url}`); super(`${body ? body.error : status} on ${method} ${url}`);
this.errcode = body ? body.errcode : null; this.errcode = body ? body.errcode : null;
this.retry_after_ms = body ? body.retry_after_ms : 0; this.retry_after_ms = body ? body.retry_after_ms : 0;
this.statusCode = status;
} }
get isFatal() { get isFatal() {

View file

@ -81,7 +81,6 @@ export default class HomeServerApi {
requestResult.response().then(() => timeout.abort()); requestResult.response().then(() => timeout.abort());
} }
const wrapper = new RequestWrapper(method, url, requestResult); const wrapper = new RequestWrapper(method, url, requestResult);
if (this._reconnector) { if (this._reconnector) {

View file

@ -57,7 +57,7 @@ export default function fetchRequest(url, options) {
// this can either mean user is offline, server is offline, or a CORS error (server misconfiguration). // this can either mean user is offline, server is offline, or a CORS error (server misconfiguration).
// //
// One could check navigator.onLine to rule out the first // One could check navigator.onLine to rule out the first
// but the 2 later ones are indistinguishable from javascript. // but the 2 latter ones are indistinguishable from javascript.
throw new NetworkError(`${options.method} ${url}: ${err.message}`); throw new NetworkError(`${options.method} ${url}: ${err.message}`);
} }
throw err; throw err;

View file

@ -40,7 +40,11 @@ export default class Session {
})); }));
} }
notifyNetworkAvailable(lastVersionResponse) { stop() {
this._sendScheduler.stop();
}
start(lastVersionResponse) {
for (const [, room] of this._rooms) { for (const [, room] of this._rooms) {
room.resumeSending(); room.resumeSending();
} }

View file

@ -32,7 +32,7 @@ export default class BaseObservableCollection {
} }
// like an EventEmitter, but doesn't have an event type // like an EventEmitter, but doesn't have an event type
export class ObservableValue extends BaseObservableCollection { export class BaseObservableValue extends BaseObservableCollection {
emit(argument) { emit(argument) {
for (const h of this._handlers) { for (const h of this._handlers) {
h(argument); h(argument);
@ -40,6 +40,22 @@ export class ObservableValue extends BaseObservableCollection {
} }
} }
export class ObservableValue extends BaseObservableValue {
constructor(initialValue) {
super();
this._value = initialValue;
}
get() {
return this._value;
}
set(value) {
this._value = value;
this.emit(this._value);
}
}
export function tests() { export function tests() {
class Collection extends BaseObservableCollection { class Collection extends BaseObservableCollection {
constructor() { constructor() {

View file

@ -1,4 +1,4 @@
export class Online extends ObservableValue { export class OnlineStatus extends ObservableValue {
constructor() { constructor() {
super(); super();
this._onOffline = this._onOffline.bind(this); this._onOffline = this._onOffline.bind(this);