Write sync interactions from scratch

Doesn't work because we lack `room.preparation` objects when
writing sync results...
This commit is contained in:
Kegan Dougal 2021-11-26 18:25:20 +00:00
parent 81fddc008c
commit 1ef6963018
3 changed files with 136 additions and 89 deletions

View file

@ -126,6 +126,8 @@ export class Sync3 {
// Hydrogen only has 1 list currently (no DM section) so we only need 1 range
this.ranges = [[0, 99]];
this.roomIndexToRoomId = {};
console.log("session", session);
console.log("storage", storage);
}
// Start syncing. Probably call this at startup once you have an access_token.
@ -139,6 +141,17 @@ export class Sync3 {
this.syncLoop(undefined);
}
stop() {
if (this.status.get() === SyncStatus.Stopped) {
return;
}
this.status.set(SyncStatus.Stopped);
if (this.currentRequest) {
this.currentRequest.abort();
this.currentRequest = null;
}
}
// The purpose of this function is to do repeated /sync calls and call processResponse. It doesn't
// know or care how to handle the response, it only cares about the position and retries.
private async syncLoop(pos?: number) {
@ -181,7 +194,7 @@ export class Sync3 {
backoffCounter = 0;
// we have to wait for some parts of the response to be saved to disk before we can go on
// hence the await.
await this.processResponse(resp);
await this.processResponse(isFirstSync, resp);
// increment our position to tell the server we got everything, similar to using ?since= in v2
pos = resp.pos;
if (isFirstSync) {
@ -211,37 +224,49 @@ export class Sync3 {
}
}
// TODO: Handle room updates
// TODO: atomically swap indexToRoom
// The purpose of this function is process the /sync response and atomically update sync state.
private async processResponse(resp: Sync3Response) {
private async processResponse(isFirstSync: boolean, resp: Sync3Response) {
console.log(resp);
let { indexToRoom, updates } = this.processOps(resp.ops);
// process the room updates: new rooms, new timeline events, updated room names, that sort of thing.
// we're kinda forced to use the logger as most functions expect an ILogItem
await this.logger.run("sync", async log => {
const syncTxn = await this.openSyncTxn();
try {
// this.session.writeSync() // write account data, device lists, etc.
await Promise.all(updates.map(async (roomResponse) => {
// get or create a room
let room = this.session.rooms.get(roomResponse.room_id);
if (!room) {
room = this.session.createRoom(roomResponse.room_id);
}
room.writeSync(
roomResponse, isFirstSync, {}, syncTxn, log
)
}))
} catch (err) {
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
syncTxn.abort(log);
throw syncTxn.getCause(err);
}
await syncTxn.complete(log);
this.prepareResponse(updates);
// update in-memory structs
this.session.afterSync(); // ???
updates.forEach((roomResponse) => {
// get room then afterSync() ???
});
this.session.applyRoomCollectionChangesAfterSync(null, roomStates, null);
});
/*
try {
await log.wrap("prepare", log => this._prepareSync(sessionState, roomStates, response, log));
await log.wrap("afterPrepareSync", log => Promise.all(roomStates.map(rs => {
return rs.room.afterPrepareSync(rs.preparation, log);
})));
await log.wrap("write", async log => this._writeSync(
sessionState, inviteStates, roomStates, archivedRoomStates,
response, syncFilterId, isInitialSync, log));
} finally {
sessionState.dispose();
}
// sync txn comitted, emit updates and apply changes to in-memory state
log.wrap("after", log => this._afterSync(
sessionState, inviteStates, roomStates, archivedRoomStates, log));
*/
// instantly move all the rooms to their new positions
this.roomIndexToRoomId = indexToRoom;
}
// The purpose of this function is to process the response `ops` array by modifying the current
@ -368,60 +393,35 @@ export class Sync3 {
};
}
private async prepareResponse(updates: RoomResponse[]) {
/*
// IF WE HAVE TO-DEVICE MSGS THEN this.session.obtainSyncLock(response) (which checks response.to_device.events)
const prepareTxn = await this.openPrepareSyncTxn();
// IF WE HAVE TO-DEVICE MSGS THEN this.session.prepareSync(syncResponse, lock, txn, log) => {newKeysByRoom}
// purpose: add any rooms with new keys but no sync response to the list of rooms to be synced
await Promise.all(updates.map(async (roomResponse) => {
let storedRoom = this.session.rooms.get(roomResponse.room_id);
if (!storedRoom) {
storedRoom = this.session.createRoom(roomResponse.room_id);
} else {
// if previously joined and we still have the timeline for it,
// this loads the syncWriter at the correct position to continue writing the timeline
await storedRoom.load(null, prepareTxn, this.logger);
}
return storedRoom.prepareSync(
rs.roomResponse, rs.membership, rs.invite, newKeys, prepareTxn, log)
}));
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
await prepareTxn.complete();
await Promise.all(updates.map(rs => {
return rs.room.afterPrepareSync(rs.preparation, log);
})); */
}
stop() {
if (this.status.get() === SyncStatus.Stopped) {
return;
}
this.status.set(SyncStatus.Stopped);
if (this.currentRequest) {
this.currentRequest.abort();
this.currentRequest = null;
}
}
// storage locking shenanigans here
openPrepareSyncTxn() {
private openSyncTxn() {
const storeNames = this.storage.storeNames;
return this.storage.readTxn([
return this.storage.readWriteTxn([
storeNames.session,
storeNames.roomSummary,
storeNames.archivedRoomSummary,
storeNames.invites,
storeNames.roomState,
storeNames.roomMembers,
storeNames.timelineEvents,
storeNames.timelineRelations,
storeNames.timelineFragments,
storeNames.pendingEvents,
storeNames.userIdentities,
storeNames.groupSessionDecryptions,
storeNames.deviceIdentities,
// to discard outbound session when somebody leaves a room
// and to create room key messages when somebody joins
storeNames.outboundGroupSessions,
storeNames.operations,
storeNames.accountData,
// to decrypt and store new room keys
storeNames.olmSessions,
storeNames.inboundGroupSessions,
// to read fragments when loading sync writer when rejoining archived room
storeNames.timelineFragments,
// to read fragments when loading sync writer when rejoining archived room
// to read events that can now be decrypted
storeNames.timelineEvents,
]);
}
}
const sleep = (ms) => {
const sleep = (ms: number) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
@ -430,7 +430,7 @@ const sleep = (ms) => {
// a b c d e f
// a b c d _ f
// e a b c d f <--- c=3 is wrong as we are not tracking it, ergo we need to see if `i` is in range else drop it
const indexInRange = (ranges, i) => {
const indexInRange = (ranges: number[][], i: number) => {
let isInRange = false;
ranges.forEach((r) => {
if (r[0] <= i && i <= r[1]) {

View file

@ -54,6 +54,7 @@ export class Room extends BaseRoom {
return false;
}
// {}, string, bool?, [], txn, log
async prepareSync(roomResponse, membership, invite, newKeys, txn, log) {
log.set("id", this.id);
if (newKeys) {

View file

@ -15,17 +15,58 @@
<input id="tokensubmit" type="button" value="Start" />
<div id="session-status" class="hydrogen" style="height: 500px;"></div>
<script id="main" type="module">
// core hydrogen imports
import {Platform} from "./platform/web/Platform";
import {createNavigation, createRouter} from "./domain/navigation/index.js";
import {StorageFactory} from "./matrix/storage/idb/StorageFactory";
import {Session} from "./matrix/Session.js";
import {ObservableMap} from "./observable/index.js";
// left panel specific
import {LeftPanelView} from "./platform/web/ui/session/leftpanel/LeftPanelView.js";
import {LeftPanelViewModel} from "./domain/session/leftpanel/LeftPanelViewModel";
import {Navigation} from "./domain/navigation/Navigation.js";
import {URLRouter} from "./domain/navigation/URLRouter.js";
import {parseUrlPath, stringifyPath} from "./domain/navigation/index.js";
import {ObservableMap} from "./observable/index.js";
import {History} from "./platform/web/dom/History.js";
// matrix specific bits
import {HomeServerApi} from "./matrix/net/HomeServerApi.js";
import {Sync3} from "./matrix/Sync3";
import {xhrRequest} from "./platform/web/dom/request/xhr.js";
const navigation = new Navigation(() => false);
const sleep = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
// dependency inject everything...
const platform = new Platform(document.body, {
// worker: "src/worker.js",
downloadSandbox: "assets/download-sandbox.html",
defaultHomeServer: "localhost",
// NOTE: uncomment this if you want the service worker for local development
// serviceWorker: "sw.js",
// NOTE: provide push config if you want push notifs for local development
// see assets/config.json for what the config looks like
// push: {...},
olm: {
wasm: "lib/olm/olm.wasm",
legacyBundle: "lib/olm/olm_legacy.js",
wasmBundle: "lib/olm/olm.js",
}
}, null, {development: true});
const navigation = createNavigation();
platform.setNavigation(navigation);
const urlRouter = createRouter({navigation, history: platform.history});
urlRouter.attach();
const hydrogenSessionID = "demo";
const factory = new StorageFactory(null); // TODO: this needs to be a fake idb
let storage;
const loadStorage = async () => {
await platform.logger.run("login", async log => {
storage = await factory.create(hydrogenSessionID, log);
});
};
await loadStorage();
// make some placeholder rooms
const rooms = new ObservableMap();
for (let i = 0; i < 1000; i++) {
let r = {
@ -35,21 +76,15 @@
};
rooms.add(r.id, r);
}
// make a left panel
const leftPanel = new LeftPanelViewModel({
invites: new ObservableMap(),
rooms: rooms,
navigation: navigation,
urlCreator: new URLRouter({
navigation: navigation,
history: new History(),
stringifyPath: stringifyPath,
parseUrlPath: parseUrlPath,
}),
urlCreator: urlRouter,
platform: null,
});
const sleep = (ms) => {
return new Promise((resolve) => setTimeout(resolve, ms));
};
leftPanel.loadRoomRange = async (range) => {
// pretend to load something
await sleep(200);
@ -71,15 +106,26 @@
const view = new LeftPanelView(leftPanel);
document.getElementById("session-status").appendChild(view.mount());
// kick off a sync v3 loop when an access token is provided
document.getElementById("tokensubmit").addEventListener("click", () => {
const accessToken = document.getElementById("tokeninput").value;
const sessionId = new Date().getTime() + "";
const hs = new HomeServerApi({
homeserver: "http://localhost:8008",
accessToken: accessToken,
request: xhrRequest,
request: platform.request,
});
const syncer = new Sync3(hs, null, null, null);
const session = new Session({
storage: storage,
hsApi: hs,
sessionInfo: {
id: hydrogenSessionID,
deviceId: null,
userId: null,
homeserver: "http://localhost:8008",
},
});
const syncer = new Sync3(hs, session, storage, platform.logger);
syncer.start();
});