From 9d171682da300d44f7324de4408a873d7afa3fb4 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Wed, 8 Dec 2021 11:43:56 +0000 Subject: [PATCH] Implement room subscriptions (with caveats) When a room is clicked on then a room subscription is made. `Sync3.ts` handles unsubscribing from old rooms. Caveats: - currently we don't read the `room_subscription` response. - currently the hook for which room is visible doesn't honour the default room on refresh. - lacks unit tests. --- .../session/leftpanel/LeftPanelViewModel.js | 5 + src/matrix/Sync3.ts | 150 ++++++++++++++++-- 2 files changed, 145 insertions(+), 10 deletions(-) diff --git a/src/domain/session/leftpanel/LeftPanelViewModel.js b/src/domain/session/leftpanel/LeftPanelViewModel.js index 86040da8..124547d7 100644 --- a/src/domain/session/leftpanel/LeftPanelViewModel.js +++ b/src/domain/session/leftpanel/LeftPanelViewModel.js @@ -41,6 +41,10 @@ export class LeftPanelViewModel extends ViewModel { this._compareFn = compareFn; } + _subscribeToRoom(roomId) { + this._sync.setRoomSubscriptions([roomId]); + } + _mapTileViewModels(list) { const mapper = (roomOrInvite, emitChange) => { let vm; @@ -117,6 +121,7 @@ export class LeftPanelViewModel extends ViewModel { } } if (targetVM) { + this._subscribeToRoom(roomId); this._currentTileVM = targetVM; this._currentTileVM?.open(); } diff --git a/src/matrix/Sync3.ts b/src/matrix/Sync3.ts index c1ae4d6d..6f97b955 100644 --- a/src/matrix/Sync3.ts +++ b/src/matrix/Sync3.ts @@ -32,8 +32,27 @@ import { ILogger } from "../logging/types"; * repeat content in that link. */ +// the state events which will be returned for every visible entry in the left panel room list +const ROOM_LIST_STATE_EVENTS = [ + ["m.room.avatar", ""], // don't need m.room.name etc as server calculates this for us +]; +// the number of timeline events to get for every visible room in the left panel room list +const ROOM_LIST_TIMELINE_LIMIT = 1; +// the state events which will be returned for every currently visible room on the central panel +const ROOM_SUB_STATE_EVENTS = [ + ["m.room.avatar", ""], + ["m.room.topic", ""], + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.room.power_levels", ""], + ["m.room.create", ""], // TODO: Does H need this? + ["m.room.encrypted", ""], +]; +// the number of timeline events to get for every currently visible room timeline +const ROOM_SUB_TIMELINE_LIMIT = 50; + // sync v2 code has this so we probably want something similar, though v3 has no concept of a catchup -// sync hence it isn't here. +// sync hence it isn't here. Nothing in Hydrogen seems to use CatchupSync apart from for logging. export enum SyncStatus { InitialSync, // valid: on startup until the first response. Request has no ?pos= Syncing, // valid: after the first response. Requests have a ?pos= @@ -62,6 +81,7 @@ interface Sync3RequestBody { session_id: string; lists: Sync3List[]; room_subscriptions?: RoomSubscriptionsRequest; + unsubscribe_rooms?: string[]; }; // Response types are below: See api.md for what these fields mean. @@ -96,6 +116,8 @@ interface RoomResponse { highlight_count: number; }; +// Some internal data structure types + type IndexToRoomId = { [index: number]: string; }; @@ -110,13 +132,23 @@ export class Sync3 { private storage: Storage; private currentRequest?: HomeServerRequest; - // sync v3 specific: contains the sliding window ranges to request + // sync v3 specific: contains the sliding window ranges to request as well as the data structures + // to remember the indexes for each room. private ranges: number[][]; private roomIndexToRoomId: IndexToRoomId; private roomIdToRoomIndex: RoomIdToIndex; private totalRooms: number; + // Tracks room subscriptions (array of room IDs). + // Current = ones confirmed with the server. + // Next = triggered by Hydrogen but not yet confirmed with the server. + private currentRoomSubscriptions: string[]; + private nextRoomSubscriptions: string[]; + + // Sync v2 has this; unsure how it should be used correctly, maybe remove it? public error?: any; + + // Sync v2 has this; seems to be used to ensure that the first sync is done before loading the app. public status: ObservableValue; // same params as sync v2 @@ -133,8 +165,32 @@ export class Sync3 { this.roomIndexToRoomId = {}; this.roomIdToRoomIndex = {}; this.totalRooms = 0; + this.currentRoomSubscriptions = []; + this.nextRoomSubscriptions = []; } + /** + * Set the room subscriptions for sync v3. This must be the entire set of room subscriptions + * (not a delta). Replaces all previous subscriptions. + * + * Any room the user is currently viewing needs to have a subscription so you see things like + * the topic/join rules/other state events you don't need to see/retrieve on the room list. + * @param roomIds The new room subscriptions. An array to support grid view. + */ + setRoomSubscriptions(roomIds: string[]) { + this.nextRoomSubscriptions = roomIds; + // interrupt the current request so we can update the subscriptions + this.currentRequest?.abort(); + } + + /** + * Load a new sliding window range for sync v3. + * + * This range should be the part of the room list the user is currently looking at. No index + * padding will be performed (e.g viewing 10-20 so request 5-25). + * @param start The start index (inclusive) + * @param end The end index (inclusive) + */ loadRange(start, end) { let range = [start, end]; if (end < this.ranges[0][1]) { @@ -157,7 +213,7 @@ export class Sync3 { } this.ranges[1][0] = start; this.ranges[1][1] = end; - console.log("new ranges: ", JSON.stringify(this.ranges), this.roomIndexToRoomId); + console.log("new sync v3 ranges: ", JSON.stringify(this.ranges)); // interrupt the sync request to send up the new ranges this.currentRequest?.abort(); } @@ -184,10 +240,20 @@ export class Sync3 { } } + /** + * Get the number of joined rooms for this user. + * @returns The total number of joined rooms for this user. + */ count(): number { + // TODO: We may want this to be an ObservableValue? In practice that hasn't been necessary yet. return this.totalRooms; } + /** + * Map this room ID to an index position in the room list. Not all rooms will have an index. + * @param roomId The room to find the index of + * @returns The index or -1 if there is no index. + */ indexOfRoom(roomId: string): number { const index = this.roomIdToRoomIndex[roomId]; if (index === undefined) { @@ -196,6 +262,11 @@ export class Sync3 { return index; } + /** + * Map this index in the room list to a room ID. + * @param index The index position + * @returns The room ID or null if there is no room at this index. + */ roomAtIndex(index: number): string | null { const roomID = this.roomIndexToRoomId[index]; if (roomID === undefined) { @@ -204,7 +275,7 @@ export class Sync3 { return roomID; } - // TODO REMOVE + // TODO REMOVE BECAUSE THIS WAS A HACK BACK WHEN WE MANUALLY SORTED CLIENT-SIDE compare(roomIdA: string, roomIdB: string): number { if (roomIdA === roomIdB) { return 0; @@ -227,12 +298,14 @@ export class Sync3 { } // The purpose of this function is to do repeated /sync calls and call processResponse. It doesn't - // know or care how to handle the response, it only cares about the position and retries. + // know or care how to handle the response, it only cares about room subs, the position and retries. private async syncLoop(pos?: number) { // In sync v2 a user can have many devices and each device has a single access token. // In sync v3 it's the same but IN ADDITION a single device can have many sessions. // This exists to fix to-device msgs being deleted prematurely caused by implicit ACKs. // Ergo, we need to specify a session ID when we start and provide it on all our requests. + // TODO: Really this ID should be stored in indexeddb so we don't make a new session + // every time we refresh the app, so can make use of persistence more. const sessionId = new Date().getTime() + ""; // Set this too low and we'll do many more needless sync requests which consume bandwidth when there's no traffic. @@ -251,26 +324,39 @@ export class Sync3 { // add in sticky params, these are set once (initially) and then can be omitted and // the server will remember them (hence 'sticky'). list.sort = ["by_highlight_count", "by_notification_count", "by_recency"]; - list.timeline_limit = 20; - list.required_state = [ - ["m.room.avatar", ""], - ] + list.timeline_limit = ROOM_LIST_TIMELINE_LIMIT; + list.required_state = ROOM_LIST_STATE_EVENTS; } - const requestBody: Sync3RequestBody = { + let requestBody: Sync3RequestBody = { session_id: sessionId, lists: [list], }; + if (this.nextRoomSubscriptions.length > 0) { + // we may have been interruped before to update the room subscriptions, if this array + // is populated then work out the subscription delta and set it on the request + requestBody = setRoomSubscriptionDelta(requestBody, { + required_state: ROOM_SUB_STATE_EVENTS, + timeline_limit: ROOM_SUB_TIMELINE_LIMIT, + }, this.nextRoomSubscriptions, this.currentRoomSubscriptions); + } try { await sleep(100); let resp: Sync3Response; this.currentRequest = await this.hsApi.sync3(requestBody, pos); resp = await this.currentRequest.response(); backoffCounter = 0; + // regardless of whether we process the sync response without error, update the room subs + // to reflect the new reality. + if (this.nextRoomSubscriptions.length > 0) { + this.currentRoomSubscriptions = this.nextRoomSubscriptions; + } // we have to wait for some parts of the response to be saved to disk before we can go on // hence the await. await this.processResponse(isFirstSync, resp); // increment our position to tell the server we got everything, similar to using ?since= in v2 pos = resp.pos; + // reset the room subs so we don't send up subscriptions again (they are sticky) + this.nextRoomSubscriptions = []; if (isFirstSync) { this.status.set(SyncStatus.Syncing); } @@ -475,6 +561,50 @@ export class Sync3 { } } +const setRoomSubscriptionDelta = (requestBody: Sync3RequestBody, subData: RoomSubscriptionRequest, next: string[], current: string[]): Sync3RequestBody => { + // find distinct and overlapping room IDs like so: + // next + // .-----------. + // A B C D E F G H I + // `-----------` + // current + // + // new subscriptions => A,B,C + // delete subscriptions => G,H,I + // no-op subscriptions (still subscribed) => D,E,F + const allSet = new Set(); + const nextSet = new Set(); + next.forEach((r) => { + allSet.add(r); + nextSet.add(r); + }); + const currSet = new Set(); + current.forEach((r) => { + allSet.add(r); + currSet.add(r); + }); + requestBody.room_subscriptions = {}; + requestBody.unsubscribe_rooms = []; + for (let roomId of allSet) { + if (nextSet.has(roomId)) { + if (currSet.has(roomId)) { + // no-op subscription + } else { + // exists in next but not current, new subscription + requestBody.room_subscriptions[roomId] = subData; + } + } else { + if (currSet.has(roomId)) { + // doesn't exist in next, existed in current, delete subscription + requestBody.unsubscribe_rooms.push(roomId); + } + // shouldn't be possible for something in allSet to not exist in either nextSet/currSet + } + } + + return requestBody; +} + const sleep = (ms: number) => { return new Promise((resolve) => setTimeout(resolve, ms)); };