diff --git a/src/matrix/Sync3.ts b/src/matrix/Sync3.ts index d4a06bc8..72e12fd0 100644 --- a/src/matrix/Sync3.ts +++ b/src/matrix/Sync3.ts @@ -126,6 +126,8 @@ export class Sync3 { // Hydrogen only has 1 list currently (no DM section) so we only need 1 range this.ranges = [[0, 99]]; this.roomIndexToRoomId = {}; + console.log("session", session); + console.log("storage", storage); } // Start syncing. Probably call this at startup once you have an access_token. @@ -139,6 +141,17 @@ export class Sync3 { this.syncLoop(undefined); } + stop() { + if (this.status.get() === SyncStatus.Stopped) { + return; + } + this.status.set(SyncStatus.Stopped); + if (this.currentRequest) { + this.currentRequest.abort(); + this.currentRequest = null; + } + } + // The purpose of this function is to do repeated /sync calls and call processResponse. It doesn't // know or care how to handle the response, it only cares about the position and retries. private async syncLoop(pos?: number) { @@ -181,7 +194,7 @@ export class Sync3 { backoffCounter = 0; // we have to wait for some parts of the response to be saved to disk before we can go on // hence the await. - await this.processResponse(resp); + await this.processResponse(isFirstSync, resp); // increment our position to tell the server we got everything, similar to using ?since= in v2 pos = resp.pos; if (isFirstSync) { @@ -211,37 +224,49 @@ export class Sync3 { } } - // TODO: Handle room updates - // TODO: atomically swap indexToRoom - // The purpose of this function is process the /sync response and atomically update sync state. - private async processResponse(resp: Sync3Response) { + private async processResponse(isFirstSync: boolean, resp: Sync3Response) { console.log(resp); let { indexToRoom, updates } = this.processOps(resp.ops); // process the room updates: new rooms, new timeline events, updated room names, that sort of thing. + // we're kinda forced to use the logger as most functions expect an ILogItem + await this.logger.run("sync", async log => { + const syncTxn = await this.openSyncTxn(); + try { + // this.session.writeSync() // write account data, device lists, etc. + await Promise.all(updates.map(async (roomResponse) => { + // get or create a room + let room = this.session.rooms.get(roomResponse.room_id); + if (!room) { + room = this.session.createRoom(roomResponse.room_id); + } + room.writeSync( + roomResponse, isFirstSync, {}, syncTxn, log + ) + })) + } catch (err) { + // avoid corrupting state by only + // storing the sync up till the point + // the exception occurred + syncTxn.abort(log); + throw syncTxn.getCause(err); + } + await syncTxn.complete(log); - this.prepareResponse(updates); + // update in-memory structs + this.session.afterSync(); // ??? + + updates.forEach((roomResponse) => { + // get room then afterSync() ??? + }); + + this.session.applyRoomCollectionChangesAfterSync(null, roomStates, null); + }); - /* - try { - await log.wrap("prepare", log => this._prepareSync(sessionState, roomStates, response, log)); - await log.wrap("afterPrepareSync", log => Promise.all(roomStates.map(rs => { - return rs.room.afterPrepareSync(rs.preparation, log); - }))); - await log.wrap("write", async log => this._writeSync( - sessionState, inviteStates, roomStates, archivedRoomStates, - response, syncFilterId, isInitialSync, log)); - } finally { - sessionState.dispose(); - } - // sync txn comitted, emit updates and apply changes to in-memory state - log.wrap("after", log => this._afterSync( - sessionState, inviteStates, roomStates, archivedRoomStates, log)); - */ // instantly move all the rooms to their new positions - + this.roomIndexToRoomId = indexToRoom; } // The purpose of this function is to process the response `ops` array by modifying the current @@ -368,60 +393,35 @@ export class Sync3 { }; } - private async prepareResponse(updates: RoomResponse[]) { - /* - // IF WE HAVE TO-DEVICE MSGS THEN this.session.obtainSyncLock(response) (which checks response.to_device.events) - const prepareTxn = await this.openPrepareSyncTxn(); - // IF WE HAVE TO-DEVICE MSGS THEN this.session.prepareSync(syncResponse, lock, txn, log) => {newKeysByRoom} - // purpose: add any rooms with new keys but no sync response to the list of rooms to be synced - - await Promise.all(updates.map(async (roomResponse) => { - let storedRoom = this.session.rooms.get(roomResponse.room_id); - if (!storedRoom) { - storedRoom = this.session.createRoom(roomResponse.room_id); - } else { - // if previously joined and we still have the timeline for it, - // this loads the syncWriter at the correct position to continue writing the timeline - await storedRoom.load(null, prepareTxn, this.logger); - } - return storedRoom.prepareSync( - rs.roomResponse, rs.membership, rs.invite, newKeys, prepareTxn, log) - })); - // This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md - await prepareTxn.complete(); - await Promise.all(updates.map(rs => { - return rs.room.afterPrepareSync(rs.preparation, log); - })); */ - } - - stop() { - if (this.status.get() === SyncStatus.Stopped) { - return; - } - this.status.set(SyncStatus.Stopped); - if (this.currentRequest) { - this.currentRequest.abort(); - this.currentRequest = null; - } - } - - // storage locking shenanigans here - - openPrepareSyncTxn() { + private openSyncTxn() { const storeNames = this.storage.storeNames; - return this.storage.readTxn([ + return this.storage.readWriteTxn([ + storeNames.session, + storeNames.roomSummary, + storeNames.archivedRoomSummary, + storeNames.invites, + storeNames.roomState, + storeNames.roomMembers, + storeNames.timelineEvents, + storeNames.timelineRelations, + storeNames.timelineFragments, + storeNames.pendingEvents, + storeNames.userIdentities, + storeNames.groupSessionDecryptions, + storeNames.deviceIdentities, + // to discard outbound session when somebody leaves a room + // and to create room key messages when somebody joins + storeNames.outboundGroupSessions, + storeNames.operations, + storeNames.accountData, + // to decrypt and store new room keys storeNames.olmSessions, storeNames.inboundGroupSessions, - // to read fragments when loading sync writer when rejoining archived room - storeNames.timelineFragments, - // to read fragments when loading sync writer when rejoining archived room - // to read events that can now be decrypted - storeNames.timelineEvents, ]); } } -const sleep = (ms) => { +const sleep = (ms: number) => { return new Promise((resolve) => setTimeout(resolve, ms)); }; @@ -430,7 +430,7 @@ const sleep = (ms) => { // a b c d e f // a b c d _ f // e a b c d f <--- c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it -const indexInRange = (ranges, i) => { +const indexInRange = (ranges: number[][], i: number) => { let isInRange = false; ranges.forEach((r) => { if (r[0] <= i && i <= r[1]) { diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index aaf66be1..0e4d488d 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -54,6 +54,7 @@ export class Room extends BaseRoom { return false; } + // {}, string, bool?, [], txn, log async prepareSync(roomResponse, membership, invite, newKeys, txn, log) { log.set("id", this.id); if (newKeys) { diff --git a/src/placeholder-rooms.html b/src/placeholder-rooms.html index 909d9108..83ade20d 100644 --- a/src/placeholder-rooms.html +++ b/src/placeholder-rooms.html @@ -15,17 +15,58 @@