replace isSyncing and emit with an Observable SyncStatus

This commit is contained in:
Bruno Windels 2020-04-19 19:52:26 +02:00
parent 80f7caadbe
commit 72b0eefccb
3 changed files with 41 additions and 32 deletions

View file

@ -41,11 +41,12 @@ rooms should report how many messages they have queued up, and each time they se
- add waitFor (won't this leak if the promise never resolves?)
- decide whether we want to inherit (no?)
- DONE: cleanup Reconnector with recent changes, move generic code, make imports work
- add SyncStatus as ObservableValue of enum in Sync
- show load progress in LoginView/SessionPickView and do away with loading screen
- DONE: add SyncStatus as ObservableValue of enum in Sync
- cleanup SessionContainer
- change main.js to pass in a creation function of a SessionContainer instead of everything it is replacing
- show load progress in LoginView/SessionPickView and do away with loading screen
- adjust BrawlViewModel, SessionPickViewModel and LoginViewModel to use a SessionContainer
- rename SessionsStore to SessionInfoStorage
- DONE: rename SessionsStore to SessionInfoStorage
- make sure we've renamed all \*State enums and fields to \*Status
- add pendingMessageCount prop to SendQueue and Room, aggregate this in Session
- add completedFirstSync to Sync, so we can check if the catchup or initial sync is still in progress

View file

@ -6,8 +6,7 @@ export const LoadStatus = createEnum(
"LoginFailed",
"Loading",
"Migrating", //not used atm, but would fit here
"InitialSync",
"CatchupSync",
"FirstSync",
"Error",
"Ready",
);
@ -127,7 +126,6 @@ export class SessionContainer {
if (!needsInitialSync) {
this._status.set(LoadStatus.CatchupSync);
} else {
this._status.set(LoadStatus.InitialSync);
}
this._sync = new Sync({hsApi, storage, session: this._session});
@ -148,7 +146,8 @@ export class SessionContainer {
async _waitForFirstSync() {
try {
await this._sync.start();
this._sync.start();
this._status.set(LoadStatus.FirstSync);
} catch (err) {
// swallow ConnectionError here and continue,
// as the reconnector above will call

View file

@ -1,9 +1,17 @@
import {AbortError} from "./error.js";
import EventEmitter from "../EventEmitter.js";
import ObservableValue from "../observable/ObservableValue.js";
import {createEnum} from "../utils/enum.js";
const INCREMENTAL_TIMEOUT = 30000;
const SYNC_EVENT_LIMIT = 10;
export const SyncStatus = createEnum(
"InitialSync",
"CatchupSync",
"Syncing",
"Stopped"
);
function parseRooms(roomsSection, roomCallback) {
if (roomsSection) {
const allMemberships = ["join", "invite", "leave"];
@ -19,53 +27,54 @@ function parseRooms(roomsSection, roomCallback) {
return [];
}
export default class Sync extends EventEmitter {
export default class Sync {
constructor({hsApi, session, storage}) {
super();
this._hsApi = hsApi;
this._session = session;
this._storage = storage;
this._isSyncing = false;
this._currentRequest = null;
this._status = new ObservableValue(SyncStatus.Stopped);
this._error = null;
}
get isSyncing() {
return this._isSyncing;
get status() {
return this._status;
}
// this should not throw?
// returns when initial sync is done
async start() {
if (this._isSyncing) {
/** the error that made the sync stop */
get error() {
return this._error;
}
start() {
// not already syncing?
if (this._status.get() !== SyncStatus.Stopped) {
return;
}
this._isSyncing = true;
this.emit("status", "started");
let syncToken = this._session.syncToken;
// do initial sync if needed
if (!syncToken) {
// need to create limit filter here
syncToken = await this._syncRequest();
if (syncToken) {
this._status.set(SyncStatus.CatchupSync);
} else {
this._status.set(SyncStatus.InitialSync);
}
this._syncLoop(syncToken);
}
async _syncLoop(syncToken) {
// if syncToken is falsy, it will first do an initial sync ...
while(this._isSyncing) {
while(this._status.get() !== SyncStatus.Stopped) {
try {
console.log(`starting sync request with since ${syncToken} ...`);
syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT);
const timeout = syncToken ? INCREMENTAL_TIMEOUT : undefined;
syncToken = await this._syncRequest(syncToken, timeout);
this._status.set(SyncStatus.Syncing);
} catch (err) {
this._isSyncing = false;
if (!(err instanceof AbortError)) {
console.error("stopping sync because of error");
console.error(err);
this.emit("status", "error", err);
this._error = err;
this._status.set(SyncStatus.Stopped);
}
}
}
this.emit("status", "stopped");
}
async _syncRequest(syncToken, timeout) {
@ -128,10 +137,10 @@ export default class Sync extends EventEmitter {
}
stop() {
if (!this._isSyncing) {
if (this._status.get() === SyncStatus.Stopped) {
return;
}
this._isSyncing = false;
this._status.set(SyncStatus.Stopped);
if (this._currentRequest) {
this._currentRequest.abort();
this._currentRequest = null;