forked from mystiq/hydrogen-web
Write all members of a sync in one go
so state member events written but not causing a memberChange.hasJoined don't prevent timeline member events for the same user from doing so
This commit is contained in:
parent
d0c1ddb51b
commit
826de7e9cb
4 changed files with 332 additions and 162 deletions
|
@ -30,6 +30,7 @@ const EVENT_ENCRYPTED_TYPE = "m.room.encrypted";
|
|||
export class Room extends BaseRoom {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
// TODO: pass pendingEvents to start like pendingOperations?
|
||||
const {pendingEvents} = options;
|
||||
const relationWriter = new RelationWriter({
|
||||
roomId: this.id,
|
||||
|
@ -120,7 +121,8 @@ export class Room extends BaseRoom {
|
|||
txn.roomMembers.removeAllForRoom(this.id);
|
||||
}
|
||||
const {entries: newEntries, updatedEntries, newLiveKey, memberChanges} =
|
||||
await log.wrap("syncWriter", log => this._syncWriter.writeSync(roomResponse, isRejoin, txn, log), log.level.Detail);
|
||||
await log.wrap("syncWriter", log => this._syncWriter.writeSync(
|
||||
roomResponse, isRejoin, summaryChanges.hasFetchedMembers, txn, log), log.level.Detail);
|
||||
if (decryptChanges) {
|
||||
const decryption = await log.wrap("decryptChanges", log => decryptChanges.write(txn, log));
|
||||
log.set("decryptionResults", decryption.results.size);
|
||||
|
|
|
@ -141,6 +141,16 @@ export class MemberChange {
|
|||
return this.previousMembership === "join" && this.membership !== "join";
|
||||
}
|
||||
|
||||
/** The result can be a false negative when all of these apply:
|
||||
* - the complete set of room members hasn't been fetched yet.
|
||||
* - the member event for this change was received in the
|
||||
* state section and wasn't present in the timeline section.
|
||||
* - the room response was limited, e.g. there was a gap.
|
||||
*
|
||||
* This is because during sync, in this case it is not possible
|
||||
* to distinguish between a new member that joined the room
|
||||
* during a gap and a lazy-loading member.
|
||||
* */
|
||||
get hasJoined() {
|
||||
return this.previousMembership !== "join" && this.membership === "join";
|
||||
}
|
||||
|
|
|
@ -23,57 +23,27 @@ export class MemberWriter {
|
|||
this._cache = new LRUCache(5, member => member.userId);
|
||||
}
|
||||
|
||||
writeTimelineMemberEvent(event, txn) {
|
||||
return this._writeMemberEvent(event, false, txn);
|
||||
prepareMemberSync(stateEvents, timelineEvents, hasFetchedMembers) {
|
||||
return new MemberSync(this, stateEvents, timelineEvents, hasFetchedMembers);
|
||||
}
|
||||
|
||||
writeStateMemberEvent(event, isLimited, txn) {
|
||||
// member events in the state section when the room response
|
||||
// is not limited must always be lazy loaded members.
|
||||
// If they are not, they will be repeated in the timeline anyway.
|
||||
return this._writeMemberEvent(event, !isLimited, txn);
|
||||
}
|
||||
|
||||
async _writeMemberEvent(event, isLazyLoadingMember, txn) {
|
||||
const userId = event.state_key;
|
||||
if (!userId) {
|
||||
return;
|
||||
}
|
||||
const member = RoomMember.fromMemberEvent(this._roomId, event);
|
||||
if (!member) {
|
||||
return;
|
||||
}
|
||||
|
||||
let existingMember = this._cache.get(userId);
|
||||
async _writeMember(member, txn) {
|
||||
let existingMember = this._cache.get(member.userId);
|
||||
if (!existingMember) {
|
||||
const memberData = await txn.roomMembers.get(this._roomId, userId);
|
||||
const memberData = await txn.roomMembers.get(this._roomId, member.userId);
|
||||
if (memberData) {
|
||||
existingMember = new RoomMember(memberData);
|
||||
}
|
||||
}
|
||||
|
||||
// either never heard of the member, or something changed
|
||||
if (!existingMember || !existingMember.equals(member)) {
|
||||
txn.roomMembers.set(member.serialize());
|
||||
this._cache.set(member);
|
||||
// we also return a member change for lazy loading members if something changed,
|
||||
// so when the dupe timeline event comes and it doesn't see a diff
|
||||
// with the cache, we already returned the event here.
|
||||
//
|
||||
// it's just important that we don't consider the first LL event
|
||||
// for a user we see as a membership change, or we'll share keys with
|
||||
// them, etc...
|
||||
if (isLazyLoadingMember && !existingMember) {
|
||||
// we don't have a previous member, but we know this is not a
|
||||
// membership change as it's a lazy loaded
|
||||
// member so take the membership from the member
|
||||
return new MemberChange(member, member.membership);
|
||||
}
|
||||
return new MemberChange(member, existingMember?.membership);
|
||||
}
|
||||
}
|
||||
|
||||
async lookupMember(userId, event, timelineEvents, txn) {
|
||||
async lookupMember(userId, txn) {
|
||||
let member = this._cache.get(userId);
|
||||
if (!member) {
|
||||
const memberData = await txn.roomMembers.get(this._roomId, userId);
|
||||
|
@ -82,60 +52,170 @@ export class MemberWriter {
|
|||
this._cache.set(member);
|
||||
}
|
||||
}
|
||||
if (!member) {
|
||||
// sometimes the member event isn't included in state, but rather in the timeline,
|
||||
// even if it is not the first event in the timeline. In this case, go look for
|
||||
// the last one before the event, or if none is found,
|
||||
// the least recent matching member event in the timeline.
|
||||
// The latter is needed because of new joins picking up their own display name
|
||||
let foundEvent = false;
|
||||
let memberEventBefore;
|
||||
let firstMemberEvent;
|
||||
for (let i = timelineEvents.length - 1; i >= 0; i -= 1) {
|
||||
const e = timelineEvents[i];
|
||||
let matchingEvent;
|
||||
if (e.type === MEMBER_EVENT_TYPE && e.state_key === userId) {
|
||||
matchingEvent = e;
|
||||
firstMemberEvent = matchingEvent;
|
||||
}
|
||||
if (!foundEvent) {
|
||||
if (e.event_id === event.event_id) {
|
||||
foundEvent = true;
|
||||
return member;
|
||||
}
|
||||
}
|
||||
|
||||
class MemberSync {
|
||||
constructor(memberWriter, stateEvents, timelineEvents, hasFetchedMembers) {
|
||||
this._memberWriter = memberWriter;
|
||||
this._timelineEvents = timelineEvents;
|
||||
this._hasFetchedMembers = hasFetchedMembers;
|
||||
this._newStateMembers = stateEvents && this._stateEventsToMembers(stateEvents);
|
||||
}
|
||||
|
||||
get _roomId() {
|
||||
return this._memberWriter._roomId;
|
||||
}
|
||||
|
||||
_stateEventsToMembers(stateEvents) {
|
||||
let members;
|
||||
for (const event of stateEvents) {
|
||||
if (event.type === MEMBER_EVENT_TYPE) {
|
||||
const member = RoomMember.fromMemberEvent(this._roomId, event);
|
||||
if (member) {
|
||||
if (!members) {
|
||||
members = new Map();
|
||||
}
|
||||
} else if (matchingEvent) {
|
||||
memberEventBefore = matchingEvent;
|
||||
break;
|
||||
members.set(member.userId, member);
|
||||
}
|
||||
}
|
||||
}
|
||||
return members;
|
||||
}
|
||||
|
||||
_timelineEventsToMembers() {
|
||||
let members;
|
||||
// iterate backwards to only add the last member in the timeline
|
||||
for (let i = this._timelineEvents.length - 1; i >= 0; i--) {
|
||||
const e = this._timelineEvents[i];
|
||||
const userId = e.state_key;
|
||||
if (e.type === MEMBER_EVENT_TYPE && !members?.has(userId)) {
|
||||
const member = RoomMember.fromMemberEvent(this._roomId, e);
|
||||
if (member) {
|
||||
if (!members) {
|
||||
members = new Map();
|
||||
}
|
||||
members.set(member.userId, member);
|
||||
}
|
||||
}
|
||||
}
|
||||
return members;
|
||||
}
|
||||
|
||||
async lookupMemberAtEvent(userId, event, txn) {
|
||||
let member = this._findPrecedingMemberEventInTimeline(userId, event);
|
||||
if (member) {
|
||||
return member;
|
||||
}
|
||||
member = this._newStateMembers?.get(userId);
|
||||
if (member) {
|
||||
return member;
|
||||
}
|
||||
return await this._memberWriter.lookupMember(userId, txn);
|
||||
}
|
||||
|
||||
async write(txn) {
|
||||
const memberChanges = new Map();
|
||||
const newTimelineMembers = this._timelineEventsToMembers();
|
||||
if (this._newStateMembers) {
|
||||
for (const member of this._newStateMembers.values()) {
|
||||
if (!newTimelineMembers?.has(member.userId)) {
|
||||
const memberChange = await this._memberWriter._writeMember(member, txn);
|
||||
if (memberChange) {
|
||||
// if the member event appeared only in the state section,
|
||||
// AND we haven't heard about it AND we haven't fetched all members yet (to avoid #470),
|
||||
// this may be a lazy loading member (if it's not in a gap, we are certain
|
||||
// it is a ll member, in a gap, we can't tell), so we pass in our own membership as
|
||||
// as the previous one so we won't consider it a join to not have false positives (to avoid #192).
|
||||
// see also MemberChange.hasJoined
|
||||
const maybeLazyLoadingMember = !this._hasFetchedMembers && !memberChange.previousMembership;
|
||||
if (maybeLazyLoadingMember) {
|
||||
memberChange.previousMembership = member.membership;
|
||||
}
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newTimelineMembers) {
|
||||
for (const member of newTimelineMembers.values()) {
|
||||
const memberChange = await this._memberWriter._writeMember(member, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
}
|
||||
}
|
||||
return memberChanges;
|
||||
}
|
||||
|
||||
// try to find the first member event before the given event,
|
||||
// so we respect historical display names within the chunk of timeline
|
||||
_findPrecedingMemberEventInTimeline(userId, event) {
|
||||
let eventIndex = -1;
|
||||
for (let i = this._timelineEvents.length - 1; i >= 0; i--) {
|
||||
const e = this._timelineEvents[i];
|
||||
if (e.event_id === event.event_id) {
|
||||
eventIndex = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
for (let i = eventIndex - 1; i >= 0; i--) {
|
||||
const e = this._timelineEvents[i];
|
||||
if (e.type === MEMBER_EVENT_TYPE && e.state_key === userId) {
|
||||
const member = RoomMember.fromMemberEvent(this._roomId, e);
|
||||
if (member) {
|
||||
return member;
|
||||
}
|
||||
}
|
||||
// first see if we found a member event before the event we're looking up the sender for
|
||||
if (memberEventBefore) {
|
||||
member = RoomMember.fromMemberEvent(this._roomId, memberEventBefore);
|
||||
}
|
||||
// and only if we didn't, fall back to the first member event,
|
||||
// regardless of where it is positioned relative to the lookup event
|
||||
else if (firstMemberEvent) {
|
||||
member = RoomMember.fromMemberEvent(this._roomId, firstMemberEvent);
|
||||
}
|
||||
}
|
||||
return member;
|
||||
}
|
||||
}
|
||||
|
||||
export function tests() {
|
||||
|
||||
let idCounter = 0;
|
||||
|
||||
function createMemberEvent(membership, userId, displayName, avatarUrl) {
|
||||
idCounter += 1;
|
||||
return {
|
||||
content: {
|
||||
membership,
|
||||
"displayname": displayName,
|
||||
"avatar_url": avatarUrl
|
||||
},
|
||||
event_id: `$${idCounter}`,
|
||||
sender: userId,
|
||||
"state_key": userId,
|
||||
type: "m.room.member"
|
||||
};
|
||||
}
|
||||
|
||||
function createRoomResponse(stateEvents, timelineEvents) {
|
||||
if (!Array.isArray(timelineEvents)) {
|
||||
timelineEvents = [timelineEvents];
|
||||
}
|
||||
if (!Array.isArray(stateEvents)) {
|
||||
stateEvents = [stateEvents];
|
||||
}
|
||||
return {
|
||||
timeline: {
|
||||
limited: false,
|
||||
events: timelineEvents,
|
||||
},
|
||||
state: {
|
||||
events: stateEvents
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
function createTimelineResponse(timelineEvents) {
|
||||
return createRoomResponse([], timelineEvents)
|
||||
}
|
||||
|
||||
function createStateReponse(stateEvents) {
|
||||
return createRoomResponse(stateEvents, []);
|
||||
}
|
||||
|
||||
function createStorage(initialMembers = []) {
|
||||
const members = new Map();
|
||||
|
@ -163,81 +243,126 @@ export function tests() {
|
|||
const alice = "@alice:hs.tld";
|
||||
const avatar = "mxc://hs.tld/def";
|
||||
|
||||
/*
|
||||
join without previous membership
|
||||
join during gap with hasFetchedMembers=false
|
||||
join during gap with hasFetchedMembers=true
|
||||
join after invite
|
||||
|
||||
*/
|
||||
|
||||
return {
|
||||
"new join through state": async assert => {
|
||||
"new join": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage();
|
||||
const change = await writer.writeStateMemberEvent(createMemberEvent("join", alice), true, txn);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("join", alice)], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(change.hasJoined);
|
||||
assert.equal(txn.members.get(alice).membership, "join");
|
||||
},
|
||||
"accept invite through state": async assert => {
|
||||
"accept invite": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("invite", alice)]);
|
||||
const change = await writer.writeStateMemberEvent(createMemberEvent("join", alice), true, txn);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("join", alice)], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert.equal(change.previousMembership, "invite");
|
||||
assert(change.hasJoined);
|
||||
assert.equal(txn.members.get(alice).membership, "join");
|
||||
},
|
||||
"change display name through timeline": async assert => {
|
||||
"change display name": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice")]);
|
||||
const change = await writer.writeTimelineMemberEvent(createMemberEvent("join", alice, "Alies"), txn);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("join", alice, "Alies")], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(!change.hasJoined);
|
||||
assert.equal(change.member.displayName, "Alies");
|
||||
assert.equal(txn.members.get(alice).displayName, "Alies");
|
||||
},
|
||||
"set avatar through timeline": async assert => {
|
||||
"set avatar": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice")]);
|
||||
const change = await writer.writeTimelineMemberEvent(createMemberEvent("join", alice, "Alice", avatar), txn);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("join", alice, "Alice", avatar)], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(!change.hasJoined);
|
||||
assert.equal(change.member.avatarUrl, avatar);
|
||||
assert.equal(txn.members.get(alice).avatarUrl, avatar);
|
||||
},
|
||||
"ignore redundant member event": async assert => {
|
||||
"ignore redundant member event in timeline": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice", avatar)]);
|
||||
const change = await writer.writeTimelineMemberEvent(createMemberEvent("join", alice, "Alice", avatar), txn);
|
||||
assert(!change);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("join", alice, "Alice", avatar)], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 0);
|
||||
},
|
||||
"ignore redundant member event in state": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice", avatar)]);
|
||||
const memberSync = writer.prepareMemberSync([createMemberEvent("join", alice, "Alice", avatar)], [], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 0);
|
||||
},
|
||||
"leave": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice")]);
|
||||
const change = await writer.writeTimelineMemberEvent(createMemberEvent("leave", alice, "Alice"), txn);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("leave", alice, "Alice")], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(change.hasLeft);
|
||||
assert(!change.hasJoined);
|
||||
},
|
||||
"ban": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice")]);
|
||||
const change = await writer.writeTimelineMemberEvent(createMemberEvent("ban", alice, "Alice"), txn);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("ban", alice, "Alice")], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(change.hasLeft);
|
||||
assert(!change.hasJoined);
|
||||
},
|
||||
"reject invite": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("invite", alice, "Alice")]);
|
||||
const change = await writer.writeTimelineMemberEvent(createMemberEvent("leave", alice, "Alice"), txn);
|
||||
const memberSync = writer.prepareMemberSync([], [createMemberEvent("leave", alice, "Alice")], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(!change.hasLeft);
|
||||
assert(!change.hasJoined);
|
||||
},
|
||||
"lazy loaded member we already know about doens't return change": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice")]);
|
||||
const change = await writer.writeStateMemberEvent(createMemberEvent("join", alice, "Alice"), false, txn);
|
||||
assert(!change);
|
||||
const memberSync = writer.prepareMemberSync([createMemberEvent("join", alice, "Alice")], [], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 0);
|
||||
},
|
||||
"lazy loaded member we already know about changes display name": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage([member("join", alice, "Alice")]);
|
||||
const change = await writer.writeStateMemberEvent(createMemberEvent("join", alice, "Alies"), false, txn);
|
||||
const memberSync = writer.prepareMemberSync([createMemberEvent("join", alice, "Alies")], [], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(!change.hasJoined);
|
||||
assert.equal(change.member.displayName, "Alies");
|
||||
},
|
||||
"unknown lazy loaded member returns change, but not considered a membership change": async assert => {
|
||||
"unknown lazy loaded member returns change, but not considered a join": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage();
|
||||
const change = await writer.writeStateMemberEvent(createMemberEvent("join", alice, "Alice"), false, txn);
|
||||
const memberSync = writer.prepareMemberSync([createMemberEvent("join", alice, "Alice")], [], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(!change.hasJoined);
|
||||
assert(!change.hasLeft);
|
||||
assert.equal(change.member.membership, "join");
|
||||
|
@ -247,30 +372,68 @@ export function tests() {
|
|||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage();
|
||||
const aliceJoin = createMemberEvent("join", alice, "Alice");
|
||||
const change = await writer.writeStateMemberEvent(aliceJoin, false, txn);
|
||||
assert(!change.hasJoined);
|
||||
assert(!change.hasLeft);
|
||||
const timelineChange = await writer.writeTimelineMemberEvent(aliceJoin, txn);
|
||||
const memberSync = writer.prepareMemberSync([aliceJoin], [aliceJoin], false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(change.hasJoined);
|
||||
assert(!change.hasLeft);
|
||||
},
|
||||
"newly joined member causes a change with lookup done first": async assert => {
|
||||
const event = createMemberEvent("join", alice, "Alice");
|
||||
"change display name in timeline with lazy loaded member in state": async assert => {
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage();
|
||||
const member = await writer.lookupMember(event.sender, event, [event], txn);
|
||||
assert(member);
|
||||
const change = await writer.writeTimelineMemberEvent(event, txn);
|
||||
assert(change);
|
||||
const memberSync = writer.prepareMemberSync(
|
||||
[createMemberEvent("join", alice, "Alice")],
|
||||
[createMemberEvent("join", alice, "Alies")],
|
||||
false);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(change.hasJoined);
|
||||
assert(!change.hasLeft);
|
||||
assert.equal(change.member.displayName, "Alies");
|
||||
},
|
||||
"lookupMember returns closest member in the past": async assert => {
|
||||
"lookupMemberAtEvent returns closest member in the past": async assert => {
|
||||
const event1 = createMemberEvent("join", alice, "Alice");
|
||||
const event2 = createMemberEvent("join", alice, "Alies");
|
||||
const event3 = createMemberEvent("join", alice, "Alys");
|
||||
const events = [event1, event2, event3];
|
||||
// we write first because the MemberWriter assumes it is called before
|
||||
// the SyncWriter does any lookups
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage();
|
||||
const member = await writer.lookupMember(event3.sender, event3, [event1, event2, event3], txn);
|
||||
const memberSync = await writer.prepareMemberSync([], events, false);
|
||||
let member = await memberSync.lookupMemberAtEvent(event1.sender, event1, txn);
|
||||
assert.equal(member, undefined);
|
||||
member = await memberSync.lookupMemberAtEvent(event2.sender, event2, txn);
|
||||
assert.equal(member.displayName, "Alice");
|
||||
member = await memberSync.lookupMemberAtEvent(event3.sender, event3, txn);
|
||||
assert.equal(member.displayName, "Alies");
|
||||
|
||||
assert.equal(txn.members.size, 0);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(txn.members.size, 1);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(change.hasJoined);
|
||||
},
|
||||
"lookupMemberAtEvent falls back on state event": async assert => {
|
||||
const event1 = createMemberEvent("join", alice, "Alice");
|
||||
const event2 = createMemberEvent("join", alice, "Alies");
|
||||
// we write first because the MemberWriter assumes it is called before
|
||||
// the SyncWriter does any lookups
|
||||
const writer = new MemberWriter(roomId);
|
||||
const txn = createStorage();
|
||||
const memberSync = await writer.prepareMemberSync([event1], [event2], false);
|
||||
const member = await memberSync.lookupMemberAtEvent(event2.sender, event2, txn);
|
||||
assert.equal(member.displayName, "Alice");
|
||||
|
||||
assert.equal(txn.members.size, 0);
|
||||
const changes = await memberSync.write(txn);
|
||||
assert.equal(txn.members.size, 1);
|
||||
assert.equal(changes.size, 1);
|
||||
const change = changes.get(alice);
|
||||
assert(change.hasJoined);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
|
|
@ -133,66 +133,51 @@ export class SyncWriter {
|
|||
return currentKey;
|
||||
}
|
||||
|
||||
async _writeStateEvents(roomResponse, memberChanges, isLimited, txn, log) {
|
||||
// persist state
|
||||
const {state} = roomResponse;
|
||||
if (Array.isArray(state?.events)) {
|
||||
log.set("stateEvents", state.events.length);
|
||||
for (const event of state.events) {
|
||||
if (event.type === MEMBER_EVENT_TYPE) {
|
||||
const memberChange = await this._memberWriter.writeStateMemberEvent(event, isLimited, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
} else {
|
||||
txn.roomState.set(this._roomId, event);
|
||||
}
|
||||
async _writeStateEvents(stateEvents, txn, log) {
|
||||
let nonMemberStateEvents = 0;
|
||||
for (const event of stateEvents) {
|
||||
// member events are written prior by MemberWriter
|
||||
if (event.type !== MEMBER_EVENT_TYPE) {
|
||||
txn.roomState.set(this._roomId, event);
|
||||
nonMemberStateEvents += 1;
|
||||
}
|
||||
}
|
||||
log.set("stateEvents", nonMemberStateEvents);
|
||||
}
|
||||
|
||||
async _writeTimeline(timeline, currentKey, memberChanges, txn, log) {
|
||||
async _writeTimeline(timelineEvents, timeline, memberSync, currentKey, txn, log) {
|
||||
const entries = [];
|
||||
const updatedEntries = [];
|
||||
if (Array.isArray(timeline?.events) && timeline.events.length) {
|
||||
// only create a fragment when we will really write an event
|
||||
currentKey = await this._ensureLiveFragment(currentKey, entries, timeline, txn, log);
|
||||
const events = deduplicateEvents(timeline.events);
|
||||
log.set("timelineEvents", events.length);
|
||||
let timelineStateEventCount = 0;
|
||||
for(const event of events) {
|
||||
// store event in timeline
|
||||
currentKey = currentKey.nextKey();
|
||||
const storageEntry = createEventEntry(currentKey, this._roomId, event);
|
||||
let member = await this._memberWriter.lookupMember(event.sender, event, events, txn);
|
||||
if (member) {
|
||||
storageEntry.displayName = member.displayName;
|
||||
storageEntry.avatarUrl = member.avatarUrl;
|
||||
}
|
||||
txn.timelineEvents.insert(storageEntry);
|
||||
const entry = new EventEntry(storageEntry, this._fragmentIdComparer);
|
||||
entries.push(entry);
|
||||
const updatedRelationTargetEntries = await this._relationWriter.writeRelation(entry, txn, log);
|
||||
if (updatedRelationTargetEntries) {
|
||||
updatedEntries.push(...updatedRelationTargetEntries);
|
||||
}
|
||||
// update state events after writing event, so for a member event,
|
||||
// we only update the member info after having written the member event
|
||||
// to the timeline, as we want that event to have the old profile info
|
||||
if (typeof event.state_key === "string") {
|
||||
timelineStateEventCount += 1;
|
||||
if (event.type === MEMBER_EVENT_TYPE) {
|
||||
const memberChange = await this._memberWriter.writeTimelineMemberEvent(event, txn);
|
||||
if (memberChange) {
|
||||
memberChanges.set(memberChange.userId, memberChange);
|
||||
}
|
||||
} else {
|
||||
txn.roomState.set(this._roomId, event);
|
||||
}
|
||||
}
|
||||
// only create a fragment when we will really write an event
|
||||
currentKey = await this._ensureLiveFragment(currentKey, entries, timeline, txn, log);
|
||||
log.set("timelineEvents", timelineEvents.length);
|
||||
let timelineStateEventCount = 0;
|
||||
for(const event of timelineEvents) {
|
||||
// store event in timeline
|
||||
currentKey = currentKey.nextKey();
|
||||
const storageEntry = createEventEntry(currentKey, this._roomId, event);
|
||||
let member = await memberSync.lookupMemberAtEvent(event.sender, event, txn);
|
||||
if (member) {
|
||||
storageEntry.displayName = member.displayName;
|
||||
storageEntry.avatarUrl = member.avatarUrl;
|
||||
}
|
||||
txn.timelineEvents.insert(storageEntry);
|
||||
const entry = new EventEntry(storageEntry, this._fragmentIdComparer);
|
||||
entries.push(entry);
|
||||
const updatedRelationTargetEntries = await this._relationWriter.writeRelation(entry, txn, log);
|
||||
if (updatedRelationTargetEntries) {
|
||||
updatedEntries.push(...updatedRelationTargetEntries);
|
||||
}
|
||||
// update state events after writing event, so for a member event,
|
||||
// we only update the member info after having written the member event
|
||||
// to the timeline, as we want that event to have the old profile info.
|
||||
// member events are written prior by MemberWriter.
|
||||
if (typeof event.state_key === "string" && event.type !== MEMBER_EVENT_TYPE) {
|
||||
timelineStateEventCount += 1;
|
||||
txn.roomState.set(this._roomId, event);
|
||||
}
|
||||
log.set("timelineStateEventCount", timelineStateEventCount);
|
||||
}
|
||||
log.set("timelineStateEventCount", timelineStateEventCount);
|
||||
return {currentKey, entries, updatedEntries};
|
||||
}
|
||||
|
||||
|
@ -224,14 +209,13 @@ export class SyncWriter {
|
|||
* @type {SyncWriterResult}
|
||||
* @property {Array<BaseEntry>} entries new timeline entries written
|
||||
* @property {EventKey} newLiveKey the advanced key to write events at
|
||||
* @property {Map<string, MemberChange>} memberChanges member changes in the processed sync ny user id
|
||||
*
|
||||
* @param {Object} roomResponse [description]
|
||||
* @param {boolean} isRejoin whether the room was rejoined in the sync being processed
|
||||
* @param {Transaction} txn
|
||||
* @return {SyncWriterResult}
|
||||
*/
|
||||
async writeSync(roomResponse, isRejoin, txn, log) {
|
||||
async writeSync(roomResponse, isRejoin, hasFetchedMembers, txn, log) {
|
||||
let {timeline} = roomResponse;
|
||||
// we have rejoined the room after having synced it before,
|
||||
// check for overlap with the last synced event
|
||||
|
@ -239,13 +223,24 @@ export class SyncWriter {
|
|||
if (isRejoin) {
|
||||
timeline = await this._handleRejoinOverlap(timeline, txn, log);
|
||||
}
|
||||
const memberChanges = new Map();
|
||||
// important this happens before _writeTimeline so
|
||||
// members are available in the transaction
|
||||
await this._writeStateEvents(roomResponse, memberChanges, timeline?.limited, txn, log);
|
||||
const {currentKey, entries, updatedEntries} =
|
||||
await this._writeTimeline(timeline, this._lastLiveKey, memberChanges, txn, log);
|
||||
log.set("memberChanges", memberChanges.size);
|
||||
let timelineEvents;
|
||||
if (Array.isArray(timeline?.events)) {
|
||||
timelineEvents = deduplicateEvents(timeline.events);
|
||||
}
|
||||
const {state} = roomResponse;
|
||||
let stateEvents;
|
||||
if (Array.isArray(state?.events)) {
|
||||
stateEvents = state.events;
|
||||
}
|
||||
const memberSync = this._memberWriter.prepareMemberSync(stateEvents, timelineEvents, hasFetchedMembers);
|
||||
if (stateEvents) {
|
||||
await this._writeStateEvents(roomResponse, txn, log);
|
||||
}
|
||||
if (timelineEvents?.length) {
|
||||
const {currentKey, entries, updatedEntries} =
|
||||
await this._writeTimeline(timelineEvents, timeline, memberSync, this._lastLiveKey, txn, log);
|
||||
}
|
||||
const memberChanges = await memberSync.write(txn);
|
||||
return {entries, updatedEntries, newLiveKey: currentKey, memberChanges};
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue