Implement room subscriptions (with caveats)

When a room is clicked on then a room subscription is made.
`Sync3.ts` handles unsubscribing from old rooms. Caveats:

- currently we don't read the `room_subscription` response.
- currently the hook for which room is visible doesn't honour the default room on refresh.
- lacks unit tests.
This commit is contained in:
Kegan Dougal 2021-12-08 11:43:56 +00:00
parent 7d35e861e3
commit 9d171682da
2 changed files with 145 additions and 10 deletions

View file

@ -41,6 +41,10 @@ export class LeftPanelViewModel extends ViewModel {
this._compareFn = compareFn; this._compareFn = compareFn;
} }
_subscribeToRoom(roomId) {
this._sync.setRoomSubscriptions([roomId]);
}
_mapTileViewModels(list) { _mapTileViewModels(list) {
const mapper = (roomOrInvite, emitChange) => { const mapper = (roomOrInvite, emitChange) => {
let vm; let vm;
@ -117,6 +121,7 @@ export class LeftPanelViewModel extends ViewModel {
} }
} }
if (targetVM) { if (targetVM) {
this._subscribeToRoom(roomId);
this._currentTileVM = targetVM; this._currentTileVM = targetVM;
this._currentTileVM?.open(); this._currentTileVM?.open();
} }

View file

@ -32,8 +32,27 @@ import { ILogger } from "../logging/types";
* repeat content in that link. * repeat content in that link.
*/ */
// the state events which will be returned for every visible entry in the left panel room list
const ROOM_LIST_STATE_EVENTS = [
["m.room.avatar", ""], // don't need m.room.name etc as server calculates this for us
];
// the number of timeline events to get for every visible room in the left panel room list
const ROOM_LIST_TIMELINE_LIMIT = 1;
// the state events which will be returned for every currently visible room on the central panel
const ROOM_SUB_STATE_EVENTS = [
["m.room.avatar", ""],
["m.room.topic", ""],
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
["m.room.power_levels", ""],
["m.room.create", ""], // TODO: Does H need this?
["m.room.encrypted", ""],
];
// the number of timeline events to get for every currently visible room timeline
const ROOM_SUB_TIMELINE_LIMIT = 50;
// sync v2 code has this so we probably want something similar, though v3 has no concept of a catchup // sync v2 code has this so we probably want something similar, though v3 has no concept of a catchup
// sync hence it isn't here. // sync hence it isn't here. Nothing in Hydrogen seems to use CatchupSync apart from for logging.
export enum SyncStatus { export enum SyncStatus {
InitialSync, // valid: on startup until the first response. Request has no ?pos= InitialSync, // valid: on startup until the first response. Request has no ?pos=
Syncing, // valid: after the first response. Requests have a ?pos= Syncing, // valid: after the first response. Requests have a ?pos=
@ -62,6 +81,7 @@ interface Sync3RequestBody {
session_id: string; session_id: string;
lists: Sync3List[]; lists: Sync3List[];
room_subscriptions?: RoomSubscriptionsRequest; room_subscriptions?: RoomSubscriptionsRequest;
unsubscribe_rooms?: string[];
}; };
// Response types are below: See api.md for what these fields mean. // Response types are below: See api.md for what these fields mean.
@ -96,6 +116,8 @@ interface RoomResponse {
highlight_count: number; highlight_count: number;
}; };
// Some internal data structure types
type IndexToRoomId = { type IndexToRoomId = {
[index: number]: string; [index: number]: string;
}; };
@ -110,13 +132,23 @@ export class Sync3 {
private storage: Storage; private storage: Storage;
private currentRequest?: HomeServerRequest; private currentRequest?: HomeServerRequest;
// sync v3 specific: contains the sliding window ranges to request // sync v3 specific: contains the sliding window ranges to request as well as the data structures
// to remember the indexes for each room.
private ranges: number[][]; private ranges: number[][];
private roomIndexToRoomId: IndexToRoomId; private roomIndexToRoomId: IndexToRoomId;
private roomIdToRoomIndex: RoomIdToIndex; private roomIdToRoomIndex: RoomIdToIndex;
private totalRooms: number; private totalRooms: number;
// Tracks room subscriptions (array of room IDs).
// Current = ones confirmed with the server.
// Next = triggered by Hydrogen but not yet confirmed with the server.
private currentRoomSubscriptions: string[];
private nextRoomSubscriptions: string[];
// Sync v2 has this; unsure how it should be used correctly, maybe remove it?
public error?: any; public error?: any;
// Sync v2 has this; seems to be used to ensure that the first sync is done before loading the app.
public status: ObservableValue<SyncStatus>; public status: ObservableValue<SyncStatus>;
// same params as sync v2 // same params as sync v2
@ -133,8 +165,32 @@ export class Sync3 {
this.roomIndexToRoomId = {}; this.roomIndexToRoomId = {};
this.roomIdToRoomIndex = {}; this.roomIdToRoomIndex = {};
this.totalRooms = 0; this.totalRooms = 0;
this.currentRoomSubscriptions = [];
this.nextRoomSubscriptions = [];
} }
/**
* Set the room subscriptions for sync v3. This must be the entire set of room subscriptions
* (not a delta). Replaces all previous subscriptions.
*
* Any room the user is currently viewing needs to have a subscription so you see things like
* the topic/join rules/other state events you don't need to see/retrieve on the room list.
* @param roomIds The new room subscriptions. An array to support grid view.
*/
setRoomSubscriptions(roomIds: string[]) {
this.nextRoomSubscriptions = roomIds;
// interrupt the current request so we can update the subscriptions
this.currentRequest?.abort();
}
/**
* Load a new sliding window range for sync v3.
*
* This range should be the part of the room list the user is currently looking at. No index
* padding will be performed (e.g viewing 10-20 so request 5-25).
* @param start The start index (inclusive)
* @param end The end index (inclusive)
*/
loadRange(start, end) { loadRange(start, end) {
let range = [start, end]; let range = [start, end];
if (end < this.ranges[0][1]) { if (end < this.ranges[0][1]) {
@ -157,7 +213,7 @@ export class Sync3 {
} }
this.ranges[1][0] = start; this.ranges[1][0] = start;
this.ranges[1][1] = end; this.ranges[1][1] = end;
console.log("new ranges: ", JSON.stringify(this.ranges), this.roomIndexToRoomId); console.log("new sync v3 ranges: ", JSON.stringify(this.ranges));
// interrupt the sync request to send up the new ranges // interrupt the sync request to send up the new ranges
this.currentRequest?.abort(); this.currentRequest?.abort();
} }
@ -184,10 +240,20 @@ export class Sync3 {
} }
} }
/**
* Get the number of joined rooms for this user.
* @returns The total number of joined rooms for this user.
*/
count(): number { count(): number {
// TODO: We may want this to be an ObservableValue? In practice that hasn't been necessary yet.
return this.totalRooms; return this.totalRooms;
} }
/**
* Map this room ID to an index position in the room list. Not all rooms will have an index.
* @param roomId The room to find the index of
* @returns The index or -1 if there is no index.
*/
indexOfRoom(roomId: string): number { indexOfRoom(roomId: string): number {
const index = this.roomIdToRoomIndex[roomId]; const index = this.roomIdToRoomIndex[roomId];
if (index === undefined) { if (index === undefined) {
@ -196,6 +262,11 @@ export class Sync3 {
return index; return index;
} }
/**
* Map this index in the room list to a room ID.
* @param index The index position
* @returns The room ID or null if there is no room at this index.
*/
roomAtIndex(index: number): string | null { roomAtIndex(index: number): string | null {
const roomID = this.roomIndexToRoomId[index]; const roomID = this.roomIndexToRoomId[index];
if (roomID === undefined) { if (roomID === undefined) {
@ -204,7 +275,7 @@ export class Sync3 {
return roomID; return roomID;
} }
// TODO REMOVE // TODO REMOVE BECAUSE THIS WAS A HACK BACK WHEN WE MANUALLY SORTED CLIENT-SIDE
compare(roomIdA: string, roomIdB: string): number { compare(roomIdA: string, roomIdB: string): number {
if (roomIdA === roomIdB) { if (roomIdA === roomIdB) {
return 0; return 0;
@ -227,12 +298,14 @@ export class Sync3 {
} }
// The purpose of this function is to do repeated /sync calls and call processResponse. It doesn't // The purpose of this function is to do repeated /sync calls and call processResponse. It doesn't
// know or care how to handle the response, it only cares about the position and retries. // know or care how to handle the response, it only cares about room subs, the position and retries.
private async syncLoop(pos?: number) { private async syncLoop(pos?: number) {
// In sync v2 a user can have many devices and each device has a single access token. // In sync v2 a user can have many devices and each device has a single access token.
// In sync v3 it's the same but IN ADDITION a single device can have many sessions. // In sync v3 it's the same but IN ADDITION a single device can have many sessions.
// This exists to fix to-device msgs being deleted prematurely caused by implicit ACKs. // This exists to fix to-device msgs being deleted prematurely caused by implicit ACKs.
// Ergo, we need to specify a session ID when we start and provide it on all our requests. // Ergo, we need to specify a session ID when we start and provide it on all our requests.
// TODO: Really this ID should be stored in indexeddb so we don't make a new session
// every time we refresh the app, so can make use of persistence more.
const sessionId = new Date().getTime() + ""; const sessionId = new Date().getTime() + "";
// Set this too low and we'll do many more needless sync requests which consume bandwidth when there's no traffic. // Set this too low and we'll do many more needless sync requests which consume bandwidth when there's no traffic.
@ -251,26 +324,39 @@ export class Sync3 {
// add in sticky params, these are set once (initially) and then can be omitted and // add in sticky params, these are set once (initially) and then can be omitted and
// the server will remember them (hence 'sticky'). // the server will remember them (hence 'sticky').
list.sort = ["by_highlight_count", "by_notification_count", "by_recency"]; list.sort = ["by_highlight_count", "by_notification_count", "by_recency"];
list.timeline_limit = 20; list.timeline_limit = ROOM_LIST_TIMELINE_LIMIT;
list.required_state = [ list.required_state = ROOM_LIST_STATE_EVENTS;
["m.room.avatar", ""],
]
} }
const requestBody: Sync3RequestBody = { let requestBody: Sync3RequestBody = {
session_id: sessionId, session_id: sessionId,
lists: [list], lists: [list],
}; };
if (this.nextRoomSubscriptions.length > 0) {
// we may have been interruped before to update the room subscriptions, if this array
// is populated then work out the subscription delta and set it on the request
requestBody = setRoomSubscriptionDelta(requestBody, {
required_state: ROOM_SUB_STATE_EVENTS,
timeline_limit: ROOM_SUB_TIMELINE_LIMIT,
}, this.nextRoomSubscriptions, this.currentRoomSubscriptions);
}
try { try {
await sleep(100); await sleep(100);
let resp: Sync3Response; let resp: Sync3Response;
this.currentRequest = await this.hsApi.sync3(requestBody, pos); this.currentRequest = await this.hsApi.sync3(requestBody, pos);
resp = await this.currentRequest.response(); resp = await this.currentRequest.response();
backoffCounter = 0; backoffCounter = 0;
// regardless of whether we process the sync response without error, update the room subs
// to reflect the new reality.
if (this.nextRoomSubscriptions.length > 0) {
this.currentRoomSubscriptions = this.nextRoomSubscriptions;
}
// we have to wait for some parts of the response to be saved to disk before we can go on // we have to wait for some parts of the response to be saved to disk before we can go on
// hence the await. // hence the await.
await this.processResponse(isFirstSync, resp); await this.processResponse(isFirstSync, resp);
// increment our position to tell the server we got everything, similar to using ?since= in v2 // increment our position to tell the server we got everything, similar to using ?since= in v2
pos = resp.pos; pos = resp.pos;
// reset the room subs so we don't send up subscriptions again (they are sticky)
this.nextRoomSubscriptions = [];
if (isFirstSync) { if (isFirstSync) {
this.status.set(SyncStatus.Syncing); this.status.set(SyncStatus.Syncing);
} }
@ -475,6 +561,50 @@ export class Sync3 {
} }
} }
const setRoomSubscriptionDelta = (requestBody: Sync3RequestBody, subData: RoomSubscriptionRequest, next: string[], current: string[]): Sync3RequestBody => {
// find distinct and overlapping room IDs like so:
// next
// .-----------.
// A B C D E F G H I
// `-----------`
// current
//
// new subscriptions => A,B,C
// delete subscriptions => G,H,I
// no-op subscriptions (still subscribed) => D,E,F
const allSet = new Set<string>();
const nextSet = new Set<string>();
next.forEach((r) => {
allSet.add(r);
nextSet.add(r);
});
const currSet = new Set<string>();
current.forEach((r) => {
allSet.add(r);
currSet.add(r);
});
requestBody.room_subscriptions = {};
requestBody.unsubscribe_rooms = [];
for (let roomId of allSet) {
if (nextSet.has(roomId)) {
if (currSet.has(roomId)) {
// no-op subscription
} else {
// exists in next but not current, new subscription
requestBody.room_subscriptions[roomId] = subData;
}
} else {
if (currSet.has(roomId)) {
// doesn't exist in next, existed in current, delete subscription
requestBody.unsubscribe_rooms.push(roomId);
}
// shouldn't be possible for something in allSet to not exist in either nextSet/currSet
}
}
return requestBody;
}
const sleep = (ms: number) => { const sleep = (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
}; };