its syncing, sort off

This commit is contained in:
Bruno Windels 2019-02-10 21:25:29 +01:00
parent c05e40188b
commit b57c5abdd6
26 changed files with 466 additions and 274 deletions

14
.eslintrc.js Normal file
View file

@ -0,0 +1,14 @@
module.exports = {
"env": {
"browser": true,
"es6": true
},
"extends": "eslint:recommended",
"parserOptions": {
"ecmaVersion": 2018,
"sourceType": "module"
},
"rules": {
"no-console": "off"
}
};

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.sublime-project
*.sublime-workspace

View file

@ -1,6 +1,7 @@
# Minimal thing to get working
- finish summary store
- move "sdk" bits over to "matrix" directory
- add eventemitter
- make sync work
- store summaries

View file

@ -2,8 +2,16 @@
<html>
<head>
<meta charset="utf-8">
<script type="module" src="src/main.js"></script>
</head>
<body>
<p id="syncstatus"></p>
<div><button id="stopsync">stop syncing</button></div>
<script>
const label = document.getElementById("syncstatus");
const button = document.getElementById("stopsync");
import("./src/main.js").then(main => {
main.default(label, button);
});
</script>
</body>
</html>

22
package.json Normal file
View file

@ -0,0 +1,22 @@
{
"name": "morpheusjs",
"version": "1.0.0",
"description": "A javascript matrix client prototype, trying to minize RAM usage by offloading as much as possible to IndexedDB",
"main": "index.js",
"directories": {
"doc": "doc"
},
"scripts": {
"test": "node --experimental-modules --loader ../js-inline-tests/src/resolve-hook.mjs ../js-inline-tests/src/main.mjs --entryPoint src/main.js --force-esm"
},
"repository": {
"type": "git",
"url": "git+https://github.com/bwindels/morpheusjs.git"
},
"author": "",
"license": "ISC",
"bugs": {
"url": "https://github.com/bwindels/morpheusjs/issues"
},
"homepage": "https://github.com/bwindels/morpheusjs#readme"
}

View file

