implement subscribing to a single event

This commit is contained in:
Bruno Windels 2020-10-30 15:19:51 +01:00
parent 7d81306a49
commit 137264edcb
4 changed files with 150 additions and 1 deletions

View file

@ -0,0 +1,90 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import {BaseObservableValue} from "../../observable/ObservableValue.js";
export class ObservedEventMap {
constructor(notifyEmpty) {
this._map = new Map();
this._notifyEmpty = notifyEmpty;
}
observe(eventId, eventEntry = null) {
let observable = this._map.get(eventId);
if (!observable) {
observable = new ObservedEvent(this, eventEntry);
this._map.set(eventId, observable);
}
return observable;
}
updateEvents(eventEntries) {
for (let i = 0; i < eventEntries.length; i += 1) {
const entry = eventEntries[i];
const observable = this._map.get(entry.id);
observable?.update(entry);
}
}
_remove(observable) {
this._map.delete(observable.get().id);
if (this._map.size === 0) {
this._notifyEmpty();
}
}
}
class ObservedEvent extends BaseObservableValue {
constructor(eventMap, entry) {
super();
this._eventMap = eventMap;
this._entry = entry;
// remove subscription in microtask after creating it
// otherwise ObservedEvents would easily never get
// removed if you never subscribe
Promise.resolve().then(() => {
if (!this.hasSubscriptions) {
this._eventMap.remove(this);
this._eventMap = null;
}
});
}
subscribe(handler) {
if (!this._eventMap) {
throw new Error("ObservedEvent expired, subscribe right after calling room.observeEvent()");
}
return super.subscribe(handler);
}
onUnsubscribeLast() {
this._eventMap._remove(this);
this._eventMap = null;
super.onUnsubscribeLast();
}
update(entry) {
// entries are mostly updated in-place,
// apart from when they are created,
// but doesn't hurt to reassign
this._entry = entry;
this.emit(this._entry);
}
get() {
return this._entry;
}
}

View file

@ -29,8 +29,8 @@ import {Heroes} from "./members/Heroes.js";
import {EventEntry} from "./timeline/entries/EventEntry.js";
import {EventKey} from "./timeline/EventKey.js";
import {Direction} from "./timeline/Direction.js";
import {ObservedEventMap} from "./ObservedEventMap.js";
import {DecryptionSource} from "../e2ee/common.js";
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
export class Room extends EventEmitter {
@ -53,6 +53,7 @@ export class Room extends EventEmitter {
this._roomEncryption = null;
this._getSyncToken = getSyncToken;
this._clock = clock;
this._observedEvents = null;
}
_readRetryDecryptCandidateEntries(sinceEventKey, txn) {
@ -165,6 +166,9 @@ export class Room extends EventEmitter {
}
await writeTxn.complete();
decryption.applyToEntries(entries);
if (this._observedEvents) {
this._observedEvents.updateEvents(entries);
}
});
return request;
}
@ -285,6 +289,9 @@ export class Room extends EventEmitter {
if (this._timeline) {
this._timeline.appendLiveEntries(newTimelineEntries);
}
if (this._observedEvents) {
this._observedEvents.updateEvents(newTimelineEntries);
}
if (removedPendingEvents) {
this._sendQueue.emitRemovals(removedPendingEvents);
}
@ -580,6 +587,45 @@ export class Room extends EventEmitter {
this._summary.applyChanges(changes);
}
observeEvent(eventId) {
if (!this._observedEvents) {
this._observedEvents = new ObservedEventMap(() => {
this._observedEvents = null;
});
}
let entry = null;
if (this._timeline) {
entry = this._timeline.getByEventId(eventId);
}
const observable = this._observedEvents.observe(eventId, entry);
if (!entry) {
// update in the background
this._readEventById(eventId).then(entry => {
observable.update(entry);
}).catch(err => {
console.warn(`could not load event ${eventId} from storage`, err);
});
}
return observable;
}
async _readEventById(eventId) {
let stores = [this._storage.storeNames.timelineEvents];
if (this.isEncrypted) {
stores.push(this._storage.storeNames.inboundGroupSessions);
}
const txn = this._storage.readTxn(stores);
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
if (storageEntry) {
const entry = new EventEntry(storageEntry, this._fragmentIdComparer);
if (entry.eventType === EVENT_ENCRYPTED_TYPE) {
const request = this._decryptEntries(DecryptionSource.Timeline, [entry], txn);
await request.complete();
}
return entry;
}
}
dispose() {
this._roomEncryption?.dispose();
this._timeline?.dispose();

View file

@ -95,6 +95,15 @@ export class Timeline {
}
}
getByEventId(eventId) {
for (let i = 0; i < this._remoteEntries.length; i += 1) {
const entry = this._remoteEntries.get(i);
if (entry.id === eventId) {
return entry;
}
}
}
/** @public */
get entries() {
return this._allEntries;

View file

@ -48,6 +48,10 @@ export class BaseObservable {
return null;
}
get hasSubscriptions() {
return this._handlers.size !== 0;
}
// Add iterator over handlers here
}