add support for observing room state for single room + initial state

This commit is contained in:
Bruno Windels 2022-05-12 17:27:03 +02:00
parent db05338596
commit a50ea7e77b
10 changed files with 192 additions and 15 deletions

View File

@ -22,7 +22,7 @@ export {Platform} from "./platform/web/Platform.js";
export {Client, LoadStatus} from "./matrix/Client.js";
export {RoomStatus} from "./matrix/room/common";
// export everything needed to observe state events on all rooms using session.observeRoomState
export type {RoomStateHandler} from "./matrix/room/common";
export type {RoomStateHandler} from "./matrix/room/state/types";
export type {MemberChange} from "./matrix/room/members/RoomMember";
export type {Transaction} from "./matrix/storage/idb/Transaction";
export type {Room} from "./matrix/room/Room";

View File

@ -48,7 +48,7 @@ import {SecretStorage} from "./ssss/SecretStorage";
import {ObservableValue} from "../observable/value/ObservableValue";
import {RetainedObservableValue} from "../observable/value/RetainedObservableValue";
import {CallHandler} from "./calls/CallHandler";
import {RoomStateHandlerSet} from "./RoomStateHandlerSet";
import {RoomStateHandlerSet} from "./room/state/RoomStateHandlerSet";
const PICKLE_KEY = "DEFAULT_KEY";
const PUSHER_KEY = "pusher";

View File

