Compare commits

...
This repository has been archived on 2022-08-19. You can view files and clone it, but cannot push or open issues or pull requests.

1 commit

Author SHA1 Message Date
Bruno Windels 93f200673f allow members to be persisted as part of a larger txn when fetched 2020-08-31 08:48:14 +02:00
2 changed files with 105 additions and 55 deletions

View file

@ -22,7 +22,7 @@ import {Timeline} from "./timeline/Timeline.js";
import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js"; import {FragmentIdComparer} from "./timeline/FragmentIdComparer.js";
import {SendQueue} from "./sending/SendQueue.js"; import {SendQueue} from "./sending/SendQueue.js";
import {WrappedError} from "../error.js" import {WrappedError} from "../error.js"
import {fetchOrLoadMembers} from "./members/load.js"; import {loadMembers, fetchMemberSnapshot} from "./members/load.js";
import {MemberList} from "./members/MemberList.js"; import {MemberList} from "./members/MemberList.js";
import {Heroes} from "./members/Heroes.js"; import {Heroes} from "./members/Heroes.js";
@ -141,20 +141,60 @@ export class Room extends EventEmitter {
return this._sendQueue.enqueueEvent(eventType, content); return this._sendQueue.enqueueEvent(eventType, content);
} }
/**
* @package
* @return {MemberSnapshot} the member snapshot, be sure to call dispose
*/
async fetchMemberSnapshot() {
if (!this.hasFetchedMembers) {
if (this._changedMembersDuringSync) {
throw new Error("already snapshot being written");
}
return await fetchMemberSnapshot({
summary: this._summary,
roomId: this._roomId,
hsApi: this._hsApi,
// to handle race between /members and /sync
setChangedMembersMap: map => this._changedMembersDuringSync = map,
});
}
}
async _fetchAndWriteMembers() {
const snapshot = this.fetchMemberSnapshot();
try {
const txn = await this._storage.readWriteTxn([
this._storage.storeNames.roomSummary,
this._storage.storeNames.roomMembers,
]);
let changes;
try {
changes = snapshot.write(txn);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
snapshot.applyWrite(changes);
return snapshot.members;
} finally {
snapshot.dispose();
}
}
/** @public */ /** @public */
async loadMemberList() { async loadMemberList() {
if (this._memberList) { if (this._memberList) {
this._memberList.retain(); this._memberList.retain();
return this._memberList; return this._memberList;
} else { } else {
const members = await fetchOrLoadMembers({ let members;
summary: this._summary, if (this.hasFetchedMembers) {
roomId: this._roomId, const txn = await this._storage.readTxn([this._storage.storeNames.roomMembers]);
hsApi: this._hsApi, members = await loadMembers(this._roomId, txn);
storage: this._storage, } else {
// to handle race between /members and /sync members = await this._fetchAndWriteMembers();
setChangedMembersMap: map => this._changedMembersDuringSync = map, }
});
this._memberList = new MemberList({ this._memberList = new MemberList({
members, members,
closeCallback: () => { this._memberList = null; } closeCallback: () => { this._memberList = null; }
@ -256,6 +296,10 @@ export class Room extends EventEmitter {
return !!(tags && tags['m.lowpriority']); return !!(tags && tags['m.lowpriority']);
} }
get hasFetchedMembers() {
return this._summary.hasFetchedMembers;
}
async _getLastEventId() { async _getLastEventId() {
const lastKey = this._syncWriter.lastMessageKey; const lastKey = this._syncWriter.lastMessageKey;
if (lastKey) { if (lastKey) {

View file

@ -17,74 +17,80 @@ limitations under the License.
import {RoomMember} from "./RoomMember.js"; import {RoomMember} from "./RoomMember.js";
async function loadMembers({roomId, storage}) { export async function loadMembers(roomId, txn) {
const txn = await storage.readTxn([
storage.storeNames.roomMembers,
]);
const memberDatas = await txn.roomMembers.getAll(roomId); const memberDatas = await txn.roomMembers.getAll(roomId);
return memberDatas.map(d => new RoomMember(d)); return memberDatas.map(d => new RoomMember(d));
} }
async function fetchMembers({summary, roomId, hsApi, storage, setChangedMembersMap}) { export async function fetchMemberSnapshot({summary, roomId, hsApi, setChangedMembersMap}) {
// if any members are changed by sync while we're fetching members, // if any members are changed by sync while we're fetching members,
// they will end up here, so we check not to override them // they will end up here, so we check not to override them
const changedMembersDuringSync = new Map(); const changedMembersDuringSync = new Map();
setChangedMembersMap(changedMembersDuringSync); setChangedMembersMap(changedMembersDuringSync);
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response; const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response();
if (!Array.isArray(memberResponse?.chunk)) {
throw new Error("malformed");
}
return new MemberSnapshot({memberEvents: memberResponse.chunk,
setChangedMembersMap, changedMembersDuringSync, summary, roomId});
}
const txn = await storage.readWriteTxn([ /** Container for fetching /members while handling race with /sync. Can be persisted as part of a wider transaction */
storage.storeNames.roomSummary, class MemberSnapshot {
storage.storeNames.roomMembers, constructor({memberEvents, setChangedMembersMap, changedMembersDuringSync, summary, roomId}) {
]); this._memberEvents = memberEvents;
this._setChangedMembersMap = setChangedMembersMap;
this._changedMembersDuringSync = changedMembersDuringSync;
this._summary = summary;
this._roomId = roomId;
this._members = null;
}
let summaryChanges; write(txn) {
let members; let summaryChanges;
// this needs to happen after the txn is opened to prevent a race
try { // between awaiting the opening of the txn and the sync
summaryChanges = summary.writeHasFetchedMembers(true, txn); this._members = this._memberEvents.map(memberEvent => {
const {roomMembers} = txn;
const memberEvents = memberResponse.chunk;
if (!Array.isArray(memberEvents)) {
throw new Error("malformed");
}
members = await Promise.all(memberEvents.map(async memberEvent => {
const userId = memberEvent?.state_key; const userId = memberEvent?.state_key;
if (!userId) { if (!userId) {
throw new Error("malformed"); throw new Error("malformed");
} }
// this member was changed during a sync that happened while calling /members // this member was changed during a sync that happened while calling /members
// and thus is more recent, so don't overwrite // and thus is more recent, so don't overwrite
const changedMember = changedMembersDuringSync.get(userId); const changedMember = this._changedMembersDuringSync.get(userId);
if (changedMember) { if (changedMember) {
return changedMember; return changedMember;
} else { } else {
const member = RoomMember.fromMemberEvent(roomId, memberEvent); return RoomMember.fromMemberEvent(this._roomId, memberEvent);
if (member) {
roomMembers.set(member.serialize());
}
return member;
} }
})); });
} catch (err) { // store members
// abort txn on any error const {roomMembers} = txn;
txn.abort(); for (const member of this._members) {
throw err; if (member) {
} finally { roomMembers.set(member.serialize());
}
}
// store flag
summaryChanges = this._summary.writeHasFetchedMembers(true, txn);
return summaryChanges;
}
applyWrite(summaryChanges) {
this._summary.applyChanges(summaryChanges);
}
get members() {
if (!this._members) {
throw new Error("call write first");
}
return this._members;
}
dispose() {
// important this gets cleared // important this gets cleared
// or otherwise Room remains in "fetching-members" mode // or otherwise Room remains in "fetching-members" mode
setChangedMembersMap(null); this._setChangedMembersMap(null);
}
await txn.complete();
summary.applyChanges(summaryChanges);
return members;
}
export async function fetchOrLoadMembers(options) {
const {summary} = options;
if (!summary.hasFetchedMembers) {
return fetchMembers(options);
} else {
return loadMembers(options);
} }
} }