Merge pull request #32 from bwindels/bwindels/fixconstrainterror-part2-rebased

Also only set session and summary data once txn is committed
This commit is contained in:
Bruno Windels 2020-03-14 19:53:31 +00:00 committed by GitHub
commit 2b0151c248
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 217 additions and 194 deletions

View file

@ -29,6 +29,5 @@
"postcss-import": "^12.0.1", "postcss-import": "^12.0.1",
"rollup": "^1.15.6", "rollup": "^1.15.6",
"serve-static": "^1.13.2" "serve-static": "^1.13.2"
}, }
"type": "module"
} }

View file

@ -20,22 +20,23 @@ export default class Room extends EventEmitter {
this._user = user; this._user = user;
} }
async persistSync(roomResponse, membership, txn) { async writeSync(roomResponse, membership, txn) {
const summaryChanged = this._summary.applySync(roomResponse, membership, txn); const summaryChanges = this._summary.writeSync(roomResponse, membership, txn);
const {entries, newLiveKey} = await this._syncWriter.writeSync(roomResponse, txn); const {entries, newLiveKey} = await this._syncWriter.writeSync(roomResponse, txn);
let removedPendingEvents; let removedPendingEvents;
if (roomResponse.timeline && roomResponse.timeline.events) { if (roomResponse.timeline && roomResponse.timeline.events) {
removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn); removedPendingEvents = this._sendQueue.removeRemoteEchos(roomResponse.timeline.events, txn);
} }
return {summaryChanged, newTimelineEntries: entries, newLiveKey, removedPendingEvents}; return {summaryChanges, newTimelineEntries: entries, newLiveKey, removedPendingEvents};
} }
emitSync({summaryChanged, newTimelineEntries, newLiveKey, removedPendingEvents}) { afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents}) {
if (summaryChanged) { this._syncWriter.afterSync(newLiveKey);
if (summaryChanges) {
this._summary.afterSync(summaryChanges);
this.emit("change"); this.emit("change");
this._emitCollectionChange(this); this._emitCollectionChange(this);
} }
this._syncWriter.setKeyOnCompleted(newLiveKey);
if (this._timeline) { if (this._timeline) {
this._timeline.appendLiveEntries(newTimelineEntries); this._timeline.appendLiveEntries(newTimelineEntries);
} }

View file

@ -1,205 +1,170 @@
// import SummaryMembers from "./members"; function applySyncResponse(data, roomResponse, membership) {
if (roomResponse.summary) {
data = updateSummary(data, roomResponse.summary);
}
if (membership !== data.membership) {
data = data.cloneIfNeeded();
data.membership = membership;
}
// state comes before timeline
if (roomResponse.state) {
data = roomResponse.state.events.reduce(processEvent, data);
}
if (roomResponse.timeline) {
data = roomResponse.timeline.events.reduce(processEvent, data);
}
return data;
}
function processEvent(data, event) {
if (event.type === "m.room.encryption") {
if (!data.isEncrypted) {
data = data.cloneIfNeeded();
data.isEncrypted = true;
}
}
if (event.type === "m.room.name") {
const newName = event.content && event.content.name;
if (newName !== data.name) {
data = data.cloneIfNeeded();
data.name = newName;
}
} else if (event.type === "m.room.message") {
const content = event.content;
const body = content && content.body;
const msgtype = content && content.msgtype;
if (msgtype === "m.text") {
data = data.cloneIfNeeded();
data.lastMessageBody = body;
}
} else if (event.type === "m.room.canonical_alias") {
const content = event.content;
data = data.cloneIfNeeded();
data.canonicalAlias = content.alias;
data.altAliases = content.alt_aliases;
}
return data;
}
function updateSummary(data, summary) {
const heroes = summary["m.heroes"];
const inviteCount = summary["m.joined_member_count"];
const joinCount = summary["m.invited_member_count"];
if (heroes) {
data = data.cloneIfNeeded();
data.heroes = heroes;
}
if (Number.isInteger(inviteCount)) {
data = data.cloneIfNeeded();
data.inviteCount = inviteCount;
}
if (Number.isInteger(joinCount)) {
data = data.cloneIfNeeded();
data.joinCount = joinCount;
}
return data;
}
class SummaryData {
constructor(copy, roomId) {
this.roomId = copy ? copy.roomId : roomId;
this.name = copy ? copy.name : null;
this.lastMessageBody = copy ? copy.lastMessageBody : null;
this.unreadCount = copy ? copy.unreadCount : null;
this.mentionCount = copy ? copy.mentionCount : null;
this.isEncrypted = copy ? copy.isEncrypted : null;
this.isDirectMessage = copy ? copy.isDirectMessage : null;
this.membership = copy ? copy.membership : null;
this.inviteCount = copy ? copy.inviteCount : 0;
this.joinCount = copy ? copy.joinCount : 0;
this.heroes = copy ? copy.heroes : null;
this.canonicalAlias = copy ? copy.canonicalAlias : null;
this.altAliases = copy ? copy.altAliases : null;
this.cloned = copy ? true : false;
}
cloneIfNeeded() {
if (this.cloned) {
return this;
} else {
return new SummaryData(this);
}
}
serialize() {
const {cloned, ...serializedProps} = this;
return serializedProps;
}
}
export default class RoomSummary { export default class RoomSummary {
constructor(roomId) { constructor(roomId) {
// this._members = new SummaryMembers(); this._data = new SummaryData(null, roomId);
this._roomId = roomId;
this._name = null;
this._lastMessageBody = null;
this._unreadCount = null;
this._mentionCount = null;
this._isEncrypted = null;
this._isDirectMessage = null;
this._membership = null;
this._inviteCount = 0;
this._joinCount = 0;
this._readMarkerEventId = null;
this._heroes = null;
this._canonicalAlias = null;
this._aliases = null;
} }
get name() { get name() {
if (this._name) { if (this._data.name) {
return this._name; return this._data.name;
} }
if (this._canonicalAlias) { if (this._data.canonicalAlias) {
return this._canonicalAlias; return this._data.canonicalAlias;
} }
if (Array.isArray(this._aliases) && this._aliases.length !== 0) { if (Array.isArray(this._data.altAliases) && this._data.altAliases.length !== 0) {
return this._aliases[0]; return this._data.altAliases[0];
} }
if (Array.isArray(this._heroes) && this._heroes.length !== 0) { if (Array.isArray(this._data.heroes) && this._data.heroes.length !== 0) {
return this._heroes.join(", "); return this._data.heroes.join(", ");
} }
return this._roomId; return this._data.roomId;
} }
get lastMessage() { get lastMessage() {
return this._lastMessageBody; return this._data.lastMessageBody;
} }
get inviteCount() { get inviteCount() {
return this._inviteCount; return this._data.inviteCount;
} }
get joinCount() { get joinCount() {
return this._joinCount; return this._data.joinCount;
} }
applySync(roomResponse, membership, txn) { writeSync(roomResponse, membership, txn) {
const changed = this._processSyncResponse(roomResponse, membership); // clear cloned flag, so cloneIfNeeded makes a copy and
if (changed) { // this._data is not modified if any field is changed.
this._persist(txn); this._data.cloned = false;
} const data = applySyncResponse(this._data, roomResponse, membership);
return changed; if (data !== this._data) {
}
async load(summary) {
this._roomId = summary.roomId;
this._name = summary.name;
this._lastMessageBody = summary.lastMessageBody;
this._unreadCount = summary.unreadCount;
this._mentionCount = summary.mentionCount;
this._isEncrypted = summary.isEncrypted;
this._isDirectMessage = summary.isDirectMessage;
this._membership = summary.membership;
this._inviteCount = summary.inviteCount;
this._joinCount = summary.joinCount;
this._readMarkerEventId = summary.readMarkerEventId;
this._heroes = summary.heroes;
this._aliases = summary.aliases;
this._canonicalAlias = summary.canonicalAlias;
}
_persist(txn) {
// need to think here how we want to persist // need to think here how we want to persist
// things like unread status (as read marker, or unread count)? // things like unread status (as read marker, or unread count)?
// we could very well load additional things in the load method // we could very well load additional things in the load method
// ... the trade-off is between constantly writing the summary // ... the trade-off is between constantly writing the summary
// on every sync, or doing a bit of extra reading on load // on every sync, or doing a bit of extra reading on load
// and have in-memory only variables for visualization // and have in-memory only variables for visualization
const summary = { txn.roomSummary.set(data.serialize());
roomId: this._roomId, return data;
name: this._name, }
lastMessageBody: this._lastMessageBody,
unreadCount: this._unreadCount,
mentionCount: this._mentionCount,
isEncrypted: this._isEncrypted,
isDirectMessage: this._isDirectMessage,
membership: this._membership,
inviteCount: this._inviteCount,
joinCount: this._joinCount,
readMarkerEventId: this._readMarkerEventId,
heroes: this._heroes,
aliases: this._aliases,
canonicalAlias: this._canonicalAlias,
};
return txn.roomSummary.set(summary);
} }
_processSyncResponse(roomResponse, membership) { afterSync(data) {
let changed = false; this._data = data;
if (roomResponse.summary) {
this._updateSummary(roomResponse.summary);
changed = true;
}
if (membership !== this._membership) {
this._membership = membership;
changed = true;
}
// state comes before timeline
if (roomResponse.state) {
changed = roomResponse.state.events.reduce((changed, e) => {
return this._processEvent(e) || changed;
}, changed);
}
if (roomResponse.timeline) {
changed = roomResponse.timeline.events.reduce((changed, e) => {
return this._processEvent(e) || changed;
}, changed);
} }
return changed; async load(summary) {
this._data = new SummaryData(summary);
}
} }
_processEvent(event) { export function tests() {
if (event.type === "m.room.encryption") { return {
if (!this._isEncrypted) { "membership trigger change": function(assert) {
this._isEncrypted = true; const summary = new RoomSummary("id");
return true; const changes = summary.writeSync({}, "join");
} assert(changes);
} assert(changes.changed);
if (event.type === "m.room.name") {
const newName = event.content && event.content.name;
if (newName !== this._name) {
this._name = newName;
return true;
}
} else if (event.type === "m.room.member") {
return this._processMembership(event);
} else if (event.type === "m.room.message") {
const content = event.content;
const body = content && content.body;
const msgtype = content && content.msgtype;
if (msgtype === "m.text") {
this._lastMessageBody = body;
return true;
}
} else if (event.type === "m.room.canonical_alias") {
const content = event.content;
this._canonicalAlias = content.alias;
return true;
} else if (event.type === "m.room.aliases") {
const content = event.content;
this._aliases = content.aliases;
return true;
}
return false;
}
_processMembership(event) {
let changed = false;
const prevMembership = event.prev_content && event.prev_content.membership;
if (!event.content) {
return changed;
}
const content = event.content;
const membership = content.membership;
// danger of a replayed event getting the count out of sync
// but summary api will solve this.
// otherwise we'd have to store all the member ids in here
if (membership !== prevMembership) {
switch (prevMembership) {
case "invite": --this._inviteCount; break;
case "join": --this._joinCount; break;
}
switch (membership) {
case "invite": ++this._inviteCount; break;
case "join": ++this._joinCount; break;
}
changed = true;
}
// if (membership === "join" && content.name) {
// // TODO: avatar_url
// changed = this._members.applyMember(content.name, content.state_key) || changed;
// }
return changed;
}
_updateSummary(summary) {
const heroes = summary["m.heroes"];
const inviteCount = summary["m.joined_member_count"];
const joinCount = summary["m.invited_member_count"];
if (heroes) {
this._heroes = heroes;
}
if (Number.isInteger(inviteCount)) {
this._inviteCount = inviteCount;
}
if (Number.isInteger(joinCount)) {
this._joinCount = joinCount;
} }
} }
} }

