whitespace

This commit is contained in:
Bruno Windels 2019-05-12 20:26:46 +02:00
parent e3328f0fef
commit 10457611f9
2 changed files with 139 additions and 139 deletions

View file

@ -3,59 +3,59 @@ import { ObservableMap } from "../observable/index.js";
export default class Session { export default class Session {
// sessionInfo contains deviceId, userId and homeServer // sessionInfo contains deviceId, userId and homeServer
constructor({storage, hsApi, sessionInfo}) { constructor({storage, hsApi, sessionInfo}) {
this._storage = storage; this._storage = storage;
this._hsApi = hsApi; this._hsApi = hsApi;
this._session = null; this._session = null;
this._sessionInfo = sessionInfo; this._sessionInfo = sessionInfo;
this._rooms = new ObservableMap(); this._rooms = new ObservableMap();
this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params); this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params);
} }
async load() { async load() {
const txn = await this._storage.readTxn([ const txn = await this._storage.readTxn([
this._storage.storeNames.session, this._storage.storeNames.session,
this._storage.storeNames.roomSummary, this._storage.storeNames.roomSummary,
this._storage.storeNames.roomState, this._storage.storeNames.roomState,
this._storage.storeNames.roomTimeline, this._storage.storeNames.timelineEvents,
]); ]);
// restore session object // restore session object
this._session = await txn.session.get(); this._session = await txn.session.get();
if (!this._session) { if (!this._session) {
this._session = {}; this._session = {};
return; return;
} }
// load rooms // load rooms
const rooms = await txn.roomSummary.getAll(); const rooms = await txn.roomSummary.getAll();
await Promise.all(rooms.map(summary => { await Promise.all(rooms.map(summary => {
const room = this.createRoom(summary.roomId); const room = this.createRoom(summary.roomId);
return room.load(summary, txn); return room.load(summary, txn);
})); }));
} }
get rooms() { get rooms() {
return this._rooms; return this._rooms;
} }
createRoom(roomId) { createRoom(roomId) {
const room = new Room({ const room = new Room({
roomId, roomId,
storage: this._storage, storage: this._storage,
emitCollectionChange: this._roomUpdateCallback, emitCollectionChange: this._roomUpdateCallback,
hsApi: this._hsApi, hsApi: this._hsApi,
}); });
this._rooms.add(roomId, room); this._rooms.add(roomId, room);
return room; return room;
} }
persistSync(syncToken, accountData, txn) { persistSync(syncToken, accountData, txn) {
if (syncToken !== this._session.syncToken) { if (syncToken !== this._session.syncToken) {
this._session.syncToken = syncToken; this._session.syncToken = syncToken;
txn.session.set(this._session); txn.session.set(this._session);
} }
} }
get syncToken() { get syncToken() {
return this._session.syncToken; return this._session.syncToken;
} }
} }

View file

