2020-08-05 22:08:55 +05:30
/ *
Copyright 2020 Bruno Windels < bruno @ windels . cloud >
2020-08-17 17:43:23 +05:30
Copyright 2020 The Matrix . org Foundation C . I . C .
2020-08-05 22:08:55 +05:30
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2020-04-04 21:04:46 +05:30
import { AbortError } from "./error.js" ;
2020-04-21 00:56:39 +05:30
import { ObservableValue } from "../observable/ObservableValue.js" ;
import { createEnum } from "../utils/enum.js" ;
2018-12-21 19:05:24 +05:30
2019-02-11 01:55:29 +05:30
const INCREMENTAL _TIMEOUT = 30000 ;
const SYNC _EVENT _LIMIT = 10 ;
2018-12-21 19:05:24 +05:30
2020-04-19 23:22:26 +05:30
export const SyncStatus = createEnum (
"InitialSync" ,
"CatchupSync" ,
"Syncing" ,
"Stopped"
) ;
2020-08-17 17:43:23 +05:30
function timelineIsEmpty ( roomResponse ) {
try {
2020-08-19 15:06:43 +05:30
const events = roomResponse ? . timeline ? . events ;
return Array . isArray ( events ) && events . length === 0 ;
2020-08-17 17:43:23 +05:30
} catch ( err ) {
return true ;
}
}
2020-09-10 15:41:43 +05:30
/ * *
* Sync steps in js - pseudocode :
* ` ` ` js
2020-09-23 17:56:14 +05:30
* // can only read some stores
* const preparation = await room . prepareSync ( roomResponse , membership , prepareTxn ) ;
* // can do async work that is not related to storage (such as decryption)
* await room . afterPrepareSync ( preparation ) ;
2020-09-10 15:41:43 +05:30
* // writes and calculates changes
2020-09-23 17:56:14 +05:30
* const changes = await room . writeSync ( roomResponse , isInitialSync , preparation , syncTxn ) ;
2020-09-10 15:41:43 +05:30
* // applies and emits changes once syncTxn is committed
2020-09-23 21:36:16 +05:30
* room . afterSync ( changes ) ;
2020-09-10 15:41:43 +05:30
* if ( room . needsAfterSyncCompleted ( changes ) ) {
* // can do network requests
* await room . afterSyncCompleted ( changes ) ;
* }
* ` ` `
* /
2020-04-20 23:17:45 +05:30
export class Sync {
2019-09-08 13:49:16 +05:30
constructor ( { hsApi , session , storage } ) {
2019-05-12 23:56:46 +05:30
this . _hsApi = hsApi ;
this . _session = session ;
this . _storage = storage ;
this . _currentRequest = null ;
2020-04-19 23:22:26 +05:30
this . _status = new ObservableValue ( SyncStatus . Stopped ) ;
this . _error = null ;
}
get status ( ) {
return this . _status ;
2019-05-12 23:56:46 +05:30
}
2019-06-16 14:24:16 +05:30
2020-04-19 23:22:26 +05:30
/** the error that made the sync stop */
get error ( ) {
return this . _error ;
2019-06-16 14:24:16 +05:30
}
2020-04-19 23:22:26 +05:30
start ( ) {
// not already syncing?
if ( this . _status . get ( ) !== SyncStatus . Stopped ) {
2019-05-12 23:56:46 +05:30
return ;
}
2020-09-21 17:25:35 +05:30
this . _error = null ;
2019-05-12 23:56:46 +05:30
let syncToken = this . _session . syncToken ;
2020-04-19 23:22:26 +05:30
if ( syncToken ) {
this . _status . set ( SyncStatus . CatchupSync ) ;
} else {
this . _status . set ( SyncStatus . InitialSync ) ;
2019-05-12 23:56:46 +05:30
}
this . _syncLoop ( syncToken ) ;
}
2018-12-21 19:05:24 +05:30
2019-05-12 23:56:46 +05:30
async _syncLoop ( syncToken ) {
// if syncToken is falsy, it will first do an initial sync ...
2020-04-19 23:22:26 +05:30
while ( this . _status . get ( ) !== SyncStatus . Stopped ) {
2020-09-10 15:41:43 +05:30
let roomStates ;
2020-09-24 14:22:56 +05:30
let sessionChanges ;
2019-05-12 23:56:46 +05:30
try {
console . log ( ` starting sync request with since ${ syncToken } ... ` ) ;
2020-09-21 21:27:01 +05:30
// unless we are happily syncing already, we want the server to return
// as quickly as possible, even if there are no events queued. This
// serves two purposes:
//
// * When the connection dies, we want to know asap when it comes back,
// so that we can hide the error from the user. (We don't want to
// have to wait for an event or a timeout).
//
// * We want to know if the server has any to_device messages queued up
// for us. We do that by calling it with a zero timeout until it
// doesn't give us any more to_device messages.
const timeout = this . _status . get ( ) === SyncStatus . Syncing ? INCREMENTAL _TIMEOUT : 0 ;
2020-09-21 21:23:29 +05:30
const syncResult = await this . _syncRequest ( syncToken , timeout ) ;
2020-09-08 18:07:24 +05:30
syncToken = syncResult . syncToken ;
2020-09-10 15:41:43 +05:30
roomStates = syncResult . roomStates ;
2020-09-24 14:22:56 +05:30
sessionChanges = syncResult . sessionChanges ;
2020-09-21 21:27:01 +05:30
// initial sync or catchup sync
if ( this . _status . get ( ) !== SyncStatus . Syncing && syncResult . hadToDeviceMessages ) {
this . _status . set ( SyncStatus . CatchupSync ) ;
} else {
this . _status . set ( SyncStatus . Syncing ) ;
}
2019-05-12 23:56:46 +05:30
} catch ( err ) {
2020-09-25 14:14:29 +05:30
// retry same request on timeout
if ( err . name === "ConnectionError" && err . isTimeout ) {
// don't run afterSyncCompleted
continue ;
}
2020-09-25 14:14:19 +05:30
if ( err . name !== AbortError ) {
2020-09-18 16:41:18 +05:30
console . warn ( "stopping sync because of error" ) ;
console . error ( err ) ;
2020-04-19 23:22:26 +05:30
this . _error = err ;
2019-05-12 23:56:46 +05:30
}
2020-09-28 19:36:19 +05:30
this . _status . set ( SyncStatus . Stopped ) ;
2019-05-12 23:56:46 +05:30
}
2020-09-21 17:25:35 +05:30
if ( this . _status . get ( ) !== SyncStatus . Stopped ) {
2020-09-24 14:22:56 +05:30
// TODO: if we're not going to run this phase in parallel with the next
// sync request (because this causes OTKs to be uploaded twice)
// should we move this inside _syncRequest?
// Alternatively, we can try to fix the OTK upload issue while still
// running in parallel.
await this . _runAfterSyncCompleted ( sessionChanges , roomStates ) ;
2020-09-08 18:07:24 +05:30
}
}
}
2020-09-24 14:22:56 +05:30
async _runAfterSyncCompleted ( sessionChanges , roomStates ) {
2020-09-21 21:27:01 +05:30
const isCatchupSync = this . _status . get ( ) === SyncStatus . CatchupSync ;
2020-09-08 18:07:24 +05:30
const sessionPromise = ( async ( ) => {
try {
2020-09-24 14:22:56 +05:30
await this . _session . afterSyncCompleted ( sessionChanges , isCatchupSync ) ;
2020-09-08 18:07:24 +05:30
} catch ( err ) {
console . error ( "error during session afterSyncCompleted, continuing" , err . stack ) ;
}
} ) ( ) ;
2020-09-10 15:41:43 +05:30
const roomsNeedingAfterSyncCompleted = roomStates . filter ( rs => {
return rs . room . needsAfterSyncCompleted ( rs . changes ) ;
} ) ;
const roomsPromises = roomsNeedingAfterSyncCompleted . map ( async rs => {
try {
await rs . room . afterSyncCompleted ( rs . changes ) ;
} catch ( err ) {
console . error ( ` error during room ${ rs . room . id } afterSyncCompleted, continuing ` , err . stack ) ;
}
2020-09-08 18:07:24 +05:30
} ) ;
// run everything in parallel,
// we don't want to delay the next sync too much
2020-09-10 15:41:43 +05:30
// Also, since all promises won't reject (as they have a try/catch)
// it's fine to use Promise.all
await Promise . all ( roomsPromises . concat ( sessionPromise ) ) ;
2019-05-12 23:56:46 +05:30
}
2019-02-04 02:47:24 +05:30
2020-09-21 21:23:29 +05:30
async _syncRequest ( syncToken , timeout ) {
2019-10-12 23:54:09 +05:30
let { syncFilterId } = this . _session ;
if ( typeof syncFilterId !== "string" ) {
2020-05-07 03:34:41 +05:30
this . _currentRequest = this . _hsApi . createFilter ( this . _session . user . id , { room : { state : { lazy _load _members : true } } } ) ;
syncFilterId = ( await this . _currentRequest . response ( ) ) . filter _id ;
2019-10-12 23:54:09 +05:30
}
2020-05-07 03:20:12 +05:30
const totalRequestTimeout = timeout + ( 80 * 1000 ) ; // same as riot-web, don't get stuck on wedged long requests
this . _currentRequest = this . _hsApi . sync ( syncToken , syncFilterId , timeout , { timeout : totalRequestTimeout } ) ;
2019-05-12 23:56:46 +05:30
const response = await this . _currentRequest . response ( ) ;
2020-09-08 18:07:24 +05:30
2020-08-17 17:43:23 +05:30
const isInitialSync = ! syncToken ;
2019-05-12 23:56:46 +05:30
syncToken = response . next _batch ;
2020-09-10 15:41:43 +05:30
const roomStates = this . _parseRoomsResponse ( response . rooms , isInitialSync ) ;
2020-09-23 21:36:16 +05:30
await this . _prepareRooms ( roomStates ) ;
2020-03-15 01:15:36 +05:30
let sessionChanges ;
2020-09-25 20:12:41 +05:30
const syncTxn = this . _openSyncTxn ( ) ;
2019-02-27 23:57:45 +05:30
try {
2020-10-01 18:06:22 +05:30
sessionChanges = await this . _session . writeSync ( response , syncFilterId , syncTxn ) ;
2020-09-10 15:41:43 +05:30
await Promise . all ( roomStates . map ( async rs => {
console . log ( ` * applying sync response to room ${ rs . room . id } ... ` ) ;
rs . changes = await rs . room . writeSync (
2020-09-23 17:56:14 +05:30
rs . roomResponse , isInitialSync , rs . preparation , syncTxn ) ;
2020-09-10 15:41:43 +05:30
} ) ) ;
2019-05-12 23:56:46 +05:30
} catch ( err ) {
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
2020-09-29 14:22:52 +05:30
try {
syncTxn . abort ( ) ;
2020-10-01 19:53:15 +05:30
} catch ( abortErr ) {
console . error ( "Could not abort sync transaction, the sync response was probably only partially written and may have put storage in a inconsistent state." , abortErr ) ;
}
2019-05-12 23:56:46 +05:30
throw err ;
}
try {
await syncTxn . complete ( ) ;
console . info ( "syncTxn committed!!" ) ;
} catch ( err ) {
2019-10-13 01:48:19 +05:30
console . error ( "unable to commit sync tranaction" ) ;
2019-06-27 01:30:50 +05:30
throw err ;
2019-05-12 23:56:46 +05:30
}
2020-03-15 01:15:36 +05:30
this . _session . afterSync ( sessionChanges ) ;
2019-02-27 23:57:45 +05:30
// emit room related events after txn has been closed
2020-09-10 15:41:43 +05:30
for ( let rs of roomStates ) {
rs . room . afterSync ( rs . changes ) ;
2019-02-27 23:57:45 +05:30
}
2020-09-21 21:27:01 +05:30
const toDeviceEvents = response . to _device ? . events ;
return {
syncToken ,
roomStates ,
2020-09-24 14:22:56 +05:30
sessionChanges ,
2020-09-21 21:27:01 +05:30
hadToDeviceMessages : Array . isArray ( toDeviceEvents ) && toDeviceEvents . length > 0 ,
} ;
2020-09-08 18:07:24 +05:30
}
2020-09-25 20:12:41 +05:30
_openPrepareSyncTxn ( ) {
2020-09-10 15:41:43 +05:30
const storeNames = this . _storage . storeNames ;
2020-09-25 20:12:41 +05:30
return this . _storage . readTxn ( [
2020-09-10 15:41:43 +05:30
storeNames . inboundGroupSessions ,
] ) ;
}
async _prepareRooms ( roomStates ) {
2020-09-25 20:12:41 +05:30
const prepareTxn = this . _openPrepareSyncTxn ( ) ;
2020-09-23 17:56:14 +05:30
await Promise . all ( roomStates . map ( async rs => {
rs . preparation = await rs . room . prepareSync ( rs . roomResponse , rs . membership , prepareTxn ) ;
} ) ) ;
2020-10-01 19:44:06 +05:30
// This is needed for safari to not throw TransactionInactiveErrors on the syncTxn. See docs/INDEXEDDB.md
2020-10-01 18:01:38 +05:30
await prepareTxn . complete ( ) ;
2020-09-23 17:56:14 +05:30
await Promise . all ( roomStates . map ( rs => rs . room . afterPrepareSync ( rs . preparation ) ) ) ;
2020-09-08 18:07:24 +05:30
}
2020-09-25 20:12:41 +05:30
_openSyncTxn ( ) {
2020-09-08 18:07:24 +05:30
const storeNames = this . _storage . storeNames ;
2020-09-25 20:12:41 +05:30
return this . _storage . readWriteTxn ( [
2020-09-08 18:07:24 +05:30
storeNames . session ,
storeNames . roomSummary ,
storeNames . roomState ,
storeNames . roomMembers ,
storeNames . timelineEvents ,
storeNames . timelineFragments ,
storeNames . pendingEvents ,
storeNames . userIdentities ,
storeNames . groupSessionDecryptions ,
storeNames . deviceIdentities ,
2020-09-08 18:30:00 +05:30
// to discard outbound session when somebody leaves a room
2020-10-01 18:09:23 +05:30
// and to create room key messages when somebody joins
2020-09-11 18:11:12 +05:30
storeNames . outboundGroupSessions ,
2020-09-17 14:09:51 +05:30
storeNames . operations ,
storeNames . accountData ,
2020-09-08 18:07:24 +05:30
] ) ;
2019-05-12 23:56:46 +05:30
}
2020-09-10 15:41:43 +05:30
_parseRoomsResponse ( roomsSection , isInitialSync ) {
const roomStates = [ ] ;
if ( roomsSection ) {
// don't do "invite", "leave" for now
const allMemberships = [ "join" ] ;
for ( const membership of allMemberships ) {
const membershipSection = roomsSection [ membership ] ;
if ( membershipSection ) {
for ( const [ roomId , roomResponse ] of Object . entries ( membershipSection ) ) {
// ignore rooms with empty timelines during initial sync,
// see https://github.com/vector-im/hydrogen-web/issues/15
if ( isInitialSync && timelineIsEmpty ( roomResponse ) ) {
2020-09-21 17:41:28 +05:30
continue ;
2020-09-10 15:41:43 +05:30
}
let room = this . _session . rooms . get ( roomId ) ;
if ( ! room ) {
room = this . _session . createRoom ( roomId ) ;
}
roomStates . push ( new RoomSyncProcessState ( room , roomResponse , membership ) ) ;
}
}
}
}
return roomStates ;
}
2018-12-21 19:05:24 +05:30
2019-05-12 23:56:46 +05:30
stop ( ) {
2020-04-19 23:22:26 +05:30
if ( this . _status . get ( ) === SyncStatus . Stopped ) {
2019-05-12 23:56:46 +05:30
return ;
}
2020-04-19 23:22:26 +05:30
this . _status . set ( SyncStatus . Stopped ) ;
2019-05-12 23:56:46 +05:30
if ( this . _currentRequest ) {
this . _currentRequest . abort ( ) ;
this . _currentRequest = null ;
}
}
2019-02-21 04:18:16 +05:30
}
2020-09-10 15:41:43 +05:30
class RoomSyncProcessState {
constructor ( room , roomResponse , membership ) {
this . room = room ;
this . roomResponse = roomResponse ;
this . membership = membership ;
this . preparation = null ;
this . changes = null ;
}
}