From b57c5abdd6fae1b1a5716c8545690e8891dfd853 Mon Sep 17 00:00:00 2001 From: Bruno Windels Date: Sun, 10 Feb 2019 21:25:29 +0100 Subject: [PATCH] its syncing, sort off --- .eslintrc.js | 14 ++ .gitignore | 2 + doc/TODO.md | 1 + index.html | 10 +- package.json | 22 +++ src/error.js | 8 +- src/event-emitter.js | 69 +++++++++ src/hs-api.js | 12 +- src/main.js | 58 ++++---- src/room/persister.js | 49 ++++--- src/room/room.js | 21 +-- src/room/summary.js | 133 +++++++----------- src/session.js | 13 +- src/storage/idb/create.js | 25 ++-- src/storage/idb/query-target.js | 10 +- src/storage/idb/storage.js | 6 +- src/storage/idb/store.js | 12 +- src/storage/idb/stores/RoomStateStore.js | 17 +++ .../{summary.js => RoomSummaryStore.js} | 6 + .../{timeline.js => RoomTimelineStore.js} | 17 +-- .../stores/{session.js => SessionStore.js} | 0 src/storage/idb/stores/room.js | 27 ---- src/storage/idb/stores/state.js | 17 --- src/storage/idb/transaction.js | 22 ++- src/storage/sortkey.js | 85 +++++++++-- src/sync.js | 84 +++++++---- 26 files changed, 466 insertions(+), 274 deletions(-) create mode 100644 .eslintrc.js create mode 100644 .gitignore create mode 100644 package.json create mode 100644 src/event-emitter.js create mode 100644 src/storage/idb/stores/RoomStateStore.js rename src/storage/idb/stores/{summary.js => RoomSummaryStore.js} (76%) rename src/storage/idb/stores/{timeline.js => RoomTimelineStore.js} (68%) rename src/storage/idb/stores/{session.js => SessionStore.js} (100%) delete mode 100644 src/storage/idb/stores/room.js delete mode 100644 src/storage/idb/stores/state.js diff --git a/.eslintrc.js b/.eslintrc.js new file mode 100644 index 00000000..521ea791 --- /dev/null +++ b/.eslintrc.js @@ -0,0 +1,14 @@ +module.exports = { + "env": { + "browser": true, + "es6": true + }, + "extends": "eslint:recommended", + "parserOptions": { + "ecmaVersion": 2018, + "sourceType": "module" + }, + "rules": { + "no-console": "off" + } +}; \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..2be9c7c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.sublime-project +*.sublime-workspace \ No newline at end of file diff --git a/doc/TODO.md b/doc/TODO.md index 3b0eace1..732f8ab9 100644 --- a/doc/TODO.md +++ b/doc/TODO.md @@ -1,6 +1,7 @@ # Minimal thing to get working - finish summary store + - move "sdk" bits over to "matrix" directory - add eventemitter - make sync work - store summaries diff --git a/index.html b/index.html index d0cabfa3..a1e3e5df 100644 --- a/index.html +++ b/index.html @@ -2,8 +2,16 @@ - +

+
+ \ No newline at end of file diff --git a/package.json b/package.json new file mode 100644 index 00000000..02b47c4e --- /dev/null +++ b/package.json @@ -0,0 +1,22 @@ +{ + "name": "morpheusjs", + "version": "1.0.0", + "description": "A javascript matrix client prototype, trying to minize RAM usage by offloading as much as possible to IndexedDB", + "main": "index.js", + "directories": { + "doc": "doc" + }, + "scripts": { + "test": "node --experimental-modules --loader ../js-inline-tests/src/resolve-hook.mjs ../js-inline-tests/src/main.mjs --entryPoint src/main.js --force-esm" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/bwindels/morpheusjs.git" + }, + "author": "", + "license": "ISC", + "bugs": { + "url": "https://github.com/bwindels/morpheusjs/issues" + }, + "homepage": "https://github.com/bwindels/morpheusjs#readme" +} diff --git a/src/error.js b/src/error.js index 7ae036a5..83cc8652 100644 --- a/src/error.js +++ b/src/error.js @@ -1,9 +1,13 @@ export class HomeServerError extends Error { - constructor(body) { - super(body.error); + constructor(method, url, body) { + super(`${body.error} on ${method} ${url}`); this.errcode = body.errcode; } } export class StorageError extends Error { } + +export class RequestAbortError extends Error { + +} \ No newline at end of file diff --git a/src/event-emitter.js b/src/event-emitter.js new file mode 100644 index 00000000..460d75a0 --- /dev/null +++ b/src/event-emitter.js @@ -0,0 +1,69 @@ +export default class EventEmitter { + constructor() { + this._handlersByName = {}; + } + + emit(name, value) { + const handlers = this._handlersByName[name]; + if (handlers) { + for(const h of handlers) { + h(value); + } + } + } + + on(name, callback) { + let handlers = this._handlersByName[name]; + if (!handlers) { + this._handlersByName[name] = handlers = new Set(); + } + handlers.add(callback); + } + + off(name, callback) { + const handlers = this._handlersByName[name]; + if (handlers) { + handlers.delete(callback); + if (handlers.length === 0) { + delete this._handlersByName[name]; + } + } + } +} +//#ifdef TESTS +export function tests() { + return { + test_on_off(assert) { + let counter = 0; + const e = new EventEmitter(); + const callback = () => counter += 1; + e.on("change", callback); + e.emit("change"); + e.off("change", callback); + e.emit("change"); + assert.equal(counter, 1); + }, + + test_emit_value(assert) { + let value = 0; + const e = new EventEmitter(); + const callback = (v) => value = v; + e.on("change", callback); + e.emit("change", 5); + e.off("change", callback); + assert.equal(value, 5); + }, + + test_double_on(assert) { + let counter = 0; + const e = new EventEmitter(); + const callback = () => counter += 1; + e.on("change", callback); + e.on("change", callback); + e.emit("change"); + e.off("change", callback); + assert.equal(counter, 1); + } + }; +} +//#endif \ No newline at end of file diff --git a/src/hs-api.js b/src/hs-api.js index d8ab5d16..828f91ca 100644 --- a/src/hs-api.js +++ b/src/hs-api.js @@ -1,3 +1,5 @@ +import {HomeServerError} from "./error.js"; + class RequestWrapper { constructor(promise, controller) { this._promise = promise; @@ -47,13 +49,13 @@ export default class HomeServerApi { body: bodyString, signal: controller.signal }); - promise = promise.then(response => { + promise = promise.then(async (response) => { if (response.ok) { - return response.json(); + return await response.json(); } else { switch (response.status) { default: - throw new HomeServerError(response.json()) + throw new HomeServerError(method, url, await response.json()) } } }); @@ -68,8 +70,8 @@ export default class HomeServerApi { return this._request("GET", csPath, queryParams, body); } - sync(timeout = 0, since = undefined) { - return this._get("/sync", {since, timeout}); + sync(since, filter, timeout) { + return this._get("/sync", {since, timeout, filter}); } passwordLogin(username, password) { diff --git a/src/main.js b/src/main.js index cbcccaf5..9ea71599 100644 --- a/src/main.js +++ b/src/main.js @@ -1,6 +1,7 @@ import HomeServerApi from "./hs-api.js"; import Session from "./session.js"; import createIdbStorage from "./storage/idb/create.js"; +import Sync from "./sync.js"; const HOST = "localhost"; const HOMESERVER = `http://${HOST}:8008`; @@ -31,30 +32,37 @@ async function login(username, password, homeserver) { return {sessionId, loginData}; } -async function main() { - let sessionId = getSessionId(USER_ID); - let loginData; - if (!sessionId) { - ({sessionId, loginData} = await login(USERNAME, PASSWORD, HOMESERVER)); +// eslint-disable-next-line no-unused-vars +export default async function main(label, button) { + try { + let sessionId = getSessionId(USER_ID); + let loginData; + if (!sessionId) { + ({sessionId, loginData} = await login(USERNAME, PASSWORD, HOMESERVER)); + } + const storage = await createIdbStorage(`morpheus_session_${sessionId}`); + const session = new Session(storage); + if (loginData) { + await session.setLoginData(loginData); + } + await session.load(); + const hsApi = new HomeServerApi(HOMESERVER, session.accessToken); + console.log("session loaded"); + if (!session.syncToken) { + console.log("session needs initial sync"); + } + const sync = new Sync(hsApi, session, storage); + await sync.start(); + label.innerText = "sync running"; + button.addEventListener("click", () => sync.stop()); + sync.on("error", err => { + label.innerText = "sync error"; + console.error("sync error", err); + }); + sync.on("stopped", () => { + label.innerText = "sync stopped"; + }); + } catch(err) { + console.error(err); } - const storage = await createIdbStorage(`morpheus_session_${sessionId}`); - const session = new Session(storage); - if (loginData) { - await session.setLoginData(loginData); - } - await session.load(); - const hsApi = new HomeServerApi(HOMESERVER, session.accessToken); - console.log("session loaded"); - if (!session.syncToken) { - console.log("session needs initial sync"); - } - return; - const sync = new Sync(hsApi, session, storage); - await sync.start(); - - sync.on("error", err => { - console.error("sync error", err); - }); } - -main().catch(err => console.error(err)); \ No newline at end of file diff --git a/src/room/persister.js b/src/room/persister.js index aed18474..67bcfbf6 100644 --- a/src/room/persister.js +++ b/src/room/persister.js @@ -1,51 +1,56 @@ -class RoomPersister { +import SortKey from "../storage/sortkey.js"; + +export default class RoomPersister { constructor(roomId) { this._roomId = roomId; - this._lastSortKey = null; + this._lastSortKey = new SortKey(); } - async loadFromStorage(storage) { - const lastEvent = await storage.timeline.lastEvents(1); + async load(txn) { + //fetch key here instead? + const [lastEvent] = await txn.roomTimeline.lastEvents(this._roomId, 1); if (lastEvent) { - this._lastSortKey = lastEvent.sortKey; - } else { - this._lastSortKey = new GapSortKey(); + console.log("room persister load", this._roomId, lastEvent); + this._lastSortKey = new SortKey(lastEvent.sortKey); } } - async persistGapFill(...) { + // async persistGapFill(...) { - } + // } async persistSync(roomResponse, txn) { - - let nextKey; + let nextKey = this._lastSortKey; const timeline = roomResponse.timeline; // is limited true for initial sync???? or do we need to handle that as a special case? + // I suppose it will, yes if (timeline.limited) { - nextKey = this._lastSortKey.nextKeyWithGap(); - txn.timeline.appendGap(this._roomId, nextKey, {prev_batch: timeline.prev_batch}); + nextKey = nextKey.nextKeyWithGap(); + txn.roomTimeline.appendGap(this._roomId, nextKey, {prev_batch: timeline.prev_batch}); } - nextKey = this._lastSortKey.nextKey(); - const startOfChunkSortKey = nextKey; + // const startOfChunkSortKey = nextKey; if (timeline.events) { for(const event of timeline.events) { - txn.timeline.appendEvent(this._roomId, nextKey, event); nextKey = nextKey.nextKey(); + txn.roomTimeline.appendEvent(this._roomId, nextKey, event); } } - // what happens here when the txn fails? - this._lastSortKey = nextKey; + // right thing to do? if the txn fails, not sure we'll continue anyways ... + // only advance the key once the transaction has + // succeeded + txn.complete().then(() => { + console.log("txn complete, setting key"); + this._lastSortKey = nextKey; + }); // persist state const state = roomResponse.state; if (state.events) { - const promises = state.events.map((event) => { - txn.state.setStateEventAt(startOfChunkSortKey, event) - }); - await Promise.all(promises); + for (const event of state.events) { + txn.roomState.setStateEvent(this._roomId, event) + } } } } \ No newline at end of file diff --git a/src/room/room.js b/src/room/room.js index d94c029b..1fadccaf 100644 --- a/src/room/room.js +++ b/src/room/room.js @@ -1,20 +1,21 @@ -class Room { +import RoomSummary from "./summary.js"; +import RoomPersister from "./persister.js"; +export default class Room { constructor(roomId, storage) { this._roomId = roomId; this._storage = storage; - this._summary = new RoomSummary(this._roomId, this._storage); + this._summary = new RoomSummary(roomId); + this._persister = new RoomPersister(roomId); } - async applyInitialSync(roomResponse, membership) { - + async applySync(roomResponse, membership, txn) { + this._summary.applySync(roomResponse, membership, txn); + this._persister.persistSync(roomResponse, txn); } - async applyIncrementalSync(roomResponse, membership) { - - } - - async load() { - + load(summary, txn) { + this._summary.load(summary); + return this._persister.load(txn); } } \ No newline at end of file diff --git a/src/room/summary.js b/src/room/summary.js index bcf9e5e4..bbb5ad28 100644 --- a/src/room/summary.js +++ b/src/room/summary.js @@ -1,24 +1,22 @@ -const SUMMARY_NAME_COUNT = 3; +// import SummaryMembers from "./members"; -function disambiguateMember(name, userId) { - return `${name} (${userId})`; -} - -// could even split name calculation in a separate class -// as the summary will grow more -export class RoomSummary { +export default class RoomSummary { constructor(roomId) { - this._members = new SummaryMembers(); + // this._members = new SummaryMembers(); this._roomId = roomId; + this._name = null; + this._lastMessage = null; + this._unreadCount = null; + this._mentionCount = null; + this._isEncrypted = null; + this._isDirectMessage = null; + this._membership = null; this._inviteCount = 0; this._joinCount = 0; - this._calculatedName = null; - this._nameFromEvent = null; - this._lastMessageBody = null; } get name() { - return this._nameFromEvent || this._calculatedName; + return this._name || "Room without a name"; } get lastMessage() { @@ -33,53 +31,59 @@ export class RoomSummary { return this._joinCount; } - async applySync(roomResponse) { - const changed = this._processSyncResponse(roomResponse); + async applySync(roomResponse, membership, txn) { + const changed = this._processSyncResponse(roomResponse, membership); if (changed) { - await this._persist(); + await this._persist(txn); } return changed; } - async load() { - const summary = await storage.getSummary(this._roomId); + async load(summary) { this._roomId = summary.roomId; + this._name = summary.name; + this._lastMessage = summary.lastMessage; + this._unreadCount = summary.unreadCount; + this._mentionCount = summary.mentionCount; + this._isEncrypted = summary.isEncrypted; + this._isDirectMessage = summary.isDirectMessage; + this._membership = summary.membership; this._inviteCount = summary.inviteCount; this._joinCount = summary.joinCount; - this._calculatedName = summary.calculatedName; - this._nameFromEvent = summary.nameFromEvent; - this._lastMessageBody = summary.lastMessageBody; - this._members = new SummaryMembers(summary.members); } - _persist() { + _persist(txn) { const summary = { roomId: this._roomId, heroes: this._heroes, inviteCount: this._inviteCount, joinCount: this._joinCount, - calculatedName: this._calculatedName, - nameFromEvent: this._nameFromEvent, - lastMessageBody: this._lastMessageBody, - members: this._members.asArray() + name: this._name, + lastMessageBody: this._lastMessageBody }; - return this.storage.saveSummary(this.room_id, summary); + return txn.roomSummary.set(summary); } - _processSyncResponse(roomResponse) { + _processSyncResponse(roomResponse, membership) { // lets not do lazy loading for now // if (roomResponse.summary) { // this._updateSummary(roomResponse.summary); // } let changed = false; - if (roomResponse.limited) { + if (membership !== this._membership) { + this._membership = membership; + changed = true; + } + if (roomResponse.state_events) { changed = roomResponse.state_events.events.reduce((changed, e) => { return this._processEvent(e) || changed; }, changed); } - changed = roomResponse.timeline.events.reduce((changed, e) => { - return this._processEvent(e) || changed; - }, changed); + if (roomResponse.timeline) { + changed = roomResponse.timeline.events.reduce((changed, e) => { + return this._processEvent(e) || changed; + }, changed); + } return changed; } @@ -87,8 +91,8 @@ export class RoomSummary { _processEvent(event) { if (event.type === "m.room.name") { const newName = event.content && event.content.name; - if (newName !== this._nameFromEvent) { - this._nameFromEvent = newName; + if (newName !== this._name) { + this._name = newName; return true; } } else if (event.type === "m.room.member") { @@ -108,25 +112,29 @@ export class RoomSummary { _processMembership(event) { let changed = false; const prevMembership = event.prev_content && event.prev_content.membership; - const membership = event.content && event.content.membership; + if (!event.content) { + return changed; + } + const content = event.content; + const membership = content.membership; // danger of a replayed event getting the count out of sync // but summary api will solve this. // otherwise we'd have to store all the member ids in here if (membership !== prevMembership) { switch (prevMembership) { - case "invite": --this._inviteCount; - case "join": --this._joinCount; + case "invite": --this._inviteCount; break; + case "join": --this._joinCount; break; } switch (membership) { - case "invite": ++this._inviteCount; - case "join": ++this._joinCount; + case "invite": ++this._inviteCount; break; + case "join": ++this._joinCount; break; } changed = true; } - if (membership === "join" && content.name) { - // TODO: avatar_url - changed = this._members.applyMember(content.name, content.state_key) || changed; - } + // if (membership === "join" && content.name) { + // // TODO: avatar_url + // changed = this._members.applyMember(content.name, content.state_key) || changed; + // } return changed; } @@ -147,40 +155,3 @@ export class RoomSummary { // this._recaculateNameIfNoneSet(); } } - -class SummaryMembers { - constructor(initialMembers = []) { - this._alphabeticalNames = initialMembers.map(m => m.name); - } - - applyMember(name, userId) { - let insertionIndex = 0; - for (var i = this._alphabeticalNames.length - 1; i >= 0; i--) { - const cmp = this._alphabeticalNames[i].localeCompare(name); - // name is already in the list, disambiguate - if (cmp === 0) { - name = disambiguateMember(name, userId); - } - // name should come after already present name, stop - if (cmp >= 0) { - insertionIndex = i + 1; - break; - } - } - // don't append names if list is full already - if (insertionIndex < SUMMARY_NAME_COUNT) { - this._alphabeticalNames.splice(insertionIndex, 0, name); - } - if (this._alphabeticalNames > SUMMARY_NAME_COUNT) { - this._alphabeticalNames = this._alphabeticalNames.slice(0, SUMMARY_NAME_COUNT); - } - } - - get names() { - return this._alphabeticalNames; - } - - asArray() { - return this._alphabeticalNames.map(n => {name: n}); - } -} diff --git a/src/session.js b/src/session.js index f12b25a9..da86b195 100644 --- a/src/session.js +++ b/src/session.js @@ -1,12 +1,15 @@ +import Room from "./room/room.js"; + export default class Session { constructor(storage) { this._storage = storage; this._session = null; - this._rooms = null; + this._rooms = {}; } // should be called before load // loginData has device_id, user_id, home_server, access_token async setLoginData(loginData) { + console.log("session.setLoginData"); const txn = this._storage.readWriteTxn([this._storage.storeNames.session]); const session = {loginData}; txn.session.set(session); @@ -17,6 +20,8 @@ export default class Session { const txn = this._storage.readTxn([ this._storage.storeNames.session, this._storage.storeNames.roomSummary, + this._storage.storeNames.roomState, + this._storage.storeNames.roomTimeline, ]); // restore session object this._session = await txn.session.get(); @@ -25,9 +30,9 @@ export default class Session { } // load rooms const rooms = await txn.roomSummary.getAll(); - await Promise.all(rooms.map(roomSummary => { - const room = this.createRoom(room.roomId); - return room.load(roomSummary); + await Promise.all(rooms.map(summary => { + const room = this.createRoom(summary.roomId); + return room.load(summary, txn); })); } diff --git a/src/storage/idb/create.js b/src/storage/idb/create.js index 6ee0a5c9..42f66e04 100644 --- a/src/storage/idb/create.js +++ b/src/storage/idb/create.js @@ -12,12 +12,21 @@ function createStores(db) { db.createObjectStore("roomSummary", {keyPath: "roomId"}); // needs roomId separate because it might hold a gap and no event const timeline = db.createObjectStore("roomTimeline", {keyPath: ["roomId", "sortKey"]}); - timeline.createIndex("byEventId", ["roomId", "event.event_id"], {unique: true}); - // how to get the first/last x events for a room? - // we don't want to specify the sort key, but would need an index for the room_id? - // take sort_key as primary key then and have index on event_id? - // still, you also can't have a PK of [room_id, sort_key] and get the last or first events with just the room_id? the only thing that changes it that the PK will provide an inherent sorting that you inherit in an index that only has room_id as keyPath??? There must be a better way, need to write a prototype test for this. - // SOLUTION: with numeric keys, you can just us a min/max value to get first/last - // db.createObjectStore("members", ["room_id", "state_key"]); - const state = db.createObjectStore("roomState", {keyPath: ["event.room_id", "event.type", "event.state_key"]}); + timeline.createIndex("byEventId", [ + "roomId", + "event.event_id" + ], {unique: true}); + + db.createObjectStore("roomState", {keyPath: [ + "roomId", + "event.type", + "event.state_key" + ]}); + + // const roomMembers = db.createObjectStore("roomMembers", {keyPath: [ + // "event.room_id", + // "event.content.membership", + // "event.state_key" + // ]}); + // roomMembers.createIndex("byName", ["room_id", "content.name"]); } \ No newline at end of file diff --git a/src/storage/idb/query-target.js b/src/storage/idb/query-target.js index c8052dfb..22fc325b 100644 --- a/src/storage/idb/query-target.js +++ b/src/storage/idb/query-target.js @@ -29,13 +29,14 @@ export default class QueryTarget { return this._selectWhile(range, predicate, "prev"); } - selectAll(range, direction) { + async selectAll(range, direction) { const cursor = this._target.openCursor(range, direction); const results = []; - return iterateCursor(cursor, (value) => { + await iterateCursor(cursor, (value) => { results.push(value); return true; }); + return results; } selectFirst(range) { @@ -69,13 +70,14 @@ export default class QueryTarget { }, direction); } - _selectWhile(range, predicate, direction) { + async _selectWhile(range, predicate, direction) { const cursor = this._target.openCursor(range, direction); const results = []; - return iterateCursor(cursor, (value) => { + await iterateCursor(cursor, (value) => { results.push(value); return predicate(results); }); + return results; } async _find(range, predicate, direction) { diff --git a/src/storage/idb/storage.js b/src/storage/idb/storage.js index fe13a5ab..8f411317 100644 --- a/src/storage/idb/storage.js +++ b/src/storage/idb/storage.js @@ -13,9 +13,9 @@ export default class Storage { } _validateStoreNames(storeNames) { - const unknownStoreName = storeNames.find(name => !STORE_NAMES.includes(name)); - if (unknownStoreName) { - throw new Error(`Tried to open a transaction for unknown store ${unknownStoreName}`); + const idx = storeNames.findIndex(name => !STORE_NAMES.includes(name)); + if (idx !== -1) { + throw new Error(`Tried to open a transaction for unknown store ${storeNames[idx]}`); } } diff --git a/src/storage/idb/store.js b/src/storage/idb/store.js index 0d2e2385..9b5daf46 100644 --- a/src/storage/idb/store.js +++ b/src/storage/idb/store.js @@ -2,23 +2,23 @@ import QueryTarget from "./query-target.js"; import { reqAsPromise } from "./utils.js"; export default class Store extends QueryTarget { - constructor(store) { - super(store); + constructor(idbStore) { + super(idbStore); } - get _store() { + get _idbStore() { return this._target; } index(indexName) { - return new QueryTarget(this._store.index(indexName)); + return new QueryTarget(this._idbStore.index(indexName)); } put(value) { - return reqAsPromise(this._store.put(value)); + return reqAsPromise(this._idbStore.put(value)); } add(value) { - return reqAsPromise(this._store.add(value)); + return reqAsPromise(this._idbStore.add(value)); } } \ No newline at end of file diff --git a/src/storage/idb/stores/RoomStateStore.js b/src/storage/idb/stores/RoomStateStore.js new file mode 100644 index 00000000..0a6ca949 --- /dev/null +++ b/src/storage/idb/stores/RoomStateStore.js @@ -0,0 +1,17 @@ +export default class RoomStateStore { + constructor(idbStore) { + this._roomStateStore = idbStore; + } + + async getEvents(type) { + + } + + async getEventsForKey(type, stateKey) { + + } + + async setStateEvent(roomId, event) { + return this._roomStateStore.put({roomId, event}); + } +} \ No newline at end of file diff --git a/src/storage/idb/stores/summary.js b/src/storage/idb/stores/RoomSummaryStore.js similarity index 76% rename from src/storage/idb/stores/summary.js rename to src/storage/idb/stores/RoomSummaryStore.js index e0487522..5d4c99ec 100644 --- a/src/storage/idb/stores/summary.js +++ b/src/storage/idb/stores/RoomSummaryStore.js @@ -8,6 +8,8 @@ store contains: isEncrypted isDirectMessage membership + inviteCount + joinCount */ export default class RoomSummaryStore { constructor(summaryStore) { @@ -17,4 +19,8 @@ export default class RoomSummaryStore { getAll() { return this._summaryStore.selectAll(); } + + set(summary) { + return this._summaryStore.put(summary); + } } diff --git a/src/storage/idb/stores/timeline.js b/src/storage/idb/stores/RoomTimelineStore.js similarity index 68% rename from src/storage/idb/stores/timeline.js rename to src/storage/idb/stores/RoomTimelineStore.js index 29b6e09e..df729277 100644 --- a/src/storage/idb/stores/timeline.js +++ b/src/storage/idb/stores/RoomTimelineStore.js @@ -1,25 +1,25 @@ import SortKey from "../../sortkey.js"; -class TimelineStore { +export default class RoomTimelineStore { constructor(timelineStore) { this._timelineStore = timelineStore; } async lastEvents(roomId, amount) { - return this.eventsBefore(roomId, GapSortKey.maxKey()); + return this.eventsBefore(roomId, SortKey.maxKey, amount); } async firstEvents(roomId, amount) { - return this.eventsAfter(roomId, GapSortKey.minKey()); + return this.eventsAfter(roomId, SortKey.minKey, amount); } eventsAfter(roomId, sortKey, amount) { - const range = IDBKeyRange.lowerBound([roomId, sortKey], true); + const range = IDBKeyRange.lowerBound([roomId, sortKey.buffer], true); return this._timelineStore.selectLimit(range, amount); } async eventsBefore(roomId, sortKey, amount) { - const range = IDBKeyRange.upperBound([roomId, sortKey], true); + const range = IDBKeyRange.upperBound([roomId, sortKey.buffer], true); const events = await this._timelineStore.selectLimitReverse(range, amount); events.reverse(); // because we fetched them backwards return events; @@ -36,7 +36,7 @@ class TimelineStore { appendGap(roomId, sortKey, gap) { this._timelineStore.add({ roomId: roomId, - sortKey: sortKey, + sortKey: sortKey.buffer, content: { event: null, gap: gap, @@ -45,9 +45,10 @@ class TimelineStore { } appendEvent(roomId, sortKey, event) { + console.info(`appending event for room ${roomId} with key ${sortKey}`); this._timelineStore.add({ roomId: roomId, - sortKey: sortKey, + sortKey: sortKey.buffer, content: { event: event, gap: null, @@ -56,6 +57,6 @@ class TimelineStore { } async removeEvent(roomId, sortKey) { - this._timelineStore.delete([roomId, sortKey]); + this._timelineStore.delete([roomId, sortKey.buffer]); } } diff --git a/src/storage/idb/stores/session.js b/src/storage/idb/stores/SessionStore.js similarity index 100% rename from src/storage/idb/stores/session.js rename to src/storage/idb/stores/SessionStore.js diff --git a/src/storage/idb/stores/room.js b/src/storage/idb/stores/room.js deleted file mode 100644 index fe5df86b..00000000 --- a/src/storage/idb/stores/room.js +++ /dev/null @@ -1,27 +0,0 @@ -class RoomStore { - - constructor(summary, db, syncTxn) { - this._summary = summary; - } - - getSummary() { - return Promise.resolve(this._summary); - } - - async setSummary(summary) { - this._summary = summary; - //... - } - - get timelineStore() { - - } - - get memberStore() { - - } - - get stateStore() { - - } -} diff --git a/src/storage/idb/stores/state.js b/src/storage/idb/stores/state.js deleted file mode 100644 index f303dd8d..00000000 --- a/src/storage/idb/stores/state.js +++ /dev/null @@ -1,17 +0,0 @@ -class StateStore { - constructor(db) { - - } - - async getEvents(type) { - - } - - async getEventsForKey(type, stateKey) { - - } - - async setState(events) { - - } -} \ No newline at end of file diff --git a/src/storage/idb/transaction.js b/src/storage/idb/transaction.js index 872fcc9c..78883964 100644 --- a/src/storage/idb/transaction.js +++ b/src/storage/idb/transaction.js @@ -1,7 +1,9 @@ import {txnAsPromise} from "./utils.js"; import Store from "./store.js"; -// import TimelineStore from "./stores/timeline.js"; -import SessionStore from "./stores/session.js"; +import SessionStore from "./stores/SessionStore.js"; +import RoomSummaryStore from "./stores/RoomSummaryStore.js"; +import RoomTimelineStore from "./stores/RoomTimelineStore.js"; +import RoomStateStore from "./stores/RoomStateStore.js"; export default class Transaction { constructor(txn, allowedStoreNames) { @@ -31,14 +33,22 @@ export default class Transaction { return this._stores[name]; } - // get roomTimeline() { - // return this._store("roomTimeline", idbStore => new TimelineStore(idbStore)); - // } - get session() { return this._store("session", idbStore => new SessionStore(idbStore)); } + get roomSummary() { + return this._store("roomSummary", idbStore => new RoomSummaryStore(idbStore)); + } + + get roomTimeline() { + return this._store("roomTimeline", idbStore => new RoomTimelineStore(idbStore)); + } + + get roomState() { + return this._store("roomState", idbStore => new RoomStateStore(idbStore)); + } + complete() { return txnAsPromise(this._txn); } diff --git a/src/storage/sortkey.js b/src/storage/sortkey.js index 4a576fb8..58cd3e78 100644 --- a/src/storage/sortkey.js +++ b/src/storage/sortkey.js @@ -1,6 +1,13 @@ -class GapSortKey { - constructor() { - this._keys = new Int32Array(2); +const MIN_INT32 = -2147483648; +const MAX_INT32 = 2147483647; + +export default class SortKey { + constructor(buffer) { + if (buffer) { + this._keys = new Int32Array(buffer, 2); + } else { + this._keys = new Int32Array(2); + } } get gapKey() { @@ -19,49 +26,99 @@ class GapSortKey { this._keys[1] = value; } - buffer() { + get buffer() { return this._keys.buffer; } nextKeyWithGap() { - const k = new Key(); + const k = new SortKey(); k.gapKey = this.gapKey + 1; k.eventKey = 0; return k; } nextKey() { - const k = new Key(); + const k = new SortKey(); k.gapKey = this.gapKey; k.eventKey = this.eventKey + 1; return k; } previousKey() { - const k = new Key(); + const k = new SortKey(); k.gapKey = this.gapKey; k.eventKey = this.eventKey - 1; return k; } clone() { - const k = new Key(); + const k = new SortKey(); k.gapKey = this.gapKey; k.eventKey = this.eventKey; return k; } static get maxKey() { - const maxKey = new GapSortKey(); - maxKey.gapKey = Number.MAX_SAFE_INTEGER; - maxKey.eventKey = Number.MAX_SAFE_INTEGER; + const maxKey = new SortKey(); + maxKey.gapKey = MAX_INT32; + maxKey.eventKey = MAX_INT32; return maxKey; } static get minKey() { - const minKey = new GapSortKey(); - minKey.gapKey = 0; - minKey.eventKey = 0; + const minKey = new SortKey(); + minKey.gapKey = MIN_INT32; + minKey.eventKey = MIN_INT32; return minKey; } + + toString() { + return `[${this.gapKey}/${this.eventKey}]`; + } } + +//#ifdef TESTS +export function tests() { + return { + test_default_key(assert) { + const k = new SortKey(); + assert.equal(k.gapKey, 0); + assert.equal(k.eventKey, 0); + }, + + test_inc(assert) { + const a = new SortKey(); + const b = a.nextKey(); + assert.equal(a.gapKey, b.gapKey); + assert.equal(a.eventKey + 1, b.eventKey); + const c = b.previousKey(); + assert.equal(b.gapKey, c.gapKey); + assert.equal(c.eventKey + 1, b.eventKey); + assert.equal(a.eventKey, c.eventKey); + }, + + test_min_key(assert) { + const minKey = SortKey.minKey; + const k = new SortKey(); + assert(minKey.gapKey < k.gapKey); + assert(minKey.eventKey < k.eventKey); + }, + + test_max_key(assert) { + const maxKey = SortKey.maxKey; + const k = new SortKey(); + assert(maxKey.gapKey > k.gapKey); + assert(maxKey.eventKey > k.eventKey); + }, + + test_immutable(assert) { + const a = new SortKey(); + const gapKey = a.gapKey; + const eventKey = a.gapKey; + a.nextKeyWithGap(); + assert.equal(a.gapKey, gapKey); + assert.equal(a.eventKey, eventKey); + } + }; +} +//#endif \ No newline at end of file diff --git a/src/sync.js b/src/sync.js index 5b037a52..b37b0f1b 100644 --- a/src/sync.js +++ b/src/sync.js @@ -1,21 +1,32 @@ -import {RequestAbortError} from "./hs-api.js"; -import {HomeServerError, StorageError} from "./error.js"; +import { + RequestAbortError, + HomeServerError, + StorageError +} from "./error.js"; +import EventEmitter from "./event-emitter.js"; -const INCREMENTAL_TIMEOUT = 30; +const INCREMENTAL_TIMEOUT = 30000; +const SYNC_EVENT_LIMIT = 10; -function parseRooms(responseSections, roomMapper) { - return ["join", "invite", "leave"].map(membership => { - const membershipSection = responseSections[membership]; - const results = Object.entries(membershipSection).map(([roomId, roomResponse]) => { - const room = roomMapper(roomId, membership); - return room.processInitialSync(roomResponse); - }); - return results; - }).reduce((allResults, sectionResults) => allResults.concat(sectionResults), []); +function parseRooms(roomsSection, roomCallback) { + if (!roomsSection) { + return; + } + const allMemberships = ["join", "invite", "leave"]; + for(const membership of allMemberships) { + const membershipSection = roomsSection[membership]; + if (membershipSection) { + const rooms = Object.entries(membershipSection) + for (const [roomId, roomResponse] of rooms) { + roomCallback(roomId, roomResponse, membership); + } + } + } } -export class Sync { +export default class Sync extends EventEmitter { constructor(hsApi, session, storage) { + super(); this._hsApi = hsApi; this._session = session; this._storage = storage; @@ -28,9 +39,10 @@ export class Sync { return; } this._isSyncing = true; - let syncToken = session.syncToken; + let syncToken = this._session.syncToken; // do initial sync if needed if (!syncToken) { + // need to create limit filter here syncToken = await this._syncRequest(); } this._syncLoop(syncToken); @@ -40,43 +52,53 @@ export class Sync { // if syncToken is falsy, it will first do an initial sync ... while(this._isSyncing) { try { - syncToken = await this._syncRequest(INCREMENTAL_TIMEOUT, syncToken); + console.log(`starting sync request with since ${syncToken} ...`); + syncToken = await this._syncRequest(syncToken, INCREMENTAL_TIMEOUT); } catch (err) { + console.warn("stopping sync because of error"); + this._isSyncing = false; this.emit("error", err); } } + this.emit("stopped"); } - async _syncRequest(timeout, syncToken) { - this._currentRequest = this._hsApi.sync(timeout, syncToken); - const response = await this._currentRequest.response; + async _syncRequest(syncToken, timeout) { + this._currentRequest = this._hsApi.sync(syncToken, undefined, timeout); + const response = await this._currentRequest.response(); syncToken = response.next_batch; const storeNames = this._storage.storeNames; - const syncTxn = this._storage.startReadWriteTxn([ - storeNames.timeline, + const syncTxn = this._storage.readWriteTxn([ storeNames.session, - storeNames.state + storeNames.roomSummary, + storeNames.roomTimeline, + storeNames.roomState, ]); try { - session.applySync(syncToken, response.account_data, syncTxn); + this._session.applySync(syncToken, response.account_data, syncTxn); // to_device // presence - parseRooms(response.rooms, async (roomId, roomResponse, membership) => { - let room = session.getRoom(roomId); - if (!room) { - room = session.createRoom(roomId); - } - room.applySync(roomResponse, membership, syncTxn); - }); + if (response.rooms) { + parseRooms(response.rooms, (roomId, roomResponse, membership) => { + let room = this._session.getRoom(roomId); + if (!room) { + room = this._session.createRoom(roomId); + } + console.log(` * applying sync response to room ${roomId} ...`); + room.applySync(roomResponse, membership, syncTxn); + }); + } } catch(err) { + console.warn("aborting syncTxn because of error"); // avoid corrupting state by only // storing the sync up till the point // the exception occurred - txn.abort(); + syncTxn.abort(); throw err; } try { - await txn.complete(); + await syncTxn.complete(); + console.info("syncTxn committed!!"); } catch (err) { throw new StorageError("unable to commit sync tranaction", err); }