manage request scheduler in session container

so we can start it before sync does its first request,
which otherwise gets aborted because the scheduler hasn't started yet
This commit is contained in:
Bruno Windels 2020-09-22 16:39:41 +02:00
parent 150f06b9bf
commit 137f55b44d
3 changed files with 21 additions and 24 deletions

View file

@ -16,7 +16,6 @@ limitations under the License.
import {Room} from "./room/Room.js";
import { ObservableMap } from "../observable/index.js";
import {RequestScheduler} from "./net/RequestScheduler.js";
import {User} from "./User.js";
import {DeviceMessageHandler} from "./DeviceMessageHandler.js";
import {Account as E2EEAccount} from "./e2ee/Account.js";
@ -45,8 +44,7 @@ export class Session {
constructor({clock, storage, hsApi, sessionInfo, olm, olmWorker, cryptoDriver, mediaRepository}) {
this._clock = clock;
this._storage = storage;
this._requestScheduler = new RequestScheduler({hsApi, clock});
this._hsApi = this._requestScheduler.createHomeServerApiWrapper();
this._hsApi = hsApi;
this._mediaRepository = mediaRepository;
this._syncInfo = null;
this._sessionInfo = sessionInfo;
@ -267,13 +265,8 @@ export class Session {
}));
}
get isStarted() {
return this._requestScheduler.isStarted;
}
dispose() {
this._olmWorker?.dispose();
this._requestScheduler.stop();
this._sessionBackup?.dispose();
for (const room of this._rooms.values()) {
room.dispose();
@ -297,7 +290,6 @@ export class Session {
const operations = await opsTxn.operations.getAll();
const operationsByScope = groupBy(operations, o => o.scope);
this._requestScheduler.start();
for (const [, room] of this._rooms) {
let roomOperationsByType;
const roomOperations = operationsByScope.get(room.id);

View file

@ -20,6 +20,7 @@ import {HomeServerApi} from "./net/HomeServerApi.js";
import {Reconnector, ConnectionStatus} from "./net/Reconnector.js";
import {ExponentialRetryDelay} from "./net/ExponentialRetryDelay.js";
import {MediaRepository} from "./net/MediaRepository.js";
import {RequestScheduler} from "./net/RequestScheduler.js";
import {HomeServerError, ConnectionError, AbortError} from "./error.js";
import {Sync, SyncStatus} from "./Sync.js";
import {Session} from "./Session.js";
@ -50,7 +51,7 @@ export class SessionContainer {
this._request = request;
this._storageFactory = storageFactory;
this._sessionInfoStorage = sessionInfoStorage;
this._sessionStartedByReconnector = false;
this._status = new ObservableValue(LoadStatus.NotLoading);
this._error = null;
this._loginFailure = null;
@ -59,6 +60,7 @@ export class SessionContainer {
this._sync = null;
this._sessionId = null;
this._storage = null;
this._requestScheduler = null;
this._olmPromise = olmPromise;
this._workerPromise = workerPromise;
this._cryptoDriver = cryptoDriver;
@ -133,6 +135,7 @@ export class SessionContainer {
}
async _loadSessionInfo(sessionInfo, isNewLogin) {
this._sessionStartedByReconnector = false;
this._status.set(LoadStatus.Loading);
this._reconnector = new Reconnector({
onlineStatus: this._onlineStatus,
@ -159,10 +162,12 @@ export class SessionContainer {
if (this._workerPromise) {
olmWorker = await this._workerPromise;
}
this._requestScheduler = new RequestScheduler({hsApi, clock: this._clock});
this._requestScheduler.start();
this._session = new Session({
storage: this._storage,
sessionInfo: filteredSessionInfo,
hsApi,
hsApi: this._requestScheduler.hsApi,
olm,
clock: this._clock,
olmWorker,
@ -173,11 +178,14 @@ export class SessionContainer {
this._status.set(LoadStatus.SessionSetup);
await this._session.beforeFirstSync(isNewLogin);
this._sync = new Sync({hsApi, storage: this._storage, session: this._session});
this._sync = new Sync({hsApi: this._requestScheduler.hsApi, storage: this._storage, session: this._session});
// notify sync and session when back online
this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => {
if (state === ConnectionStatus.Online) {
// needs to happen before sync and session or it would abort all requests
this._requestScheduler.start();
this._sync.start();
this._sessionStartedByReconnector = true;
this._session.start(this._reconnector.lastVersionsResponse);
}
});
@ -189,11 +197,7 @@ export class SessionContainer {
// restored the connection, it would have already
// started to session, so check first
// to prevent an extra /versions request
// TODO: this doesn't look logical, but works. Why?
// I think because isStarted is true by default. That's probably not what we intend.
// I think there is a bug here, in that even if the reconnector already started the session, we'd still do this.
if (this._session.isStarted) {
if (!this._sessionStartedByReconnector) {
const lastVersionsResponse = await hsApi.versions({timeout: 10000}).response();
this._session.start(lastVersionsResponse);
}
@ -259,6 +263,9 @@ export class SessionContainer {
this._reconnectSubscription();
this._reconnectSubscription = null;
}
if (this._requestScheduler) {
this._requestScheduler.stop();
}
if (this._sync) {
this._sync.stop();
}

View file

@ -1,5 +1,6 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -65,11 +66,12 @@ export class RequestScheduler {
this._requests = new Set();
this._isRateLimited = false;
this._isDrainingRateLimit = false;
this._stopped = false;
this._stopped = true;
this._wrapper = new HomeServerApiWrapper(this);
}
createHomeServerApiWrapper() {
return new HomeServerApiWrapper(this);
get hsApi() {
return this._wrapper;
}
stop() {
@ -84,10 +86,6 @@ export class RequestScheduler {
this._stopped = false;
}
get isStarted() {
return !this._stopped;
}
_hsApiRequest(name, args) {
const request = new Request(name, args);
this._doSend(request);