Merge pull request #611 from vector-im/threading-fallback-relation
Threading fallback - PR 1 - Link events with their related event
This commit is contained in:
commit
17ebc8a066
12 changed files with 441 additions and 85 deletions
|
@ -254,8 +254,14 @@ export function tests() {
|
|||
// 2. setup queue & timeline
|
||||
const queue = new SendQueue({roomId, storage, hsApi: new MockHomeServer().api});
|
||||
const powerLevelsObservable = new ObservableValue(new PowerLevels({ ownUserId: alice, membership: "join" }));
|
||||
const timeline = new Timeline({roomId, storage, fragmentIdComparer,
|
||||
clock: new MockClock(), pendingEvents: queue.pendingEvents, powerLevelsObservable});
|
||||
const timeline = new Timeline({
|
||||
roomId,
|
||||
storage,
|
||||
fragmentIdComparer,
|
||||
clock: new MockClock(),
|
||||
pendingEvents: queue.pendingEvents,
|
||||
powerLevelsObservable
|
||||
});
|
||||
// 3. load the timeline, which will load the message with the reaction
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
const tiles = mapMessageEntriesToBaseMessageTile(timeline, queue);
|
||||
|
|
|
@ -128,6 +128,10 @@ export class HomeServerApi {
|
|||
return this._get("/sync", {since, timeout, filter}, undefined, options);
|
||||
}
|
||||
|
||||
context(roomId: string, eventId: string, limit: number, filter: string): IHomeServerRequest {
|
||||
return this._get(`/rooms/${encodeURIComponent(roomId)}/context/${encodeURIComponent(eventId)}`, {filter, limit});
|
||||
}
|
||||
|
||||
// params is from, dir and optionally to, limit, filter.
|
||||
messages(roomId: string, params: Record<string, any>, options?: IRequestOptions): IHomeServerRequest {
|
||||
return this._get(`/rooms/${encodeURIComponent(roomId)}/messages`, params, undefined, options);
|
||||
|
|
|
@ -30,6 +30,7 @@ import {DecryptionSource} from "../e2ee/common.js";
|
|||
import {ensureLogItem} from "../../logging/utils";
|
||||
import {PowerLevels} from "./PowerLevels.js";
|
||||
import {RetainedObservableValue} from "../../observable/ObservableValue";
|
||||
import {TimelineReader} from "./timeline/persistence/TimelineReader";
|
||||
|
||||
const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
||||
|
||||
|
@ -501,7 +502,8 @@ export class BaseRoom extends EventEmitter {
|
|||
},
|
||||
clock: this._platform.clock,
|
||||
logger: this._platform.logger,
|
||||
powerLevelsObservable: await this.observePowerLevels()
|
||||
powerLevelsObservable: await this.observePowerLevels(),
|
||||
hsApi: this._hsApi
|
||||
});
|
||||
try {
|
||||
if (this._roomEncryption) {
|
||||
|
@ -543,23 +545,11 @@ export class BaseRoom extends EventEmitter {
|
|||
}
|
||||
|
||||
async _readEventById(eventId) {
|
||||
let stores = [this._storage.storeNames.timelineEvents];
|
||||
if (this.isEncrypted) {
|
||||
stores.push(this._storage.storeNames.inboundGroupSessions);
|
||||
}
|
||||
const txn = await this._storage.readTxn(stores);
|
||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||
if (storageEntry) {
|
||||
const entry = new EventEntry(storageEntry, this._fragmentIdComparer);
|
||||
if (entry.eventType === EVENT_ENCRYPTED_TYPE) {
|
||||
const request = this._decryptEntries(DecryptionSource.Timeline, [entry], txn);
|
||||
await request.complete();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
const reader = new TimelineReader({ roomId: this._roomId, storage: this._storage, fragmentIdComparer: this._fragmentIdComparer });
|
||||
const entry = await reader.readById(eventId);
|
||||
return entry;
|
||||
}
|
||||
|
||||
|
||||
dispose() {
|
||||
this._roomEncryption?.dispose();
|
||||
this._timeline?.dispose();
|
||||
|
|
|
@ -23,9 +23,12 @@ import {PendingEventEntry} from "./entries/PendingEventEntry.js";
|
|||
import {RoomMember} from "../members/RoomMember.js";
|
||||
import {getRelation, ANNOTATION_RELATION_TYPE} from "./relations.js";
|
||||
import {REDACTION_TYPE} from "../common.js";
|
||||
import {NonPersistedEventEntry} from "./entries/NonPersistedEventEntry.js";
|
||||
import {DecryptionSource} from "../../e2ee/common.js";
|
||||
import {EVENT_TYPE as MEMBER_EVENT_TYPE} from "../members/RoomMember.js";
|
||||
|
||||
export class Timeline {
|
||||
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, clock, powerLevelsObservable}) {
|
||||
constructor({roomId, storage, closeCallback, fragmentIdComparer, pendingEvents, clock, powerLevelsObservable, hsApi}) {
|
||||
this._roomId = roomId;
|
||||
this._storage = storage;
|
||||
this._closeCallback = closeCallback;
|
||||
|
@ -43,6 +46,11 @@ export class Timeline {
|
|||
});
|
||||
this._readerRequest = null;
|
||||
this._allEntries = null;
|
||||
/** Stores event entries that we had to fetch from hs/storage for reply previews (because they were not in timeline) */
|
||||
this._contextEntriesNotInTimeline = new Map();
|
||||
/** Only used to decrypt non-persisted context entries fetched from the homeserver */
|
||||
this._decryptEntries = null;
|
||||
this._hsApi = hsApi;
|
||||
this.initializePowerLevels(powerLevelsObservable);
|
||||
}
|
||||
|
||||
|
@ -78,6 +86,7 @@ export class Timeline {
|
|||
const readerRequest = this._disposables.track(this._timelineReader.readFromEnd(20, txn, log));
|
||||
try {
|
||||
const entries = await readerRequest.complete();
|
||||
this._loadContextEntriesWhereNeeded(entries);
|
||||
this._setupEntries(entries);
|
||||
} finally {
|
||||
this._disposables.disposeTracked(readerRequest);
|
||||
|
@ -115,6 +124,7 @@ export class Timeline {
|
|||
pendingEvent: pe, member: this._ownMember,
|
||||
clock: this._clock, redactingEntry
|
||||
});
|
||||
this._loadContextEntriesWhereNeeded([pee]);
|
||||
this._applyAndEmitLocalRelationChange(pee, target => target.addLocalRelation(pee));
|
||||
return pee;
|
||||
}
|
||||
|
@ -125,28 +135,29 @@ export class Timeline {
|
|||
const params = updater(e);
|
||||
return params ? params : false;
|
||||
};
|
||||
this._findAndUpdateRelatedEntry(pee.pendingEvent.relatedTxnId, pee.relatedEventId, updateOrFalse);
|
||||
this._findAndUpdateEntryById(pee.pendingEvent.relatedTxnId, pee.relatedEventId, updateOrFalse);
|
||||
// also look for a relation target to update with this redaction
|
||||
if (pee.redactingEntry) {
|
||||
// redactingEntry might be a PendingEventEntry or an EventEntry, so don't assume pendingEvent
|
||||
const relatedTxnId = pee.redactingEntry.pendingEvent?.relatedTxnId;
|
||||
this._findAndUpdateRelatedEntry(relatedTxnId, pee.redactingEntry.relatedEventId, updateOrFalse);
|
||||
this._findAndUpdateEntryById(relatedTxnId, pee.redactingEntry.relatedEventId, updateOrFalse);
|
||||
pee.redactingEntry.contextForEntries?.forEach(e => this._emitUpdateForEntry(e, "contextEntry"));
|
||||
}
|
||||
}
|
||||
|
||||
_findAndUpdateRelatedEntry(relatedTxnId, relatedEventId, updateOrFalse) {
|
||||
_findAndUpdateEntryById(txnId, eventId, updateOrFalse) {
|
||||
let found = false;
|
||||
// first, look in local entries based on txn id
|
||||
if (relatedTxnId) {
|
||||
if (txnId) {
|
||||
found = this._localEntries.findAndUpdate(
|
||||
e => e.id === relatedTxnId,
|
||||
e => e.id === txnId,
|
||||
updateOrFalse,
|
||||
);
|
||||
}
|
||||
// if not found here, look in remote entries based on event id
|
||||
if (!found && relatedEventId) {
|
||||
if (!found && eventId) {
|
||||
this._remoteEntries.findAndUpdate(
|
||||
e => e.id === relatedEventId,
|
||||
e => e.id === eventId,
|
||||
updateOrFalse
|
||||
);
|
||||
}
|
||||
|
@ -206,6 +217,8 @@ export class Timeline {
|
|||
|
||||
// used in replaceEntries
|
||||
static _entryUpdater(existingEntry, entry) {
|
||||
// ensure other entries for which this existingEntry is a context point to the new entry instead of existingEntry
|
||||
existingEntry.contextForEntries?.forEach(event => event.setContextEntry(entry));
|
||||
entry.updateFrom(existingEntry);
|
||||
return entry;
|
||||
}
|
||||
|
@ -216,6 +229,13 @@ export class Timeline {
|
|||
for (const entry of entries) {
|
||||
try {
|
||||
this._remoteEntries.getAndUpdate(entry, Timeline._entryUpdater);
|
||||
const oldEntry = this._contextEntriesNotInTimeline.get(entry.id)
|
||||
if (oldEntry) {
|
||||
Timeline._entryUpdater(oldEntry, entry);
|
||||
this._contextEntriesNotInTimeline.set(entry.id, entry);
|
||||
}
|
||||
// Since this entry changed, all dependent entries should be updated
|
||||
entry.contextForEntries?.forEach(e => this._emitUpdateForEntry(e, "contextEntry"));
|
||||
} catch (err) {
|
||||
if (err.name === "CompareError") {
|
||||
// see FragmentIdComparer, if the replacing entry is on a fragment
|
||||
|
@ -236,7 +256,115 @@ export class Timeline {
|
|||
/** @package */
|
||||
addEntries(newEntries) {
|
||||
this._addLocalRelationsToNewRemoteEntries(newEntries);
|
||||
this._updateEntriesFetchedFromHomeserver(newEntries);
|
||||
this._moveEntryToRemoteEntries(newEntries);
|
||||
this._remoteEntries.setManySorted(newEntries);
|
||||
this._loadContextEntriesWhereNeeded(newEntries);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update entries based on newly received events.
|
||||
* This is specific to events that are not in the timeline but had to be fetched from the homeserver
|
||||
* because they are context-events for other events in the timeline (i.e fetched from hs so that we
|
||||
* can render things like reply previews)
|
||||
*/
|
||||
_updateEntriesFetchedFromHomeserver(entries) {
|
||||
/**
|
||||
* Updates for entries in timeline is handled by remoteEntries observable collection
|
||||
* Updates for entries not in timeline but fetched from storage is handled in this.replaceEntries()
|
||||
* This code is specific to entries fetched from HomeServer i.e NonPersistedEventEntry
|
||||
*/
|
||||
for (const entry of entries) {
|
||||
const relatedEntry = this._contextEntriesNotInTimeline.get(entry.relatedEventId);
|
||||
if (relatedEntry?.isNonPersisted && relatedEntry?.addLocalRelation(entry)) {
|
||||
// update other entries for which this entry is a context entry
|
||||
relatedEntry.contextForEntries?.forEach(e => this._emitUpdateForEntry(e, "contextEntry"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If an event we had to fetch from hs/storage is now in the timeline (for eg, due to gap fill),
|
||||
* remove the event from _contextEntriesNotInTimeline since it is now in remoteEntries
|
||||
*/
|
||||
_moveEntryToRemoteEntries(entries) {
|
||||
for (const entry of entries) {
|
||||
const fetchedEntry = this._contextEntriesNotInTimeline.get(entry.id);
|
||||
if (fetchedEntry) {
|
||||
fetchedEntry.contextForEntries.forEach(e => {
|
||||
e.setContextEntry(entry);
|
||||
this._emitUpdateForEntry(e, "contextEntry");
|
||||
});
|
||||
this._contextEntriesNotInTimeline.delete(entry.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_emitUpdateForEntry(entry, param) {
|
||||
const txnId = entry.isPending ? entry.id : null;
|
||||
const eventId = entry.isPending ? null : entry.id;
|
||||
this._findAndUpdateEntryById(txnId, eventId, () => param);
|
||||
}
|
||||
|
||||
/**
|
||||
* For each entry in entries, this method associates a context-entry (if needed) to it.
|
||||
* The context-entry is fetched using the following strategies (in the same order as given):
|
||||
* - timeline
|
||||
* - storage
|
||||
* - homeserver
|
||||
* @param {EventEntry[]} entries
|
||||
*/
|
||||
async _loadContextEntriesWhereNeeded(entries) {
|
||||
for (const entry of entries) {
|
||||
if (!entry.contextEventId) {
|
||||
continue;
|
||||
}
|
||||
const id = entry.contextEventId;
|
||||
let contextEvent = this._findLoadedEventById(id);
|
||||
if (!contextEvent) {
|
||||
contextEvent = await this._getEventFromStorage(id) ?? await this._getEventFromHomeserver(id);
|
||||
if (contextEvent) {
|
||||
// this entry was created from storage/hs, so it's not tracked by remoteEntries
|
||||
// we track them here so that we can update reply previews later
|
||||
this._contextEntriesNotInTimeline.set(id, contextEvent);
|
||||
}
|
||||
}
|
||||
if (contextEvent) {
|
||||
entry.setContextEntry(contextEvent);
|
||||
this._emitUpdateForEntry(entry, "contextEntry");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches an entry with the given event-id from localEntries, remoteEntries or contextEntriesNotInTimeline.
|
||||
* @param {string} eventId event-id of the entry
|
||||
* @returns entry if found, undefined otherwise
|
||||
*/
|
||||
_findLoadedEventById(eventId) {
|
||||
return this.getByEventId(eventId) ?? this._contextEntriesNotInTimeline.get(eventId);
|
||||
}
|
||||
|
||||
async _getEventFromStorage(eventId) {
|
||||
const entry = await this._timelineReader.readById(eventId);
|
||||
return entry;
|
||||
}
|
||||
|
||||
async _getEventFromHomeserver(eventId) {
|
||||
const response = await this._hsApi.context(this._roomId, eventId, 0).response();
|
||||
const sender = response.event.sender;
|
||||
const member = response.state.find(e => e.type === MEMBER_EVENT_TYPE && e.user_id === sender);
|
||||
const entry = {
|
||||
event: response.event,
|
||||
displayName: member.content.displayname,
|
||||
avatarUrl: member.content.avatar_url
|
||||
};
|
||||
const eventEntry = new NonPersistedEventEntry(entry, this._fragmentIdComparer);
|
||||
if (this._decryptEntries) {
|
||||
const request = this._decryptEntries(DecryptionSource.Timeline, [eventEntry]);
|
||||
await request.complete();
|
||||
}
|
||||
return eventEntry;
|
||||
}
|
||||
|
||||
// tries to prepend `amount` entries to the `entries` list.
|
||||
|
@ -278,18 +406,7 @@ export class Timeline {
|
|||
}
|
||||
}
|
||||
if (eventId) {
|
||||
const loadedEntry = this.getByEventId(eventId);
|
||||
if (loadedEntry) {
|
||||
return loadedEntry;
|
||||
} else {
|
||||
const txn = await this._storage.readWriteTxn([
|
||||
this._storage.storeNames.timelineEvents,
|
||||
]);
|
||||
const redactionTargetEntry = await txn.timelineEvents.getByEventId(this._roomId, eventId);
|
||||
if (redactionTargetEntry) {
|
||||
return new EventEntry(redactionTargetEntry, this._fragmentIdComparer);
|
||||
}
|
||||
}
|
||||
return this.getByEventId(eventId) ?? await this._getEventFromStorage(eventId);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -328,6 +445,7 @@ export class Timeline {
|
|||
|
||||
/** @internal */
|
||||
enableEncryption(decryptEntries) {
|
||||
this._decryptEntries = decryptEntries;
|
||||
this._timelineReader.enableEncryption(decryptEntries);
|
||||
}
|
||||
|
||||
|
@ -345,7 +463,7 @@ import {poll} from "../../../mocks/poll.js";
|
|||
import {Clock as MockClock} from "../../../mocks/Clock.js";
|
||||
import {createMockStorage} from "../../../mocks/Storage";
|
||||
import {ListObserver} from "../../../mocks/ListObserver.js";
|
||||
import {createEvent, withTextBody, withContent, withSender} from "../../../mocks/event.js";
|
||||
import {createEvent, withTextBody, withContent, withSender, withRedacts, withReply} from "../../../mocks/event.js";
|
||||
import {NullLogItem} from "../../../logging/NullLogger";
|
||||
import {EventEntry} from "./entries/EventEntry.js";
|
||||
import {User} from "../../User.js";
|
||||
|
@ -357,6 +475,22 @@ export function tests() {
|
|||
const roomId = "$abc";
|
||||
const alice = "@alice:hs.tld";
|
||||
const bob = "@bob:hs.tld";
|
||||
const hsApi = {
|
||||
context() {
|
||||
const result = {
|
||||
event: withTextBody("foo", createEvent("m.room.message", "event_id_1", alice)),
|
||||
state: [{
|
||||
type: MEMBER_EVENT_TYPE,
|
||||
user_id: alice,
|
||||
content: {
|
||||
displayName: "",
|
||||
avatarUrl: ""
|
||||
}
|
||||
}]
|
||||
};
|
||||
return { response: () => result };
|
||||
}
|
||||
};
|
||||
|
||||
function getIndexFromIterable(it, n) {
|
||||
let i = 0;
|
||||
|
@ -486,7 +620,7 @@ export function tests() {
|
|||
// 1. setup timeline
|
||||
const pendingEvents = new ObservableArray();
|
||||
const timeline = new Timeline({roomId, storage: await createMockStorage(),
|
||||
closeCallback: () => {}, fragmentIdComparer, pendingEvents, clock: new MockClock()});
|
||||
closeCallback: () => { }, fragmentIdComparer, pendingEvents, clock: new MockClock()});
|
||||
await timeline.load(new User(bob), "join", new NullLogItem());
|
||||
timeline.entries.subscribe(new ListObserver());
|
||||
// 2. add message and reaction to timeline
|
||||
|
@ -600,6 +734,109 @@ export function tests() {
|
|||
assert.equal(type, "update");
|
||||
assert.equal(value.eventType, "m.room.message");
|
||||
assert.equal(value.content.body, "hi bob!");
|
||||
},
|
||||
|
||||
"context entry is fetched from remoteEntries": async assert => {
|
||||
const timeline = new Timeline({roomId, storage: await createMockStorage(), closeCallback: () => {},
|
||||
fragmentIdComparer, pendingEvents: new ObservableArray(), clock: new MockClock()});
|
||||
const entryA = new EventEntry({ event: withTextBody("foo", createEvent("m.room.message", "event_id_1", alice)) });
|
||||
const entryB = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_2", bob)), eventIndex: 2 });
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
timeline.entries.subscribe({ onAdd: () => null, });
|
||||
timeline.addEntries([entryA, entryB]);
|
||||
assert.deepEqual(entryB.contextEntry, entryA);
|
||||
},
|
||||
|
||||
"context entry is fetched from storage": async assert => {
|
||||
const storage = await createMockStorage();
|
||||
const txn = await storage.readWriteTxn([storage.storeNames.timelineEvents, storage.storeNames.timelineRelations]);
|
||||
txn.timelineEvents.tryInsert({ event: withTextBody("foo", createEvent("m.room.message", "event_id_1", alice)), fragmentId: 1, eventIndex: 1, roomId });
|
||||
await txn.complete();
|
||||
const timeline = new Timeline({roomId, storage, closeCallback: () => {},
|
||||
fragmentIdComparer, pendingEvents: new ObservableArray(), clock: new MockClock()});
|
||||
const entryB = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_2", bob)), eventIndex: 2 });
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
timeline.entries.subscribe({ onAdd: () => null, onUpdate: () => null });
|
||||
timeline.addEntries([entryB]);
|
||||
await poll(() => entryB.contextEntry);
|
||||
assert.strictEqual(entryB.contextEntry.id, "event_id_1");
|
||||
},
|
||||
|
||||
"context entry is fetched from hs": async assert => {
|
||||
const timeline = new Timeline({roomId, storage: await createMockStorage(), closeCallback: () => {},
|
||||
fragmentIdComparer, pendingEvents: new ObservableArray(), clock: new MockClock(), hsApi});
|
||||
const entryB = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_2", bob)), eventIndex: 2 });
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
timeline.entries.subscribe({ onAdd: () => null, onUpdate: () => null });
|
||||
timeline.addEntries([entryB]);
|
||||
await poll(() => entryB.contextEntry);
|
||||
assert.strictEqual(entryB.contextEntry.id, "event_id_1");
|
||||
},
|
||||
|
||||
"context entry has a list of entries to which it forms the context": async assert => {
|
||||
const timeline = new Timeline({roomId, storage: await createMockStorage(), closeCallback: () => {},
|
||||
fragmentIdComparer, pendingEvents: new ObservableArray(), clock: new MockClock()});
|
||||
const entryA = new EventEntry({ event: withTextBody("foo", createEvent("m.room.message", "event_id_1", alice)), eventIndex: 1 });
|
||||
const entryB = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_2", bob)), eventIndex: 2 });
|
||||
const entryC = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_3", bob)), eventIndex: 3 });
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
timeline.entries.subscribe({ onAdd: () => null, onUpdate: () => null });
|
||||
timeline.addEntries([entryA, entryB, entryC]);
|
||||
await poll(() => entryA.contextForEntries.length === 2);
|
||||
assert.deepEqual(entryA.contextForEntries, [entryB, entryC]);
|
||||
},
|
||||
|
||||
"context entry in contextEntryNotInTimeline gets updated based on incoming redaction": async assert => {
|
||||
const timeline = new Timeline({roomId, storage: await createMockStorage(), closeCallback: () => {},
|
||||
fragmentIdComparer, pendingEvents: new ObservableArray(), clock: new MockClock(), hsApi});
|
||||
const entryB = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_2", bob)), eventIndex: 2 });
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
timeline.entries.subscribe({ onAdd: () => null, onUpdate: () => null });
|
||||
timeline.addEntries([entryB]);
|
||||
await poll(() => entryB.contextEntry);
|
||||
const redactingEntry = new EventEntry({ event: withRedacts("event_id_1", "foo", createEvent("m.room.redaction", "event_id_3", alice)), eventIndex: 3 });
|
||||
timeline.addEntries([redactingEntry]);
|
||||
assert.strictEqual(entryB.contextEntry.isRedacted, true);
|
||||
},
|
||||
|
||||
"redaction of context entry triggers updates in other entries": async assert => {
|
||||
const timeline = new Timeline({roomId, storage: await createMockStorage(), closeCallback: () => {},
|
||||
fragmentIdComparer, pendingEvents: new ObservableArray(), clock: new MockClock(), hsApi});
|
||||
const entryB = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_2", bob)), eventIndex: 2 });
|
||||
const entryC = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_3", bob)), eventIndex: 3 });
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
const bin = [];
|
||||
timeline.entries.subscribe({
|
||||
onUpdate: (index) => {
|
||||
const e = timeline.remoteEntries[index];
|
||||
bin.push(e.id);
|
||||
},
|
||||
onAdd: () => null,
|
||||
});
|
||||
timeline.addEntries([entryB, entryC]);
|
||||
await poll(() => timeline._remoteEntries.array.length === 2 && timeline._contextEntriesNotInTimeline.get("event_id_1"));
|
||||
const redactingEntry = new EventEntry({ event: withRedacts("event_id_1", "foo", createEvent("m.room.redaction", "event_id_3", alice)) });
|
||||
timeline.addEntries([redactingEntry]);
|
||||
assert.strictEqual(bin.includes("event_id_2"), true);
|
||||
assert.strictEqual(bin.includes("event_id_3"), true);
|
||||
},
|
||||
|
||||
"context entries fetched from storage/hs are moved to remoteEntries": async assert => {
|
||||
const timeline = new Timeline({roomId, storage: await createMockStorage(), closeCallback: () => {},
|
||||
fragmentIdComparer, pendingEvents: new ObservableArray(), clock: new MockClock(), hsApi});
|
||||
const entryA = new EventEntry({ event: withTextBody("foo", createEvent("m.room.message", "event_id_1", alice)), eventIndex: 1 });
|
||||
const entryB = new EventEntry({ event: withReply("event_id_1", createEvent("m.room.message", "event_id_2", bob)), eventIndex: 2 });
|
||||
await timeline.load(new User(alice), "join", new NullLogItem());
|
||||
timeline.entries.subscribe({ onAdd: () => null, onUpdate: () => null });
|
||||
timeline.addEntries([entryB]);
|
||||
await poll(() => entryB.contextEntry);
|
||||
assert.strictEqual(timeline._contextEntriesNotInTimeline.has(entryA.id), true);
|
||||
timeline.addEntries([entryA]);
|
||||
assert.strictEqual(timeline._contextEntriesNotInTimeline.has(entryA.id), false);
|
||||
const movedEntry = timeline.remoteEntries[0];
|
||||
assert.deepEqual(movedEntry, entryA);
|
||||
assert.deepEqual(movedEntry.contextForEntries[0], entryB);
|
||||
assert.deepEqual(entryB.contextEntry, movedEntry);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -17,3 +17,50 @@ limitations under the License.
|
|||
export function isValidFragmentId(id) {
|
||||
return typeof id === "number";
|
||||
}
|
||||
|
||||
// copied over from matrix-js-sdk, copyright 2016 OpenMarket Ltd
|
||||
/* _REDACT_KEEP_KEY_MAP gives the keys we keep when an event is redacted
|
||||
*
|
||||
* This is specified here:
|
||||
* http://matrix.org/speculator/spec/HEAD/client_server/latest.html#redactions
|
||||
*
|
||||
* Also:
|
||||
* - We keep 'unsigned' since that is created by the local server
|
||||
* - We keep user_id for backwards-compat with v1
|
||||
*/
|
||||
const _REDACT_KEEP_KEY_MAP = [
|
||||
'event_id', 'type', 'room_id', 'user_id', 'sender', 'state_key', 'prev_state',
|
||||
'content', 'unsigned', 'origin_server_ts',
|
||||
].reduce(function(ret, val) {
|
||||
ret[val] = 1; return ret;
|
||||
}, {});
|
||||
|
||||
// a map from event type to the .content keys we keep when an event is redacted
|
||||
const _REDACT_KEEP_CONTENT_MAP = {
|
||||
'm.room.member': {'membership': 1},
|
||||
'm.room.create': {'creator': 1},
|
||||
'm.room.join_rules': {'join_rule': 1},
|
||||
'm.room.power_levels': {'ban': 1, 'events': 1, 'events_default': 1,
|
||||
'kick': 1, 'redact': 1, 'state_default': 1,
|
||||
'users': 1, 'users_default': 1,
|
||||
},
|
||||
'm.room.aliases': {'aliases': 1},
|
||||
};
|
||||
// end of matrix-js-sdk code
|
||||
|
||||
export function redactEvent(redactionEvent, redactedEvent) {
|
||||
for (const key of Object.keys(redactedEvent)) {
|
||||
if (!_REDACT_KEEP_KEY_MAP[key]) {
|
||||
delete redactedEvent[key];
|
||||
}
|
||||
}
|
||||
const { content } = redactedEvent;
|
||||
const keepMap = _REDACT_KEEP_CONTENT_MAP[redactedEvent.type];
|
||||
for (const key of Object.keys(content)) {
|
||||
if (!keepMap?.[key]) {
|
||||
delete content[key];
|
||||
}
|
||||
}
|
||||
redactedEvent.unsigned = redactedEvent.unsigned || {};
|
||||
redactedEvent.unsigned.redacted_because = redactionEvent;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,8 @@ export class BaseEventEntry extends BaseEntry {
|
|||
super(fragmentIdComparer);
|
||||
this._pendingRedactions = null;
|
||||
this._pendingAnnotations = null;
|
||||
this._contextEntry = null;
|
||||
this._contextForEntries = null;
|
||||
}
|
||||
|
||||
get isReply() {
|
||||
|
@ -52,8 +54,31 @@ export class BaseEventEntry extends BaseEntry {
|
|||
return null;
|
||||
}
|
||||
|
||||
setContextEntry(entry) {
|
||||
this._contextEntry = entry;
|
||||
entry._setAsContextOf(this);
|
||||
}
|
||||
|
||||
_setAsContextOf(entry) {
|
||||
if (!this._contextForEntries) {
|
||||
this._contextForEntries = [];
|
||||
}
|
||||
this._contextForEntries.push(entry);
|
||||
}
|
||||
|
||||
get contextForEntries() {
|
||||
return this._contextForEntries;
|
||||
}
|
||||
|
||||
get contextEntry() {
|
||||
return this._contextEntry;
|
||||
}
|
||||
|
||||
/**
|
||||
aggregates local relation or local redaction of remote relation.
|
||||
Aggregates relation or redaction of remote relation.
|
||||
Used in two situations:
|
||||
- to aggregate local relation/redaction of remote relation
|
||||
- to mark this entry as being redacted in Timeline._updateEntriesFetchedFromHomeserver
|
||||
@return [string] returns the name of the field that has changed, if any
|
||||
*/
|
||||
addLocalRelation(entry) {
|
||||
|
|
|
@ -39,6 +39,8 @@ export class EventEntry extends BaseEventEntry {
|
|||
if (other._decryptionError && !this._decryptionError) {
|
||||
this._decryptionError = other._decryptionError;
|
||||
}
|
||||
this._contextForEntries = other.contextForEntries;
|
||||
this._contextEntry = other.contextEntry;
|
||||
}
|
||||
|
||||
get event() {
|
||||
|
@ -144,6 +146,15 @@ export class EventEntry extends BaseEventEntry {
|
|||
const originalRelation = originalContent && getRelationFromContent(originalContent);
|
||||
return originalRelation || getRelationFromContent(this.content);
|
||||
}
|
||||
|
||||
// similar to relatedEventID but only for replies
|
||||
get contextEventId() {
|
||||
if (this.isReply) {
|
||||
return this.relatedEventId;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
import {withTextBody, withContent, createEvent} from "../../../../mocks/event.js";
|
||||
|
|
44
src/matrix/room/timeline/entries/NonPersistedEventEntry.js
Normal file
44
src/matrix/room/timeline/entries/NonPersistedEventEntry.js
Normal file
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
import {EventEntry} from "./EventEntry.js";
|
||||
|
||||
// EventEntry but without the two properties that are populated via SyncWriter
|
||||
// Useful if you want to create an EventEntry that is ephemeral
|
||||
|
||||
export class NonPersistedEventEntry extends EventEntry {
|
||||
get fragmentId() {
|
||||
throw new Error("Cannot access fragmentId for non-persisted EventEntry");
|
||||
}
|
||||
|
||||
get entryIndex() {
|
||||
throw new Error("Cannot access entryIndex for non-persisted EventEntry");
|
||||
}
|
||||
|
||||
get isNonPersisted() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// overridden here because we reuse addLocalRelation() for updating this entry
|
||||
// we don't want the RedactedTile created using this entry to ever show "is being redacted"
|
||||
get isRedacting() {
|
||||
return false;
|
||||
}
|
||||
|
||||
get isRedacted() {
|
||||
return super.isRedacting;
|
||||
}
|
||||
}
|
|
@ -100,4 +100,11 @@ export class PendingEventEntry extends BaseEventEntry {
|
|||
get redactingEntry() {
|
||||
return this._redactingEntry;
|
||||
}
|
||||
|
||||
get contextEventId() {
|
||||
if (this.isReply) {
|
||||
return this._pendingEvent.relatedEventId ?? this._pendingEvent.relatedTxnId;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
import {EventEntry} from "../entries/EventEntry.js";
|
||||
import {REDACTION_TYPE, isRedacted} from "../../common.js";
|
||||
import {ANNOTATION_RELATION_TYPE, getRelation} from "../relations.js";
|
||||
import {redactEvent} from "../common.js";
|
||||
|
||||
export class RelationWriter {
|
||||
constructor({roomId, ownUserId, fragmentIdComparer}) {
|
||||
|
@ -127,21 +128,7 @@ export class RelationWriter {
|
|||
// check if we're the target of a relation and remove all relations then as well
|
||||
txn.timelineRelations.removeAllForTarget(this._roomId, redactedEvent.event_id);
|
||||
|
||||
for (const key of Object.keys(redactedEvent)) {
|
||||
if (!_REDACT_KEEP_KEY_MAP[key]) {
|
||||
delete redactedEvent[key];
|
||||
}
|
||||
}
|
||||
const {content} = redactedEvent;
|
||||
const keepMap = _REDACT_KEEP_CONTENT_MAP[redactedEvent.type];
|
||||
for (const key of Object.keys(content)) {
|
||||
if (!keepMap?.[key]) {
|
||||
delete content[key];
|
||||
}
|
||||
}
|
||||
redactedEvent.unsigned = redactedEvent.unsigned || {};
|
||||
redactedEvent.unsigned.redacted_because = redactionEvent;
|
||||
|
||||
redactEvent(redactionEvent, redactedEvent);
|
||||
delete redactedStorageEntry.annotations;
|
||||
|
||||
return true;
|
||||
|
@ -223,35 +210,6 @@ function isObjectEmpty(obj) {
|
|||
return true;
|
||||
}
|
||||
|
||||
// copied over from matrix-js-sdk, copyright 2016 OpenMarket Ltd
|
||||
/* _REDACT_KEEP_KEY_MAP gives the keys we keep when an event is redacted
|
||||
*
|
||||
* This is specified here:
|
||||
* http://matrix.org/speculator/spec/HEAD/client_server/latest.html#redactions
|
||||
*
|
||||
* Also:
|
||||
* - We keep 'unsigned' since that is created by the local server
|
||||
* - We keep user_id for backwards-compat with v1
|
||||
*/
|
||||
const _REDACT_KEEP_KEY_MAP = [
|
||||
'event_id', 'type', 'room_id', 'user_id', 'sender', 'state_key', 'prev_state',
|
||||
'content', 'unsigned', 'origin_server_ts',
|
||||
].reduce(function(ret, val) {
|
||||
ret[val] = 1; return ret;
|
||||
}, {});
|
||||
|
||||
// a map from event type to the .content keys we keep when an event is redacted
|
||||
const _REDACT_KEEP_CONTENT_MAP = {
|
||||
'm.room.member': {'membership': 1},
|
||||
'm.room.create': {'creator': 1},
|
||||
'm.room.join_rules': {'join_rule': 1},
|
||||
'm.room.power_levels': {'ban': 1, 'events': 1, 'events_default': 1,
|
||||
'kick': 1, 'redact': 1, 'state_default': 1,
|
||||
'users': 1, 'users_default': 1,
|
||||
},
|
||||
'm.room.aliases': {'aliases': 1},
|
||||
};
|
||||
// end of matrix-js-sdk code
|
||||
|
||||
import {createMockStorage} from "../../../../mocks/Storage";
|
||||
import {createEvent, withTextBody, withRedacts, withContent} from "../../../../mocks/event.js";
|
||||
|
|
|
@ -132,6 +132,23 @@ export class TimelineReader {
|
|||
}, log);
|
||||
}
|
||||
|
||||
async readById(id, log) {
|
||||
let stores = [this._storage.storeNames.timelineEvents];
|
||||
if (this._decryptEntries) {
|
||||
stores.push(this._storage.storeNames.inboundGroupSessions);
|
||||
}
|
||||
const txn = await this._storage.readTxn(stores); // todo: can we just use this.readTxnStores here? probably
|
||||
const storageEntry = await txn.timelineEvents.getByEventId(this._roomId, id);
|
||||
if (storageEntry) {
|
||||
const entry = new EventEntry(storageEntry, this._fragmentIdComparer);
|
||||
if (this._decryptEntries) {
|
||||
const request = this._decryptEntries([entry], txn, log);
|
||||
await request.complete();
|
||||
}
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
async _readFrom(eventKey, direction, amount, r, txn, log) {
|
||||
const entries = await readRawTimelineEntriesWithTxn(this._roomId, eventKey, direction, amount, this._fragmentIdComparer, txn);
|
||||
if (this._decryptEntries) {
|
||||
|
|
|
@ -37,3 +37,13 @@ export function withTxnId(txnId, event) {
|
|||
export function withRedacts(redacts, reason, event) {
|
||||
return Object.assign({redacts, content: {reason}}, event);
|
||||
}
|
||||
|
||||
export function withReply(replyToId, event) {
|
||||
return withContent({
|
||||
"m.relates_to": {
|
||||
"m.in_reply_to": {
|
||||
"event_id": replyToId
|
||||
}
|
||||
}
|
||||
}, event);
|
||||
}
|
||||
|
|
Reference in a new issue