Merge pull request #77 from vector-im/bwindels/devicetracking

Implement device tracking for E2EE rooms
This commit is contained in:
Bruno Windels 2020-08-31 14:16:46 +00:00 committed by GitHub
commit cb940bf143
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 602 additions and 55 deletions

View file

@ -19,6 +19,7 @@ import { ObservableMap } from "../observable/index.js";
import { SendScheduler, RateLimitingBackoff } from "./SendScheduler.js";
import {User} from "./User.js";
import {Account as E2EEAccount} from "./e2ee/Account.js";
import {DeviceTracker} from "./e2ee/DeviceTracker.js";
const PICKLE_KEY = "DEFAULT_KEY";
export class Session {
@ -34,6 +35,11 @@ export class Session {
this._user = new User(sessionInfo.userId);
this._olm = olm;
this._e2eeAccount = null;
this._deviceTracker = olm ? new DeviceTracker({
storage,
getSyncToken: () => this.syncToken,
olm,
}) : null;
}
async beforeFirstSync(isNewLogin) {
@ -152,7 +158,7 @@ export class Session {
return room;
}
writeSync(syncResponse, syncFilterId, txn) {
async writeSync(syncResponse, syncFilterId, roomChanges, txn) {
const changes = {};
const syncToken = syncResponse.next_batch;
const deviceOneTimeKeysCount = syncResponse.device_one_time_keys_count;
@ -166,6 +172,17 @@ export class Session {
txn.session.set("sync", syncInfo);
changes.syncInfo = syncInfo;
}
if (this._deviceTracker) {
for (const {room, changes} of roomChanges) {
if (room.isTrackingMembers && changes.memberChanges?.size) {
await this._deviceTracker.writeMemberChanges(room, changes.memberChanges, txn);
}
}
const deviceLists = syncResponse.device_lists;
if (deviceLists) {
await this._deviceTracker.writeDeviceChanges(deviceLists, txn);
}
}
return changes;
}

View file

@ -100,11 +100,13 @@ export class Sync {
this._status.set(SyncStatus.Stopped);
}
}
try {
await this._session.afterSyncCompleted();
} catch (err) {
console.err("error during after sync completed, continuing to sync.", err.stack);
// swallowing error here apart from logging
if (!this._error) {
try {
await this._session.afterSyncCompleted();
} catch (err) {
console.err("error during after sync completed, continuing to sync.", err.stack);
// swallowing error here apart from logging
}
}
}
}
@ -129,11 +131,11 @@ export class Sync {
storeNames.timelineEvents,
storeNames.timelineFragments,
storeNames.pendingEvents,
storeNames.userIdentities,
]);
const roomChanges = [];
let sessionChanges;
try {
sessionChanges = this._session.writeSync(response, syncFilterId, syncTxn);
// to_device
// presence
if (response.rooms) {
@ -153,6 +155,7 @@ export class Sync {
});
await Promise.all(promises);
}
sessionChanges = await this._session.writeSync(response, syncFilterId, roomChanges, syncTxn);
} catch(err) {
console.warn("aborting syncTxn because of error");
console.error(err);

View file

@ -15,14 +15,12 @@ limitations under the License.
*/
import anotherjson from "../../../lib/another-json/index.js";
import {SESSION_KEY_PREFIX, OLM_ALGORITHM, MEGOLM_ALGORITHM} from "./common.js";
// use common prefix so it's easy to clear properties that are not e2ee related during session clear
export const SESSION_KEY_PREFIX = "e2ee:";
const ACCOUNT_SESSION_KEY = SESSION_KEY_PREFIX + "olmAccount";
const DEVICE_KEY_FLAG_SESSION_KEY = SESSION_KEY_PREFIX + "areDeviceKeysUploaded";
const SERVER_OTK_COUNT_SESSION_KEY = SESSION_KEY_PREFIX + "serverOTKCount";
const OLM_ALGORITHM = "m.olm.v1.curve25519-aes-sha2";
const MEGOLM_ALGORITHM = "m.megolm.v1.aes-sha2";
export class Account {
static async load({olm, pickleKey, hsApi, userId, deviceId, txn}) {

View file

@ -0,0 +1,280 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import anotherjson from "../../../lib/another-json/index.js";
const TRACKING_STATUS_OUTDATED = 0;
const TRACKING_STATUS_UPTODATE = 1;
const DEVICE_KEYS_SIGNATURE_ALGORITHM = "ed25519";
// map 1 device from /keys/query response to DeviceIdentity
function deviceKeysAsDeviceIdentity(deviceSection) {
const deviceId = deviceSection["device_id"];
const userId = deviceSection["user_id"];
return {
userId,
deviceId,
ed25519Key: deviceSection.keys?.[`ed25519:${deviceId}`],
curve25519Key: deviceSection.keys?.[`curve25519:${deviceId}`],
algorithms: deviceSection.algorithms,
displayName: deviceSection.unsigned?.device_display_name,
};
}
export class DeviceTracker {
constructor({storage, getSyncToken, olm}) {
this._storage = storage;
this._getSyncToken = getSyncToken;
this._identityChangedForRoom = null;
this._olmUtil = new olm.Utility();
}
async writeDeviceChanges(deviceLists, txn) {
const {userIdentities} = txn;
if (Array.isArray(deviceLists.changed) && deviceLists.changed.length) {
await Promise.all(deviceLists.changed.map(async userId => {
const user = await userIdentities.get(userId);
if (user) {
user.deviceTrackingStatus = TRACKING_STATUS_OUTDATED;
userIdentities.set(user);
} else {
console.warn("changed device userid not found", userId);
}
}));
}
}
writeMemberChanges(room, memberChanges, txn) {
return Promise.all(Array.from(memberChanges.values()).map(async memberChange => {
return this._applyMemberChange(memberChange, txn);
}));
}
async trackRoom(room) {
if (room.isTrackingMembers) {
return;
}
const memberList = await room.loadMemberList();
try {
const txn = await this._storage.readWriteTxn([
this._storage.storeNames.roomSummary,
this._storage.storeNames.userIdentities,
]);
let isTrackingChanges;
try {
isTrackingChanges = room.writeIsTrackingMembers(true, txn);
const members = Array.from(memberList.members.values());
await this._writeJoinedMembers(members, txn);
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
room.applyIsTrackingMembersChanges(isTrackingChanges);
} finally {
memberList.release();
}
}
async _writeJoinedMembers(members, txn) {
await Promise.all(members.map(async member => {
if (member.membership === "join") {
await this._writeMember(member, txn);
}
}));
}
async _writeMember(member, txn) {
const {userIdentities} = txn;
const identity = await userIdentities.get(member.userId);
if (!identity) {
userIdentities.set({
userId: member.userId,
roomIds: [member.roomId],
deviceTrackingStatus: TRACKING_STATUS_OUTDATED,
});
} else {
if (!identity.roomIds.includes(member.roomId)) {
identity.roomIds.push(member.roomId);
userIdentities.set(identity);
}
}
}
async _applyMemberChange(memberChange, txn) {
// TODO: depends whether we encrypt for invited users??
// add room
if (memberChange.previousMembership !== "join" && memberChange.membership === "join") {
await this._writeMember(memberChange.member, txn);
}
// remove room
else if (memberChange.previousMembership === "join" && memberChange.membership !== "join") {
const {userIdentities} = txn;
const identity = await userIdentities.get(memberChange.userId);
if (identity) {
identity.roomIds = identity.roomIds.filter(roomId => roomId !== memberChange.roomId);
// no more encrypted rooms with this user, remove
if (identity.roomIds.length === 0) {
userIdentities.remove(identity.userId);
} else {
userIdentities.set(identity);
}
}
}
}
async _queryKeys(userIds, hsApi) {
// TODO: we need to handle the race here between /sync and /keys/query just like we need to do for the member list ...
// there are multiple requests going out for /keys/query though and only one for /members
const deviceKeyResponse = await hsApi.queryKeys({
"timeout": 10000,
"device_keys": userIds.reduce((deviceKeysMap, userId) => {
deviceKeysMap[userId] = [];
return deviceKeysMap;
}, {}),
"token": this._getSyncToken()
}).response();
const verifiedKeysPerUser = this._filterVerifiedDeviceKeys(deviceKeyResponse["device_keys"]);
const flattenedVerifiedKeysPerUser = verifiedKeysPerUser.reduce((all, {verifiedKeys}) => all.concat(verifiedKeys), []);
const deviceIdentitiesWithPossibleChangedKeys = flattenedVerifiedKeysPerUser.map(deviceKeysAsDeviceIdentity);
const txn = await this._storage.readWriteTxn([
this._storage.storeNames.userIdentities,
this._storage.storeNames.deviceIdentities,
]);
let deviceIdentities;
try {
// check ed25519 key has not changed if we've seen the device before
deviceIdentities = await Promise.all(deviceIdentitiesWithPossibleChangedKeys.map(async (deviceIdentity) => {
const existingDevice = await txn.deviceIdentities.get(deviceIdentity.userId, deviceIdentity.deviceId);
if (!existingDevice || existingDevice.ed25519Key === deviceIdentity.ed25519Key) {
return deviceIdentity;
}
// ignore devices where the keys have changed
return null;
}));
// filter out nulls
deviceIdentities = deviceIdentities.filter(di => !!di);
// store devices
for (const deviceIdentity of deviceIdentities) {
txn.deviceIdentities.set(deviceIdentity);
}
// mark user identities as up to date
await Promise.all(verifiedKeysPerUser.map(async ({userId}) => {
const identity = await txn.userIdentities.get(userId);
identity.deviceTrackingStatus = TRACKING_STATUS_UPTODATE;
txn.userIdentities.set(identity);
}));
} catch (err) {
txn.abort();
throw err;
}
await txn.complete();
return deviceIdentities;
}
_filterVerifiedDeviceKeys(keyQueryDeviceKeysResponse) {
const verifiedKeys = Object.entries(keyQueryDeviceKeysResponse).map(([userId, keysByDevice]) => {
const verifiedEntries = Object.entries(keysByDevice).filter(([deviceId, deviceKeys]) => {
const deviceIdOnKeys = deviceKeys["device_id"];
const userIdOnKeys = deviceKeys["user_id"];
if (userIdOnKeys !== userId) {
return false;
}
if (deviceIdOnKeys !== deviceId) {
return false;
}
return this._verifyUserDeviceKeys(deviceKeys);
});
const verifiedKeys = verifiedEntries.map(([, deviceKeys]) => deviceKeys);
return {userId, verifiedKeys};
});
return verifiedKeys;
}
_verifyUserDeviceKeys(deviceSection) {
const deviceId = deviceSection["device_id"];
const userId = deviceSection["user_id"];
const clone = Object.assign({}, deviceSection);
delete clone.unsigned;
delete clone.signatures;
const canonicalJson = anotherjson.stringify(clone);
const key = deviceSection?.keys?.[`${DEVICE_KEYS_SIGNATURE_ALGORITHM}:${deviceId}`];
const signature = deviceSection?.signatures?.[userId]?.[`${DEVICE_KEYS_SIGNATURE_ALGORITHM}:${deviceId}`];
try {
if (!signature) {
throw new Error("no signature");
}
// throws when signature is invalid
this._olmUtil.ed25519_verify(key, canonicalJson, signature);
return true;
} catch (err) {
console.warn("Invalid device signature, ignoring device.", key, canonicalJson, signature, err);
return false;
}
}
/**
* Gives all the device identities for a room that is already tracked.
* Assumes room is already tracked. Call `trackRoom` first if unsure.
* @param {String} roomId [description]
* @return {[type]} [description]
*/
async deviceIdentitiesForTrackedRoom(roomId, hsApi) {
let identities;
const txn = await this._storage.readTxn([
this._storage.storeNames.roomMembers,
this._storage.storeNames.userIdentities,
]);
// because we don't have multiEntry support in IE11, we get a set of userIds that is pretty close to what we
// need as a good first filter (given that non-join memberships will be in there). After fetching the identities,
// we check which ones have the roomId for the room we're looking at.
// So, this will also contain non-joined memberships
const userIds = await txn.roomMembers.getAllUserIds(roomId);
const allMemberIdentities = await Promise.all(userIds.map(userId => txn.userIdentities.get(userId)));
identities = allMemberIdentities.filter(identity => {
// identity will be missing for any userIds that don't have
// membership join in any of your encrypted rooms
return identity && identity.roomIds.includes(roomId);
});
const upToDateIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_UPTODATE);
const outdatedIdentities = identities.filter(i => i.deviceTrackingStatus === TRACKING_STATUS_OUTDATED);
let queriedDevices;
if (outdatedIdentities.length) {
// TODO: ignore the race between /sync and /keys/query for now,
// where users could get marked as outdated or added/removed from the room while
// querying keys
queriedDevices = await this._queryKeys(outdatedIdentities.map(i => i.userId), hsApi);
}
const deviceTxn = await this._storage.readTxn([
this._storage.storeNames.deviceIdentities,
]);
const devicesPerUser = await Promise.all(upToDateIdentities.map(identity => {
return deviceTxn.deviceIdentities.getAllForUserId(identity.userId);
}));
let flattenedDevices = devicesPerUser.reduce((all, devicesForUser) => all.concat(devicesForUser), []);
if (queriedDevices && queriedDevices.length) {
flattenedDevices = flattenedDevices.concat(queriedDevices);
}
return flattenedDevices;
}
}

20
src/matrix/e2ee/common.js Normal file
View file

@ -0,0 +1,20 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// use common prefix so it's easy to clear properties that are not e2ee related during session clear
export const SESSION_KEY_PREFIX = "e2ee:";
export const OLM_ALGORITHM = "m.olm.v1.curve25519-aes-sha2";
export const MEGOLM_ALGORITHM = "m.megolm.v1.aes-sha2";

View file

@ -164,6 +164,10 @@ export class HomeServerApi {
return this._post("/keys/upload", null, payload, options);
}
queryKeys(queryRequest, options = null) {
return this._post("/keys/query", null, queryRequest, options);
}
get mediaRepository() {
return this._mediaRepository;
}

View file

@ -40,6 +40,7 @@ export class Room extends EventEmitter {
this._timeline = null;
this._user = user;
this._changedMembersDuringSync = null;
this._memberList = null;
}
/** @package */
@ -50,7 +51,7 @@ export class Room extends EventEmitter {
membership,
isInitialSync, isTimelineOpen,
txn);
const {entries, newLiveKey, changedMembers} = await this._syncWriter.writeSync(roomResponse, txn);
const {entries, newLiveKey, memberChanges} = await this._syncWriter.writeSync(roomResponse, txn);
// fetch new members while we have txn open,
// but don't make any in-memory changes yet
let heroChanges;
@ -59,7 +60,7 @@ export class Room extends EventEmitter {
if (!this._heroes) {
this._heroes = new Heroes(this._roomId);
}
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, changedMembers, txn);
heroChanges = await this._heroes.calculateChanges(summaryChanges.heroes, memberChanges, txn);
}
let removedPendingEvents;
if (roomResponse.timeline && roomResponse.timeline.events) {
@ -70,22 +71,22 @@ export class Room extends EventEmitter {
newTimelineEntries: entries,
newLiveKey,
removedPendingEvents,
changedMembers,
memberChanges,
heroChanges
};
}
/** @package */
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, changedMembers, heroChanges}) {
afterSync({summaryChanges, newTimelineEntries, newLiveKey, removedPendingEvents, memberChanges, heroChanges}) {
this._syncWriter.afterSync(newLiveKey);
if (changedMembers.length) {
if (memberChanges.size) {
if (this._changedMembersDuringSync) {
for (const member of changedMembers) {
this._changedMembersDuringSync.set(member.userId, member);
for (const [userId, memberChange] of memberChanges.entries()) {
this._changedMembersDuringSync.set(userId, memberChange.member);
}
}
if (this._memberList) {
this._memberList.afterSync(changedMembers);
this._memberList.afterSync(memberChanges);
}
}
let emitChange = false;
@ -144,6 +145,7 @@ export class Room extends EventEmitter {
/** @public */
async loadMemberList() {
if (this._memberList) {
// TODO: also await fetchOrLoadMembers promise here
this._memberList.retain();
return this._memberList;
} else {
@ -256,6 +258,14 @@ export class Room extends EventEmitter {
return !!(tags && tags['m.lowpriority']);
}
get isEncrypted() {
return !!this._summary.encryption;
}
get isTrackingMembers() {
return this._summary.isTrackingMembers;
}
async _getLastEventId() {
const lastKey = this._syncWriter.lastMessageKey;
if (lastKey) {
@ -322,5 +332,15 @@ export class Room extends EventEmitter {
get mediaRepository() {
return this._hsApi.mediaRepository;
}
/** @package */
writeIsTrackingMembers(value, txn) {
return this._summary.writeIsTrackingMembers(value, txn);
}
/** @package */
applyIsTrackingMembersChanges(changes) {
this._summary.applyChanges(changes);
}
}

View file

@ -14,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {MEGOLM_ALGORITHM} from "../e2ee/common.js";
function applySyncResponse(data, roomResponse, membership, isInitialSync, isTimelineOpen, ownUserId) {
if (roomResponse.summary) {
data = updateSummary(data, roomResponse.summary);
@ -68,9 +70,10 @@ function processRoomAccountData(data, event) {
function processStateEvent(data, event) {
if (event.type === "m.room.encryption") {
if (!data.isEncrypted) {
const algorithm = event.content?.algorithm;
if (!data.encryption && algorithm === MEGOLM_ALGORITHM) {
data = data.cloneIfNeeded();
data.isEncrypted = true;
data.encryption = event.content;
}
} else if (event.type === "m.room.name") {
const newName = event.content?.name;
@ -113,7 +116,9 @@ function updateSummary(data, summary) {
const heroes = summary["m.heroes"];
const joinCount = summary["m.joined_member_count"];
const inviteCount = summary["m.invited_member_count"];
// TODO: we could easily calculate if all members are available here and set hasFetchedMembers?
// so we can avoid calling /members...
// we'd need to do a count query in the roomMembers store though ...
if (heroes && Array.isArray(heroes)) {
data = data.cloneIfNeeded();
data.heroes = heroes;
@ -136,7 +141,7 @@ class SummaryData {
this.lastMessageBody = copy ? copy.lastMessageBody : null;
this.lastMessageTimestamp = copy ? copy.lastMessageTimestamp : null;
this.isUnread = copy ? copy.isUnread : false;
this.isEncrypted = copy ? copy.isEncrypted : false;
this.encryption = copy ? copy.encryption : null;
this.isDirectMessage = copy ? copy.isDirectMessage : false;
this.membership = copy ? copy.membership : null;
this.inviteCount = copy ? copy.inviteCount : 0;
@ -144,6 +149,7 @@ class SummaryData {
this.heroes = copy ? copy.heroes : null;
this.canonicalAlias = copy ? copy.canonicalAlias : null;
this.hasFetchedMembers = copy ? copy.hasFetchedMembers : false;
this.isTrackingMembers = copy ? copy.isTrackingMembers : false;
this.lastPaginationToken = copy ? copy.lastPaginationToken : null;
this.avatarUrl = copy ? copy.avatarUrl : null;
this.notificationCount = copy ? copy.notificationCount : 0;
@ -190,6 +196,11 @@ export class RoomSummary {
return this._data.heroes;
}
get encryption() {
return this._data.encryption;
}
// whether the room name should be determined with Heroes
get needsHeroes() {
return needsHeroes(this._data);
}
@ -230,6 +241,10 @@ export class RoomSummary {
return this._data.hasFetchedMembers;
}
get isTrackingMembers() {
return this._data.isTrackingMembers;
}
get lastPaginationToken() {
return this._data.lastPaginationToken;
}
@ -254,6 +269,13 @@ export class RoomSummary {
return data;
}
writeIsTrackingMembers(value, txn) {
const data = new SummaryData(this._data);
data.isTrackingMembers = value;
txn.roomSummary.set(data.serialize());
return data;
}
writeSync(roomResponse, membership, isInitialSync, isTimelineOpen, txn) {
// clear cloned flag, so cloneIfNeeded makes a copy and
// this._data is not modified if any field is changed.

21
src/matrix/room/common.js Normal file
View file

@ -0,0 +1,21 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export function getPrevContentFromStateEvent(event) {
// where to look for prev_content is a bit of a mess,
// see https://matrix.to/#/!NasysSDfxKxZBzJJoE:matrix.org/$DvrAbZJiILkOmOIuRsNoHmh2v7UO5CWp_rYhlGk34fQ?via=matrix.org&via=pixie.town&via=amorgan.xyz
return event.unsigned?.prev_content || event.prev_content;
}

View file

@ -42,11 +42,11 @@ export class Heroes {
/**
* @param {string[]} newHeroes array of user ids
* @param {RoomMember[]} changedMembers array of changed members in this sync
* @param {Map<string, MemberChange>} memberChanges map of changed memberships
* @param {Transaction} txn
* @return {Promise}
*/
async calculateChanges(newHeroes, changedMembers, txn) {
async calculateChanges(newHeroes, memberChanges, txn) {
const updatedHeroMembers = new Map();
const removedUserIds = [];
// remove non-present members
@ -56,9 +56,9 @@ export class Heroes {
}
}
// update heroes with synced member changes
for (const member of changedMembers) {
if (this._members.has(member.userId) || newHeroes.indexOf(member.userId) !== -1) {
updatedHeroMembers.set(member.userId, member);
for (const [userId, memberChange] of memberChanges.entries()) {
if (this._members.has(userId) || newHeroes.indexOf(userId) !== -1) {
updatedHeroMembers.set(userId, memberChange.member);
}
}
// load member for new heroes from storage

View file

@ -26,9 +26,9 @@ export class MemberList {
this._retentionCount = 1;
}
afterSync(updatedMembers) {
for (const member of updatedMembers) {
this._members.add(member.userId, member);
afterSync(memberChanges) {
for (const [userId, memberChange] of memberChanges.entries()) {
this._members.add(userId, memberChange.member);
}
}

View file

@ -1,5 +1,4 @@
/*
Copyright 2020 Bruno Windels <bruno@windels.cloud>
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
@ -15,6 +14,8 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
import {getPrevContentFromStateEvent} from "../common.js";
export const EVENT_TYPE = "m.room.member";
export class RoomMember {
@ -28,7 +29,7 @@ export class RoomMember {
return;
}
const content = memberEvent.content;
const prevContent = memberEvent.unsigned?.prev_content;
const prevContent = getPrevContentFromStateEvent(memberEvent);
const membership = content?.membership;
// fall back to prev_content for these as synapse doesn't (always?)
// put them on content for "leave" memberships
@ -45,7 +46,7 @@ export class RoomMember {
if (typeof userId !== "string") {
return;
}
const content = memberEvent.unsigned?.prev_content
const content = getPrevContentFromStateEvent(memberEvent);
return this._validateAndCreateMember(roomId, userId,
content?.membership,
content?.displayname,
@ -66,6 +67,10 @@ export class RoomMember {
});
}
get membership() {
return this._data.membership;
}
/**
* @return {String?} the display name, if any
*/
@ -99,3 +104,34 @@ export class RoomMember {
return this._data;
}
}
export class MemberChange {
constructor(roomId, memberEvent) {
this._roomId = roomId;
this._memberEvent = memberEvent;
this._member = null;
}
get member() {
if (!this._member) {
this._member = RoomMember.fromMemberEvent(this._roomId, this._memberEvent);
}
return this._member;
}
get roomId() {
return this._roomId;
}
get userId() {
return this._memberEvent.state_key;
}
get previousMembership() {
return getPrevContentFromStateEvent(this._memberEvent)?.membership;
}
get membership() {
return this._memberEvent.content?.membership;
}
}

View file

@ -31,7 +31,7 @@ async function fetchMembers({summary, roomId, hsApi, storage, setChangedMembersM
const changedMembersDuringSync = new Map();
setChangedMembersMap(changedMembersDuringSync);
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response;
const memberResponse = await hsApi.members(roomId, {at: summary.lastPaginationToken}).response();
const txn = await storage.readWriteTxn([
storage.storeNames.roomSummary,

View file

@ -15,6 +15,7 @@ limitations under the License.
*/
import {BaseEntry} from "./BaseEntry.js";
import {getPrevContentFromStateEvent} from "../../common.js";
export class EventEntry extends BaseEntry {
constructor(eventEntry, fragmentIdComparer) {
@ -35,7 +36,7 @@ export class EventEntry extends BaseEntry {
}
get prevContent() {
return this._eventEntry.event.unsigned?.prev_content;
return getPrevContentFromStateEvent(this._eventEntry.event);
}
get eventType() {

View file

@ -18,7 +18,7 @@ import {EventKey} from "../EventKey.js";
import {EventEntry} from "../entries/EventEntry.js";
import {FragmentBoundaryEntry} from "../entries/FragmentBoundaryEntry.js";
import {createEventEntry} from "./common.js";
import {RoomMember, EVENT_TYPE as MEMBER_EVENT_TYPE} from "../../members/RoomMember.js";
import {MemberChange, RoomMember, EVENT_TYPE as MEMBER_EVENT_TYPE} from "../../members/RoomMember.js";
// Synapse bug? where the m.room.create event appears twice in sync response
// when first syncing the room
@ -102,13 +102,13 @@ export class SyncWriter {
if (event.type === MEMBER_EVENT_TYPE) {
const userId = event.state_key;
if (userId) {
const member = RoomMember.fromMemberEvent(this._roomId, event);
if (member) {
const memberChange = new MemberChange(this._roomId, event);
if (memberChange.member) {
// as this is sync, we can just replace the member
// if it is there already
txn.roomMembers.set(member.serialize());
txn.roomMembers.set(memberChange.member.serialize());
return memberChange;
}
return member;
}
} else {
txn.roomState.set(this._roomId, event);
@ -116,22 +116,22 @@ export class SyncWriter {
}
_writeStateEvents(roomResponse, txn) {
const changedMembers = [];
const memberChanges = new Map();
// persist state
const {state} = roomResponse;
if (Array.isArray(state?.events)) {
for (const event of state.events) {
const member = this._writeStateEvent(event, txn);
if (member) {
changedMembers.push(member);
const memberChange = this._writeStateEvent(event, txn);
if (memberChange) {
memberChanges.set(memberChange.userId, memberChange);
}
}
}
return changedMembers;
return memberChanges;
}
async _writeTimeline(entries, timeline, currentKey, txn) {
const changedMembers = [];
const memberChanges = new Map();
if (timeline.events) {
const events = deduplicateEvents(timeline.events);
for(const event of events) {
@ -148,14 +148,14 @@ export class SyncWriter {
// process live state events first, so new member info is available
if (typeof event.state_key === "string") {
const member = this._writeStateEvent(event, txn);
if (member) {
changedMembers.push(member);
const memberChange = this._writeStateEvent(event, txn);
if (memberChange) {
memberChanges.set(memberChange.userId, memberChange);
}
}
}
}
return {currentKey, changedMembers};
return {currentKey, memberChanges};
}
async _findMemberData(userId, events, txn) {
@ -198,12 +198,14 @@ export class SyncWriter {
}
// important this happens before _writeTimeline so
// members are available in the transaction
const changedMembers = this._writeStateEvents(roomResponse, txn);
const memberChanges = this._writeStateEvents(roomResponse, txn);
const timelineResult = await this._writeTimeline(entries, timeline, currentKey, txn);
currentKey = timelineResult.currentKey;
changedMembers.push(...timelineResult.changedMembers);
return {entries, newLiveKey: currentKey, changedMembers};
// merge member changes from state and timeline, giving precedence to the latter
for (const [userId, memberChange] of timelineResult.memberChanges.entries()) {
memberChanges.set(userId, memberChange);
}
return {entries, newLiveKey: currentKey, memberChanges};
}
afterSync(newLiveKey) {

View file

@ -22,6 +22,8 @@ export const STORE_NAMES = Object.freeze([
"timelineEvents",
"timelineFragments",
"pendingEvents",
"userIdentities",
"deviceIdentities",
]);
export const STORE_MAP = Object.freeze(STORE_NAMES.reduce((nameMap, name) => {

View file

@ -105,6 +105,13 @@ export class QueryTarget {
return maxKey;
}
async iterateKeys(range, callback) {
const cursor = this._target.openKeyCursor(range, "next");
await iterateCursor(cursor, (_, key) => {
return {done: callback(key)};
});
}
/**
* Checks if a given set of keys exist.
* Calls `callback(key, found)` for each key in `keys`, in key sorting order (or reversed if backwards=true).

View file

@ -24,6 +24,8 @@ import {RoomStateStore} from "./stores/RoomStateStore.js";
import {RoomMemberStore} from "./stores/RoomMemberStore.js";
import {TimelineFragmentStore} from "./stores/TimelineFragmentStore.js";
import {PendingEventStore} from "./stores/PendingEventStore.js";
import {UserIdentityStore} from "./stores/UserIdentityStore.js";
import {DeviceIdentityStore} from "./stores/DeviceIdentityStore.js";
export class Transaction {
constructor(txn, allowedStoreNames) {
@ -81,6 +83,14 @@ export class Transaction {
return this._store("pendingEvents", idbStore => new PendingEventStore(idbStore));
}
get userIdentities() {
return this._store("userIdentities", idbStore => new UserIdentityStore(idbStore));
}
get deviceIdentities() {
return this._store("deviceIdentities", idbStore => new DeviceIdentityStore(idbStore));
}
complete() {
return txnAsPromise(this._txn);
}

View file

@ -9,6 +9,7 @@ export const schema = [
createInitialStores,
createMemberStore,
migrateSession,
createIdentityStores,
];
// TODO: how to deal with git merge conflicts of this array?
@ -46,7 +47,7 @@ async function createMemberStore(db, txn) {
}
});
}
//v3
async function migrateSession(db, txn) {
const session = txn.objectStore("session");
try {
@ -64,3 +65,8 @@ async function migrateSession(db, txn) {
console.error("could not migrate session", err.stack);
}
}
//v4
function createIdentityStores(db) {
db.createObjectStore("userIdentities", {keyPath: "userId"});
db.createObjectStore("deviceIdentities", {keyPath: "key"});
}

View file

@ -0,0 +1,41 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
function encodeKey(userId, deviceId) {
return `${userId}|${deviceId}`;
}
export class DeviceIdentityStore {
constructor(store) {
this._store = store;
}
getAllForUserId(userId) {
const range = IDBKeyRange.lowerBound(encodeKey(userId, ""));
return this._store.selectWhile(range, device => {
return device.userId === userId;
});
}
get(userId, deviceId) {
return this._store.get(encodeKey(userId, deviceId));
}
set(deviceIdentity) {
deviceIdentity.key = encodeKey(deviceIdentity.userId, deviceIdentity.deviceId);
return this._store.put(deviceIdentity);
}
}

View file

@ -19,6 +19,11 @@ function encodeKey(roomId, userId) {
return `${roomId}|${userId}`;
}
function decodeKey(key) {
const [roomId, userId] = key.split("|");
return {roomId, userId};
}
// no historical members
export class RoomMemberStore {
constructor(roomMembersStore) {
@ -40,4 +45,19 @@ export class RoomMemberStore {
return member.roomId === roomId;
});
}
async getAllUserIds(roomId) {
const userIds = [];
const range = IDBKeyRange.lowerBound(encodeKey(roomId, ""));
await this._roomMembersStore.iterateKeys(range, key => {
const decodedKey = decodeKey(key);
// prevent running into the next room
if (decodedKey.roomId === roomId) {
userIds.push(decodedKey.userId);
return false; // fetch more
}
return true; // done
});
return userIds;
}
}

View file

@ -0,0 +1,33 @@
/*
Copyright 2020 The Matrix.org Foundation C.I.C.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
export class UserIdentityStore {
constructor(store) {
this._store = store;
}
get(userId) {
return this._store.get(userId);
}
set(userIdentity) {
this._store.put(userIdentity);
}
remove(userId) {
return this._store.delete(userId);
}
}

View file

@ -70,6 +70,10 @@ export class ObservableMap extends BaseObservableMap {
[Symbol.iterator]() {
return this._values.entries();
}
values() {
return this._values.values();
}
}
export function tests() {