@ -1,9 +1,13 @@
export class HomeServerError extends Error {
constructor(body) {
super(body.error);
constructor(method, url, body) {
super(`${body.error} on ${method} ${url}`);
this.errcode = body.errcode;
}
}
export class StorageError extends Error {
}
export class RequestAbortError extends Error {
}

69
src/event-emitter.js Normal file
View file

@ -0,0 +1,69 @@
export default class EventEmitter {
constructor() {
this._handlersByName = {};
}
emit(name, value) {
const handlers = this._handlersByName[name];
if (handlers) {
for(const h of handlers) {
h(value);
}
}
}
on(name, callback) {
let handlers = this._handlersByName[name];
if (!handlers) {
this._handlersByName[name] = handlers = new Set();
}
handlers.add(callback);
}
off(name, callback) {
const handlers = this._handlersByName[name];
if (handlers) {
handlers.delete(callback);
if (handlers.length === 0) {
delete this._handlersByName[name];
}
}
}
}
//#ifdef TESTS
export function tests() {
return {
test_on_off(assert) {
let counter = 0;
const e = new EventEmitter();
const callback = () => counter += 1;
e.on("change", callback);
e.emit("change");
e.off("change", callback);
e.emit("change");
assert.equal(counter, 1);
},
test_emit_value(assert) {
let value = 0;
const e = new EventEmitter();
const callback = (v) => value = v;
e.on("change", callback);
e.emit("change", 5);
e.off("change", callback);
assert.equal(value, 5);
},
test_double_on(assert) {
let counter = 0;
const e = new EventEmitter();
const callback = () => counter += 1;
e.on("change", callback);
e.on("change", callback);
e.emit("change");
e.off("change", callback);
assert.equal(counter, 1);
}
};
}
//#endif

View file

@ -1,3 +1,5 @@
import {HomeServerError} from "./error.js";
class RequestWrapper {
constructor(promise, controller) {
this._promise = promise;
@ -47,13 +49,13 @@ export default class HomeServerApi {
body: bodyString,
signal: controller.signal
});
promise = promise.then(response => {
promise = promise.then(async (response) => {
if (response.ok) {
return response.json();
return await response.json();
} else {
switch (response.status) {
default:
throw new HomeServerError(response.json())
throw new HomeServerError(method, url, await response.json())
}
}
});
@ -68,8 +70,8 @@ export default class HomeServerApi {
return this._request("GET", csPath, queryParams, body);
}
sync(timeout = 0, since = undefined) {
return this._get("/sync", {since, timeout});
sync(since, filter, timeout) {
return this._get("/sync", {since, timeout, filter});
}
passwordLogin(username, password) {

View file

@ -1,6 +1,7 @@
import HomeServerApi from "./hs-api.js";
import Session from "./session.js";
import createIdbStorage from "./storage/idb/create.js";
import Sync from "./sync.js";
const HOST = "localhost";
const HOMESERVER = `http://${HOST}:8008`;
@ -31,30 +32,37 @@ async function login(username, password, homeserver) {
return {sessionId, loginData};
}
async function main() {
let sessionId = getSessionId(USER_ID);
let loginData;
if (!sessionId) {
({sessionId, loginData} = await login(USERNAME, PASSWORD, HOMESERVER));
// eslint-disable-next-line no-unused-vars
export default async function main(label, button) {
try {
let sessionId = getSessionId(USER_ID);
let loginData;
if (!sessionId) {
({sessionId, loginData} = await login(USERNAME, PASSWORD, HOMESERVER));
}
const storage = await createIdbStorage(`morpheus_session_${sessionId}`);
const session = new Session(storage);
if (loginData) {
await session.setLoginData(loginData);
}
await session.load();
const hsApi = new HomeServerApi(HOMESERVER, session.accessToken);
console.log("session loaded");
if (!session.syncToken) {
console.log("session needs initial sync");
}
const sync = new Sync(hsApi, session, storage);
await sync.start();
label.innerText = "sync running";
button.addEventListener("click", () => sync.stop());
sync.on("error", err => {
label.innerText = "sync error";
console.error("sync error", err);
});
sync.on("stopped", () => {
label.innerText = "sync stopped";
});
} catch(err) {
console.error(err);
}
const storage = await createIdbStorage(`morpheus_session_${sessionId}`);
const session = new Session(storage);
if (loginData) {
await session.setLoginData(loginData);
}
await session.load();
const hsApi = new HomeServerApi(HOMESERVER, session.accessToken);
console.log("session loaded");
if (!session.syncToken) {
console.log("session needs initial sync");
}
return;
const sync = new Sync(hsApi, session, storage);
await sync.start();
sync.on("error", err => {
console.error("sync error", err);
});
}
main().catch(err => console.error(err));

View file

@ -1,51 +1,56 @@
class RoomPersister {
import SortKey from "../storage/sortkey.js";
export default class RoomPersister {
constructor(roomId) {
this._roomId = roomId;
this._lastSortKey = null;
this._lastSortKey = new SortKey();
}
async loadFromStorage(storage) {
const lastEvent = await storage.timeline.lastEvents(1);
async load(txn) {
//fetch key here instead?
const [lastEvent] = await txn.roomTimeline.lastEvents(this._roomId, 1);
if (lastEvent) {
this._lastSortKey = lastEvent.sortKey;
} else {
this._lastSortKey = new GapSortKey();
console.log("room persister load", this._roomId, lastEvent);
this._lastSortKey = new SortKey(lastEvent.sortKey);
}
}
async persistGapFill(...) {
// async persistGapFill(...) {
}
// }
async persistSync(roomResponse, txn) {
let nextKey;
let nextKey = this._lastSortKey;
const timeline = roomResponse.timeline;
// is limited true for initial sync???? or do we need to handle that as a special case?
// I suppose it will, yes
if (timeline.limited) {
nextKey = this._lastSortKey.nextKeyWithGap();
txn.timeline.appendGap(this._roomId, nextKey, {prev_batch: timeline.prev_batch});
nextKey = nextKey.nextKeyWithGap();
txn.roomTimeline.appendGap(this._roomId, nextKey, {prev_batch: timeline.prev_batch});
}
nextKey = this._lastSortKey.nextKey();
const startOfChunkSortKey = nextKey;
// const startOfChunkSortKey = nextKey;
if (timeline.events) {
for(const event of timeline.events) {
txn.timeline.appendEvent(this._roomId, nextKey, event);
nextKey = nextKey.nextKey();
txn.roomTimeline.appendEvent(this._roomId, nextKey, event);
}
}
// what happens here when the txn fails?
this._lastSortKey = nextKey;
// right thing to do? if the txn fails, not sure we'll continue anyways ...
// only advance the key once the transaction has
// succeeded
txn.complete().then(() => {
console.log("txn complete, setting key");
this._lastSortKey = nextKey;
});
// persist state
const state = roomResponse.state;
if (state.events) {
const promises = state.events.map((event) => {
txn.state.setStateEventAt(startOfChunkSortKey, event)
});
await Promise.all(promises);
for (const event of state.events) {
txn.roomState.setStateEvent(this._roomId, event)
}
}
}
}

View file

@ -1,20 +1,21 @@
class Room {
import RoomSummary from "./summary.js";
import RoomPersister from "./persister.js";
export default class Room {
constructor(roomId, storage) {
this._roomId = roomId;
this._storage = storage;
this._summary = new RoomSummary(this._roomId, this._storage);
this._summary = new RoomSummary(roomId);
this._persister = new RoomPersister(roomId);
}
async applyInitialSync(roomResponse, membership) {
async applySync(roomResponse, membership, txn) {
this._summary.applySync(roomResponse, membership, txn);
this._persister.persistSync(roomResponse, txn);
}
async applyIncrementalSync(roomResponse, membership) {
}
async load() {
load(summary, txn) {
this._summary.load(summary);
return this._persister.load(txn);
}
}

View file

@ -1,24 +1,22 @@
const SUMMARY_NAME_COUNT = 3;
// import SummaryMembers from "./members";
function disambiguateMember(name, userId) {
return `${name} (${userId})`;
}
// could even split name calculation in a separate class
// as the summary will grow more
export class RoomSummary {
export default class RoomSummary {
constructor(roomId) {
this._members = new SummaryMembers();
// this._members = new SummaryMembers();
this._roomId = roomId;
this._name = null;
this._lastMessage = null;
this._unreadCount = null;
this._mentionCount = null;
this._isEncrypted = null;
this._isDirectMessage = null;
this._membership = null;
this._inviteCount = 0;
this._joinCount = 0;
this._calculatedName = null;
this._nameFromEvent = null;
this._lastMessageBody = null;
}
get name() {
return this._nameFromEvent || this._calculatedName;
return this._name || "Room without a name";
}
get lastMessage() {
@ -33,53 +31,59 @@ export class RoomSummary {
return this._joinCount;
}
async applySync(roomResponse) {
const changed = this._processSyncResponse(roomResponse);
async applySync(roomResponse, membership, txn) {
const changed = this._processSyncResponse(roomResponse, membership);
if (changed) {
await this._persist();
await this._persist(txn);
}
return changed;
}
async load() {
const summary = await storage.getSummary(this._roomId);
async load(summary) {
this._roomId = summary.roomId;
this._name = summary.name;
this._lastMessage = summary.lastMessage;
this._unreadCount = summary.unreadCount;
this._mentionCount = summary.mentionCount;
this._isEncrypted = summary.isEncrypted;
this._isDirectMessage = summary.isDirectMessage;
this._membership = summary.membership;
this._inviteCount = summary.inviteCount;
this._joinCount = summary.joinCount;
this._calculatedName = summary.calculatedName;
this._nameFromEvent = summary.nameFromEvent;
this._lastMessageBody = summary.lastMessageBody;
this._members = new SummaryMembers(summary.members);
}
_persist() {
_persist(txn) {
const summary = {
roomId: this._roomId,
heroes: this._heroes,
inviteCount: this._inviteCount,
joinCount: this._joinCount,
calculatedName: this._calculatedName,
nameFromEvent: this._nameFromEvent,
lastMessageBody: this._lastMessageBody,
members: this._members.asArray()
name: this._name,
lastMessageBody: this._lastMessageBody
};
return this.storage.saveSummary(this.room_id, summary);
return txn.roomSummary.set(summary);
}
_processSyncResponse(roomResponse) {
_processSyncResponse(roomResponse, membership) {
// lets not do lazy loading for now
// if (roomResponse.summary) {
// this._updateSummary(roomResponse.summary);
// }
let changed = false;
if (roomResponse.limited) {
if (membership !== this._membership) {
this._membership = membership;
changed = true;
}
if (roomResponse.state_events) {
changed = roomResponse.state_events.events.reduce((changed, e) => {
return this._processEvent(e) || changed;
}, changed);
}
changed = roomResponse.timeline.events.reduce((changed, e) => {
return this._processEvent(e) || changed;
}, changed);
if (roomResponse.timeline) {
changed = roomResponse.timeline.events.reduce((changed, e) => {
return this._processEvent(e) || changed;
}, changed);
}
return changed;
}
@ -87,8 +91,8 @@ export class RoomSummary {
_processEvent(event) {
if (event.type === "m.room.name") {
const newName = event.content && event.content.name;
if (newName !== this._nameFromEvent) {
this._nameFromEvent = newName;
if (newName !== this._name) {
this._name = newName;
return true;
}
} else if (event.type === "m.room.member") {
@ -108,25 +112,29 @@ export class RoomSummary {
_processMembership(event) {
let changed = false;
const prevMembership = event.prev_content && event.prev_content.membership;
const membership = event.content && event.content.membership;
if (!event.content) {
return changed;
}
const content = event.content;
const membership = content.membership;
// danger of a replayed event getting the count out of sync
// but summary api will solve this.
// otherwise we'd have to store all the member ids in here
if (membership !== prevMembership) {
switch (prevMembership) {
case "invite": --this._inviteCount;
case "join": --this._joinCount;
case "invite": --this._inviteCount; break;
case "join": --this._joinCount; break;
}
switch (membership) {
case "invite": ++this._inviteCount;
case "join": ++this._joinCount;
case "invite": ++this._inviteCount; break;
case "join": ++this._joinCount; break;
}
changed = true;
}
if (membership === "join" && content.name) {
// TODO: avatar_url
changed = this._members.applyMember(content.name, content.state_key) || changed;
}
// if (membership === "join" && content.name) {
// // TODO: avatar_url
// changed = this._members.applyMember(content.name, content.state_key) || changed;
// }
return changed;
}
@ -147,40 +155,3 @@ export class RoomSummary {
// this._recaculateNameIfNoneSet();
}
}
class SummaryMembers {
constructor(initialMembers = []) {
this._alphabeticalNames = initialMembers.map(m => m.name);
}
applyMember(name, userId) {
let insertionIndex = 0;
for (var i = this._alphabeticalNames.length - 1; i >= 0; i--) {
const cmp = this._alphabeticalNames[i].localeCompare(name);
// name is already in the list, disambiguate
if (cmp === 0) {
name = disambiguateMember(name, userId);
}
// name should come after already present name, stop
if (cmp >= 0) {
insertionIndex = i + 1;
break;
}
}
// don't append names if list is full already
if (insertionIndex < SUMMARY_NAME_COUNT) {
this._alphabeticalNames.splice(insertionIndex, 0, name);
}
if (this._alphabeticalNames > SUMMARY_NAME_COUNT) {
this._alphabeticalNames = this._alphabeticalNames.slice(0, SUMMARY_NAME_COUNT);
}
}
get names() {
return this._alphabeticalNames;
}
asArray() {
return this._alphabeticalNames.map(n => {name: n});
}
}

View file

@ -1,12 +1,15 @@
import Room from "./room/room.js";
export default class Session {
constructor(storage) {
this._storage = storage;
this._session = null;
this._rooms = null;
this._rooms = {};
}
// should be called before load
// loginData has device_id, user_id, home_server, access_token
async setLoginData(loginData) {
console.log("session.setLoginData");
const txn = this._storage.readWriteTxn([this._storage.storeNames.session]);
const session = {loginData};
txn.session.set(session);
@ -17,6 +20,8 @@ export default class Session {
const txn = this._storage.readTxn([
this._storage.storeNames.session,
this._storage.storeNames.roomSummary,
this._storage.storeNames.roomState,
this._storage.storeNames.roomTimeline,
]);
// restore session object
this._session = await txn.session.get();
@ -25,9 +30,9 @@ export default class Session {
}
// load rooms
const rooms = await txn.roomSummary.getAll();
await Promise.all(rooms.map(roomSummary => {
const room = this.createRoom(room.roomId);
return room.load(roomSummary);
await Promise.all(rooms.map(summary => {
const room = this.createRoom(summary.roomId);
return room.load(summary, txn);
}));
}

View file

@ -12,12 +12,21 @@ function createStores(db) {
db.createObjectStore("roomSummary", {keyPath: "roomId"});
// needs roomId separate because it might hold a gap and no event
const timeline = db.createObjectStore("roomTimeline", {keyPath: ["roomId", "sortKey"]});
timeline.createIndex("byEventId", ["roomId", "event.event_id"], {unique: true});
// how to get the first/last x events for a room?
// we don't want to specify the sort key, but would need an index for the room_id?
// take sort_key as primary key then and have index on event_id?
// still, you also can't have a PK of [room_id, sort_key] and get the last or first events with just the room_id? the only thing that changes it that the PK will provide an inherent sorting that you inherit in an index that only has room_id as keyPath??? There must be a better way, need to write a prototype test for this.
// SOLUTION: with numeric keys, you can just us a min/max value to get first/last
// db.createObjectStore("members", ["room_id", "state_key"]);
const state = db.createObjectStore("roomState", {keyPath: ["event.room_id", "event.type", "event.state_key"]});
timeline.createIndex("byEventId", [
"roomId",
"event.event_id"
], {unique: true});
db.createObjectStore("roomState", {keyPath: [
"roomId",
"event.type",
"event.state_key"
]});
// const roomMembers = db.createObjectStore("roomMembers", {keyPath: [
// "event.room_id",
// "event.content.membership",
// "event.state_key"
// ]});
// roomMembers.createIndex("byName", ["room_id", "content.name"]);
}

View file

@ -29,13 +29,14 @@ export default class QueryTarget {
return this._selectWhile(range, predicate, "prev");
}
selectAll(range, direction) {
async selectAll(range, direction) {
const cursor = this._target.openCursor(range, direction);
const results = [];
return iterateCursor(cursor, (value) => {
await iterateCursor(cursor, (value) => {
results.push(value);
return true;
});
return results;
}
selectFirst(range) {
@ -69,13 +70,14 @@ export default class QueryTarget {
}, direction);
}
_selectWhile(range, predicate, direction) {
async _selectWhile(range, predicate, direction) {
const cursor = this._target.openCursor(range, direction);
const results = [];
return iterateCursor(cursor, (value) => {
await iterateCursor(cursor, (value) => {
results.push(value);
return predicate(results);
});
return results;
}
async _find(range, predicate, direction) {

View file

@ -13,9 +13,9 @@ export default class Storage {
}
_validateStoreNames(storeNames) {
const unknownStoreName = storeNames.find(name => !STORE_NAMES.includes(name));
if (unknownStoreName) {
throw new Error(`Tried to open a transaction for unknown store ${unknownStoreName}`);
const idx = storeNames.findIndex(name => !STORE_NAMES.includes(name));
if (idx !== -1) {
throw new Error(`Tried to open a transaction for unknown store ${storeNames[idx]}`);
}
}

View file

@ -2,23 +2,23 @@ import QueryTarget from "./query-target.js";
import { reqAsPromise } from "./utils.js";
export default class Store extends QueryTarget {
constructor(store) {
super(store);
constructor(idbStore) {
super(idbStore);
}
get _store() {
get _idbStore() {
return this._target;
}
index(indexName) {
return new QueryTarget(this._store.index(indexName));
return new QueryTarget(this._idbStore.index(indexName));
}
put(value) {
return reqAsPromise(this._store.put(value));
return reqAsPromise(this._idbStore.put(value));
}
add(value) {
return reqAsPromise(this._store.add(value));
return reqAsPromise(this._idbStore.add(value));
}
}

View file

@ -0,0 +1,17 @@
export default class RoomStateStore {
constructor(idbStore) {
this._roomStateStore = idbStore;
}
async getEvents(type) {
}
async getEventsForKey(type, stateKey) {
}
async setStateEvent(roomId, event) {
return this._roomStateStore.put({roomId, event});
}
}

View file

@ -8,6 +8,8 @@ store contains:
isEncrypted
isDirectMessage
membership
inviteCount
joinCount
*/
export default class RoomSummaryStore {
constructor(summaryStore) {
@ -17,4 +19,8 @@ export default class RoomSummaryStore {
getAll() {
return this._summaryStore.selectAll();
}
set(summary) {
return this._summaryStore.put(summary);
}
}

View file

@ -1,25 +1,25 @@
import SortKey from "../../sortkey.js";
class TimelineStore {
export default class RoomTimelineStore {
constructor(timelineStore) {
this._timelineStore = timelineStore;
}
async lastEvents(roomId, amount) {
return this.eventsBefore(roomId, GapSortKey.maxKey());
return this.eventsBefore(roomId, SortKey.maxKey, amount);
}
async firstEvents(roomId, amount) {
return this.eventsAfter(roomId, GapSortKey.minKey());
return this.eventsAfter(roomId, SortKey.minKey, amount);
}
eventsAfter(roomId, sortKey, amount) {
const range = IDBKeyRange.lowerBound([roomId, sortKey], true);
const range = IDBKeyRange.lowerBound([roomId, sortKey.buffer], true);
return this._timelineStore.selectLimit(range, amount);
}
async eventsBefore(roomId, sortKey, amount) {
const range = IDBKeyRange.upperBound([roomId, sortKey], true);
const range = IDBKeyRange.upperBound([roomId, sortKey.buffer], true);
const events = await this._timelineStore.selectLimitReverse(range, amount);
events.reverse(); // because we fetched them backwards
return events;
@ -36,7 +36,7 @@ class TimelineStore {
appendGap(roomId, sortKey, gap) {
this._timelineStore.add({
roomId: roomId,
sortKey: sortKey,
sortKey: sortKey.buffer,
content: {
event: null,
gap: gap,
@ -45,9 +45,10 @@ class TimelineStore {
}
appendEvent(roomId, sortKey, event) {
console.info(`appending event for room ${roomId} with key ${sortKey}`);
this._timelineStore.add({
roomId: roomId,
sortKey: sortKey,
sortKey: sortKey.buffer,
content: {
event: event,
gap: null,
@ -56,6 +57,6 @@ class TimelineStore {
}
async removeEvent(roomId, sortKey) {
this._timelineStore.delete([roomId, sortKey]);
this._timelineStore.delete([roomId, sortKey.buffer]);
}
}

View file

@ -1,27 +0,0 @@
class RoomStore {
constructor(summary, db, syncTxn) {
this._summary = summary;
}
getSummary() {
return Promise.resolve(this._summary);
}
async setSummary(summary) {
this._summary = summary;
//...
}
get timelineStore() {
}
get memberStore() {
}
get stateStore() {
}
}

View file

@ -1,17 +0,0 @@
class StateStore {
constructor(db) {
}
async getEvents(type) {
}
async getEventsForKey(type, stateKey) {
}
async setState(events) {
}
}

View file

@ -1,7 +1,9 @@
import {txnAsPromise} from "./utils.js";
import Store from "./store.js";
// import TimelineStore from "./stores/timeline.js";
import SessionStore from "./stores/session.js";
import SessionStore from "./stores/SessionStore.js";
import RoomSummaryStore from "./stores/RoomSummaryStore.js";
import RoomTimelineStore from "./stores/RoomTimelineStore.js";
import RoomStateStore from "./stores/RoomStateStore.js";
export default class Transaction {
constructor(txn, allowedStoreNames) {
@ -31,14 +33,22 @@ export default class Transaction {
return this._stores[name];
}
// get roomTimeline() {
// return this._store("roomTimeline", idbStore => new TimelineStore(idbStore));
// }
get session() {
return this._store("session", idbStore => new SessionStore(idbStore));
}
get roomSummary() {
return this._store("roomSummary", idbStore => new RoomSummaryStore(idbStore));
}
get roomTimeline() {
return this._store("roomTimeline", idbStore => new RoomTimelineStore(idbStore));
}
get roomState() {
return this._store("roomState", idbStore => new RoomStateStore(idbStore));
}
complete() {
return txnAsPromise(this._txn);
}

View file

@ -1,6 +1,13 @@
class GapSortKey {
constructor() {
this._keys = new Int32Array(2);
const MIN_INT32 = -2147483648;
const MAX_INT32 = 2147483647;
export default class SortKey {
constructor(buffer) {
if (buffer) {
this._keys = new Int32Array(buffer, 2);
} else {
this._keys = new Int32Array(2);
}
}
get gapKey() {
@ -19,49 +26,99 @@ class GapSortKey {
this._keys[1] = value;
}
buffer() {
get buffer() {
return this._keys.buffer;
}
nextKeyWithGap() {
const k = new Key();
const k = new SortKey();
k.gapKey = this.gapKey + 1;
k.eventKey = 0;
return k;
}
nextKey() {
const k = new Key();
const k = new SortKey();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey + 1;
return k;
}
previousKey() {
const k = new Key();
const k = new SortKey();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey - 1;
return k;
}
clone() {
const k = new Key();
const k = new SortKey();
k.gapKey = this.gapKey;
k.eventKey = this.eventKey;
return k;
}
static get maxKey() {
const maxKey = new GapSortKey();
maxKey.gapKey = Number.MAX_SAFE_INTEGER;
maxKey.eventKey = Number.MAX_SAFE_INTEGER;
const maxKey = new SortKey();
maxKey.gapKey = MAX_INT32;
maxKey.eventKey = MAX_INT32;
return maxKey;
}
static get minKey() {
const minKey = new GapSortKey();
minKey.gapKey = 0;
minKey.eventKey = 0;
const minKey = new SortKey();
minKey.gapKey = MIN_INT32;
minKey.eventKey = MIN_INT32;
return minKey;
}
toString() {
return `[${this.gapKey}/${this.eventKey}]`;
}
}
//#ifdef TESTS
export function tests() {
return {
test_default_key(assert) {
const k = new SortKey();
assert.equal(k.gapKey, 0);
assert.equal(k.eventKey, 0);
},
test_inc(assert) {
const a = new SortKey();
const b = a.nextKey();
assert.equal(a.gapKey, b.gapKey);
assert.equal(a.eventKey + 1, b.eventKey);
const c = b.previousKey();
assert.equal(b.gapKey, c.gapKey);
assert.equal(c.eventKey + 1, b.eventKey);
assert.equal(a.eventKey, c.eventKey);
},
test_min_key(assert) {
const minKey = SortKey.minKey;
const k = new SortKey();
assert(minKey.gapKey < k.gapKey);
assert(minKey.eventKey < k.eventKey);
},
test_max_key(assert) {
const maxKey = SortKey.maxKey;
const k = new SortKey();
assert(maxKey.gapKey > k.gapKey);
assert(maxKey.eventKey > k.eventKey);
},
test_immutable(assert) {
const a = new SortKey();
const gapKey = a.gapKey;
const eventKey = a.gapKey;
a.nextKeyWithGap();
assert.equal(a.gapKey, gapKey);
assert.equal(a.eventKey, eventKey);
}
};
}
//#endif

View file

@ -1,21 +1,32 @@
import {RequestAbortError} from "./hs-api.js";
import {HomeServerError, StorageError} from "./error.js";
import {
RequestAbortError,
HomeServerError,
StorageError
} from "./error.js";
import EventEmitter from "./event-emitter.js";
const INCREMENTAL_TIMEOUT = 30;
const INCREMENTAL_TIMEOUT = 30000;
const SYNC_EVENT_LIMIT = 10;
function parseRooms(responseSections, roomMapper) {
return ["join", "invite", "leave"].map(membership => {
const membershipSection = responseSections[membership];
const results = Object.entries(membershipSection).map(([roomId, roomResponse]) => {
const room = roomMapper(roomId, membership);
return room.processInitialSync(roomResponse);
});
return results;
}).reduce((allResults, sectionResults) => allResults.concat(sectionResults), []);
function parseRooms(roomsSection, roomCallback) {
if (!roomsSection) {
return;
}
const allMemberships = ["join", "invite", "leave"];
for(const membership of allMemberships) {
const membershipSection = roomsSection[membership];
if (membershipSection) {
const rooms = Object.entries(membershipSection)
for (const [roomId, roomResponse] of rooms) {
roomCallback(roomId, roomResponse, membership);
}
}
}
}
export class Sync {
export default class Sync extends EventEmitter {
constructor(hsApi, session, storage) {
super();
this._hsApi = hsApi;
this._session = session;
this._storage = storage;
@ -28,9 +39,10 @@ export class Sync {
return;
}
this._isSyncing = true;
let syncToken = session.syncToken;
let syncToken = this._session.syncToken;
// do initial sync if needed
if (!syncToken) {
// need to create limit filter here
syncToken = await this._syncRequest();
}
this._syncLoop(syncToken);
@ -40,43 +52,53 @@ export class Sync {
// if syncToken is falsy, it will first do an initial sync ...
while(this._isSyncing) {
try {
syncToken = await this._syncRequest(INCREMENTAL_TIMEOUT, syncToken);
console.log(`starting sync request with since ${syncToken} ...`);
syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT);
} catch (err) {
console.warn("stopping sync because of error");
this._isSyncing = false;
this.emit("error", err);
}
}
this.emit("stopped");
}
async _syncRequest(timeout, syncToken) {
this._currentRequest = this._hsApi.sync(timeout, syncToken);
const response = await this._currentRequest.response;
async _syncRequest(syncToken, timeout) {
this._currentRequest = this._hsApi.sync(syncToken, undefined, timeout);
const response = await this._currentRequest.response();
syncToken = response.next_batch;
const storeNames = this._storage.storeNames;
const syncTxn = this._storage.startReadWriteTxn([
storeNames.timeline,
const syncTxn = this._storage.readWriteTxn([
storeNames.session,
storeNames.state
storeNames.roomSummary,
storeNames.roomTimeline,
storeNames.roomState,
]);
try {
session.applySync(syncToken, response.account_data, syncTxn);
this._session.applySync(syncToken, response.account_data, syncTxn);
// to_device
// presence
parseRooms(response.rooms, async (roomId, roomResponse, membership) => {
let room = session.getRoom(roomId);
if (!room) {
room = session.createRoom(roomId);
}
room.applySync(roomResponse, membership, syncTxn);
});
if (response.rooms) {
parseRooms(response.rooms, (roomId, roomResponse, membership) => {
let room = this._session.getRoom(roomId);
if (!room) {
room = this._session.createRoom(roomId);
}
console.log(` * applying sync response to room ${roomId} ...`);
room.applySync(roomResponse, membership, syncTxn);
});
}
} catch(err) {
console.warn("aborting syncTxn because of error");
// avoid corrupting state by only
// storing the sync up till the point
// the exception occurred
txn.abort();
syncTxn.abort();
throw err;
}
try {
await txn.complete();
await syncTxn.complete();
console.info("syncTxn committed!!");
} catch (err) {
throw new StorageError("unable to commit sync tranaction", err);
}