View file

@ -129,7 +129,7 @@ export default class SyncWriter {
return {entries, newLiveKey: currentKey}; return {entries, newLiveKey: currentKey};
} }
setKeyOnCompleted(newLiveKey) { afterSync(newLiveKey) {
this._lastLiveKey = newLiveKey; this._lastLiveKey = newLiveKey;
} }
} }

View file

@ -77,11 +77,19 @@ export default class Session {
return room; return room;
} }
persistSync(syncToken, syncFilterId, accountData, txn) { writeSync(syncToken, syncFilterId, accountData, txn) {
if (syncToken !== this._session.syncToken) { if (syncToken !== this._session.syncToken) {
this._session.syncToken = syncToken; // don't modify this._session because transaction might still fail
this._session.syncFilterId = syncFilterId; const newSessionData = Object.assign({}, this._session, {syncToken, syncFilterId});
txn.session.set(this._session); txn.session.set(newSessionData);
return newSessionData;
}
}
afterSync(newSessionData) {
if (newSessionData) {
// sync transaction succeeded, modify object state now
this._session = newSessionData;
} }
} }
@ -97,3 +105,51 @@ export default class Session {
return this._user; return this._user;
} }
} }
export function tests() {
function createStorageMock(session, pendingEvents) {
return {
readTxn() {
return Promise.resolve({
session: {
get() {
return Promise.resolve(Object.assign({}, session));
}
},
pendingEvents: {
getAll() {
return Promise.resolve(pendingEvents);
}
}
});
}
};
}
return {
"session data is not modified until after sync": async (assert) => {
const session = new Session({storage: createStorageMock({
syncToken: "a",
syncFilterId: 5,
})});
await session.load();
let txnSetCalled = false;
const syncTxn = {
session: {
set({syncToken, syncFilterId}) {
txnSetCalled = true;
assert.equals(syncToken, "b");
assert.equals(syncFilterId, 6);
}
}
};
const newSessionData = session.writeSync("b", 6, {}, syncTxn);
assert(txnSetCalled);
assert.equals(session.syncToken, "a");
assert.equals(session.syncFilterId, 5);
session.afterSync(newSessionData);
assert.equals(session.syncToken, "b");
assert.equals(session.syncFilterId, 6);
}
}
}