@ -35,7 +35,7 @@ import type {Options as GroupCallOptions} from "./group/GroupCall";
import type {Transaction} from "../storage/idb/Transaction";
import type {CallEntry} from "../storage/idb/stores/CallStore";
import type {Clock} from "../../platform/web/dom/Clock";
import type {RoomStateHandler} from "../room/common";
import type {RoomStateHandler} from "../room/state/types";
export type Options = Omit<GroupCallOptions, "emitUpdate" | "createTimeout"> & {
clock: Clock

View File

@ -31,6 +31,8 @@ import {ensureLogItem} from "../../logging/utils";
import {PowerLevels} from "./PowerLevels.js";
import {RetainedObservableValue} from "../../observable/value/RetainedObservableValue";
import {TimelineReader} from "./timeline/persistence/TimelineReader";
import {ObservedStateTypeMap} from "./state/ObservedStateTypeMap";
import {ObservedStateKeyValue} from "./state/ObservedStateKeyValue";
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
@ -53,11 +55,35 @@ export class BaseRoom extends EventEmitter {
this._getSyncToken = getSyncToken;
this._platform = platform;
this._observedEvents = null;
this._roomStateObservers = new Set();
this._powerLevels = null;
this._powerLevelLoading = null;
this._observedMembers = null;
}
async observeStateType(type, txn = undefined) {
const map = new ObservedStateTypeMap(type);
await this._addStateObserver(map, txn);
return map;
}
async observeStateTypeAndKey(type, stateKey, txn = undefined) {
const value = new ObservedStateKeyValue(type, stateKey);
await this._addStateObserver(value, txn);
return value;
}
async _addStateObserver(stateObserver, txn) {
if (!txn) {
txn = await this._storage.readTxn([this._storage.storeNames.roomState]);
}
await stateObserver.load(this.id, txn);
this._roomStateObservers.add(stateObserver);
stateObserver.setRemoveCallback(() => {
this._roomStateObservers.delete(stateObserver);
});
}
async _eventIdsToEntries(eventIds, txn) {
const retryEntries = [];
await Promise.all(eventIds.map(async eventId => {

View File

@ -182,6 +182,7 @@ export class Room extends BaseRoom {
const powerLevelsEvent = this._getPowerLevelsEvent(roomResponse);
this._runRoomStateHandlers(roomResponse, txn, log);
return {
roomResponse,
summaryChanges,
roomEncryption,
newEntries,
@ -204,7 +205,7 @@ export class Room extends BaseRoom {
const {
summaryChanges, newEntries, updatedEntries, newLiveKey,
removedPendingEvents, memberChanges, powerLevelsEvent,
heroChanges, roomEncryption
heroChanges, roomEncryption, roomResponse
} = changes;
log.set("id", this.id);
this._syncWriter.afterSync(newLiveKey);
@ -264,6 +265,7 @@ export class Room extends BaseRoom {
if (removedPendingEvents) {
this._sendQueue.emitRemovals(removedPendingEvents);
}
this._emitSyncRoomState(roomResponse);
}
_updateObservedMembers(memberChanges) {
@ -457,8 +459,14 @@ export class Room extends BaseRoom {
this._roomStateHandler.handleRoomState(this, event, txn, log);
});
}
/** local room state observers, run during after sync step */
_emitSyncRoomState(roomResponse) {
iterateResponseStateEvents(roomResponse, event => {
for (const handler of this._roomStateObservers) {
handler.handleStateEvent(event);
}
}
});
}
/** @package */

View File

@ -47,9 +47,6 @@ export enum RoomType {
Public
}
export interface RoomStateHandler {
handleRoomState(room: Room, stateEvent: StateEvent, txn: Transaction, log: ILogItem);
updateRoomMembers(room: Room, memberChanges: Map<string, MemberChange>);
type RoomResponse = {
state?: {
events?: Array<StateEvent>

View File

@ -0,0 +1,55 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import type {StateObserver} from "./types";
import type {StateEvent} from "../../storage/types";
import type {Transaction} from "../../storage/idb/Transaction";
import {BaseObservableValue} from "../../../observable/value/BaseObservableValue";
/**
* Observable value for a state event with a given type and state key.
* Unsubscribes when last subscription is removed */
export class ObservedStateKeyValue extends BaseObservableValue<StateEvent | undefined> implements StateObserver {
private event?: StateEvent;
private removeCallback?: () => void;
constructor(private readonly type: string, private readonly stateKey: string) {
super();
}
/** @internal */
async load(roomId: string, txn: Transaction): Promise<void> {
this.event = (await txn.roomState.get(roomId, this.type, this.stateKey))?.event;
}
/** @internal */
handleStateEvent(event: StateEvent) {
if (event.type === this.type && event.state_key === this.stateKey) {
this.event = event;
this.emit(this.get());
}
}
get(): StateEvent | undefined {
return this.event;
}
setRemoveCallback(callback: () => void) {
this.removeCallback = callback;
}
onUnsubscribeLast() {
this.removeCallback?.();
}
}

View File

@ -0,0 +1,53 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import type {StateObserver} from "./types";
import type {StateEvent} from "../../storage/types";
import type {Transaction} from "../../storage/idb/Transaction";
import {ObservableMap} from "../../../observable/map/ObservableMap";
/**
* Observable map for a given type with state keys as map keys.
* Unsubscribes when last subscription is removed */
export class ObservedStateTypeMap extends ObservableMap<string, StateEvent> implements StateObserver {
private removeCallback?: () => void;
constructor(private readonly type: string) {
super();
}
/** @internal */
async load(roomId: string, txn: Transaction): Promise<void> {
const events = await txn.roomState.getAllForType(roomId, this.type);
for (let i = 0; i < events.length; ++i) {
const {event} = events[i];
this.add(event.state_key, event);
}
}
/** @internal */
handleStateEvent(event: StateEvent) {
if (event.type === this.type) {
this.set(event.state_key, event);
}
}
setRemoveCallback(callback: () => void) {
this.removeCallback = callback;
}
onUnsubscribeLast() {
this.removeCallback?.();
}
}

View File

@ -14,13 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import type {ILogItem} from "../logging/types";
import type {StateEvent} from "./storage/types";
import type {Transaction} from "./storage/idb/Transaction";
import type {Room} from "./room/Room";
import type {MemberChange} from "./room/members/RoomMember";
import type {RoomStateHandler} from "./room/common";
import {BaseObservable} from "../observable/BaseObservable";
import type {ILogItem} from "../../../logging/types";
import type {StateEvent} from "../../storage/types";
import type {Transaction} from "../../storage/idb/Transaction";
import type {Room} from "../Room";
import type {MemberChange} from "../members/RoomMember";
import type {RoomStateHandler} from "./types";
import {BaseObservable} from "../../../observable/BaseObservable";
/** keeps track of all handlers registered with Session.observeRoomState */
export class RoomStateHandlerSet extends BaseObservable<RoomStateHandler> implements RoomStateHandler {

View File

@ -0,0 +1,38 @@
/*
Copyright 2022 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import type {Room} from "../Room";
import type {StateEvent} from "../../storage/types";
import type {Transaction} from "../../storage/idb/Transaction";
import type {ILogItem} from "../../../logging/types";
import type {MemberChange} from "../members/RoomMember";
/** used for Session.observeRoomState, which observes in all room, but without loading from storage
* It receives the sync write transaction, so other stores can be updated as part of the same transaction. */
export interface RoomStateHandler {
handleRoomState(room: Room, stateEvent: StateEvent, syncWriteTxn: Transaction, log: ILogItem);
updateRoomMembers(room: Room, memberChanges: Map<string, MemberChange>);
}
/**
* used for Room.observeStateType and Room.observeStateTypeAndKey
* @internal
* */
export interface StateObserver {
handleStateEvent(event: StateEvent);
load(roomId: string, txn: Transaction): Promise<void>;
setRemoveCallback(callback: () => void);
}