forked from mystiq/hydrogen-web
add some logging to sync
This commit is contained in:
parent
07f8500d29
commit
704708bd6c
3 changed files with 20 additions and 14 deletions
|
@ -184,7 +184,7 @@ export class SessionContainer {
|
||||||
await this._session.createIdentity();
|
await this._session.createIdentity();
|
||||||
}
|
}
|
||||||
|
|
||||||
this._sync = new Sync({hsApi: this._requestScheduler.hsApi, storage: this._storage, session: this._session});
|
this._sync = new Sync({hsApi: this._requestScheduler.hsApi, storage: this._storage, session: this._session, logger: this._platform.logger});
|
||||||
// notify sync and session when back online
|
// notify sync and session when back online
|
||||||
this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => {
|
this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => {
|
||||||
if (state === ConnectionStatus.Online) {
|
if (state === ConnectionStatus.Online) {
|
||||||
|
|
|
@ -56,8 +56,9 @@ function timelineIsEmpty(roomResponse) {
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
export class Sync {
|
export class Sync {
|
||||||
constructor({hsApi, session, storage}) {
|
constructor({hsApi, session, storage, logger}) {
|
||||||
this._hsApi = hsApi;
|
this._hsApi = hsApi;
|
||||||
|
this._logger = logger;
|
||||||
this._session = session;
|
this._session = session;
|
||||||
this._storage = storage;
|
this._storage = storage;
|
||||||
this._currentRequest = null;
|
this._currentRequest = null;
|
||||||
|
@ -108,7 +109,7 @@ export class Sync {
|
||||||
// for us. We do that by calling it with a zero timeout until it
|
// for us. We do that by calling it with a zero timeout until it
|
||||||
// doesn't give us any more to_device messages.
|
// doesn't give us any more to_device messages.
|
||||||
const timeout = this._status.get() === SyncStatus.Syncing ? INCREMENTAL_TIMEOUT : 0;
|
const timeout = this._status.get() === SyncStatus.Syncing ? INCREMENTAL_TIMEOUT : 0;
|
||||||
const syncResult = await this._syncRequest(syncToken, timeout);
|
const syncResult = await this._logger.run("sync", log => this._syncRequest(syncToken, timeout, log));
|
||||||
syncToken = syncResult.syncToken;
|
syncToken = syncResult.syncToken;
|
||||||
roomStates = syncResult.roomStates;
|
roomStates = syncResult.roomStates;
|
||||||
sessionChanges = syncResult.sessionChanges;
|
sessionChanges = syncResult.sessionChanges;
|
||||||
|
@ -169,28 +170,31 @@ export class Sync {
|
||||||
await Promise.all(roomsPromises.concat(sessionPromise));
|
await Promise.all(roomsPromises.concat(sessionPromise));
|
||||||
}
|
}
|
||||||
|
|
||||||
async _syncRequest(syncToken, timeout) {
|
async _syncRequest(syncToken, timeout, log) {
|
||||||
let {syncFilterId} = this._session;
|
let {syncFilterId} = this._session;
|
||||||
if (typeof syncFilterId !== "string") {
|
if (typeof syncFilterId !== "string") {
|
||||||
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}});
|
this._currentRequest = this._hsApi.createFilter(this._session.user.id, {room: {state: {lazy_load_members: true}}});
|
||||||
syncFilterId = (await this._currentRequest.response()).filter_id;
|
syncFilterId = (await this._currentRequest.response()).filter_id;
|
||||||
}
|
}
|
||||||
const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests
|
const totalRequestTimeout = timeout + (80 * 1000); // same as riot-web, don't get stuck on wedged long requests
|
||||||
this._currentRequest = this._hsApi.sync(syncToken, syncFilterId, timeout, {timeout: totalRequestTimeout});
|
this._currentRequest = this._hsApi.sync(syncToken, syncFilterId, timeout, {timeout: totalRequestTimeout, log});
|
||||||
const response = await this._currentRequest.response();
|
const response = await this._currentRequest.response();
|
||||||
|
|
||||||
const isInitialSync = !syncToken;
|
const isInitialSync = !syncToken;
|
||||||
syncToken = response.next_batch;
|
syncToken = response.next_batch;
|
||||||
|
log.set("isInitialSync", isInitialSync);
|
||||||
|
log.set("syncToken", log.anonymize(syncToken));
|
||||||
|
log.set("status", this._status.get());
|
||||||
|
|
||||||
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
|
const roomStates = this._parseRoomsResponse(response.rooms, isInitialSync);
|
||||||
await this._prepareRooms(roomStates);
|
await log.wrap("prepare rooms", log => this._prepareRooms(roomStates, log));
|
||||||
let sessionChanges;
|
let sessionChanges;
|
||||||
const syncTxn = this._openSyncTxn();
|
const syncTxn = this._openSyncTxn();
|
||||||
try {
|
try {
|
||||||
sessionChanges = await this._session.writeSync(response, syncFilterId, syncTxn);
|
sessionChanges = await log.wrap("session.writeSync", log => this._session.writeSync(response, syncFilterId, syncTxn, log));
|
||||||
await Promise.all(roomStates.map(async rs => {
|
await Promise.all(roomStates.map(async rs => {
|
||||||
console.log(` * applying sync response to room ${rs.room.id} ...`);
|
rs.changes = await log.wrap("room.writeSync", log => rs.room.writeSync(
|
||||||
rs.changes = await rs.room.writeSync(
|
rs.roomResponse, isInitialSync, rs.preparation, syncTxn, log));
|
||||||
rs.roomResponse, isInitialSync, rs.preparation, syncTxn);
|
|
||||||
}));
|
}));
|
||||||
} catch(err) {
|
} catch(err) {
|
||||||
// avoid corrupting state by only
|
// avoid corrupting state by only
|
||||||
|
@ -232,10 +236,10 @@ export class Sync {
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _prepareRooms(roomStates) {
|
async _prepareRooms(roomStates, log) {
|
||||||
const prepareTxn = this._openPrepareSyncTxn();
|
const prepareTxn = this._openPrepareSyncTxn();
|
||||||
await Promise.all(roomStates.map(async rs => {
|
await Promise.all(roomStates.map(async rs => {
|
||||||
rs.preparation = await rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn);
|
rs.preparation = await log.wrap("room.prepareSync", log => rs.room.prepareSync(rs.roomResponse, rs.membership, prepareTxn, log));
|
||||||
}));
|
}));
|
||||||
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
|
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
|
||||||
await prepareTxn.complete();
|
await prepareTxn.complete();
|
||||||
|
|
|
@ -175,7 +175,8 @@ export class Room extends EventEmitter {
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
async prepareSync(roomResponse, membership, txn) {
|
async prepareSync(roomResponse, membership, txn, log) {
|
||||||
|
log.set("roomId", this.id);
|
||||||
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
const summaryChanges = this._summary.data.applySyncResponse(roomResponse, membership)
|
||||||
let roomEncryption = this._roomEncryption;
|
let roomEncryption = this._roomEncryption;
|
||||||
// encryption is enabled in this sync
|
// encryption is enabled in this sync
|
||||||
|
@ -211,7 +212,8 @@ export class Room extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** @package */
|
/** @package */
|
||||||
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn) {
|
async writeSync(roomResponse, isInitialSync, {summaryChanges, decryptChanges, roomEncryption}, txn, log) {
|
||||||
|
log.set("roomId", this.id);
|
||||||
const {entries, newLiveKey, memberChanges} =
|
const {entries, newLiveKey, memberChanges} =
|
||||||
await this._syncWriter.writeSync(roomResponse, txn);
|
await this._syncWriter.writeSync(roomResponse, txn);
|
||||||
if (decryptChanges) {
|
if (decryptChanges) {
|
||||||
|
|
Loading…
Reference in a new issue