hydrogen-web/src/matrix/sync.js

120 lines
3.1 KiB
JavaScript
Raw Normal View History

2019-02-11 01:55:29 +05:30
import {
RequestAbortError,
HomeServerError,
StorageError
} from "./error.js";
import EventEmitter from "../event-emitter.js";
2018-12-21 19:05:24 +05:30
2019-02-11 01:55:29 +05:30
const INCREMENTAL_TIMEOUT = 30000;
const SYNC_EVENT_LIMIT = 10;
2018-12-21 19:05:24 +05:30
2019-02-11 01:55:29 +05:30
function parseRooms(roomsSection, roomCallback) {
if (!roomsSection) {
return;
}
const allMemberships = ["join", "invite", "leave"];
for(const membership of allMemberships) {
const membershipSection = roomsSection[membership];
if (membershipSection) {
const rooms = Object.entries(membershipSection)
for (const [roomId, roomResponse] of rooms) {
roomCallback(roomId, roomResponse, membership);
}
}
}
2019-02-05 03:56:45 +05:30
}
2019-02-11 01:55:29 +05:30
export default class Sync extends EventEmitter {
2019-02-07 05:55:12 +05:30
constructor(hsApi, session, storage) {
2019-02-11 01:55:29 +05:30
super();
2019-02-07 05:55:12 +05:30
this._hsApi = hsApi;
2018-12-21 19:05:24 +05:30
this._session = session;
2019-02-04 02:47:24 +05:30
this._storage = storage;
2018-12-21 19:05:24 +05:30
this._isSyncing = false;
this._currentRequest = null;
}
2019-02-04 02:47:24 +05:30
// returns when initial sync is done
async start() {
2018-12-21 19:05:24 +05:30
if (this._isSyncing) {
return;
}
this._isSyncing = true;
2019-02-11 01:55:29 +05:30
let syncToken = this._session.syncToken;
// do initial sync if needed
if (!syncToken) {
2019-02-11 01:55:29 +05:30
// need to create limit filter here
syncToken = await this._syncRequest();
2018-12-21 19:05:24 +05:30
}
2019-02-04 02:47:24 +05:30
this._syncLoop(syncToken);
2018-12-21 19:05:24 +05:30
}
async _syncLoop(syncToken) {
2019-02-04 02:47:24 +05:30
// if syncToken is falsy, it will first do an initial sync ...
2018-12-21 19:05:24 +05:30
while(this._isSyncing) {
2019-02-04 02:47:24 +05:30
try {
2019-02-11 01:55:29 +05:30
console.log(`starting sync request with since ${syncToken} ...`);
syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT);
2019-02-04 02:47:24 +05:30
} catch (err) {
2019-02-11 01:55:29 +05:30
this._isSyncing = false;
if (!(err instanceof RequestAbortError)) {
console.warn("stopping sync because of error");
this.emit("error", err);
}
2019-02-04 02:47:24 +05:30
}
}
2019-02-11 01:55:29 +05:30
this.emit("stopped");
2019-02-04 02:47:24 +05:30
}
2019-02-11 01:55:29 +05:30
async _syncRequest(syncToken, timeout) {
this._currentRequest = this._hsApi.sync(syncToken, undefined, timeout);
const response = await this._currentRequest.response();
2019-02-04 02:47:24 +05:30
syncToken = response.next_batch;
2019-02-05 04:51:50 +05:30
const storeNames = this._storage.storeNames;
const syncTxn = await this._storage.readWriteTxn([
storeNames.session,
2019-02-11 01:55:29 +05:30
storeNames.roomSummary,
storeNames.roomTimeline,
storeNames.roomState,
2019-02-05 04:51:50 +05:30
]);
2019-02-04 02:47:24 +05:30
try {
2019-02-11 01:55:29 +05:30
this._session.applySync(syncToken, response.account_data, syncTxn);
2018-12-21 19:05:24 +05:30
// to_device
// presence
2019-02-11 01:55:29 +05:30
if (response.rooms) {
parseRooms(response.rooms, (roomId, roomResponse, membership) => {
let room = this._session.getRoom(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
console.log(` * applying sync response to room ${roomId} ...`);
room.applySync(roomResponse, membership, syncTxn);
});
}
2019-02-04 02:47:24 +05:30
} catch(err) {
2019-02-11 01:55:29 +05:30
console.warn("aborting syncTxn because of error");
2019-02-04 02:47:24 +05:30
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
2019-02-11 01:55:29 +05:30
syncTxn.abort();
2019-02-04 02:47:24 +05:30
throw err;
}
try {
2019-02-11 01:55:29 +05:30
await syncTxn.complete();
console.info("syncTxn committed!!");
2019-02-04 02:47:24 +05:30
} catch (err) {
throw new StorageError("unable to commit sync tranaction", err);
2018-12-21 19:05:24 +05:30
}
2019-02-04 02:47:24 +05:30
return syncToken;
2018-12-21 19:05:24 +05:30
}
stop() {
if (!this._isSyncing) {
return;
}
this._isSyncing = false;
if (this._currentRequest) {
this._currentRequest.abort();
this._currentRequest = null;
}
}
}