diff --git a/src/matrix/session.js b/src/matrix/session.js index 9dbeb010..d86065dc 100644 --- a/src/matrix/session.js +++ b/src/matrix/session.js @@ -3,59 +3,59 @@ import { ObservableMap } from "../observable/index.js"; export default class Session { // sessionInfo contains deviceId, userId and homeServer - constructor({storage, hsApi, sessionInfo}) { - this._storage = storage; + constructor({storage, hsApi, sessionInfo}) { + this._storage = storage; this._hsApi = hsApi; - this._session = null; + this._session = null; this._sessionInfo = sessionInfo; - this._rooms = new ObservableMap(); + this._rooms = new ObservableMap(); this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params); - } + } - async load() { - const txn = await this._storage.readTxn([ - this._storage.storeNames.session, - this._storage.storeNames.roomSummary, - this._storage.storeNames.roomState, - this._storage.storeNames.roomTimeline, - ]); - // restore session object - this._session = await txn.session.get(); - if (!this._session) { + async load() { + const txn = await this._storage.readTxn([ + this._storage.storeNames.session, + this._storage.storeNames.roomSummary, + this._storage.storeNames.roomState, + this._storage.storeNames.timelineEvents, + ]); + // restore session object + this._session = await txn.session.get(); + if (!this._session) { this._session = {}; - return; - } - // load rooms - const rooms = await txn.roomSummary.getAll(); - await Promise.all(rooms.map(summary => { - const room = this.createRoom(summary.roomId); - return room.load(summary, txn); - })); - } + return; + } + // load rooms + const rooms = await txn.roomSummary.getAll(); + await Promise.all(rooms.map(summary => { + const room = this.createRoom(summary.roomId); + return room.load(summary, txn); + })); + } get rooms() { return this._rooms; } - createRoom(roomId) { - const room = new Room({ + createRoom(roomId) { + const room = new Room({ roomId, storage: this._storage, emitCollectionChange: this._roomUpdateCallback, hsApi: this._hsApi, }); - this._rooms.add(roomId, room); - return room; - } + this._rooms.add(roomId, room); + return room; + } - persistSync(syncToken, accountData, txn) { - if (syncToken !== this._session.syncToken) { - this._session.syncToken = syncToken; - txn.session.set(this._session); - } - } + persistSync(syncToken, accountData, txn) { + if (syncToken !== this._session.syncToken) { + this._session.syncToken = syncToken; + txn.session.set(this._session); + } + } - get syncToken() { - return this._session.syncToken; - } + get syncToken() { + return this._session.syncToken; + } } diff --git a/src/matrix/sync.js b/src/matrix/sync.js index 4ad89466..b96f2110 100644 --- a/src/matrix/sync.js +++ b/src/matrix/sync.js @@ -1,7 +1,7 @@ import { - RequestAbortError, - HomeServerError, - StorageError + RequestAbortError, + HomeServerError, + StorageError } from "./error.js"; import EventEmitter from "../EventEmitter.js"; @@ -9,119 +9,119 @@ const INCREMENTAL_TIMEOUT = 30000; const SYNC_EVENT_LIMIT = 10; function parseRooms(roomsSection, roomCallback) { - if (!roomsSection) { - return; - } - const allMemberships = ["join", "invite", "leave"]; - for(const membership of allMemberships) { - const membershipSection = roomsSection[membership]; - if (membershipSection) { - const rooms = Object.entries(membershipSection) - for (const [roomId, roomResponse] of rooms) { - roomCallback(roomId, roomResponse, membership); - } - } - } + if (!roomsSection) { + return; + } + const allMemberships = ["join", "invite", "leave"]; + for(const membership of allMemberships) { + const membershipSection = roomsSection[membership]; + if (membershipSection) { + const rooms = Object.entries(membershipSection) + for (const [roomId, roomResponse] of rooms) { + roomCallback(roomId, roomResponse, membership); + } + } + } } export default class Sync extends EventEmitter { - constructor(hsApi, session, storage) { - super(); - this._hsApi = hsApi; - this._session = session; - this._storage = storage; - this._isSyncing = false; - this._currentRequest = null; - } - // returns when initial sync is done - async start() { - if (this._isSyncing) { - return; - } - this._isSyncing = true; - let syncToken = this._session.syncToken; - // do initial sync if needed - if (!syncToken) { - // need to create limit filter here - syncToken = await this._syncRequest(); - } - this._syncLoop(syncToken); - } + constructor(hsApi, session, storage) { + super(); + this._hsApi = hsApi; + this._session = session; + this._storage = storage; + this._isSyncing = false; + this._currentRequest = null; + } + // returns when initial sync is done + async start() { + if (this._isSyncing) { + return; + } + this._isSyncing = true; + let syncToken = this._session.syncToken; + // do initial sync if needed + if (!syncToken) { + // need to create limit filter here + syncToken = await this._syncRequest(); + } + this._syncLoop(syncToken); + } - async _syncLoop(syncToken) { - // if syncToken is falsy, it will first do an initial sync ... - while(this._isSyncing) { - try { - console.log(`starting sync request with since ${syncToken} ...`); - syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT); - } catch (err) { - this._isSyncing = false; - if (!(err instanceof RequestAbortError)) { - console.warn("stopping sync because of error"); - this.emit("error", err); - } - } - } - this.emit("stopped"); - } + async _syncLoop(syncToken) { + // if syncToken is falsy, it will first do an initial sync ... + while(this._isSyncing) { + try { + console.log(`starting sync request with since ${syncToken} ...`); + syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT); + } catch (err) { + this._isSyncing = false; + if (!(err instanceof RequestAbortError)) { + console.warn("stopping sync because of error"); + this.emit("error", err); + } + } + } + this.emit("stopped"); + } - async _syncRequest(syncToken, timeout) { - this._currentRequest = this._hsApi.sync(syncToken, undefined, timeout); - const response = await this._currentRequest.response(); - syncToken = response.next_batch; - const storeNames = this._storage.storeNames; - const syncTxn = await this._storage.readWriteTxn([ - storeNames.session, - storeNames.roomSummary, - storeNames.roomTimeline, - storeNames.roomState, - ]); + async _syncRequest(syncToken, timeout) { + this._currentRequest = this._hsApi.sync(syncToken, undefined, timeout); + const response = await this._currentRequest.response(); + syncToken = response.next_batch; + const storeNames = this._storage.storeNames; + const syncTxn = await this._storage.readWriteTxn([ + storeNames.session, + storeNames.roomSummary, + storeNames.timelineEvents, + storeNames.roomState, + ]); const roomChanges = []; try { this._session.persistSync(syncToken, response.account_data, syncTxn); // to_device // presence - if (response.rooms) { - parseRooms(response.rooms, (roomId, roomResponse, membership) => { - let room = this._session.rooms.get(roomId); - if (!room) { - room = this._session.createRoom(roomId); - } - console.log(` * applying sync response to room ${roomId} ...`); - const changes = room.persistSync(roomResponse, membership, syncTxn); + if (response.rooms) { + parseRooms(response.rooms, (roomId, roomResponse, membership) => { + let room = this._session.rooms.get(roomId); + if (!room) { + room = this._session.createRoom(roomId); + } + console.log(` * applying sync response to room ${roomId} ...`); + const changes = room.persistSync(roomResponse, membership, syncTxn); roomChanges.push({room, changes}); - }); - } - } catch(err) { - console.warn("aborting syncTxn because of error"); - // avoid corrupting state by only - // storing the sync up till the point - // the exception occurred - syncTxn.abort(); - throw err; - } - try { - await syncTxn.complete(); - console.info("syncTxn committed!!"); - } catch (err) { - throw new StorageError("unable to commit sync tranaction", err); - } + }); + } + } catch(err) { + console.warn("aborting syncTxn because of error"); + // avoid corrupting state by only + // storing the sync up till the point + // the exception occurred + syncTxn.abort(); + throw err; + } + try { + await syncTxn.complete(); + console.info("syncTxn committed!!"); + } catch (err) { + throw new StorageError("unable to commit sync tranaction", err); + } // emit room related events after txn has been closed for(let {room, changes} of roomChanges) { room.emitSync(changes); } - return syncToken; - } + return syncToken; + } - stop() { - if (!this._isSyncing) { - return; - } - this._isSyncing = false; - if (this._currentRequest) { - this._currentRequest.abort(); - this._currentRequest = null; - } - } + stop() { + if (!this._isSyncing) { + return; + } + this._isSyncing = false; + if (this._currentRequest) { + this._currentRequest.abort(); + this._currentRequest = null; + } + } }