From a50ea7e77b84dbe955bf4dc062d0a098924e81d4 Mon Sep 17 00:00:00 2001 From: Bruno Windels <274386+bwindels@users.noreply.github.com> Date: Thu, 12 May 2022 17:27:03 +0200 Subject: [PATCH] add support for observing room state for single room + initial state --- src/lib.ts | 2 +- src/matrix/Session.js | 2 +- src/matrix/calls/CallHandler.ts | 2 +- src/matrix/room/BaseRoom.js | 26 +++++++++ src/matrix/room/Room.js | 12 +++- src/matrix/room/common.ts | 3 - .../room/state/ObservedStateKeyValue.ts | 55 +++++++++++++++++++ src/matrix/room/state/ObservedStateTypeMap.ts | 53 ++++++++++++++++++ .../{ => room/state}/RoomStateHandlerSet.ts | 14 ++--- src/matrix/room/state/types.ts | 38 +++++++++++++ 10 files changed, 192 insertions(+), 15 deletions(-) create mode 100644 src/matrix/room/state/ObservedStateKeyValue.ts create mode 100644 src/matrix/room/state/ObservedStateTypeMap.ts rename src/matrix/{ => room/state}/RoomStateHandlerSet.ts (75%) create mode 100644 src/matrix/room/state/types.ts diff --git a/src/lib.ts b/src/lib.ts index 8c5c4e8e..839fbb15 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -22,7 +22,7 @@ export {Platform} from "./platform/web/Platform.js"; export {Client, LoadStatus} from "./matrix/Client.js"; export {RoomStatus} from "./matrix/room/common"; // export everything needed to observe state events on all rooms using session.observeRoomState -export type {RoomStateHandler} from "./matrix/room/common"; +export type {RoomStateHandler} from "./matrix/room/state/types"; export type {MemberChange} from "./matrix/room/members/RoomMember"; export type {Transaction} from "./matrix/storage/idb/Transaction"; export type {Room} from "./matrix/room/Room"; diff --git a/src/matrix/Session.js b/src/matrix/Session.js index 6211c456..cd676fc5 100644 --- a/src/matrix/Session.js +++ b/src/matrix/Session.js @@ -48,7 +48,7 @@ import {SecretStorage} from "./ssss/SecretStorage"; import {ObservableValue} from "../observable/value/ObservableValue"; import {RetainedObservableValue} from "../observable/value/RetainedObservableValue"; import {CallHandler} from "./calls/CallHandler"; -import {RoomStateHandlerSet} from "./RoomStateHandlerSet"; +import {RoomStateHandlerSet} from "./room/state/RoomStateHandlerSet"; const PICKLE_KEY = "DEFAULT_KEY"; const PUSHER_KEY = "pusher"; diff --git a/src/matrix/calls/CallHandler.ts b/src/matrix/calls/CallHandler.ts index 2386076b..e585bb40 100644 --- a/src/matrix/calls/CallHandler.ts +++ b/src/matrix/calls/CallHandler.ts @@ -35,7 +35,7 @@ import type {Options as GroupCallOptions} from "./group/GroupCall"; import type {Transaction} from "../storage/idb/Transaction"; import type {CallEntry} from "../storage/idb/stores/CallStore"; import type {Clock} from "../../platform/web/dom/Clock"; -import type {RoomStateHandler} from "../room/common"; +import type {RoomStateHandler} from "../room/state/types"; export type Options = Omit & { clock: Clock diff --git a/src/matrix/room/BaseRoom.js b/src/matrix/room/BaseRoom.js index cc65a320..b8f172d0 100644 --- a/src/matrix/room/BaseRoom.js +++ b/src/matrix/room/BaseRoom.js @@ -31,6 +31,8 @@ import {ensureLogItem} from "../../logging/utils"; import {PowerLevels} from "./PowerLevels.js"; import {RetainedObservableValue} from "../../observable/value/RetainedObservableValue"; import {TimelineReader} from "./timeline/persistence/TimelineReader"; +import {ObservedStateTypeMap} from "./state/ObservedStateTypeMap"; +import {ObservedStateKeyValue} from "./state/ObservedStateKeyValue"; const EVENT_ENCRYPTED_TYPE = "m.room.encrypted"; @@ -53,11 +55,35 @@ export class BaseRoom extends EventEmitter { this._getSyncToken = getSyncToken; this._platform = platform; this._observedEvents = null; + this._roomStateObservers = new Set(); this._powerLevels = null; this._powerLevelLoading = null; this._observedMembers = null; } + async observeStateType(type, txn = undefined) { + const map = new ObservedStateTypeMap(type); + await this._addStateObserver(map, txn); + return map; + } + + async observeStateTypeAndKey(type, stateKey, txn = undefined) { + const value = new ObservedStateKeyValue(type, stateKey); + await this._addStateObserver(value, txn); + return value; + } + + async _addStateObserver(stateObserver, txn) { + if (!txn) { + txn = await this._storage.readTxn([this._storage.storeNames.roomState]); + } + await stateObserver.load(this.id, txn); + this._roomStateObservers.add(stateObserver); + stateObserver.setRemoveCallback(() => { + this._roomStateObservers.delete(stateObserver); + }); + } + async _eventIdsToEntries(eventIds, txn) { const retryEntries = []; await Promise.all(eventIds.map(async eventId => { diff --git a/src/matrix/room/Room.js b/src/matrix/room/Room.js index a2eadfeb..796474d3 100644 --- a/src/matrix/room/Room.js +++ b/src/matrix/room/Room.js @@ -182,6 +182,7 @@ export class Room extends BaseRoom { const powerLevelsEvent = this._getPowerLevelsEvent(roomResponse); this._runRoomStateHandlers(roomResponse, txn, log); return { + roomResponse, summaryChanges, roomEncryption, newEntries, @@ -204,7 +205,7 @@ export class Room extends BaseRoom { const { summaryChanges, newEntries, updatedEntries, newLiveKey, removedPendingEvents, memberChanges, powerLevelsEvent, - heroChanges, roomEncryption + heroChanges, roomEncryption, roomResponse } = changes; log.set("id", this.id); this._syncWriter.afterSync(newLiveKey); @@ -264,6 +265,7 @@ export class Room extends BaseRoom { if (removedPendingEvents) { this._sendQueue.emitRemovals(removedPendingEvents); } + this._emitSyncRoomState(roomResponse); } _updateObservedMembers(memberChanges) { @@ -457,8 +459,14 @@ export class Room extends BaseRoom { this._roomStateHandler.handleRoomState(this, event, txn, log); }); } + + /** local room state observers, run during after sync step */ + _emitSyncRoomState(roomResponse) { + iterateResponseStateEvents(roomResponse, event => { + for (const handler of this._roomStateObservers) { + handler.handleStateEvent(event); } - } + }); } /** @package */ diff --git a/src/matrix/room/common.ts b/src/matrix/room/common.ts index 38070925..7556cfb0 100644 --- a/src/matrix/room/common.ts +++ b/src/matrix/room/common.ts @@ -47,9 +47,6 @@ export enum RoomType { Public } -export interface RoomStateHandler { - handleRoomState(room: Room, stateEvent: StateEvent, txn: Transaction, log: ILogItem); - updateRoomMembers(room: Room, memberChanges: Map); type RoomResponse = { state?: { events?: Array diff --git a/src/matrix/room/state/ObservedStateKeyValue.ts b/src/matrix/room/state/ObservedStateKeyValue.ts new file mode 100644 index 00000000..41cc3c7b --- /dev/null +++ b/src/matrix/room/state/ObservedStateKeyValue.ts @@ -0,0 +1,55 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import type {StateObserver} from "./types"; +import type {StateEvent} from "../../storage/types"; +import type {Transaction} from "../../storage/idb/Transaction"; +import {BaseObservableValue} from "../../../observable/value/BaseObservableValue"; + +/** + * Observable value for a state event with a given type and state key. + * Unsubscribes when last subscription is removed */ +export class ObservedStateKeyValue extends BaseObservableValue implements StateObserver { + private event?: StateEvent; + private removeCallback?: () => void; + + constructor(private readonly type: string, private readonly stateKey: string) { + super(); + } + /** @internal */ + async load(roomId: string, txn: Transaction): Promise { + this.event = (await txn.roomState.get(roomId, this.type, this.stateKey))?.event; + } + /** @internal */ + handleStateEvent(event: StateEvent) { + if (event.type === this.type && event.state_key === this.stateKey) { + this.event = event; + this.emit(this.get()); + } + } + + get(): StateEvent | undefined { + return this.event; + } + + setRemoveCallback(callback: () => void) { + this.removeCallback = callback; + } + + onUnsubscribeLast() { + this.removeCallback?.(); + } +} diff --git a/src/matrix/room/state/ObservedStateTypeMap.ts b/src/matrix/room/state/ObservedStateTypeMap.ts new file mode 100644 index 00000000..e8fa6f7b --- /dev/null +++ b/src/matrix/room/state/ObservedStateTypeMap.ts @@ -0,0 +1,53 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import type {StateObserver} from "./types"; +import type {StateEvent} from "../../storage/types"; +import type {Transaction} from "../../storage/idb/Transaction"; +import {ObservableMap} from "../../../observable/map/ObservableMap"; + +/** + * Observable map for a given type with state keys as map keys. + * Unsubscribes when last subscription is removed */ +export class ObservedStateTypeMap extends ObservableMap implements StateObserver { + private removeCallback?: () => void; + + constructor(private readonly type: string) { + super(); + } + /** @internal */ + async load(roomId: string, txn: Transaction): Promise { + const events = await txn.roomState.getAllForType(roomId, this.type); + for (let i = 0; i < events.length; ++i) { + const {event} = events[i]; + this.add(event.state_key, event); + } + } + /** @internal */ + handleStateEvent(event: StateEvent) { + if (event.type === this.type) { + this.set(event.state_key, event); + } + } + + setRemoveCallback(callback: () => void) { + this.removeCallback = callback; + } + + onUnsubscribeLast() { + this.removeCallback?.(); + } +} diff --git a/src/matrix/RoomStateHandlerSet.ts b/src/matrix/room/state/RoomStateHandlerSet.ts similarity index 75% rename from src/matrix/RoomStateHandlerSet.ts rename to src/matrix/room/state/RoomStateHandlerSet.ts index cf202097..986cb0f9 100644 --- a/src/matrix/RoomStateHandlerSet.ts +++ b/src/matrix/room/state/RoomStateHandlerSet.ts @@ -14,13 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -import type {ILogItem} from "../logging/types"; -import type {StateEvent} from "./storage/types"; -import type {Transaction} from "./storage/idb/Transaction"; -import type {Room} from "./room/Room"; -import type {MemberChange} from "./room/members/RoomMember"; -import type {RoomStateHandler} from "./room/common"; -import {BaseObservable} from "../observable/BaseObservable"; +import type {ILogItem} from "../../../logging/types"; +import type {StateEvent} from "../../storage/types"; +import type {Transaction} from "../../storage/idb/Transaction"; +import type {Room} from "../Room"; +import type {MemberChange} from "../members/RoomMember"; +import type {RoomStateHandler} from "./types"; +import {BaseObservable} from "../../../observable/BaseObservable"; /** keeps track of all handlers registered with Session.observeRoomState */ export class RoomStateHandlerSet extends BaseObservable implements RoomStateHandler { diff --git a/src/matrix/room/state/types.ts b/src/matrix/room/state/types.ts new file mode 100644 index 00000000..ef99c727 --- /dev/null +++ b/src/matrix/room/state/types.ts @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import type {Room} from "../Room"; +import type {StateEvent} from "../../storage/types"; +import type {Transaction} from "../../storage/idb/Transaction"; +import type {ILogItem} from "../../../logging/types"; +import type {MemberChange} from "../members/RoomMember"; + +/** used for Session.observeRoomState, which observes in all room, but without loading from storage + * It receives the sync write transaction, so other stores can be updated as part of the same transaction. */ +export interface RoomStateHandler { + handleRoomState(room: Room, stateEvent: StateEvent, syncWriteTxn: Transaction, log: ILogItem); + updateRoomMembers(room: Room, memberChanges: Map); +} + +/** + * used for Room.observeStateType and Room.observeStateTypeAndKey + * @internal + * */ +export interface StateObserver { + handleStateEvent(event: StateEvent); + load(roomId: string, txn: Transaction): Promise; + setRemoveCallback(callback: () => void); +}