Compare commits
1 commit
master
...
bwindels/m
Author | SHA1 | Date | |
---|---|---|---|
|
93f200673f |
2 changed files with 105 additions and 55 deletions
|
@ -22,7 +22,7 @@ import {Timeline} from "./timeline/Timeline.js";
|
|||
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
|
||||
import {SendQueue} from "./sending/SendQueue.js";
|
||||
import {WrappedError} from "../error.js"
|
||||
import {fetchOrLoadMembers} from "./members/load.js";
|
||||
import {loadMembers, fetchMemberSnapshot} from "./members/load.js";
|
||||
import {MemberList} from "./members/MemberList.js";
|
||||
import {Heroes} from "./members/Heroes.js";
|
||||
|
||||
|
@ -141,20 +141,60 @@ export class Room extends EventEmitter {
|
|||
return this._sendQueue.enqueueEvent(eventType, content);
|
||||
}
|
||||
|
||||
/**
|
||||
* @package
|
||||
* @return {MemberSnapshot} the member snapshot, be sure to call dispose
|
||||
*/
|
||||
async fetchMemberSnapshot() {
|
||||
if (!this.hasFetchedMembers) {
|
||||
if (this._changedMembersDuringSync) {
|
||||
throw new Error("already snapshot being written");
|
||||
}
|
||||
return await fetchMemberSnapshot({
|
||||
summary: this._summary,
|
||||
roomId: this._roomId,
|
||||
hsApi: this._hsApi,
|
||||
// to handle race between /members and /sync
|
||||
setChangedMembersMap: map => this._changedMembersDuringSync = map,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async _fetchAndWriteMembers() {
|
||||
const snapshot = this.fetchMemberSnapshot();
|
||||
try {
|
||||
const txn = await this._storage.readWriteTxn([
|
||||
this._storage.storeNames.roomSummary,
|
||||
this._storage.storeNames.roomMembers,
|
||||
]);
|
||||
let changes;
|
||||
try {
|
||||
changes = snapshot.write(txn);
|
||||
} catch (err) {
|
||||
txn.abort();
|
||||
throw err;
|
||||
}
|
||||
await txn.complete();
|
||||
snapshot.applyWrite(changes);
|
||||
return snapshot.members;
|
||||
} finally {
|
||||
snapshot.dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/** @public */
|
||||
async loadMemberList() {
|
||||
if (this._memberList) {
|
||||
this._memberList.retain();
|
||||
return this._memberList;
|
||||
} else {
|
||||
const members = await fetchOrLoadMembers({
|
||||
summary: this._summary,
|
||||
roomId: this._roomId,
|
||||
hsApi: this._hsApi,
|
||||
storage: this._storage,
|
||||
// to handle race between /members and /sync
|
||||
setChangedMembersMap: map => this._changedMembersDuringSync = map,
|
||||
});
|
||||
let members;
|
||||
if (this.hasFetchedMembers) {
|
||||
const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]);
|
||||
members = await loadMembers(this._roomId, txn);
|
||||
} else {
|
||||
members = await this._fetchAndWriteMembers();
|
||||
}
|
||||
this._memberList = new MemberList({
|
||||
members,
|
||||
closeCallback: () => { this._memberList = null; }
|
||||
|
@ -256,6 +296,10 @@ export class Room extends EventEmitter {
|
|||
return !!(tags && tags['m.lowpriority']);
|
||||
}
|
||||
|
||||
get hasFetchedMembers() {
|
||||
return this._summary.hasFetchedMembers;
|
||||
}
|
||||
|
||||
async _getLastEventId() {
|
||||
const lastKey = this._syncWriter.lastMessageKey;
|
||||
if (lastKey) {
|
||||
|
|
|
@ -17,74 +17,80 @@ limitations under the License.
|
|||
|
||||
import {RoomMember} from "./RoomMember.js";
|
||||
|
||||
async function loadMembers({roomId, storage}) {
|
||||
const txn = await storage.readTxn([
|
||||
storage.storeNames.roomMembers,
|
||||
]);
|
||||
export async function loadMembers(roomId, txn) {
|
||||
const memberDatas = await txn.roomMembers.getAll(roomId);
|
||||
return memberDatas.map(d => new RoomMember(d));
|
||||
}
|
||||
|
||||
async function fetchMembers({summary, roomId, hsApi, storage, setChangedMembersMap}) {
|
||||
export async function fetchMemberSnapshot({summary, roomId, hsApi, setChangedMembersMap}) {
|
||||
// if any members are changed by sync while we're fetching members,
|
||||
// they will end up here, so we check not to override them
|
||||
const changedMembersDuringSync = new Map();
|
||||
setChangedMembersMap(changedMembersDuringSync);
|
||||
|
||||
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response;
|
||||
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response();
|
||||
if (!Array.isArray(memberResponse?.chunk)) {
|
||||
throw new Error("malformed");
|
||||
}
|
||||
return new MemberSnapshot({memberEvents: memberResponse.chunk,
|
||||
setChangedMembersMap, changedMembersDuringSync, summary, roomId});
|
||||
}
|
||||
|
||||
const txn = await storage.readWriteTxn([
|
||||
storage.storeNames.roomSummary,
|
||||
storage.storeNames.roomMembers,
|
||||
]);
|
||||
/** Container for fetching /members while handling race with /sync. Can be persisted as part of a wider transaction */
|
||||
class MemberSnapshot {
|
||||
constructor({memberEvents, setChangedMembersMap, changedMembersDuringSync, summary, roomId}) {
|
||||
this._memberEvents = memberEvents;
|
||||
this._setChangedMembersMap = setChangedMembersMap;
|
||||
this._changedMembersDuringSync = changedMembersDuringSync;
|
||||
this._summary = summary;
|
||||
this._roomId = roomId;
|
||||
this._members = null;
|
||||
}
|
||||
|
||||
let summaryChanges;
|
||||
let members;
|
||||
|
||||
try {
|
||||
summaryChanges = summary.writeHasFetchedMembers(true, txn);
|
||||
const {roomMembers} = txn;
|
||||
const memberEvents = memberResponse.chunk;
|
||||
if (!Array.isArray(memberEvents)) {
|
||||
throw new Error("malformed");
|
||||
}
|
||||
members = await Promise.all(memberEvents.map(async memberEvent => {
|
||||
write(txn) {
|
||||
let summaryChanges;
|
||||
// this needs to happen after the txn is opened to prevent a race
|
||||
// between awaiting the opening of the txn and the sync
|
||||
this._members = this._memberEvents.map(memberEvent => {
|
||||
const userId = memberEvent?.state_key;
|
||||
if (!userId) {
|
||||
throw new Error("malformed");
|
||||
}
|
||||
// this member was changed during a sync that happened while calling /members
|
||||
// and thus is more recent, so don't overwrite
|
||||
const changedMember = changedMembersDuringSync.get(userId);
|
||||
const changedMember = this._changedMembersDuringSync.get(userId);
|
||||
if (changedMember) {
|
||||
return changedMember;
|
||||
} else {
|
||||
const member = RoomMember.fromMemberEvent(roomId, memberEvent);
|
||||
if (member) {
|
||||
roomMembers.set(member.serialize());
|
||||
}
|
||||
return member;
|
||||
return RoomMember.fromMemberEvent(this._roomId, memberEvent);
|
||||
}
|
||||
}));
|
||||
} catch (err) {
|
||||
// abort txn on any error
|
||||
txn.abort();
|
||||
throw err;
|
||||
} finally {
|
||||
});
|
||||
// store members
|
||||
const {roomMembers} = txn;
|
||||
for (const member of this._members) {
|
||||
if (member) {
|
||||
roomMembers.set(member.serialize());
|
||||
}
|
||||
}
|
||||
// store flag
|
||||
summaryChanges = this._summary.writeHasFetchedMembers(true, txn);
|
||||
return summaryChanges;
|
||||
}
|
||||
|
||||
applyWrite(summaryChanges) {
|
||||
this._summary.applyChanges(summaryChanges);
|
||||
}
|
||||
|
||||
get members() {
|
||||
if (!this._members) {
|
||||
throw new Error("call write first");
|
||||
}
|
||||
return this._members;
|
||||
}
|
||||
|
||||
dispose() {
|
||||
// important this gets cleared
|
||||
// or otherwise Room remains in "fetching-members" mode
|
||||
setChangedMembersMap(null);
|
||||
}
|
||||
await txn.complete();
|
||||
summary.applyChanges(summaryChanges);
|
||||
return members;
|
||||
}
|
||||
|
||||
export async function fetchOrLoadMembers(options) {
|
||||
const {summary} = options;
|
||||
if (!summary.hasFetchedMembers) {
|
||||
return fetchMembers(options);
|
||||
} else {
|
||||
return loadMembers(options);
|
||||
this._setChangedMembersMap(null);
|
||||
}
|
||||
}
|
||||
|
|
Reference in a new issue