This commit is contained in:
Bruno Windels 2020-04-05 15:11:15 +02:00
parent c980f682c6
commit ef267ca331
8 changed files with 118 additions and 94 deletions

View file

@ -1,22 +1,22 @@
# Reconnecting # Reconnecting
`HomeServerApi` notifies `Reconnecter` of network call failure `HomeServerApi` notifies `Reconnector` of network call failure
`Reconnecter` listens for online/offline event `Reconnector` listens for online/offline event
`Reconnecter` polls `/versions` with a `RetryDelay` (implemented as ExponentialRetryDelay, also used by SendScheduler if no retry_after_ms is given) `Reconnector` polls `/versions` with a `RetryDelay` (implemented as ExponentialRetryDelay, also used by SendScheduler if no retry_after_ms is given)
`Reconnecter` emits an event when sync and message sending should retry `Reconnector` emits an event when sync and message sending should retry
`Sync` listen to `Reconnecter` `Sync` listen to `Reconnector`
`Sync` notifies when the catchup sync has happened `Sync` notifies when the catchup sync has happened
`Reconnecter` has state: `Reconnector` has state:
- disconnected (and retrying at x seconds from timestamp) - disconnected (and retrying at x seconds from timestamp)
- reconnecting (call /versions, and if successful /sync) - reconnecting (call /versions, and if successful /sync)
- connected - connected
`Reconnecter` has a method to try to connect now `Reconnector` has a method to try to connect now
`SessionStatus` can be: `SessionStatus` can be:
- disconnected (and retrying at x seconds from timestamp) - disconnected (and retrying at x seconds from timestamp)
@ -31,4 +31,4 @@ rooms should report how many messages they have queued up, and each time they se
`SendReporter` (passed from `Session` to `Room`, passed down to `SendQueue`), with: `SendReporter` (passed from `Session` to `Room`, passed down to `SendQueue`), with:
- setPendingEventCount(roomId, count) - setPendingEventCount(roomId, count)
`Session` listens to `Reconnecter` to update it's status, but perhaps we wait to send messages until catchup sync is done `Session` listens to `Reconnector` to update it's status, but perhaps we wait to send messages until catchup sync is done

View file

@ -126,7 +126,11 @@ export default class BrawlViewModel extends EventEmitter {
try { try {
this._loading = true; this._loading = true;
this._loadingText = "Loading your conversations…"; this._loadingText = "Loading your conversations…";
const hsApi = this._createHsApi(sessionInfo.homeServer, sessionInfo.accessToken); const reconnector = new Reconnector(
new ExponentialRetryDelay(2000, this._clock.createTimeout),
this._clock.createMeasure
);
const hsApi = this._createHsApi(sessionInfo.homeServer, sessionInfo.accessToken, reconnector);
const storage = await this._storageFactory.create(sessionInfo.id); const storage = await this._storageFactory.create(sessionInfo.id);
// no need to pass access token to session // no need to pass access token to session
const filteredSessionInfo = { const filteredSessionInfo = {
@ -136,10 +140,16 @@ export default class BrawlViewModel extends EventEmitter {
}; };
const session = new Session({storage, sessionInfo: filteredSessionInfo, hsApi}); const session = new Session({storage, sessionInfo: filteredSessionInfo, hsApi});
// show spinner now, with title loading stored data? // show spinner now, with title loading stored data?
this.emit("change", "activeSection"); this.emit("change", "activeSection");
await session.load(); await session.load();
const sync = new Sync({hsApi, storage, session}); const sync = new Sync({hsApi, storage, session});
reconnector.on("state", state => {
if (state === ConnectionState.Online) {
sync.start();
session.notifyNetworkAvailable(reconnector.lastVersionsResponse);
}
});
const needsInitialSync = !session.syncToken; const needsInitialSync = !session.syncToken;
if (!needsInitialSync) { if (!needsInitialSync) {

View file

@ -5,6 +5,7 @@ import StorageFactory from "./matrix/storage/idb/create.js";
import SessionsStore from "./matrix/sessions-store/localstorage/SessionsStore.js"; import SessionsStore from "./matrix/sessions-store/localstorage/SessionsStore.js";
import BrawlViewModel from "./domain/BrawlViewModel.js"; import BrawlViewModel from "./domain/BrawlViewModel.js";
import BrawlView from "./ui/web/BrawlView.js"; import BrawlView from "./ui/web/BrawlView.js";
import DOMClock from "./utils/DOMClock.js";
export default async function main(container) { export default async function main(container) {
try { try {
@ -17,14 +18,13 @@ export default async function main(container) {
// const recorder = new RecordRequester(fetchRequest); // const recorder = new RecordRequester(fetchRequest);
// const request = recorder.request; // const request = recorder.request;
// window.getBrawlFetchLog = () => recorder.log(); // window.getBrawlFetchLog = () => recorder.log();
// normal network: // normal network:
const request = fetchRequest; const request = fetchRequest;
const vm = new BrawlViewModel({ const vm = new BrawlViewModel({
storageFactory: new StorageFactory(), storageFactory: new StorageFactory(),
createHsApi: (homeServer, accessToken = null) => new HomeServerApi({homeServer, accessToken, request}), createHsApi: (homeServer, accessToken, reconnector) => new HomeServerApi({homeServer, accessToken, request, reconnector}),
sessionStore: new SessionsStore("brawl_sessions_v1"), sessionStore: new SessionsStore("brawl_sessions_v1"),
clock: Date //just for `now` fn clock: new DOMClock(),
}); });
await vm.load(); await vm.load();
const view = new BrawlView(vm); const view = new BrawlView(vm);

View file

@ -1,53 +1,49 @@
class Clock {
// use cases
// StopWatch: not sure I like that name ... but measure time difference from start to current time
// Timeout: wait for a given number of ms, and be able to interrupt the wait
// Clock.timeout() -> creates a new timeout?
// Now: get current timestamp
// Clock.now(), or pass Clock.now so others can do now()
//
// should use subinterfaces so we can only pass part needed to other constructors
//
}
// need to prevent memory leaks here! // need to prevent memory leaks here!
export class DomOnlineDetected { export class DomOnlineDetected {
constructor(reconnecter) { constructor(reconnector) {
// window.addEventListener('offline', () => appendOnlineStatus(false)); // window.addEventListener('offline', () => appendOnlineStatus(false));
// window.addEventListener('online', () => appendOnlineStatus(true)); // window.addEventListener('online', () => appendOnlineStatus(true));
// appendOnlineStatus(navigator.onLine); // appendOnlineStatus(navigator.onLine);
// on online, reconnecter.tryNow() // on online, reconnector.tryNow()
} }
} }
export class ExponentialRetryDelay { export class ExponentialRetryDelay {
constructor(start = 2000, delay) { constructor(start = 2000, createTimeout) {
this._start = start; this._start = start;
this._current = start; this._current = start;
this._delay = delay; this._createTimeout = createTimeout;
this._max = 60 * 5 * 1000; //5 min this._max = 60 * 5 * 1000; //5 min
this._timer = null; this._timeout = null;
} }
async waitForRetry() { async waitForRetry() {
this._timer = this._delay(this._current); this._timeout = this._createTimeout(this._current);
try { try {
await this._timer.timeout(); await this._timeout.elapsed();
// only increase delay if we didn't get interrupted // only increase delay if we didn't get interrupted
const seconds = this._current / 1000; const seconds = this._current / 1000;
const powerOfTwo = (seconds * seconds) * 1000; const powerOfTwo = (seconds * seconds) * 1000;
this._current = Math.max(this._max, powerOfTwo); this._current = Math.max(this._max, powerOfTwo);
} catch(err) {
// swallow AbortError, means skipWaiting was called
if (!(err instanceof AbortError)) {
throw err;
}
} finally { } finally {
this._timer = null; this._timeout = null;
}
}
skipWaiting() {
if (this._timeout) {
this._timeout.abort();
} }
} }
reset() { reset() {
this._current = this._start; this._current = this._start;
if (this._timer) { this.skipWaiting();
this._timer.abort();
}
} }
get nextValue() { get nextValue() {
@ -80,13 +76,12 @@ export const ConnectionState = createEnum(
"Online" "Online"
); );
export class Reconnecter { export class Reconnector {
constructor({hsApi, retryDelay, clock}) { constructor({retryDelay, createTimeMeasure}) {
this._online this._online
this._retryDelay = retryDelay; this._retryDelay = retryDelay;
this._currentDelay = null; this._currentDelay = null;
this._hsApi = hsApi; this._createTimeMeasure = createTimeMeasure;
this._clock = clock;
// assume online, and do our thing when something fails // assume online, and do our thing when something fails
this._state = ConnectionState.Online; this._state = ConnectionState.Online;
this._isReconnecting = false; this._isReconnecting = false;
@ -102,25 +97,22 @@ export class Reconnecter {
} }
get retryIn() { get retryIn() {
return this._stateSince.measure(); if (this._state === ConnectionState.Waiting) {
return this._retryDelay.nextValue - this._stateSince.measure();
}
return 0;
} }
onRequestFailed() { onRequestFailed(hsApi) {
if (!this._isReconnecting) { if (!this._isReconnecting) {
this._setState(ConnectionState.Offline); this._setState(ConnectionState.Offline);
// do something with versions response of loop here? this._reconnectLoop(hsApi);
// we might want to pass it to session to know what server supports?
// so emit it ...
this._reconnectLoop();
// start loop
} }
} }
// don't throw from here
tryNow() { tryNow() {
// skip waiting if (this._retryDelay) {
if (this._currentDelay) { this._retryDelay.skipWaiting();
this._currentDelay.abort();
} }
} }
@ -128,7 +120,7 @@ export class Reconnecter {
if (state !== this._state) { if (state !== this._state) {
this._state = state; this._state = state;
if (this._state === ConnectionState.Waiting) { if (this._state === ConnectionState.Waiting) {
this._stateSince = this._clock.stopwatch(); this._stateSince = this._createTimeMeasure();
} else { } else {
this._stateSince = null; this._stateSince = null;
} }
@ -136,30 +128,23 @@ export class Reconnecter {
} }
} }
async _reconnectLoop() { async _reconnectLoop(hsApi) {
this._isReconnecting = true; this._isReconnecting = true;
this._retryDelay.reset();
this._versionsResponse = null; this._versionsResponse = null;
this._retryDelay.reset();
while (!this._versionsResponse) { while (!this._versionsResponse) {
// TODO: should we wait first or request first?
// as we've just failed a request? I guess no harm in trying immediately
try { try {
this._setState(ConnectionState.Reconnecting); this._setState(ConnectionState.Reconnecting);
const versionsRequest = this._hsApi.versions(10000); // use 10s timeout, because we don't want to be waiting for
// a stale connection when we just came online again
const versionsRequest = hsApi.versions({timeout: 10000});
this._versionsResponse = await versionsRequest.response(); this._versionsResponse = await versionsRequest.response();
this._setState(ConnectionState.Online); this._setState(ConnectionState.Online);
} catch (err) { } catch (err) {
// NetworkError or AbortError from timeout
this._setState(ConnectionState.Waiting); this._setState(ConnectionState.Waiting);
this._currentDelay = this._retryDelay.next(); await this._retryDelay.waitForRetry();
try {
await this._currentDelay
} catch (err) {
// waiting interrupted, we should retry immediately,
// swallow error
} finally {
this._currentDelay = null;
}
} }
} }
} }

View file

@ -1,5 +1,6 @@
import { import {
HomeServerError, HomeServerError,
NetworkError,
} from "./error.js"; } from "./error.js";
class RequestWrapper { class RequestWrapper {
@ -28,19 +29,21 @@ class RequestWrapper {
} }
export default class HomeServerApi { export default class HomeServerApi {
constructor({homeServer, accessToken, request}) { constructor({homeServer, accessToken, request, createTimeout, reconnector}) {
// store these both in a closure somehow so it's harder to get at in case of XSS? // store these both in a closure somehow so it's harder to get at in case of XSS?
// one could change the homeserver as well so the token gets sent there, so both must be protected from read/write // one could change the homeserver as well so the token gets sent there, so both must be protected from read/write
this._homeserver = homeServer; this._homeserver = homeServer;
this._accessToken = accessToken; this._accessToken = accessToken;
this._requestFn = request; this._requestFn = request;
this._createTimeout = createTimeout;
this._reconnector = reconnector;
} }
_url(csPath) { _url(csPath) {
return `${this._homeserver}/_matrix/client/r0${csPath}`; return `${this._homeserver}/_matrix/client/r0${csPath}`;
} }
_request(method, url, queryParams = {}, body) { _request(method, url, queryParams = {}, body, options) {
const queryString = Object.entries(queryParams) const queryString = Object.entries(queryParams)
.filter(([, value]) => value !== undefined) .filter(([, value]) => value !== undefined)
.map(([name, value]) => { .map(([name, value]) => {
@ -66,51 +69,73 @@ export default class HomeServerApi {
headers, headers,
body: bodyString, body: bodyString,
}); });
return new RequestWrapper(method, url, requestResult);
if (options.timeout) {
const timeout = this._createTimeout(options.timeout);
// abort request if timeout finishes first
timeout.elapsed().then(
() => requestResult.abort(),
() => {} // ignore AbortError
);
// abort timeout if request finishes first
requestResult.response().then(() => timeout.abort());
}
const wrapper = new RequestWrapper(method, url, requestResult);
if (this._reconnector) {
wrapper.response().catch(err => {
if (err instanceof NetworkError) {
this._reconnector.onRequestFailed(this);
}
});
}
return wrapper;
} }
_post(csPath, queryParams, body) { _post(csPath, queryParams, body, options) {
return this._request("POST", this._url(csPath), queryParams, body); return this._request("POST", this._url(csPath), queryParams, body, options);
} }
_put(csPath, queryParams, body) { _put(csPath, queryParams, body, options) {
return this._request("PUT", this._url(csPath), queryParams, body); return this._request("PUT", this._url(csPath), queryParams, body, options);
} }
_get(csPath, queryParams, body) { _get(csPath, queryParams, body, options) {
return this._request("GET", this._url(csPath), queryParams, body); return this._request("GET", this._url(csPath), queryParams, body, options);
} }
sync(since, filter, timeout) { sync(since, filter, timeout, options = null) {
return this._get("/sync", {since, timeout, filter}); return this._get("/sync", {since, timeout, filter}, null, options);
} }
// params is from, dir and optionally to, limit, filter. // params is from, dir and optionally to, limit, filter.
messages(roomId, params) { messages(roomId, params, options = null) {
return this._get(`/rooms/${encodeURIComponent(roomId)}/messages`, params); return this._get(`/rooms/${encodeURIComponent(roomId)}/messages`, params, null, options);
} }
send(roomId, eventType, txnId, content) { send(roomId, eventType, txnId, content, options = null) {
return this._put(`/rooms/${encodeURIComponent(roomId)}/send/${encodeURIComponent(eventType)}/${encodeURIComponent(txnId)}`, {}, content); return this._put(`/rooms/${encodeURIComponent(roomId)}/send/${encodeURIComponent(eventType)}/${encodeURIComponent(txnId)}`, {}, content, options);
} }
passwordLogin(username, password) { passwordLogin(username, password, options = null) {
return this._post("/login", undefined, { return this._post("/login", null, {
"type": "m.login.password", "type": "m.login.password",
"identifier": { "identifier": {
"type": "m.id.user", "type": "m.id.user",
"user": username "user": username
}, },
"password": password "password": password
}); }, options);
} }
createFilter(userId, filter) { createFilter(userId, filter, options = null) {
return this._post(`/user/${encodeURIComponent(userId)}/filter`, undefined, filter); return this._post(`/user/${encodeURIComponent(userId)}/filter`, null, filter, options);
} }
versions(timeout) { versions(options = null) {
// TODO: implement timeout return this._request("GET", `${this._homeserver}/_matrix/client/versions`, null, options);
return this._request("GET", `${this._homeserver}/_matrix/client/versions`);
} }
} }

View file

@ -40,7 +40,7 @@ export default class Session {
})); }));
} }
notifyNetworkAvailable() { notifyNetworkAvailable(lastVersionResponse) {
for (const [, room] of this._rooms) { for (const [, room] of this._rooms) {
room.resumeSending(); room.resumeSending();
} }

View file

@ -33,6 +33,7 @@ export default class Sync extends EventEmitter {
return this._isSyncing; return this._isSyncing;
} }
// this should not throw?
// returns when initial sync is done // returns when initial sync is done
async start() { async start() {
if (this._isSyncing) { if (this._isSyncing) {

View file

@ -3,22 +3,25 @@ import {AbortError} from "./error.js";
class DOMTimeout { class DOMTimeout {
constructor(ms) { constructor(ms) {
this._reject = null; this._reject = null;
this._handle = null;
this._promise = new Promise((resolve, reject) => { this._promise = new Promise((resolve, reject) => {
this._reject = reject; this._reject = reject;
setTimeout(() => { this._handle = setTimeout(() => {
this._reject = null; this._reject = null;
resolve(); resolve();
}, ms); }, ms);
}); });
} }
get elapsed() { elapsed() {
return this._promise; return this._promise;
} }
abort() { abort() {
if (this._reject) { if (this._reject) {
this._reject(new AbortError()); this._reject(new AbortError());
clearTimeout(this._handle);
this._handle = null;
this._reject = null; this._reject = null;
} }
} }