Compare commits

...

1 commit

Author SHA1 Message Date
Bruno Windels
93f200673f allow members to be persisted as part of a larger txn when fetched 2020-08-31 08:48:14 +02:00
2 changed files with 105 additions and 55 deletions

View file

@ -22,7 +22,7 @@ import {Timeline} from "./timeline/Timeline.js";
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
import {SendQueue} from "./sending/SendQueue.js";
import {WrappedError} from "../error.js"
import {fetchOrLoadMembers} from "./members/load.js";
import {loadMembers, fetchMemberSnapshot} from "./members/load.js";
import {MemberList} from "./members/MemberList.js";
import {Heroes} from "./members/Heroes.js";
@ -141,20 +141,60 @@ export class Room extends EventEmitter {
return this._sendQueue.enqueueEvent(eventType, content);
}
/**
* @package
* @return {MemberSnapshot} the member snapshot, be sure to call dispose
*/
async fetchMemberSnapshot() {
if (!this.hasFetchedMembers) {
if (this._changedMembersDuringSync) {
throw new Error("already snapshot being written");
}
return await fetchMemberSnapshot({
summary: this._summary,
roomId: this._roomId,
hsApi: this._hsApi,
// to handle race between /members and /sync
setChangedMembersMap: map => this._changedMembersDuringSync = map,
});
}
}
async _fetchAndWriteMembers() {
const snapshot = this.fetchMemberSnapshot();
try {
const txn = await this._storage.readWriteTxn([
this._storage.storeNames.roomSummary,
this._storage.storeNames.roomMembers,
]);
let changes;
try {
changes = snapshot.write(txn);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
snapshot.applyWrite(changes);
return snapshot.members;
} finally {
snapshot.dispose();
}
}
/** @public */
async loadMemberList() {
if (this._memberList) {
this._memberList.retain();
return this._memberList;
} else {
const members = await fetchOrLoadMembers({
summary: this._summary,
roomId: this._roomId,
hsApi: this._hsApi,
storage: this._storage,
// to handle race between /members and /sync
setChangedMembersMap: map => this._changedMembersDuringSync = map,
});
let members;
if (this.hasFetchedMembers) {
const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]);
members = await loadMembers(this._roomId, txn);
} else {
members = await this._fetchAndWriteMembers();
}
this._memberList = new MemberList({
members,
closeCallback: () => { this._memberList = null; }
@ -256,6 +296,10 @@ export class Room extends EventEmitter {
return !!(tags && tags['m.lowpriority']);
}
get hasFetchedMembers() {
return this._summary.hasFetchedMembers;
}
async _getLastEventId() {
const lastKey = this._syncWriter.lastMessageKey;
if (lastKey) {

View file

@ -17,74 +17,80 @@ limitations under the License.
import {RoomMember} from "./RoomMember.js";
async function loadMembers({roomId, storage}) {
const txn = await storage.readTxn([
storage.storeNames.roomMembers,
]);
export async function loadMembers(roomId, txn) {
const memberDatas = await txn.roomMembers.getAll(roomId);
return memberDatas.map(d => new RoomMember(d));
}
async function fetchMembers({summary, roomId, hsApi, storage, setChangedMembersMap}) {
export async function fetchMemberSnapshot({summary, roomId, hsApi, setChangedMembersMap}) {
// if any members are changed by sync while we're fetching members,
// they will end up here, so we check not to override them
const changedMembersDuringSync = new Map();
setChangedMembersMap(changedMembersDuringSync);
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response;
const txn = await storage.readWriteTxn([
storage.storeNames.roomSummary,
storage.storeNames.roomMembers,
]);
let summaryChanges;
let members;
try {
summaryChanges = summary.writeHasFetchedMembers(true, txn);
const {roomMembers} = txn;
const memberEvents = memberResponse.chunk;
if (!Array.isArray(memberEvents)) {
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response();
if (!Array.isArray(memberResponse?.chunk)) {
throw new Error("malformed");
}
members = await Promise.all(memberEvents.map(async memberEvent => {
return new MemberSnapshot({memberEvents: memberResponse.chunk,
setChangedMembersMap, changedMembersDuringSync, summary, roomId});
}
/** Container for fetching /members while handling race with /sync. Can be persisted as part of a wider transaction */
class MemberSnapshot {
constructor({memberEvents, setChangedMembersMap, changedMembersDuringSync, summary, roomId}) {
this._memberEvents = memberEvents;
this._setChangedMembersMap = setChangedMembersMap;
this._changedMembersDuringSync = changedMembersDuringSync;
this._summary = summary;
this._roomId = roomId;
this._members = null;
}
write(txn) {
let summaryChanges;
// this needs to happen after the txn is opened to prevent a race
// between awaiting the opening of the txn and the sync
this._members = this._memberEvents.map(memberEvent => {
const userId = memberEvent?.state_key;
if (!userId) {
throw new Error("malformed");
}
// this member was changed during a sync that happened while calling /members
// and thus is more recent, so don't overwrite
const changedMember = changedMembersDuringSync.get(userId);
const changedMember = this._changedMembersDuringSync.get(userId);
if (changedMember) {
return changedMember;
} else {
const member = RoomMember.fromMemberEvent(roomId, memberEvent);
return RoomMember.fromMemberEvent(this._roomId, memberEvent);
}
});
// store members
const {roomMembers} = txn;
for (const member of this._members) {
if (member) {
roomMembers.set(member.serialize());
}
return member;
}
}));
} catch (err) {
// abort txn on any error
txn.abort();
throw err;
} finally {
// important this gets cleared
// or otherwise Room remains in "fetching-members" mode
setChangedMembersMap(null);
}
await txn.complete();
summary.applyChanges(summaryChanges);
return members;
// store flag
summaryChanges = this._summary.writeHasFetchedMembers(true, txn);
return summaryChanges;
}
export async function fetchOrLoadMembers(options) {
const {summary} = options;
if (!summary.hasFetchedMembers) {
return fetchMembers(options);
} else {
return loadMembers(options);
applyWrite(summaryChanges) {
this._summary.applyChanges(summaryChanges);
}
get members() {
if (!this._members) {
throw new Error("call write first");
}
return this._members;
}
dispose() {
// important this gets cleared
// or otherwise Room remains in "fetching-members" mode
this._setChangedMembersMap(null);
}
}