@ -1,7 +1,7 @@
import { import {
RequestAbortError, RequestAbortError,
HomeServerError, HomeServerError,
StorageError StorageError
} from "./error.js"; } from "./error.js";
import EventEmitter from "../EventEmitter.js"; import EventEmitter from "../EventEmitter.js";
@ -9,119 +9,119 @@ const INCREMENTAL_TIMEOUT = 30000;
const SYNC_EVENT_LIMIT = 10; const SYNC_EVENT_LIMIT = 10;
function parseRooms(roomsSection, roomCallback) { function parseRooms(roomsSection, roomCallback) {
if (!roomsSection) { if (!roomsSection) {
return; return;
} }
const allMemberships = ["join", "invite", "leave"]; const allMemberships = ["join", "invite", "leave"];
for(const membership of allMemberships) { for(const membership of allMemberships) {
const membershipSection = roomsSection[membership]; const membershipSection = roomsSection[membership];
if (membershipSection) { if (membershipSection) {
const rooms = Object.entries(membershipSection) const rooms = Object.entries(membershipSection)
for (const [roomId, roomResponse] of rooms) { for (const [roomId, roomResponse] of rooms) {
roomCallback(roomId, roomResponse, membership); roomCallback(roomId, roomResponse, membership);
} }
} }
} }
} }
export default class Sync extends EventEmitter { export default class Sync extends EventEmitter {
constructor(hsApi, session, storage) { constructor(hsApi, session, storage) {
super(); super();
this._hsApi = hsApi; this._hsApi = hsApi;
this._session = session; this._session = session;
this._storage = storage; this._storage = storage;
this._isSyncing = false; this._isSyncing = false;
this._currentRequest = null; this._currentRequest = null;
} }
// returns when initial sync is done // returns when initial sync is done
async start() { async start() {
if (this._isSyncing) { if (this._isSyncing) {
return; return;
} }
this._isSyncing = true; this._isSyncing = true;
let syncToken = this._session.syncToken; let syncToken = this._session.syncToken;
// do initial sync if needed // do initial sync if needed
if (!syncToken) { if (!syncToken) {
// need to create limit filter here // need to create limit filter here
syncToken = await this._syncRequest(); syncToken = await this._syncRequest();
} }
this._syncLoop(syncToken); this._syncLoop(syncToken);
} }
async _syncLoop(syncToken) { async _syncLoop(syncToken) {
// if syncToken is falsy, it will first do an initial sync ... // if syncToken is falsy, it will first do an initial sync ...
while(this._isSyncing) { while(this._isSyncing) {
try { try {
console.log(`starting sync request with since ${syncToken} ...`); console.log(`starting sync request with since ${syncToken} ...`);
syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT); syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT);
} catch (err) { } catch (err) {
this._isSyncing = false; this._isSyncing = false;
if (!(err instanceof RequestAbortError)) { if (!(err instanceof RequestAbortError)) {
console.warn("stopping sync because of error"); console.warn("stopping sync because of error");
this.emit("error", err); this.emit("error", err);
} }
} }
} }
this.emit("stopped"); this.emit("stopped");
} }
async _syncRequest(syncToken, timeout) { async _syncRequest(syncToken, timeout) {
this._currentRequest = this._hsApi.sync(syncToken, undefined, timeout); this._currentRequest = this._hsApi.sync(syncToken, undefined, timeout);
const response = await this._currentRequest.response(); const response = await this._currentRequest.response();
syncToken = response.next_batch; syncToken = response.next_batch;
const storeNames = this._storage.storeNames; const storeNames = this._storage.storeNames;
const syncTxn = await this._storage.readWriteTxn([ const syncTxn = await this._storage.readWriteTxn([
storeNames.session, storeNames.session,
storeNames.roomSummary, storeNames.roomSummary,
storeNames.roomTimeline, storeNames.timelineEvents,
storeNames.roomState, storeNames.roomState,
]); ]);
const roomChanges = []; const roomChanges = [];
try { try {
this._session.persistSync(syncToken, response.account_data, syncTxn); this._session.persistSync(syncToken, response.account_data, syncTxn);
// to_device // to_device
// presence // presence
if (response.rooms) { if (response.rooms) {
parseRooms(response.rooms, (roomId, roomResponse, membership) => { parseRooms(response.rooms, (roomId, roomResponse, membership) => {
let room = this._session.rooms.get(roomId); let room = this._session.rooms.get(roomId);
if (!room) { if (!room) {
room = this._session.createRoom(roomId); room = this._session.createRoom(roomId);
} }
console.log(` * applying sync response to room ${roomId} ...`); console.log(` * applying sync response to room ${roomId} ...`);
const changes = room.persistSync(roomResponse, membership, syncTxn); const changes = room.persistSync(roomResponse, membership, syncTxn);
roomChanges.push({room, changes}); roomChanges.push({room, changes});
}); });
} }
} catch(err) { } catch(err) {
console.warn("aborting syncTxn because of error"); console.warn("aborting syncTxn because of error");
// avoid corrupting state by only // avoid corrupting state by only
// storing the sync up till the point // storing the sync up till the point
// the exception occurred // the exception occurred
syncTxn.abort(); syncTxn.abort();
throw err; throw err;
} }
try { try {
await syncTxn.complete(); await syncTxn.complete();
console.info("syncTxn committed!!"); console.info("syncTxn committed!!");
} catch (err) { } catch (err) {
throw new StorageError("unable to commit sync tranaction", err); throw new StorageError("unable to commit sync tranaction", err);
} }
// emit room related events after txn has been closed // emit room related events after txn has been closed
for(let {room, changes} of roomChanges) { for(let {room, changes} of roomChanges) {
room.emitSync(changes); room.emitSync(changes);
} }
return syncToken; return syncToken;
} }
stop() { stop() {
if (!this._isSyncing) { if (!this._isSyncing) {
return; return;
} }
this._isSyncing = false; this._isSyncing = false;
if (this._currentRequest) { if (this._currentRequest) {
this._currentRequest.abort(); this._currentRequest.abort();
this._currentRequest = null; this._currentRequest = null;
} }
} }
} }