integrate invites into the sync lifecycle and expose them on session
This commit is contained in:
parent
de125441d1
commit
a072426e07
5 changed files with 117 additions and 17 deletions
|
@ -1,5 +1,6 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
Copyright 2020, 2021 The Matrix.org Foundation C.I.C.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
|
@ -53,6 +54,8 @@ export class Session {
|
|||
this._sessionInfo = sessionInfo;
|
||||
this._rooms = new ObservableMap();
|
||||
this._roomUpdateCallback = (room, params) => this._rooms.update(room.id, params);
|
||||
this._invites = new ObservableMap();
|
||||
this._inviteRemoveCallback = invite => this._invites.remove(invite.id);
|
||||
this._user = new User(sessionInfo.userId);
|
||||
this._deviceMessageHandler = new DeviceMessageHandler({storage});
|
||||
this._olm = olm;
|
||||
|
@ -281,9 +284,10 @@ export class Session {
|
|||
const pendingEventsByRoomId = await this._getPendingEventsByRoom(txn);
|
||||
// load rooms
|
||||
const rooms = await txn.roomSummary.getAll();
|
||||
await Promise.all(rooms.map(summary => {
|
||||
await Promise.all(rooms.map(async summary => {
|
||||
const room = this.createRoom(summary.roomId, pendingEventsByRoomId.get(summary.roomId));
|
||||
return log.wrap("room", log => room.load(summary, txn, log));
|
||||
await log.wrap("room", log => room.load(summary, txn, log));
|
||||
this._rooms.add(room.id, room);
|
||||
}));
|
||||
}
|
||||
|
||||
|
@ -361,7 +365,7 @@ export class Session {
|
|||
|
||||
/** @internal */
|
||||
createRoom(roomId, pendingEvents) {
|
||||
const room = new Room({
|
||||
return new Room({
|
||||
roomId,
|
||||
getSyncToken: this._getSyncToken,
|
||||
storage: this._storage,
|
||||
|
@ -373,8 +377,31 @@ export class Session {
|
|||
createRoomEncryption: this._createRoomEncryption,
|
||||
platform: this._platform
|
||||
});
|
||||
this._rooms.add(roomId, room);
|
||||
return room;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
addRoomAfterSync(room) {
|
||||
this._rooms.add(room.id, room);
|
||||
}
|
||||
|
||||
get invites() {
|
||||
return this._invites;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
createInvite(roomId) {
|
||||
return new Invite({
|
||||
roomId,
|
||||
hsApi: this._hsApi,
|
||||
emitCollectionRemove: this._inviteRemoveCallback,
|
||||
user: this._user,
|
||||
clock: this._platform.clock,
|
||||
});
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
addInviteAfterSync(invite) {
|
||||
this._invites.add(invite.id, invite);
|
||||
}
|
||||
|
||||
async obtainSyncLock(syncResponse) {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/*
|
||||
Copyright 2020 Bruno Windels <bruno@windels.cloud>
|
||||
Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
Copyright 2020, 2021 The Matrix.org Foundation C.I.C.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
|
@ -191,7 +191,8 @@ export class Sync {
|
|||
|
||||
const isInitialSync = !syncToken;
|
||||
const sessionState = new SessionSyncProcessState();
|
||||
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
|
||||
const inviteStates = this._parseInvites(response.rooms);
|
||||
const roomStates = this._parseRoomsResponse(response.rooms, inviteStates, isInitialSync);
|
||||
|
||||
try {
|
||||
// take a lock on olm sessions used in this sync so sending a message doesn't change them while syncing
|
||||
|
@ -205,6 +206,10 @@ export class Sync {
|
|||
try {
|
||||
sessionState.changes = await log.wrap("session", log => this._session.writeSync(
|
||||
response, syncFilterId, sessionState.preparation, syncTxn, log));
|
||||
await Promise.all(inviteStates.map(async is => {
|
||||
is.changes = await log.wrap("invite", log => is.invite.writeSync(
|
||||
is.membership, is.roomResponse, syncTxn, log));
|
||||
}));
|
||||
await Promise.all(roomStates.map(async rs => {
|
||||
rs.changes = await log.wrap("room", log => rs.room.writeSync(
|
||||
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
|
||||
|
@ -228,9 +233,19 @@ export class Sync {
|
|||
|
||||
log.wrap("after", log => {
|
||||
log.wrap("session", log => this._session.afterSync(sessionState.changes, log), log.level.Detail);
|
||||
// emit invite related events after txn has been closed
|
||||
for(let is of inviteStates) {
|
||||
log.wrap("invite", () => is.invite.afterSync(is.changes), log.level.Detail);
|
||||
if (is.isNewInvite) {
|
||||
this._session.addInviteAfterSync(is.invite);
|
||||
}
|
||||
}
|
||||
// emit room related events after txn has been closed
|
||||
for(let rs of roomStates) {
|
||||
log.wrap("room", log => rs.room.afterSync(rs.changes, log), log.level.Detail);
|
||||
if (rs.isNewRoom) {
|
||||
this._session.addRoomAfterSync(rs.room);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -267,7 +282,7 @@ export class Sync {
|
|||
if (!isRoomInResponse) {
|
||||
let room = this._session.rooms.get(roomId);
|
||||
if (room) {
|
||||
roomStates.push(new RoomSyncProcessState(room, {}, room.membership));
|
||||
roomStates.push(new RoomSyncProcessState(room, false, null, {}, room.membership));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -276,7 +291,7 @@ export class Sync {
|
|||
await Promise.all(roomStates.map(async rs => {
|
||||
const newKeys = newKeysByRoom?.get(rs.room.id);
|
||||
rs.preparation = await log.wrap("room", log => rs.room.prepareSync(
|
||||
rs.roomResponse, rs.membership, newKeys, prepareTxn, log), log.level.Detail);
|
||||
rs.roomResponse, rs.membership, rs.invite, newKeys, prepareTxn, log), log.level.Detail);
|
||||
}));
|
||||
|
||||
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
|
||||
|
@ -288,6 +303,7 @@ export class Sync {
|
|||
return this._storage.readWriteTxn([
|
||||
storeNames.session,
|
||||
storeNames.roomSummary,
|
||||
storeNames.invites,
|
||||
storeNames.roomState,
|
||||
storeNames.roomMembers,
|
||||
storeNames.timelineEvents,
|
||||
|
@ -307,10 +323,10 @@ export class Sync {
|
|||
]);
|
||||
}
|
||||
|
||||
_parseRoomsResponse(roomsSection, isInitialSync) {
|
||||
_parseRoomsResponse(roomsSection, inviteStates, isInitialSync) {
|
||||
const roomStates = [];
|
||||
if (roomsSection) {
|
||||
// don't do "invite", "leave" for now
|
||||
// don't do "leave" for now
|
||||
const allMemberships = ["join"];
|
||||
for(const membership of allMemberships) {
|
||||
const membershipSection = roomsSection[membership];
|
||||
|
@ -321,11 +337,20 @@ export class Sync {
|
|||
if (isInitialSync && timelineIsEmpty(roomResponse)) {
|
||||
continue;
|
||||
}
|
||||
let isNewRoom = false;
|
||||
let room = this._session.rooms.get(roomId);
|
||||
if (!room) {
|
||||
room = this._session.createRoom(roomId);
|
||||
isNewRoom = true;
|
||||
}
|
||||
roomStates.push(new RoomSyncProcessState(room, roomResponse, membership));
|
||||
const invite = this._session.invites.get(roomId);
|
||||
// if there is an existing invite, add a process state for it
|
||||
// so its writeSync and afterSync will run and remove the invite
|
||||
if (invite) {
|
||||
inviteStates.push(new InviteSyncProcessState(invite, false, membership, null));
|
||||
}
|
||||
roomStates.push(new RoomSyncProcessState(
|
||||
room, isNewRoom, invite, roomResponse, membership));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -333,6 +358,21 @@ export class Sync {
|
|||
return roomStates;
|
||||
}
|
||||
|
||||
_parseInvites(invites, roomsSection) {
|
||||
const inviteStates = [];
|
||||
if (roomsSection.invite) {
|
||||
for (const [roomId, roomResponse] of Object.entries(roomsSection.invite)) {
|
||||
let invite = this._session.invites.get(roomId);
|
||||
let isNewInvite = false;
|
||||
if (!invite) {
|
||||
invite = this._session.createInvite(roomId);
|
||||
isNewInvite = true;
|
||||
}
|
||||
inviteStates.push(new InviteSyncProcessState(invite, isNewInvite, "invite", roomResponse));
|
||||
}
|
||||
}
|
||||
return inviteStates;
|
||||
}
|
||||
|
||||
stop() {
|
||||
if (this._status.get() === SyncStatus.Stopped) {
|
||||
|
@ -360,11 +400,23 @@ class SessionSyncProcessState {
|
|||
}
|
||||
|
||||
class RoomSyncProcessState {
|
||||
constructor(room, roomResponse, membership) {
|
||||
constructor(room, isNewRoom, invite, roomResponse, membership) {
|
||||
this.room = room;
|
||||
this.isNewRoom = isNewRoom;
|
||||
this.invite = invite;
|
||||
this.roomResponse = roomResponse;
|
||||
this.membership = membership;
|
||||
this.preparation = null;
|
||||
this.changes = null;
|
||||
}
|
||||
}
|
||||
|
||||
class InviteSyncProcessState {
|
||||
constructor(invite, isNewInvite, membership, roomResponse) {
|
||||
this.invite = invite;
|
||||
this.isNewInvite = isNewInvite;
|
||||
this.membership = membership;
|
||||
this.roomResponse = roomResponse;
|
||||
this.changes = null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,7 +107,7 @@ export class Invite extends EventEmitter {
|
|||
}
|
||||
}
|
||||
|
||||
afterSync(changes, room) {
|
||||
afterSync(changes) {
|
||||
if (changes) {
|
||||
this._inviteData = changes.inviteData;
|
||||
this._inviter = changes.inviter;
|
||||
|
|
|
@ -189,12 +189,15 @@ export class Room extends EventEmitter {
|
|||
return retryEntries;
|
||||
}
|
||||
|
||||
async prepareSync(roomResponse, membership, newKeys, txn, log) {
|
||||
async prepareSync(roomResponse, membership, invite, newKeys, txn, log) {
|
||||
log.set("id", this.id);
|
||||
if (newKeys) {
|
||||
log.set("newKeys", newKeys.length);
|
||||
}
|
||||
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
||||
let summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership);
|
||||
if (invite) {
|
||||
summaryChanges = summaryChanges.applyInvite(invite);
|
||||
}
|
||||
let roomEncryption = this._roomEncryption;
|
||||
// encryption is enabled in this sync
|
||||
if (!roomEncryption && summaryChanges.encryption) {
|
||||
|
@ -379,7 +382,7 @@ export class Room extends EventEmitter {
|
|||
* Can be used to do longer running operations that resulted from the last sync,
|
||||
* like network operations.
|
||||
*/
|
||||
async afterSyncCompleted(changes, log) {
|
||||
async afterSyncCompleted(changes, isNewRoom, log) {
|
||||
log.set("id", this.id);
|
||||
if (this._roomEncryption) {
|
||||
await this._roomEncryption.flushPendingRoomKeyShares(this._hsApi, null, log);
|
||||
|
|
|
@ -148,6 +148,18 @@ function updateSummary(data, summary) {
|
|||
return data;
|
||||
}
|
||||
|
||||
function applyInvite(data, invite) {
|
||||
if (data.isDirectMessage !== invite.isDirectMessage) {
|
||||
data = data.cloneIfNeeded();
|
||||
data.isDirectMessage = invite.isDirectMessage;
|
||||
}
|
||||
if (data.dmUserId !== invite.inviter?.userId) {
|
||||
data = data.cloneIfNeeded();
|
||||
data.dmUserId = invite.inviter?.userId;
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
export class SummaryData {
|
||||
constructor(copy, roomId) {
|
||||
this.roomId = copy ? copy.roomId : roomId;
|
||||
|
@ -166,6 +178,8 @@ export class SummaryData {
|
|||
this.notificationCount = copy ? copy.notificationCount : 0;
|
||||
this.highlightCount = copy ? copy.highlightCount : 0;
|
||||
this.tags = copy ? copy.tags : null;
|
||||
this.isDirectMessage = copy ? copy.isDirectMessage : false;
|
||||
this.dmUserId = copy ? copy.dmUserId : null;
|
||||
this.cloned = copy ? true : false;
|
||||
}
|
||||
|
||||
|
@ -202,6 +216,10 @@ export class SummaryData {
|
|||
return applySyncResponse(this, roomResponse, membership);
|
||||
}
|
||||
|
||||
applyInvite(invite) {
|
||||
return applyInvite(this, invite);
|
||||
}
|
||||
|
||||
get needsHeroes() {
|
||||
return !this.name && !this.canonicalAlias && this.heroes && this.heroes.length > 0;
|
||||
}
|
||||
|
|
Reference in a new issue