View file

@ -85,8 +85,9 @@ export default class Sync extends EventEmitter {
storeNames.pendingEvents, storeNames.pendingEvents,
]); ]);
const roomChanges = []; const roomChanges = [];
let sessionChanges;
try { try {
this._session.persistSync(syncToken, syncFilterId, response.account_data, syncTxn); sessionChanges = this._session.writeSync(syncToken, syncFilterId, response.account_data, syncTxn);
// to_device // to_device
// presence // presence
if (response.rooms) { if (response.rooms) {
@ -96,7 +97,7 @@ export default class Sync extends EventEmitter {
room = this._session.createRoom(roomId); room = this._session.createRoom(roomId);
} }
console.log(` * applying sync response to room ${roomId} ...`); console.log(` * applying sync response to room ${roomId} ...`);
const changes = await room.persistSync(roomResponse, membership, syncTxn); const changes = await room.writeSync(roomResponse, membership, syncTxn);
roomChanges.push({room, changes}); roomChanges.push({room, changes});
}); });
await Promise.all(promises); await Promise.all(promises);
@ -116,9 +117,10 @@ export default class Sync extends EventEmitter {
console.error("unable to commit sync tranaction"); console.error("unable to commit sync tranaction");
throw err; throw err;
} }
this._session.afterSync(sessionChanges);
// emit room related events after txn has been closed // emit room related events after txn has been closed
for(let {room, changes} of roomChanges) { for(let {room, changes} of roomChanges) {
room.emitSync(changes); room.afterSync(changes);
} }
return syncToken; return syncToken;